04. Preprocess MEG data and compute inverse solution

Here we combine the previous pipeline (preprocessing pipeline, 03. Compute inverse solution) in a unique workflow. This example shows how to combine the different NeuroPycon pipeline to create an analysis workflow.

# Authors: Annalisa Pascarella <a.pascarella@iac.cnr.it>
# License: BSD (3-clause)

# sphinx_gallery_thumbnail_number = 1


import json
import pprint  # noqa

import os.path as op
import nipype.pipeline.engine as pe
from nipype.interfaces.utility import IdentityInterface, Function
import nipype.interfaces.io as nio

from ephypype.nodes import create_iterator, create_datagrabber
from ephypype.pipelines.preproc_meeg import create_pipeline_preproc_meeg  # noqa
from ephypype.pipelines.fif_to_inv_sol import create_pipeline_source_reconstruction  # noqa


# Read experiment params as json
params = json.load(open("params.json"))
pprint.pprint({'parameters': params})

data_type = params["general"]["data_type"]
subject_ids = params["general"]["subject_ids"]
NJOBS = params["general"]["NJOBS"]
if "data_path" in params["general"].keys():
    data_path = params["general"]["data_path"]
else:
    data_path = op.expanduser("~")
print("data_path : %s" % data_path)
{'parameters': {'general': {'NJOBS': 1,
                            'data_path': '/home/pasca/Science/workshop/PracticalMEEG/ds000117/derivatives/meg_derivatives',
                            'data_type': 'fif',
                            'session_ids': ['01', '02'],
                            'short': False,
                            'subject_ids': ['sub-01'],
                            'subjects_dir': '/home/pasca/Science/workshop/PracticalMEEG/ds000117/FSF'},
                'inverse': {'condition': ['face/famous',
                                          'scrambled',
                                          'face/unfamiliar'],
                            'events_id': {'face/famous/first': 5,
                                          'face/famous/immediate': 6,
                                          'face/famous/long': 7,
                                          'face/unfamiliar/first': 13,
                                          'face/unfamiliar/immediate': 14,
                                          'face/unfamiliar/long': 15,
                                          'scrambled/first': 17,
                                          'scrambled/immediate': 18,
                                          'scrambled/long': 19},
                            'method': 'dSPM',
                            'new_name_condition': ['famous',
                                                   'scrambled',
                                                   'unfamiliar'],
                            'parcellation': 'aparc',
                            'snr': 3.0,
                            'spacing': 'oct-6',
                            'tmax': 1.0,
                            'tmin': -0.2,
                            'trans_fname': '*-trans.fif'},
                'preprocessing': {'ECG_ch_name': 'EEG063',
                                  'EoG_ch_name': ['EEG061', 'EEG062'],
                                  'down_sfreq': 300,
                                  'h_freq': 40,
                                  'l_freq': 0.1,
                                  'reject': {'grad': 4e-10, 'mag': 4e-12},
                                  'variance': 0.999}}}
data_path : /home/pasca/Science/workshop/PracticalMEEG/ds000117/derivatives/meg_derivatives

Read the parameters for preprocessing from a json file and print it

pprint.pprint({'preprocessing parameters': params["preprocessing"]})

l_freq = params["preprocessing"]['l_freq']
h_freq = params["preprocessing"]['h_freq']
ECG_ch_name = params["preprocessing"]['ECG_ch_name']
EoG_ch_name = params["preprocessing"]['EoG_ch_name']
variance = params["preprocessing"]['variance']
reject = params["preprocessing"]['reject']
down_sfreq = params["preprocessing"]['down_sfreq']
{'preprocessing parameters': {'ECG_ch_name': 'EEG063',
                              'EoG_ch_name': ['EEG061', 'EEG062'],
                              'down_sfreq': 300,
                              'h_freq': 40,
                              'l_freq': 0.1,
                              'reject': {'grad': 4e-10, 'mag': 4e-12},
                              'variance': 0.999}}

Then, we create our workflow and specify the base_dir which tells nipype the directory in which to store the outputs.

# workflow directory within the `base_dir`
wf_name = 'preprocessing_full_inverse'
main_workflow = pe.Workflow(name=wf_name)
main_workflow.base_dir = data_path

Then we create a node to pass input filenames to DataGrabber from nipype

infosource = create_iterator(['subject_id'], [subject_ids])

and a node to grab data. The template_args in this node iterate upon the values in the infosource node

template_path = '*'

raw_file = '%s/ses-meg/meg_short/*%s*run*sss*.fif'
trans_file = '%s/ses-meg/meg_short/%s%s.fif'
field_template = dict(raw_file=raw_file, trans_file=trans_file)

template_args = dict(
    raw_file=[['subject_id', 'subject_id']],
    trans_file=[['subject_id', 'subject_id', "-trans"]])

datasource = create_datagrabber(data_path, template_path, template_args,
                                field_template=field_template,
                                infields=['subject_id'],
                                outfields=['raw_file', 'trans_file'])

main_workflow.connect(infosource, 'subject_id', datasource, 'subject_id')

Ephypype creates for us a pipeline which can be connected to these nodes we created. The preprocessing pipeline is implemented by the function ephypype.pipelines.preproc_meeg.create_pipeline_preproc_meeg, thus to instantiate this pipeline node, we import it and pass our parameters to it.

*** main_path -> /home/pasca/Science/workshop/PracticalMEEG/ds000117/derivatives/meg_derivatives ***

Function to extract events from the stimulus channel

def run_events_concatenate(list_ica_files, subject):
    '''
    The events are extracted from stim channel 'STI101'. The events are saved
    to the Node directory.
    For each subject, the different run are concatenated in one single raw file
    and saved in the Node directory. We take the different run from the
    preprocessing workflow directory, i.e. the cleaned raw data.
    '''

    print(subject, list_ica_files)
    import os
    import mne

    # could be added in a node to come
    mask = 4096 + 256  # mask for excluding high order bits
    delay_item = 0.0345
    min_duration = 0.015

    print("processing subject: %s" % subject)

    raw_list = list()
    events_list = list()
    fname_events_files = []

    print("  Loading raw data")
    for i, run_fname in enumerate(list_ica_files):
        run = i+1

        raw = mne.io.read_raw_fif(run_fname, preload=True)
        events = mne.find_events(raw, stim_channel='STI101',
                                 consecutive='increasing', mask=mask,
                                 mask_type='not_and',
                                 min_duration=min_duration)

        print("  S %s - R %s" % (subject, run))

        fname_events = os.path.abspath('run_%02d-eve.fif' % run)
        mne.write_events(fname_events, events)
        fname_events_files.append(fname_events)

        delay = int(round(delay_item * raw.info['sfreq']))
        events[:, 0] = events[:, 0] + delay
        events_list.append(events)

        raw_list.append(raw)

    raw, events = mne.concatenate_raws(raw_list, events_list=events_list)
    raw.set_eeg_reference(projection=True)
    raw_file = os.path.abspath('{}_sss_filt_dsamp_ica-raw.fif'.format(subject))
    print(raw_file)

    raw.save(raw_file, overwrite=True)

    event_file = os.path.abspath(
            '{}_sss_filt_dsamp_ica-raw-eve.fif'.format(subject))
    mne.write_events(event_file, events)

    del raw_list
    del raw

    return raw_file, event_file, fname_events_files

Group average on source level

def compute_morph_stc(subject, conditions, cond_files, subjects_dir):
    import os.path as op
    import mne

    print("processing subject: %s" % subject)

    # Morph STCs
    stc_morphed_files = []
    for k, cond_file in enumerate(cond_files):
        print(conditions[k])
        print(cond_file)

        stc = mne.read_source_estimate(cond_file)
        print(stc)

        morph = mne.compute_source_morph(
                        stc, subject_from=subject, subject_to='fsaverage',
                        subjects_dir=subjects_dir)
        print(morph)

        stc_morphed = morph.apply(stc)
        print(stc_morphed)

        stc_morphed_file = op.abspath('mne_dSPM_inverse_morph-%s' % (conditions[k]))  # noqa
        stc_morphed.save(stc_morphed_file)

        stc_morphed_files.append(stc_morphed_file)

    return stc_morphed_files


def show_files(files):
    print(files)
    return files


# building full inverse pipeline
def create_full_inv_pipeline(data_path, params,
                             pipeline_name="full_inv_pipeline"):

    # inverse parameters
    pprint.pprint({'inverse parameters': params["inverse"]})

    events_id = params["inverse"]['events_id']
    condition = params["inverse"]['condition']
    t_min = params["inverse"]['tmin']
    t_max = params["inverse"]['tmax']
    spacing = params["inverse"]['spacing']  # oct-6
    snr = params["inverse"]['snr']
    inv_method = params["inverse"]['method']  # dSPM
    parc = params["inverse"]['parcellation']  # aparc

    subjects_dir = op.join(data_path, params["general"]["subjects_dir"])

    # full inverse pipeline
    inv_pipeline = pe.Workflow(name=pipeline_name)
    inv_pipeline.base_dir = data_path

    # define the inputs of the pipeline
    inputnode = pe.Node(
        IdentityInterface(fields=['list_ica_files', 'subject_id', 'trans_file']),  # noqa
        name='inputnode')

    # We connect the output of infosource node to the one of datasource.
    # So, these two nodes taken together can grab data.
    inv_pipeline.connect(infosource, 'subject_id', datasource,  'subject_id')

    # We define the Node that encapsulates run_events_concatenate function
    concat_event = pe.Node(
        Function(input_names=['list_ica_files', 'subject'],
                 output_names=['raw_file', 'event_file', 'fname_events_files'],
                 function=run_events_concatenate),
        name='concat_event')

    # and its connections to the other nodes
    inv_pipeline.connect(inputnode, 'list_ica_files',
                         concat_event, 'list_ica_files')
    inv_pipeline.connect(inputnode, 'subject_id', concat_event, 'subject')

    ###########################################################################
    # Ephypype creates for us a pipeline to compute inverse solution which can
    # be connected to these nodes we created.
    inv_sol_workflow = create_pipeline_source_reconstruction(
        data_path, subjects_dir, spacing=spacing, inv_method=inv_method,
        is_epoched=True, is_evoked=True, events_id=events_id, condition=condition,  # noqa
        t_min=t_min, t_max=t_max, all_src_space=True, parc=parc, snr=snr)

    inv_pipeline.connect(inputnode, 'subject_id',
                         inv_sol_workflow, 'inputnode.sbj_id')
    inv_pipeline.connect(concat_event, ('raw_file', show_files),
                         inv_sol_workflow, 'inputnode.raw')
    inv_pipeline.connect(concat_event, ('event_file', show_files),
                         inv_sol_workflow, 'inputnode.events_file')

    inv_pipeline.connect(inputnode, 'trans_file',
                         inv_sol_workflow, 'inputnode.trans_file')

    conditions = params["inverse"]['new_name_condition']

    # We define the Node that encapsulates compute_morph_stc function
    morph_stc = pe.Node(
        Function(input_names=['subject', 'conditions', 'cond_files', 'subjects_dir'],  # noqa
                output_names=['stc_morphed_files'],
                function=compute_morph_stc),
        name="morph_stc")

    # and its connections to the other nodes
    inv_pipeline.connect(infosource, 'subject_id', morph_stc, 'subject')
    inv_pipeline.connect(inv_sol_workflow, 'inv_solution.stc_files',
                         morph_stc, 'cond_files')

    morph_stc.inputs.conditions = conditions
    morph_stc.inputs.subjects_dir = subjects_dir

    return inv_pipeline

Defining pipeline to compute inverse solution

inv_pipeline = create_full_inv_pipeline(data_path, params)

# and its connections to the other nodes
main_workflow.connect(preproc_workflow, 'ica.ica_file',
                      inv_pipeline, 'inputnode.list_ica_files')
main_workflow.connect(infosource, 'subject_id',
                      inv_pipeline, 'inputnode.subject_id')
main_workflow.connect(datasource, 'trans_file',
                      inv_pipeline, 'inputnode.trans_file')
{'inverse parameters': {'condition': ['face/famous',
                                      'scrambled',
                                      'face/unfamiliar'],
                        'events_id': {'face/famous/first': 5,
                                      'face/famous/immediate': 6,
                                      'face/famous/long': 7,
                                      'face/unfamiliar/first': 13,
                                      'face/unfamiliar/immediate': 14,
                                      'face/unfamiliar/long': 15,
                                      'scrambled/first': 17,
                                      'scrambled/immediate': 18,
                                      'scrambled/long': 19},
                        'method': 'dSPM',
                        'new_name_condition': ['famous',
                                               'scrambled',
                                               'unfamiliar'],
                        'parcellation': 'aparc',
                        'snr': 3.0,
                        'spacing': 'oct-6',
                        'tmax': 1.0,
                        'tmin': -0.2,
                        'trans_fname': '*-trans.fif'}}
******************** {}

Workflow graph

To do so, we first write the workflow graph (optional)

main_workflow.write_graph(graph2use='colored')  # colored
'/home/pasca/Science/workshop/PracticalMEEG/ds000117/derivatives/meg_derivatives/preprocessing_full_inverse/graph.png'

Take a moment to pause and notice how the connections here correspond to how we connected the nodes.

import matplotlib.pyplot as plt  # noqa
img = plt.imread(op.join(data_path, wf_name, 'graph.png'))
plt.figure(figsize=(7, 7))
plt.imshow(img)
plt.axis('off')

main_workflow.config['execution'] = {'remove_unnecessary_outputs': 'false'}
main_workflow.run(plugin='LegacyMultiProc', plugin_args={'n_procs': NJOBS})
plot 04 preprocessing inverse
/home/pasca/Science/workshop/PracticalMEEG/ds000117/derivatives/meg_derivatives/preprocessing_full_inverse/full_inv_pipeline/_subject_id_sub-01/concat_event/sub-01_sss_filt_dsamp_ica-raw.fif
/home/pasca/Science/workshop/PracticalMEEG/ds000117/derivatives/meg_derivatives/preprocessing_full_inverse/full_inv_pipeline/_subject_id_sub-01/concat_event/sub-01_sss_filt_dsamp_ica-raw.fif
/home/pasca/Science/workshop/PracticalMEEG/ds000117/derivatives/meg_derivatives/preprocessing_full_inverse/full_inv_pipeline/_subject_id_sub-01/concat_event/sub-01_sss_filt_dsamp_ica-raw-eve.fif

<networkx.classes.digraph.DiGraph object at 0x7fa895bb2410>

Finally, we are now ready to execute our workflow.

main_workflow.config['execution'] = {'remove_unnecessary_outputs': 'false'}

# Run workflow locally on 1 CPU
main_workflow.run(plugin='LegacyMultiProc', plugin_args={'n_procs': NJOBS})
/home/pasca/Science/workshop/PracticalMEEG/ds000117/derivatives/meg_derivatives/preprocessing_full_inverse/full_inv_pipeline/_subject_id_sub-01/concat_event/sub-01_sss_filt_dsamp_ica-raw.fif
/home/pasca/Science/workshop/PracticalMEEG/ds000117/derivatives/meg_derivatives/preprocessing_full_inverse/full_inv_pipeline/_subject_id_sub-01/concat_event/sub-01_sss_filt_dsamp_ica-raw.fif
/home/pasca/Science/workshop/PracticalMEEG/ds000117/derivatives/meg_derivatives/preprocessing_full_inverse/full_inv_pipeline/_subject_id_sub-01/concat_event/sub-01_sss_filt_dsamp_ica-raw-eve.fif

<networkx.classes.digraph.DiGraph object at 0x7fa8983b07c0>

Total running time of the script: ( 1 minutes 1.669 seconds)

Gallery generated by Sphinx-Gallery