allow pseudorandom draws of settings in active

parent 25e6fedc
......@@ -9,6 +9,7 @@ define_variable_meanings.make_events_json_dict: action_type, action, outcome
import itertools
import numpy as np
import pandas as pd
def get_payoff_settings(ev_diff):
......@@ -130,7 +131,7 @@ def get_payoff_settings(ev_diff):
return payoff_settings
def get_random_payoff_dict(payoff_settings):
def get_random_payoff_dict(payoff_settings, pseudorand=False, df=None):
"""Given an array of possible payoff settings, get a random setting.
Parameters
......@@ -138,6 +139,14 @@ def get_random_payoff_dict(payoff_settings):
payoff_settings : ndarray, shape (n, 8)
Subset of all possible payoff distribution settings.
pseudorand : bool
If True, make a random pick of payoff settings, where the currently
least presented outcome is present. You must specify the df argument
if True. Defauls to False.
df : pd.DataFrame | None
The data to be supplied if pseudorand is True. Defaults to None.
Returns
-------
payoff_dict : dict
......@@ -151,11 +160,15 @@ def get_random_payoff_dict(payoff_settings):
removed.
"""
n, __ = payoff_settings.shape
selected_row = np.random.randint(0, n)
if pseudorand:
assert isinstance(df, pd.DataFrame)
selected_row_idx = provide_balancing_selection(df, payoff_settings)
else:
n, __ = payoff_settings.shape
selected_row_idx = np.random.randint(0, n)
# Form a payoff dictionary from the selected setting
payoff_setting = payoff_settings[selected_row, :]
payoff_setting = payoff_settings[selected_row_idx, :]
payoff_dict = dict()
payoff_dict[0] = [int(payoff_setting[0])] * int(payoff_setting[2]*10)
payoff_dict[0] += [int(payoff_setting[1])] * int(payoff_setting[3]*10)
......@@ -163,13 +176,98 @@ def get_random_payoff_dict(payoff_settings):
payoff_dict[1] += [int(payoff_setting[5])] * int(payoff_setting[7]*10)
# Remore the selected setting from all settings (no replacement)
payoff_settings = np.delete(payoff_settings, [selected_row], axis=0)
payoff_settings = np.delete(payoff_settings, [selected_row_idx], axis=0)
return payoff_dict, payoff_settings
def shuffle_left_right(payoff_dict):
"""Given a payoff dict, randomly change the keys."""
keys = list(payoff_dict.keys())
vals_idx = np.random.permutation(keys)
payoff_dict = {key: payoff_dict[val] for key, val in zip(keys, vals_idx)}
return payoff_dict
def provide_balancing_selection(df, payoff_settings):
"""Provide index for setting containing little sampled stimuli.
Given the previously recorded data, find which stimuli have been presented
only few times and then return an index into payoff_settings that will
resolve to a setting containing such a little sampled stimulus. Here,
stimuli are the numbers 1 to 9, as well as the side they appear on, so
18 distinct stimulus classes.
Parameters
----------
payoff_settings : ndarray, shape (n, 8)
Subset of all possible payoff distribution settings.
df : pd.DataFrame | None
The data collected until this point.
Returns
-------
selected_row_idx : int
The row index of the payoff_settings that should be picked to balance
the currently presented stimuli.
"""
# Get sampling actions and corresponding outcomes so far
actions = df[df['value'].isin([5, 6])]['action'].to_numpy(copy=True,
dtype=int)
outcomes = df[df['value'] == 9]['outcome'].to_numpy(copy=True, dtype=int)
# combine actions and outcomes to code outcomes on the left with negative
# sign outcomes on the right with positive sign ... will end up with stim
# classes
stim_classes = outcomes * (actions*2-1)
# Make a histogram of which stimulus_classes we have collected so far
bins = np.hstack((np.arange(-9, 0), np.arange(1, 11)))
stim_class_hist = np.histogram(stim_classes, bins)
# Make an array from the hist and sort it
stim_class_arr = np.vstack((stim_class_hist[0], stim_class_hist[1][:-1])).T
stim_class_arr_sorted = stim_class_arr[stim_class_arr[:, 0].argsort()]
# From what we have seen so far, take the stim_classes we have seen least
# as priotities to be shown
stim_to_show_i = 0
stim_to_show = stim_class_arr_sorted[stim_to_show_i, 1]
# Now start to look in our payoff settings, which potential settings
# contain the stimulus class to be shown
while True:
number = np.abs(stim_to_show)
side = np.sign(stim_to_show)
# Select only payoff settings that contain the specific number
num_select = payoff_settings[(payoff_settings ==
number).any(axis=1), :]
# From the number specific selection, select only where the number is
# on a specific side
if side == -1:
num_side_select = num_select[np.where(num_select ==
number)[1] <= 1, :]
else:
num_side_select = num_select[np.where(num_select ==
number)[1] > 1, :]
# If we found some candidates, return the selection for a random pick
# from it
if num_side_select.shape[0] > 0:
break
# Else, take the next simulus to be shown in line and try to find a
# selection
stim_to_show_i += 1
if stim_to_show_i == stim_to_show.shape[0]:
# This should never happen ...
raise RuntimeError('We have run out of potential stimuli to show.')
else:
stim_to_show = stim_class_arr_sorted[stim_to_show_i, 1]
# Now we have the selection of payoff_settings and we can pick a random
# row from our selection
n, __ = num_side_select.shape
num_side_select_idx = np.random.randint(0, n)
selected_row = num_side_select[num_side_select_idx]
# And find the index into the payoff_settings
selected_row_idx = np.where((payoff_settings ==
selected_row).all(axis=1))[0][0]
return selected_row_idx
......@@ -34,7 +34,6 @@ from sp_experiment.utils import (utils_fps,
from sp_experiment.psychopy_utils import get_fixation_stim
from sp_experiment.define_payoff_settings import (get_payoff_settings,
get_random_payoff_dict,
shuffle_left_right
)
from sp_experiment.define_ttl_triggers import provide_trigger_dict
......@@ -186,11 +185,16 @@ current_nblocks = 0
current_ntrls = 0
while current_ntrls < max_ntrls:
# For each trial, take a new payoff setting
# For each trial, take a new payoff setting.
# When active condition, read the current data to make a pseudorandom draw
# of a payoff setting. This is to guarantee that also stimuli that have
# been sampled the least so far will be included more often.
if condition == 'active':
payoff_dict, payoff_settings = get_random_payoff_dict(payoff_settings)
# shuffle the payoff dict so distributions can occurr left or right
payoff_dict = shuffle_left_right(payoff_dict)
df_pseudorand = pd.read_csv(data_file, sep='\t')
payoff_dict, payoff_settings = get_random_payoff_dict(payoff_settings,
pseudorand=True,
df=df_pseudorand
)
log_data(data_file, onset=exp_timer.getTime(), trial=current_ntrls,
payoff_dict=payoff_dict)
......
"""Testing the setup of the payoff distributions."""
import os.path as op
import numpy as np
import pandas as pd
import sp_experiment
from sp_experiment.define_payoff_settings import (get_payoff_settings,
get_random_payoff_dict,
shuffle_left_right
)
......@@ -15,32 +18,33 @@ def test_get_payoff_settings():
assert payoff_settings.shape[0] >= 1
for probability in payoff_settings[0, [2, 3, 6, 7]]:
assert probability in np.round(np.arange(0.1, 1, 0.1), 1)
mags = list()
for magnitude in payoff_settings[0, [0, 1, 4, 5]]:
assert magnitude in range(1, 10)
mags.append(magnitude)
assert len(np.unique(mags)) == 4
def test_get_random_payoff_dict():
"""Test getting a payoff_dict off a setup."""
payoff_settings = get_payoff_settings(0.1)
n_settings = payoff_settings.shape[0]
payoff_dict, payoff_settings = get_random_payoff_dict(payoff_settings)
assert isinstance(payoff_dict, dict)
def test_shuffle_left_right():
"""Test that we can shuffle values of a dict."""
d = {0: [1, 2, 3], 1: [4, 5, 6]}
n = 100
was_shuffled = False
for i in range(n):
d_new = shuffle_left_right(d)
# Assert the order of keys is stable after each shuffle
np.testing.assert_array_equal(np.fromiter(d.keys(), int),
np.fromiter(d_new.keys(), int))
# Over the n shuffles, we should experience a switch of values at least
# once
vals_0 = np.asarray(d_new[0])
if np.array_equal(vals_0, d[1]):
was_shuffled = True
assert was_shuffled
# Should be a dict
assert isinstance(payoff_dict, dict)
assert len(list(payoff_dict.values())[0]) == 10
assert len(list(payoff_dict.values())[1]) == 10
# Payoff settings should have been decreased by one
assert payoff_settings.shape[0] == (n_settings - 1)
# Also test with a pseudorandom draw
init_dir = op.dirname(sp_experiment.__file__)
test_data_dir = op.join(init_dir, 'tests', 'data')
fname = '2_trials_no_errors.tsv'
fpath = op.join(test_data_dir, fname)
df = pd.read_csv(fpath, sep='\t')
payoff_dict, payoff_settings = get_random_payoff_dict(payoff_settings,
pseudorand=True,
df=df)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment