'''
igazelib: A low level library for analyzing gaze-data files provided usually
by eyetracker-devices.
Gaze data is expected to be in JSON-format such as list of datapoints with
similar dict storing the properties for each point.
Example:
[{xcoordinate:0.4, ...}, {xcoordinate:0.5, ...}, ..., {xcoordinate:-1, ...}]
List elements are called "gazepoints", and dict keys as "keys".
gazelib-library is designed to be used with a script file that calls
the library functions to perform different analysis steps.
Created by researchers in Infant Cognition Lab,
University of Tampere, Finland
'''
indent = " "
[docs]def mean(lst):
'''
Return mean of the list of numbers.
'''
# http://stackoverflow.com/a/7716358/638546
return float(sum(lst)) / len(lst) if len(lst) > 0 else float('nan')
[docs]def first_gazepoints_by_time(data, time_key, timeunits, silent=True):
'''
Clip first rows from DATA before milliseconds count of time has passed
'''
if not silent:
print("Picking first " + str(timeunits) +
" timeunit gazepoints from data...")
print(indent + "List contains " + str(len(data)) +
" gazepoints before operation.")
tk = time_key # short alias
start = int(data[0][tk])
new_data = [gp.copy() for gp in data if timeunits > int(gp[tk]) - start]
if not silent:
print(indent + "List contains " + str(len(new_data)) +
" gazepoints after operation.")
return new_data
[docs]def first_gazepoints(data, gpcount, silent=True):
'''
Clip first rows from DATA before milliseconds count of time has passed
'''
if not silent:
print("Picking first " + str(gpcount) + " gazepoints from data...")
print(indent + "List contains " + str(len(data)) +
" gazepoints before operation.")
new_data = [gp.copy() for index, gp in enumerate(data) if index < gpcount]
if not silent:
print(indent + "List contains " + str(len(new_data)) +
" gazepoints after operation.")
return new_data
[docs]def gazepoints_after_time(data, time_key, timeunits, silent=True):
'''
Clip rows from DATA after [timeunits] count of time has passed
'''
if not silent:
print("Picking gazepoints after " + str(timeunits) +
" timeunits from data using TETTime...")
print(indent + "List contains " + str(len(data)) +
" gazepoints before operation.")
tk = time_key # short alias
# find the time of the first datapoint of the list
start = int(get_value(data, 0, tk))
# generate a new list of datapoints
new_data = [gp.copy() for gp in data if timeunits <= int(gp[tk]) - start]
if not silent:
print(indent + "List contains " + str(len(new_data)) +
" gazepoints after operation.")
return new_data
[docs]def gazepoints_containing_value(data, key, value_list, silent=True):
'''
return the rows that contain certain value
'''
if not silent:
print("Picking gazepoints with values " + str(value_list) +
" assosiated with key " + str(key))
print(indent + "List contains " + str(len(data)) +
" gazepoints before operation.")
# check if input is really a list
assert isinstance(value_list, list)
# find gazepoints which contain the value
gazepoints_found = [gp.copy() for gp in data if gp[key] in value_list]
if not silent:
print(indent + "Datamatrix contains " + str(len(gazepoints_found)) +
" gazepoints.")
return gazepoints_found
[docs]def gazepoints_not_containing_value(data, key, value_list, silent=True):
'''
return the rows that DO NOT contain certain value
'''
if not silent:
print("Picking gazepoints without values " + str(value_list) +
" assosiated with key " + str(key))
print(indent + "List contains " + str(len(data)) + " gazepoints.")
# check if input is really a list
assert isinstance(value_list, list)
# find rows which do not contain the value
rows_found = [row for row in data if row[key] not in value_list]
if not silent:
print(indent + "List contains " + str(len(rows_found)) +
" gazepoints.")
return rows_found
[docs]def split_at_change_in_value(data, key, silent=True):
'''
Split one list to multiple lists. New list is started each time when
value in the column changes from previous one. Omitted on the case of
first element.
'''
# return list of lists
if not silent:
print("Splitting data when change in value for key: " + str(key))
list_of_new_datas = []
new_data = []
previous_item = None
# loop throught all gazepoints
for gp in data:
# if there is gazepoints before
if previous_item is not None:
# if previous item does not match the one we are processing
if not previous_item == gp[key]:
list_of_new_datas.append(new_data)
new_data = []
new_data.append(gp.copy())
previous_item = gp[key]
# append the last remaining clip when all looped
list_of_new_datas.append(new_data)
if not silent:
print(indent + "Returning " + str(len(list_of_new_datas)) +
" gazepoint lists.")
return list_of_new_datas
[docs]def get_value(data, gazepoint, key):
'''
Returns a value from specific datapoint with specific key.
'''
return data[gazepoint][key]
[docs]def replace_value(data, key, value_to_replace, value, silent=True):
'''
replaces all values value_to_replace on key with value.
'''
if not silent:
print("Replacing values for key:" + str(key) + " with value " +
str(value_to_replace))
vtr = value_to_replace
new_values = [value if gp[key] == vtr else gp[key] for gp in data]
new_data = add_key(data, key, new_values)
if not silent:
print(indent + "Done.")
return new_data
[docs]def border_violation(data, aoi, xkey, ykey, valkey,
accepted_validities, silent=True):
'''
Return true if during-non valid perioid gaze has crossed aoi-border.
'''
if not silent:
print("Calculating if a gaze moved over aoi border " +
"during invalid data...")
gaze_in_last_good = True
gaze_okay_before = True
for index, row in enumerate(data):
# if accepted_validities.__contains__(row[valcol]):
if row[valkey] in accepted_validities:
# gaze okay
gaze_okay = True
else:
# gaze not okay
gaze_okay = False
gaze_in = inside_aoi(aoi, row[xkey], row[ykey])
if index > 0:
if gaze_okay:
if not gaze_okay_before and gaze_in != gaze_in_last_good:
if not silent:
print(indent + "Border violation detected.")
return True
gaze_in_last_good = gaze_in
gaze_okay_before = gaze_okay
if not silent:
print(indent + "No border violation detected.")
return False
[docs]def inside_aoi(aoi, x, y):
'''
If coordinates are inside aoi, return true, otherwise false.
Aoi is expected to be a dict with keys "x1", "x2", "y1", "y2".
Aoi represents an rectangle with top-left corner at (x1, y1) and
bottom-right corner at (x2, y2).
'''
if aoi["x1"] < x and x < aoi["x2"] and aoi["y1"] < y and y < aoi["y2"]:
return True
else:
return False
[docs]def combine_coordinates(data, accepted_validities, rxkey, rykey, rvalkey,
lxkey, lykey, lvalkey, silent=True):
'''
Combine two coordinate-columns with third validity-column to one column.
'''
if not silent:
print("Combining two columns...")
x = []
y = []
val = []
# loop all rows and collect x, y coordinates +
# minimum validity value (assumed to be best)
for row in data:
x.append(mean_of_valid_values([row[rxkey], row[lxkey]],
[row[rvalkey], row[lvalkey]],
accepted_validities))
y.append(mean_of_valid_values([row[rykey], row[lykey]],
[row[rvalkey], row[lvalkey]],
accepted_validities))
val.append(min(int(row[rvalkey]), int(row[lvalkey])))
if not silent:
print(indent + "Done.")
return x, y, val
[docs]def add_key(data, key, new_values):
'''
Adds a key to the datapoint-list. New_values must match in length to
the column with keys.
'''
new_data = []
for index, gp in enumerate(data):
new_gp = gp.copy() # use copy-method to not only affect pointer
new_gp[key] = new_values[index]
new_data.append(new_gp)
return new_data
[docs]def get_key(data, key):
'''
Returns a list of values, from single key in the data-parameter.
'''
column = []
for row in data:
column.append(row[key])
return column
[docs]def interpolate_using_last_good_value(data, key, valkey, accepted_validities,
silent=True):
'''
Interpolates values with key "key" in DATA-matrix by replacing the bad
value with last good value before bad values (if there is at least one good
value, otherwise, do nothing). Validitycolumn contains the validity
markings for each datapoint and good validities are defined by the accepted
validities-parameter. If the beginning of a trail is "bad", use the first
appearing good value to interpolate that.
'''
if not silent:
print("Interpolating values + " + key +
": using last good (or first good) value...")
# find the first non-bad, if any
first_valid = -1
for gpnum, gp in enumerate(data):
if gp[valkey] in accepted_validities and first_valid == -1:
first_valid = gpnum
# there was at least one good value
if first_valid == -1:
# if not, return data as it was
if not silent:
print(indent + "Done. No good data available")
return data
new_data = []
last_non_bad = get_value(data, first_valid, key)
for gp in data:
if gp[valkey] in accepted_validities:
# valid data
last_non_bad = gp[key]
else:
gp[key] = last_non_bad
new_data.append(gp)
if not silent:
print(indent + "Done.")
return new_data
[docs]def gaze_inside_aoi(data, xcol, ycol, aoi, firstorlast, silent=True):
'''
Finds either the first row when gaze enters aoi or last.
If gaze does not enter aoi, return -1
'''
if not silent:
print("Calculating when gaze inside aoi: " + str(aoi) +
" " + firstorlast + "time...")
last_in = -1
for rownumber, row in enumerate(data):
if inside_aoi(aoi, row[xcol], row[ycol]):
if firstorlast == "first":
if not silent:
print(indent + "Done.")
return rownumber
last_in = rownumber
if not silent:
print(indent + "Done.")
return last_in
[docs]def gaze_inside_aoi_percentage(data, xcol, ycol, aoi, silent=True):
'''
Calculate the percentage of gaze inside aoi borders on given data.
'''
if not silent:
print("Calculating when the portion of gaze inside aoi: " + str(aoi))
rowcount = len(data)
if rowcount == 0:
return -1
gaze_inside = 0
for row in data:
if inside_aoi(aoi, row[xcol], row[ycol]):
gaze_inside = gaze_inside + 1
if not silent:
print(indent + "Done.")
return gaze_inside / float(rowcount)
[docs]def longest_non_valid_streak(data, valkey, timekey, accepted_validities,
silent=True):
'''Longest streak of invalid values'''
if not silent:
print("Calculating longest non-valid streak...")
streak_started = None
longest_streak = 0
streak_on = False
for gp in data:
# check if ongoing streak and put to longest if is
if streak_on:
# Assert streak_started is set if streak_on
streaklen = float(gp[timekey]) - float(streak_started)
if streaklen > longest_streak:
longest_streak = streaklen
# test if this datapoint valid and start streak or end it
valid = gp[valkey] in accepted_validities
if valid:
streak_on = False
else:
if not streak_on:
streak_started = gp[timekey]
streak_on = True
if not silent:
print(indent + "Done.")
return longest_streak
[docs]def valid_gaze_percentage(data, valkey, accepted_validities):
'''Calculates the percentage of valid gaze in data.'''
if len(data) == 0:
return -1
valid = 0
for gp in data:
if gp[valkey] in accepted_validities:
valid = valid + 1
return float(valid) / float(len(data))
[docs]def duration(data, timekey):
'''Returns the length of the data in time units.'''
if len(data) == 0:
return -1
elif len(data) <= 1:
return 0.0
end_time = float(get_value(data, -1, timekey))
start_time = float(get_value(data, 0, timekey))
return end_time - start_time
[docs]def SRT_index(rtimes, max_rt, min_rt):
'''Calculate SRT index (Leppanen et al.)'''
SRTs = []
for rtime in rtimes:
SRTs.append(1 - (max_rt-rtime) / (max_rt-min_rt))
return mean(SRTs)
[docs]def group(data, group_key, value_key):
'''
Groups a list of datapoint-dicts so that one of the keys is used as
"grouping key" - according to this the values specified by "value key" are
sorted to the dict containing lists.
'''
datas_by_group = {}
for datapoint in data:
group = datapoint[group_key]
value = datapoint[value_key]
# if already entry of this type of group_key in the dict
if group in datas_by_group:
datas_by_group[group].append(value)
else:
datas_by_group[group] = [value]
return datas_by_group
[docs]def group_lists(datas, group_key):
'''
Groups data by 0'th value in group_column and places each subset to a group
defined by group_column.
parameter: list of list of rows, grouping key
value in datapoint[0][group_key] expected to exist
'''
datas_by_group = {}
for data in datas:
group = get_value(data, 0, group_key)
# if already entry of this key in the
if group in datas_by_group:
datas_by_group[group] = datas_by_group[group] + data
else:
datas_by_group[group] = data
return datas_by_group
[docs]def mean_of_valid_values(values, validities, accepted_validities):
'''
Returns mean of good validity tagged values. If none, returns -1.
values: list, validities:list
'''
goodvalues = []
for index, i in enumerate(values):
if validities[index] in accepted_validities:
goodvalues.append(float(i))
if len(goodvalues) == 0:
return -1
else:
return mean(goodvalues)