{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%matplotlib inline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n\n# 02. Preprocess MEG data\nThe `preprocessing pipeline ` runs the ICA algorithm for an\nautomatic removal of eyes and heart related artefacts.\nA report is automatically generated and can be used to correct and/or fine-tune\nthe correction in each subject.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Authors: Annalisa Pascarella \n# License: BSD (3-clause)\n\n# sphinx_gallery_thumbnail_number = 2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import modules\nThe first step is to import the modules we need in the script. We import\nmainly from |nipype| and |ephypype| packages.\n\n.. |nipype| raw:: html\n\n nipype\n\n.. |ephypype| raw:: html\n\n ephypype\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import json\nimport pprint\n\nimport os.path as op\nimport nipype.pipeline.engine as pe\n\nfrom ephypype.nodes import create_iterator, create_datagrabber\nfrom ephypype.pipelines.preproc_meeg import create_pipeline_preproc_meeg" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Define data and variables\nLet us specify the variables that are specific for the data analysis (the\nmain directories where the data are stored, the list of subjects and\nsessions, ...) and the variables specific for the particular pipeline\n(downsampling frequency, EOG and ECG channels, cut-off frequencies, ...) in a\n|params.json| file\n(if it is does work, try to go on the github page, and right-click \"Save As\" on the Raw button).\n\n.. |params.json| replace::\n :download:`json `\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Read experiment params as json\nparams = json.load(open(\"params.json\"))\npprint.pprint({'parameters': params[\"general\"]})\n\ndata_type = params[\"general\"][\"data_type\"]\nsubject_ids = params[\"general\"][\"subject_ids\"]\nNJOBS = params[\"general\"][\"NJOBS\"]\nsession_ids = params[\"general\"][\"session_ids\"]\n\nis_short = params[\"general\"][\"short\"] # to analyze a shorter segment of data\n\nif \"data_path\" in params[\"general\"].keys():\n data_path = params[\"general\"][\"data_path\"]\nelse:\n data_path = op.expanduser(\"~\")\nprint(\"data_path : %s\" % data_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then, we read the **parameters for preprocessing** from the json file and print\nit. In the json file we set : the names of EoG and ECG channels, the\nfilter settings, the downsampling frequency, the number of ICA components\nspecified as a fraction of explained variance (0.999) and a reject\ndictionary to exclude time segments.\nThe list of all input can be found in the definition of the pipeline\n:func:`~ephypype.pipelines.create_pipeline_preproc_meeg`\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "pprint.pprint({'preprocessing parameters': params[\"preprocessing\"]})\n\nl_freq = params[\"preprocessing\"]['l_freq']\nh_freq = params[\"preprocessing\"]['h_freq']\nECG_ch_name = params[\"preprocessing\"]['ECG_ch_name']\nEoG_ch_name = params[\"preprocessing\"]['EoG_ch_name']\nvariance = params[\"preprocessing\"]['variance']\nreject = params[\"preprocessing\"]['reject']\ndown_sfreq = params[\"preprocessing\"]['down_sfreq']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Specify Nodes\nBefore to create a |workflow| we have to create the |nodes| that define the\nworkflow itself. In this example the main Nodes are\n\n.. |workflow| raw:: html\n\n workflow\n\n.. |nodes| raw:: html\n\n nodes\n\n* ``infosource`` is a Node that just distributes values (`meg_infosourcenode`);\n* ``datasource`` is a |DataGrabber| Node that allows the user to **define flexible search patterns** which can be parameterized by user defined inputs (`meg_datagrabbernode`) ;\n* ``preproc_meg_pipeline`` is a Node containing the NeuroPycon pipeline created by :func:`~ephypype.pipelines.create_pipeline_preproc_meeg` (`preproc_meg_node`).\n\n.. |DataGrabber| raw:: html\n\n DataGrabber\n\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n### Infosource\nThe ephypype function :func:`~ephypype.nodes.create_iterator` creates the\n``infosource`` node that allows to distributes values: when we need to feed\nthe different subject names into the workflow we only need a Node that can\nreceive the input and distribute those inputs to the workflow.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "infosource = create_iterator(['subject_id', 'session_id'],\n [subject_ids, session_ids])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n### DataGrabber\nThen we create the ``datasource`` node to grab data. The ephypype function\n:func:`~ephypype.nodes.create_datagrabber`\ncreates a node to grab data using |DataGrabber| in Nipype. The DataGrabber\nInterface allows to define **flexible search patterns** which can be\nparameterized by user defined inputs (such as subject ID, session, etc.).\n\nIn this example we parameterize the pattern search with ``subject_id`` and\n``session_id``. The ``template_args`` in this node iterates upon the values\nin the ``infosource`` node.\nWe look for MEG data contained in ``ses-meg/meg`` (``ses-meg/meg_short``) folder.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "if is_short:\n template_path = '%s/ses-meg/meg_short/*%s*run*%s*sss*.fif'\nelse:\n template_path = '%s/ses-meg/meg/*%s*run*%s*sss*.fif'\ntemplate_args = [['subject_id', 'subject_id', 'session_id']]\ndatasource = create_datagrabber(data_path, template_path, template_args)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n### Preprocessing Node\nEphypype creates for us a pipeline which can be connected to these\nnodes we created. The preprocessing pipeline is implemented by the function\n:func:`~ephypype.pipelines.create_pipeline_preproc_meeg`, thus\nto instantiate this pipeline node, we pass our parameters to it.\n\nEach pipeline provided by NeuroPycon requires two different kind of inputs:\n\n* inputs of the pipeline\n* **inputnode**: particular inputs defined after the creation of the pipeline;\n an inputnode of a pipeline is defined by an output of a previous Node.\n\nFor example, looking at the definition of :func:`~ephypype.pipelines.create_pipeline_preproc_meeg`\nwe have the input of the pipeline (e.g., ``main_path``, ``lfreq``) and the \ninputnode ``raw_file`` and ``subject_id``. \nIn the next section `workflow_meg` we'll see how to specify these inputnode.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "preproc_workflow = create_pipeline_preproc_meeg(\n data_path, pipeline_name=\"preproc_meg_dsamp_pipeline\",\n l_freq=l_freq, h_freq=h_freq,\n variance=variance, ECG_ch_name=ECG_ch_name, EoG_ch_name=EoG_ch_name,\n data_type=data_type, down_sfreq=down_sfreq)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n## Specify Workflows and Connect Nodes\nThe purpose of Workflow is to guide the sequential execution of Nodes: we\ncreate a main Workflow to connect the different Nodes and define the data\nflow from the outputs of one Node to the inputs of the connected Nodes.\nThe specified connections create our workflow: the created nodes and the\ndependencies between them are represented as a graph\n(see `graph_preproc_meg`), in this way it is easy to see which nodes are\nexecuted and in which order.\n\nIt is important to point out that we have to connect the output and input\nfields of each node to the output and input fields of another node.\n\nNow, we create our main workflow and specify the ``base_dir`` which tells\nnipype the directory in which to store the outputs.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "preproc_wf_name = 'preprocessing_dsamp_short_workflow' if is_short \\\n else 'preprocessing_dsamp_workflow'\n\nmain_workflow = pe.Workflow(name=preproc_wf_name)\nmain_workflow.base_dir = data_path" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We then connect the nodes two at a time. First, we connect the two outputs\n(``subject_id`` and ``session_id``) of the `meg_infosourcenode` node to\nthe `meg_datagrabbernode` node. So, these two nodes taken together can\ngrab data.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "main_workflow.connect(infosource, 'subject_id', datasource, 'subject_id')\nmain_workflow.connect(infosource, 'session_id', datasource, 'session_id')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Similarly, for the inputnode of the `preproc_meg_node`. Things will\nbecome clearer in a moment when we plot the graph of the workflow.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "main_workflow.connect(infosource, 'subject_id',\n preproc_workflow, 'inputnode.subject_id')\nmain_workflow.connect(datasource, 'raw_file',\n preproc_workflow, 'inputnode.raw_file')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

Note

The inputnode ``raw_file`` of the pipeline node ``preproc_workflow``\n is the output of ``datasource`` node.

\n\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run workflow\nAfter we have specified all the nodes and connections of the workflow, the\nlast step is to run it by calling the ``run()`` method. It\u2019s also possible to\ngenerate static graph representing nodes and connections between them by\ncalling ``write_graph`` method.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "main_workflow.write_graph(graph2use='colored') # optional" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n### Workflow graph\n\nTake a moment to pause and notice how the connections\nhere correspond to how we connected the nodes. In other words, the\nconnections we specified created the workflow: the nodes and the dependencies\nbetween them are represented as a graph, in this way it is easy to see which\nnodes are executed and in which order.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import matplotlib.pyplot as plt # noqa\nimg = plt.imread(op.join(data_path, preproc_wf_name, 'graph.png'))\nplt.figure(figsize=(6, 6))\nplt.imshow(img)\nplt.axis('off')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

Note

We have to connect the output and input fields of each node to the\n output and input fields of another node.

\n\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run\n\nFinally, we are now ready to execute our workflow.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "main_workflow.config['execution'] = {'remove_unnecessary_outputs': 'false'}\n\n# Run workflow locally on 1 CPU\nmain_workflow.run(plugin='LegacyMultiProc', plugin_args={'n_procs': NJOBS})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

Note

If we rerun the workflow, only the nodes whose inputs have changed\n since the last run will be executed again. If not, it will simply\n return cached results. This is achieved by recording a hash of the inputs.

\n\n\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Results\nThe output of this workflow is the preprocessed data stored in the workflow\ndirectory defined by ``base_dir``. Here we find the folder\n``preprocessing_dsamp_workflow`` where all the results of each iteration are\nsorted by nodes. The cleaned data will be used in `plot_events_inverse`.\n\nIt\u2019s a good rule to inspect the report file saved in the ``ica`` dir to look\nat the excluded ICA components.\n\n

Note

You could use this :download:`notebook `\n to better inspect your ICs.

\n\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import mne # noqa\nfrom ephypype.gather import get_results # noqa\n\nica_files, raw_files = get_results(main_workflow.base_dir,\n main_workflow.name, pipeline='ica')\n\nfor ica_file, raw_file in zip(ica_files, raw_files):\n print(f'*** {raw_file} ***')\n raw = mne.io.read_raw_fif(raw_file)\n ica = mne.preprocessing.read_ica(ica_file)\n ica.plot_properties(raw, picks=ica.exclude, figsize=[4.5, 4.5])\n\n # ica.plot_components()\n # ica.plot_sources(raw)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.8" } }, "nbformat": 4, "nbformat_minor": 0 }