Source code for graphpype.nodes.graph_stats

"""
Preparing, computing stats and permutations
author:: David Meunier <david_meunier_79@univ-amu.fr>
"""
import numpy as np
import os

import itertools as iter

from nipype.interfaces.base import BaseInterface, \
    BaseInterfaceInputSpec, traits, File, TraitedSpec, isdefined


from nipype.utils.filemanip import split_filename as split_f


import graphpype.utils_stats as stats

from graphpype.utils_cor import (return_corres_correl_mat,
                                 return_corres_correl_mat_labels)


# StatsPairBinomial


class StatsPairBinomialInputSpec(BaseInterfaceInputSpec):

    group_coclass_matrix_file1 = File(
        exists=True,
        desc='file of group 1 coclass matrices in npy format',
        mandatory=True)

    group_coclass_matrix_file2 = File(
        exists=True,
        desc='file of group 2 coclass matrices in npy format',
        mandatory=True)

    conf_interval_binom_fdr = traits.Float(
        0.05,
        usedefault=True,
        desc='Alpha value used as FDR implementation',
        mandatory=False)


class StatsPairBinomialOutputSpec(TraitedSpec):

    signif_signed_adj_fdr_mat_file = File(
        exists=True,
        desc="int matrix with corresponding codes to significance")


class StatsPairBinomial(BaseInterface):
    """      """
    input_spec = StatsPairBinomialInputSpec
    output_spec = StatsPairBinomialOutputSpec

    def _run_interface(self, runtime):

        group_coclass_matrix_file1 = self.inputs.group_coclass_matrix_file1
        group_coclass_matrix_file2 = self.inputs.group_coclass_matrix_file2
        conf_interval_binom_fdr = self.inputs.conf_interval_binom_fdr

        # loading group_coclass_matrix1
        group_coclass_matrix1 = np.array(
            np.load(group_coclass_matrix_file1), dtype=float)
        print(group_coclass_matrix1.shape)

        # loading group_coclass_matrix2
        group_coclass_matrix2 = np.array(
            np.load(group_coclass_matrix_file2), dtype=float)
        print(group_coclass_matrix2.shape)

        # check input matrices
        # TODO checks are not done in compute_pairwise_binom_fdr already?
        Ix, Jx, nx = group_coclass_matrix1.shape
        Iy, Jy, ny = group_coclass_matrix2.shape

        assert Ix == Iy
        assert Jx == Jy
        assert Ix == Jx
        assert Iy == Jy

        signif_signed_adj_mat = stats.compute_pairwise_binom_fdr(
            group_coclass_matrix1, group_coclass_matrix2,
            conf_interval_binom_fdr)

        # save pairwise signed stat file
        signif_signed_adj_fdr_mat_file = os.path.abspath(
            'signif_signed_adj_fdr_' + str(conf_interval_binom_fdr) + '.npy')
        np.save(signif_signed_adj_fdr_mat_file, signif_signed_adj_mat)

        return runtime

    def _list_outputs(self):
        outputs = self._outputs().get()

        outputs["signif_signed_adj_fdr_mat_file"] = \
            os.path.abspath('signif_signed_adj_fdr_{}.npy'.format(
                self.inputs.conf_interval_binom_fdr))
        return outputs


# StatsPairTTest


class StatsPairTTestInputSpec(BaseInterfaceInputSpec):

    group_cormat_file1 = File(
        exists=True,
        desc='file of group 1 cormat matrices in npy format', mandatory=True)
    group_cormat_file2 = File(
        exists=True,
        desc='file of group 2 cormat matrices in npy format', mandatory=True)

    t_test_thresh_fdr = traits.Float(
        0.05, usedefault=True,
        desc='Alpha value used as FDR implementation', mandatory=False)

    paired = traits.Bool(True, usedefault=True,
                         desc='Ttest is paired or not', mandatory=False)


class StatsPairTTestOutputSpec(TraitedSpec):

    signif_signed_adj_fdr_mat_file = File(
        exists=True,
        desc="int matrix with corresponding codes to significance")


[docs]class StatsPairTTest(BaseInterface): """ Compute ttest stats between 2 group of matrix - matrix are arranged in group_cormat, with order (Nx,Ny,Nsubj). Nx = Ny (each matricx is square) - t_test_thresh_fdr is optional (default, 0.05) - paired in indicate if ttest is pairde or not. If paired, both group have the same number of samples """ input_spec = StatsPairTTestInputSpec output_spec = StatsPairTTestOutputSpec def _run_interface(self, runtime): print('in plot_cormat') group_cormat_file1 = self.inputs.group_cormat_file1 group_cormat_file2 = self.inputs.group_cormat_file2 t_test_thresh_fdr = self.inputs.t_test_thresh_fdr paired = self.inputs.paired print("loading group_cormat1") group_cormat1 = np.array(np.load(group_cormat_file1), dtype=float) print(group_cormat1.shape) print("loading group_cormat2") group_cormat2 = np.array(np.load(group_cormat_file2), dtype=float) print(group_cormat2.shape) print("compute NBS stats") # check input matrices Ix, Jx, nx = group_cormat1.shape Iy, Jy, ny = group_cormat2.shape assert Ix == Iy assert Jx == Jy assert Ix == Jx assert Iy == Jy # TODO checks are not done in compute_pairwise_ttest_fdr already? signif_signed_adj_mat = stats.compute_pairwise_ttest_fdr( group_cormat1, group_cormat2, t_test_thresh_fdr, paired) print('save pairwise signed stat file') signif_signed_adj_fdr_mat_file = os.path.abspath( 'signif_signed_adj_fdr_' + str(t_test_thresh_fdr) + '.npy') np.save(signif_signed_adj_fdr_mat_file, signif_signed_adj_mat) # return signif_signed_adj_fdr_mat_file return runtime def _list_outputs(self): outputs = self._outputs().get() outputs["signif_signed_adj_fdr_mat_file"] = \ os.path.abspath('signif_signed_adj_fdr_{}.npy'.format( self.inputs.t_test_thresh_fdr)) return outputs
# PrepareCormat class PrepareCormatInputSpec(BaseInterfaceInputSpec): cor_mat_files = traits.List( File(exists=True), desc='list of all correlation matrice files (in npy format) for each \ subject', mandatory=True) coords_files = traits.List( File(exists=True), desc='list of all coordinates in numpy space files (in txt format) \ for each subject', mandatory=True, xor=['labels_files']) labels_files = traits.List(File( exists=True), desc='list of labels (in txt format) for each subject', mandatory=True, xor=['coords_files']) gm_mask_coords_file = File( exists=True, desc='Coordinates in numpy space, corresponding to all possible nodes\ in the original space', mandatory=False, xor=['gm_mask_labels_file']) gm_mask_labels_file = File( exists=True, desc='Labels for all possible nodes - in case coords are varying from\ one indiv to the other', mandatory=False, xor=['gm_mask_coords_file']) class PrepareCormatOutputSpec(TraitedSpec): group_cormat_file = File( exists=True, desc="all cormat matrices of the group in .npy (pickle format)") avg_cormat_file = File( exists=True, desc="average of cormat matrix of the group in .npy (pickle format)") group_vect_file = File( exists=True, desc="degree by nodes * indiv of the group in .npy (pickleformat)")
[docs]class PrepareCormat(BaseInterface): """ Average correlation matrices, within a common reference (based on labels, or coordinates) Inputs: cor_mat_files: type = List of File, (exists=True), desc='list of all correlation matrice files (in npy format) for each subject', mandatory=True coords_files: type = List of File, (exists=True), desc='list of all coordinates in numpy space files for each subject', mandatory=True, xor = ['labels_files'] labels_files: type = List of File, (exists=True), desc='list of labels (in txt format) for each subject', mandatory=True, xor = ['coords_files']) gm_mask_coords_file: type = File(exists=True, desc='Coordinates in numpy space, corresponding to all possible nodes in the original space', mandatory=False, xor = ['gm_mask_labels_file']) gm_mask_labels_file : type = File, (exists=True), desc='Labels for all possible nodes - in case coords are varying from one indiv to the other (source space for example)', mandatory=False, xor = ['gm_mask_coords_file']) """ input_spec = PrepareCormatInputSpec output_spec = PrepareCormatOutputSpec def _prep_coords_cormat(self): cor_mat_files = self.inputs.cor_mat_files nb_cormats = len(cor_mat_files) coords_files = self.inputs.coords_files gm_mask_coords_file = self.inputs.gm_mask_coords_file # loading gm mask corres gm_mask_coords = np.array(np.loadtxt(gm_mask_coords_file), dtype='int') nb_nodes = gm_mask_coords.shape[0] # defining return matrices sum_cormat = np.zeros((nb_nodes, nb_nodes), dtype=float) group_cormat = np.zeros((nb_nodes, nb_nodes, nb_cormats), dtype=float) group_vect = np.zeros((nb_nodes, nb_cormats), dtype=float) assert nb_cormats == len(coords_files), \ ("Error, nb_cormats and coords_files are imcompatible {} \ {}".format(nb_cormats, len(coords_files))) for index_file in range(nb_cormats): assert os.path.exists(cor_mat_files[index_file]) assert os.path.exists(coords_files[index_file]) Z_cor_mat = np.load(cor_mat_files[index_file]) print(Z_cor_mat.shape) coords = np.array(np.loadtxt( coords_files[index_file]), dtype='int') print(coords.shape) corres_cor_mat, possible_edge_mat = \ return_corres_correl_mat(Z_cor_mat, coords, gm_mask_coords) corres_cor_mat = corres_cor_mat + np.transpose(corres_cor_mat) sum_cormat += corres_cor_mat group_cormat[:, :, index_file] = corres_cor_mat group_vect[:, index_file] = np.sum(corres_cor_mat, axis=0) return sum_cormat, group_cormat, group_vect def _prep_labels_cormat(self): cor_mat_files = self.inputs.cor_mat_files nb_cormats = len(cor_mat_files) labels_files = self.inputs.labels_files gm_mask_labels_file = self.inputs.gm_mask_labels_file # loading gm mask labels gm_mask_labels = np.array( [line.strip() for line in open(gm_mask_labels_file)], dtype='str') nb_nodes = gm_mask_labels.shape[0] # defining return matrices sum_cormat = np.zeros((nb_nodes, nb_nodes), dtype=float) group_cormat = np.zeros((nb_nodes, nb_nodes, nb_cormats), dtype=float) group_vect = np.zeros((nb_nodes, len(cor_mat_files)), dtype=float) assert nb_cormats == len(labels_files), \ ("Error, length of cor_mat_files, labels_files are imcompatible {}\ {}".format(nb_cormats, len(labels_files))) for index_file in range(len(cor_mat_files)): assert os.path.exists(cor_mat_files[index_file]) assert os.path.exists(labels_files[index_file]) Z_cor_mat = np.load(cor_mat_files[index_file]) labels = np.array([line.strip() for line in open( labels_files[index_file])], dtype='str') corres_cor_mat, possible_edge_mat = \ return_corres_correl_mat_labels(Z_cor_mat, labels, gm_mask_labels) corres_cor_mat = corres_cor_mat + np.transpose(corres_cor_mat) sum_cormat += corres_cor_mat group_cormat[:, :, index_file] = corres_cor_mat group_vect[:, index_file] = np.sum(corres_cor_mat, axis=0) return sum_cormat, group_cormat, group_vect def _run_interface(self, runtime): cor_mat_files = self.inputs.cor_mat_files if isdefined(self.inputs.gm_mask_coords_file) and \ isdefined(self.inputs.coords_files): sum_cormat, group_cormat, group_vect = self._prep_coords_cormat() elif isdefined(self.inputs.gm_mask_labels_file) and \ isdefined(self.inputs.labels_files): sum_cormat, group_cormat, group_vect = self._prep_labels_cormat() else: print("Error, neither coords nor labels are defined properly") group_cormat_file = os.path.abspath('group_cormat.npy') np.save(group_cormat_file, group_cormat) group_vect_file = os.path.abspath('group_vect.npy') np.save(group_vect_file, group_vect) # saving cor_mat matrix avg_cormat_file = os.path.abspath('avg_cormat.npy') if (len(cor_mat_files) != 0): avg_cormat = sum_cormat / len(cor_mat_files) np.save(avg_cormat_file, avg_cormat) return runtime def _list_outputs(self): outputs = self._outputs().get() outputs["group_cormat_file"] = os.path.abspath('group_cormat.npy') outputs["avg_cormat_file"] = os.path.abspath('avg_cormat.npy') outputs["group_vect_file"] = os.path.abspath('group_vect.npy') return outputs
# SwapLists class SwapListsInputSpec(BaseInterfaceInputSpec): list_of_lists = traits.List( traits.List(traits.List(File(exists=True))), desc='list of all correlation matrice files for each subject', mandatory=True) unbalanced = traits.Bool(False, default=True, usedefault=True) seed = traits.Int(-1, desc='value for seed', mandatory=True, usedefault=True) class SwapListsOutputSpec(TraitedSpec): permut_lists_of_lists = traits.List( traits.List(traits.List(File(exists=True))), desc='swapped list of all correlation matrice files for each subject', mandatory=True)
[docs]class SwapLists(BaseInterface): """ Exchange lists of files in a random fashion (based on seed value) Typically, cor_mat, coords -> 2, or Z_list, node_corres and labels -> 3 If seed = -1, no swap is done (keep original values) Inputs: list_of_lists: type = List of List of List of Files, exists=True, desc='list of all correlation matrice files (in npy format) for each subject', mandatory=True seed: type = Int, default = -1, desc='value for seed', mandatory=True, usedefault = True Outputs: permut_lists_of_lists: type = List of List of List of Files ,exists=True, desc='swapped list of all correlation matrice files (in npy format) for each subject', mandatory=True """ input_spec = SwapListsInputSpec output_spec = SwapListsOutputSpec def _run_interface(self, runtime): list_of_lists = self.inputs.list_of_lists seed = self.inputs.seed unbalanced = self.inputs.unbalanced # checking lists homogeneity if unbalanced: nb_files_per_list = [] else: nb_files_per_list = -1 # number of elements tomix from (highest level) nb_set_to_shuffle = len(list_of_lists) # number of files per case to shuffle # (typically cor_mat, coords -> 2, or Z_list, # node_corres and labels -> 3) nb_args = -1 for i in range(nb_set_to_shuffle): if nb_args == -1: nb_args = len(list_of_lists[i]) else: assert nb_args == len(list_of_lists[i]), \ ("Error list length {} != than list {} length {}".format( nb_args, i, len(list_of_lists[i]))) if unbalanced: nb_files_per_list.append(len(list_of_lists[i][0])) else: if nb_files_per_list == -1: nb_files_per_list = len(list_of_lists[i][0]) else: assert nb_files_per_list == len(list_of_lists[i][0]), \ ("Error list length {} != than list {} length \ {}".format(nb_files_per_list, i, len(list_of_lists[i][0]))) print(nb_files_per_list) # generating 0 or 1 for each subj: if seed == -1: self.permut_lists_of_lists = self.inputs.list_of_lists return runtime np.random.seed(seed) if unbalanced: def prod(x, y): return x * y print(sum(nb_files_per_list)) is_permut = np.array(np.random.randint( nb_set_to_shuffle, size=sum(nb_files_per_list)), dtype=int) print(is_permut) else: def prod(x, y): return x * y # print reduce(prod,nb_files_per_list,1) is_permut = np.array(np.random.randint( nb_set_to_shuffle, size=nb_files_per_list), dtype=int) print(is_permut) np.savetxt(os.path.abspath("is_permut.txt"), is_permut, fmt="%d") # shuffling list self.permut_lists_of_lists = [ [[] for i in range(nb_args)] for j in range(nb_set_to_shuffle)] if unbalanced: merged_group_lists = [] for i in range(nb_args): merged_group_list = [] for group in range(nb_set_to_shuffle): merged_group_list = merged_group_list + \ list_of_lists[group][i] merged_group_lists.append(merged_group_list) for index_file, permut in enumerate(is_permut): for i in range(nb_args): self.permut_lists_of_lists[permut][i].append( merged_group_lists[i][index_file]) else: for index_file, permut in enumerate(is_permut): for j in range(nb_set_to_shuffle): shift = j + permut rel_shift = shift % nb_set_to_shuffle for i in range(nb_args): self.permut_lists_of_lists[j][i].append( list_of_lists[rel_shift][i][index_file]) return runtime def _list_outputs(self): outputs = self._outputs().get() outputs["permut_lists_of_lists"] = self.permut_lists_of_lists return outputs
# ShuffleMatrix class ShuffleMatrixInputSpec(BaseInterfaceInputSpec): original_matrix_file = File( exists=True, desc='original matrix in npy format', mandatory=True) seed = traits.Int(-1, desc='value for seed', mandatory=True, usedefault=True) class ShuffleMatrixOutputSpec(TraitedSpec): shuffled_matrix_file = File( exists=True, desc='shuffled matrix in npy format', mandatory=True)
[docs]class ShuffleMatrix(BaseInterface): """ Compute randomisation of matrix, keeping the same distribution If seed = -1, no shuffling is done (keep original values) Inputs: original_matrix_file: type = File, exists=True, desc='original matrix in npy format', mandatory=True seed: type = Int, default = -1, desc='value for seed', mandatory=True, usedefault = True Outputs: shuffled_matrix_file: type = File, exists=True, desc='shuffled matrix in npy format', mandatory=True """ input_spec = ShuffleMatrixInputSpec output_spec = ShuffleMatrixOutputSpec def _run_interface(self, runtime): original_matrix_file = self.inputs.original_matrix_file seed = self.inputs.seed path, fname, ext = split_f(original_matrix_file) orig_mat = np.load(original_matrix_file) if seed == -1: print("keeping original matrix") shuf_mat = orig_mat else: print("randomizing " + str(seed)) np.random.seed(seed) np.fill_diagonal(orig_mat, np.nan) shuf_mat = np.zeros(shape=orig_mat.shape, dtype=orig_mat.dtype) for i, j in iter.combinations(list(range(orig_mat.shape[0])), 2): bool_ok = False while not bool_ok: new_ind = np.random.randint( low=orig_mat.shape[0], size=2) if not np.isnan(orig_mat[new_ind[0], new_ind[1]]): shuf_mat[i, j] = orig_mat[new_ind[0], new_ind[1]] shuf_mat[j, i] = orig_mat[new_ind[0], new_ind[1]] orig_mat[new_ind[0], new_ind[1]] = np.nan orig_mat[new_ind[1], new_ind[0]] = np.nan bool_ok = True shuffled_matrix_file = os.path.abspath("shuffled_matrix.npy") np.save(shuffled_matrix_file, shuf_mat) return runtime def _list_outputs(self): outputs = self._outputs().get() outputs["shuffled_matrix_file"] = os.path.abspath( "shuffled_matrix.npy") return outputs
# ShuffleNetList class ShuffleNetListInputSpec(BaseInterfaceInputSpec): orig_net_list_file = File( exists=True, desc='original net list in txt format', mandatory=True) seed = traits.Int(-1, desc='value for seed', mandatory=True, usedefault=True) class ShuffleNetListOutputSpec(TraitedSpec): shuffled_net_list_file = File( exists=True, desc='shuffled net list in txt format', mandatory=True) class ShuffleNetList(BaseInterface): """ Extract mean time series from a labelled mask in Nifti Format where the voxels of interest have values 1 """ input_spec = ShuffleNetListInputSpec output_spec = ShuffleNetListOutputSpec def _run_interface(self, runtime): orig_net_list_file = self.inputs.orig_net_list_file seed = self.inputs.seed path, fname, ext = split_f(orig_net_list_file) original_net_list = np.loadtxt(orig_net_list_file) if seed == -1: print("keeping original matrix") else: print("randomizing " + str(seed)) np.random.seed(seed) np.random.shuffle(original_net_list[:, 0]) np.random.shuffle(original_net_list[:, 1]) shuffled_net_list_file = os.path.abspath("shuffled_net_list.txt") np.savetxt(shuffled_net_list_file, original_net_list, fmt="%d %d %d") return runtime def _list_outputs(self): outputs = self._outputs().get() outputs["shuffled_net_list_file"] = os.path.abspath( "shuffled_net_list.txt") return outputs