This is an automated email from the git hooks/post-receive script. yoh pushed a commit to annotated tag v0.1 in repository python-mne.
commit 906abbbb7ea38863c62aaff6c3558715a2e12076 Author: Alexandre Gramfort <[email protected]> Date: Thu Aug 25 15:12:32 2011 -0400 ENH : new apply_inverse_epochs and refactoring of minimum_norm.inverse + tests --- .../plot_compute_mne_inverse_epochs_in_label.py | 65 +++++ mne/minimum_norm/__init__.py | 2 +- mne/minimum_norm/inverse.py | 311 ++++++++++++--------- mne/minimum_norm/tests/test_inverse.py | 61 +++- 4 files changed, 299 insertions(+), 140 deletions(-) diff --git a/examples/inverse/plot_compute_mne_inverse_epochs_in_label.py b/examples/inverse/plot_compute_mne_inverse_epochs_in_label.py new file mode 100644 index 0000000..3975282 --- /dev/null +++ b/examples/inverse/plot_compute_mne_inverse_epochs_in_label.py @@ -0,0 +1,65 @@ +""" +================================================== +Compute MNE-dSPM inverse solution on single epochs +================================================== + +Compute dSPM inverse solution on single trial epochs restricted +to a brain label. + +""" + +# Author: Alexandre Gramfort <[email protected]> +# +# License: BSD (3-clause) + +print __doc__ + +import pylab as pl +import mne +from mne.datasets import sample +from mne.fiff import Raw, pick_types +from mne.minimum_norm import apply_inverse_epochs, read_inverse_operator + + +data_path = sample.data_path('..') +fname_inv = data_path + '/MEG/sample/sample_audvis-meg-oct-6-meg-inv.fif' +fname_raw = data_path + '/MEG/sample/sample_audvis_filt-0-40_raw.fif' +fname_event = data_path + '/MEG/sample/sample_audvis_filt-0-40_raw-eve.fif' +label_name = 'Aud-lh' +fname_label = data_path + '/MEG/sample/labels/%s.label' % label_name + +event_id, tmin, tmax = 1, -0.2, 0.5 +snr = 3.0 +lambda2 = 1.0 / snr ** 2 +dSPM = True + +# Load data +inverse_operator = read_inverse_operator(fname_inv) +label = mne.read_label(fname_label) +raw = Raw(fname_raw) +events = mne.read_events(fname_event) + +# Set up pick list +include = [] +exclude = raw.info['bads'] + ['EEG 053'] # bads + 1 more + +# pick MEG channels +picks = pick_types(raw.info, meg=True, eeg=False, stim=False, eog=True, + include=include, exclude=exclude) +# Read epochs +epochs = mne.Epochs(raw, events, event_id, tmin, tmax, picks=picks, + baseline=(None, 0), reject=dict(mag=4e-12, grad=4000e-13, + eog=150e-6)) + +# Compute inverse solution and stcs for each epoch +stcs = apply_inverse_epochs(epochs, inverse_operator, lambda2, dSPM, label, + pick_normal=True) + +data = sum(stc.data for stc in stcs) / len(stcs) + +############################################################################### +# View activation time-series +pl.plot(1e3 * stcs[0].times, data.T) +pl.xlabel('time (ms)') +pl.ylabel('dSPM value') +pl.show() diff --git a/mne/minimum_norm/__init__.py b/mne/minimum_norm/__init__.py index 273ffa8..899ec93 100644 --- a/mne/minimum_norm/__init__.py +++ b/mne/minimum_norm/__init__.py @@ -1,3 +1,3 @@ from .inverse import read_inverse_operator, apply_inverse, minimum_norm, \ - apply_inverse_raw + apply_inverse_raw, apply_inverse_epochs from .time_frequency import source_induced_power diff --git a/mne/minimum_norm/inverse.py b/mne/minimum_norm/inverse.py index 7f36d4d..6930e94 100644 --- a/mne/minimum_norm/inverse.py +++ b/mne/minimum_norm/inverse.py @@ -14,7 +14,7 @@ from ..fiff.tag import find_tag from ..fiff.matrix import _read_named_matrix, _transpose_named_matrix from ..fiff.proj import read_proj, make_projector from ..fiff.tree import dir_tree_find -from ..fiff.pick import pick_channels_evoked, pick_channels +from ..fiff.pick import pick_channels from ..cov import read_cov from ..source_space import read_source_spaces_from_tree, find_source_space_hemi @@ -131,8 +131,7 @@ def read_inverse_operator(fname): # inv['eigen_leads'] = _transpose_named_matrix(inv['eigen_leads']) inv['eigen_fields'] = _read_named_matrix(fid, invs, - FIFF.FIFF_MNE_INVERSE_FIELDS) - + FIFF.FIFF_MNE_INVERSE_FIELDS) print '[done]' # # Read the covariance matrices @@ -271,6 +270,20 @@ def combine_xyz(vec, square=False): return comb +def _combine_ori(sol, inverse_operator, pick_normal): + if inverse_operator['source_ori'] == FIFF.FIFFV_MNE_FREE_ORI: + print 'combining the current components...', + if pick_normal: + is_loose = 0 < inverse_operator['orient_prior']['data'][0] < 1 + if not is_loose: + raise ValueError('The pick_normal parameter is only valid ' + 'when working with loose orientations.') + sol = sol[2::3] # take one every 3 sources ie. only the normal + else: + sol = combine_xyz(sol) + return sol + + def prepare_inverse_operator(orig, nave, lambda2, dSPM): """Prepare an inverse operator for actually computing the inverse @@ -346,7 +359,8 @@ def prepare_inverse_operator(orig, nave, lambda2, dSPM): # # No need to omit the zeroes due to projection # - inv['whitener'] = np.diag(1.0 / np.sqrt(inv['noise_cov']['data'].ravel())) + inv['whitener'] = np.diag(1.0 / + np.sqrt(inv['noise_cov']['data'].ravel())) print ('\tCreated the whitener using a diagonal noise covariance ' 'matrix (%d small eigenvalues discarded)' % ncomp) @@ -389,6 +403,94 @@ def prepare_inverse_operator(orig, nave, lambda2, dSPM): return inv +def _assemble_kernel(inv, label, dSPM): + # + # Simple matrix multiplication followed by combination of the + # three current components + # + # This does all the data transformations to compute the weights for the + # eigenleads + # + eigen_leads = inv['eigen_leads']['data'] + source_cov = inv['source_cov']['data'][:, None] + noise_norm = inv['noisenorm'][:, None] + + src = inv['src'] + lh_vertno = src[0]['vertno'] + rh_vertno = src[1]['vertno'] + + if label is not None: + lh_vertno, rh_vertno, src_sel = _get_label_sel(label, inv) + + noise_norm = noise_norm[src_sel] + + if inv['source_ori'] == FIFF.FIFFV_MNE_FREE_ORI: + src_sel = 3 * src_sel + src_sel = np.c_[src_sel, src_sel + 1, src_sel + 2] + src_sel = src_sel.ravel() + + eigen_leads = eigen_leads[src_sel] + source_cov = source_cov[src_sel] + + trans = inv['reginv'][:, None] * reduce(np.dot, + [inv['eigen_fields']['data'], + inv['whitener'], + inv['proj']]) + # + # Transformation into current distributions by weighting the eigenleads + # with the weights computed above + # + if inv['eigen_leads_weighted']: + # + # R^0.5 has been already factored in + # + print '(eigenleads already weighted)...', + K = np.dot(eigen_leads, trans) + else: + # + # R^0.5 has to be factored in + # + print '(eigenleads need to be weighted)...', + K = np.sqrt(source_cov) * np.dot(eigen_leads, trans) + + if not dSPM: + noise_norm = None + + return K, noise_norm, lh_vertno, rh_vertno + + +def _make_stc(sol, tmin, tstep, lh_vertno, rh_vertno): + stc = SourceEstimate(None) + stc.data = sol + stc.tmin = tmin + stc.tstep = tstep + stc.lh_vertno = lh_vertno + stc.rh_vertno = rh_vertno + stc._init_times() + return stc + + +def _get_label_sel(label, inv): + src = inv['src'] + lh_vertno = src[0]['vertno'] + rh_vertno = src[1]['vertno'] + + if label['hemi'] == 'lh': + vertno_sel = np.intersect1d(lh_vertno, label['vertices']) + src_sel = np.searchsorted(lh_vertno, vertno_sel) + lh_vertno = vertno_sel + rh_vertno = np.array([]) + elif label['hemi'] == 'rh': + vertno_sel = np.intersect1d(rh_vertno, label['vertices']) + src_sel = np.searchsorted(rh_vertno, vertno_sel) + len(lh_vertno) + lh_vertno = np.array([]) + rh_vertno = vertno_sel + else: + raise Exception("Unknown hemisphere type") + + return lh_vertno, rh_vertno, src_sel + + def apply_inverse(evoked, inverse_operator, lambda2, dSPM=True, pick_normal=False): """Apply inverse operator to evoked data @@ -417,7 +519,6 @@ def apply_inverse(evoked, inverse_operator, lambda2, dSPM=True, stc: SourceEstimate The source estimates """ - # # Set up the inverse according to the parameters # @@ -427,63 +528,22 @@ def apply_inverse(evoked, inverse_operator, lambda2, dSPM=True, # # Pick the correct channels from the data # - evoked = pick_channels_evoked(evoked, inv['noise_cov']['names']) - print 'Picked %d channels from the data' % evoked.info['nchan'] - print 'Computing inverse...', - # - # Simple matrix multiplication followed by combination of the - # three current components - # - # This does all the data transformations to compute the weights for the - # eigenleads - # - trans = inv['reginv'][:, None] * reduce(np.dot, - [inv['eigen_fields']['data'], - inv['whitener'], - inv['proj'], - evoked.data]) - - # - # Transformation into current distributions by weighting the eigenleads - # with the weights computed above - # - if inv['eigen_leads_weighted']: - # - # R^0.5 has been already factored in - # - print '(eigenleads already weighted)...', - sol = np.dot(inv['eigen_leads']['data'], trans) - else: - # - # R^0.5 has to be factored in - # - print '(eigenleads need to be weighted)...', - sol = np.sqrt(inv['source_cov']['data'])[:, None] * \ - np.dot(inv['eigen_leads']['data'], trans) + sel = pick_channels(evoked.ch_names, include=inv['noise_cov']['names']) + print 'Picked %d channels from the data' % len(sel) - if inv['source_ori'] == FIFF.FIFFV_MNE_FREE_ORI: - print 'combining the current components...', - if pick_normal: - is_loose = 0 < inverse_operator['orient_prior']['data'][0] < 1 - if not is_loose: - raise ValueError('The pick_normal parameter is only valid ' - 'when working with loose orientations.') - sol = sol[2::3] # take one every 3 sources ie. only the normal - else: - sol = combine_xyz(sol) + print 'Computing inverse...', + K, noise_norm, _, _ = _assemble_kernel(inv, None, dSPM) + sol = np.dot(K, evoked.data[sel]) # apply imaging kernel + sol = _combine_ori(sol, inv, pick_normal) - if dSPM: + if noise_norm is not None: print '(dSPM)...', - sol *= inv['noisenorm'][:, None] + sol *= noise_norm + tstep = 1.0 / evoked.info['sfreq'] + tmin = float(evoked.first) / evoked.info['sfreq'] src = inv['src'] - stc = SourceEstimate(None) - stc.data = sol - stc.tmin = float(evoked.first) / evoked.info['sfreq'] - stc.tstep = 1.0 / evoked.info['sfreq'] - stc.lh_vertno = src[0]['vertno'] - stc.rh_vertno = src[1]['vertno'] - stc._init_times() + stc = _make_stc(sol, tmin, tstep, src[0]['vertno'], src[1]['vertno']) print '[done]' return stc @@ -501,7 +561,7 @@ def apply_inverse_raw(raw, inverse_operator, lambda2, dSPM=True, Parameters ---------- raw: Raw object - Evoked data + Raw data inverse_operator: dict Inverse operator read with mne.read_inverse_operator lambda2: float @@ -529,7 +589,6 @@ def apply_inverse_raw(raw, inverse_operator, lambda2, dSPM=True, stc: SourceEstimate The source estimates """ - # # Set up the inverse according to the parameters # @@ -540,13 +599,6 @@ def apply_inverse_raw(raw, inverse_operator, lambda2, dSPM=True, sel = pick_channels(raw.ch_names, include=inv['noise_cov']['names']) print 'Picked %d channels from the data' % len(sel) print 'Computing inverse...', - # - # Simple matrix multiplication followed by combination of the - # three current components - # - # This does all the data transformations to compute the weights for the - # eigenleads - # src = inv['src'] lh_vertno = src[0]['vertno'] @@ -557,81 +609,83 @@ def apply_inverse_raw(raw, inverse_operator, lambda2, dSPM=True, if time_func is not None: data = time_func(data) - trans = inv['reginv'][:, None] * reduce(np.dot, - [inv['eigen_fields']['data'], - inv['whitener'], - inv['proj'], - data]) + K, noise_norm, lh_vertno, rh_vertno = _assemble_kernel(inv, label, dSPM) + sol = np.dot(K, data) + sol = _combine_ori(sol, inv, pick_normal) - eigen_leads = inv['eigen_leads']['data'] - source_cov = inv['source_cov']['data'][:, None] - noise_norm = inv['noisenorm'][:, None] + if noise_norm is not None: + sol *= noise_norm - if label is not None: - if label['hemi'] == 'lh': - vertno_sel = np.intersect1d(lh_vertno, label['vertices']) - src_sel = np.searchsorted(lh_vertno, vertno_sel) - lh_vertno = vertno_sel - rh_vertno = np.array([]) - elif label['hemi'] == 'rh': - vertno_sel = np.intersect1d(rh_vertno, label['vertices']) - src_sel = np.searchsorted(rh_vertno, vertno_sel) + len(lh_vertno) - lh_vertno = np.array([]) - rh_vertno = vertno_sel + tmin = float(times[0]) / raw.info['sfreq'] + tstep = 1.0 / raw.info['sfreq'] + stc = _make_stc(sol, tmin, tstep, lh_vertno, rh_vertno) + print '[done]' - noise_norm = noise_norm[src_sel] + return stc - if inv['source_ori'] == FIFF.FIFFV_MNE_FREE_ORI: - src_sel = 3 * src_sel - src_sel = np.c_[src_sel, src_sel + 1, src_sel + 2] - src_sel = src_sel.ravel() - eigen_leads = eigen_leads[src_sel] - source_cov = source_cov[src_sel] +def apply_inverse_epochs(epochs, inverse_operator, lambda2, dSPM=True, + label=None, nave=1, pick_normal=False): + """Apply inverse operator to Epochs + + Computes a L2-norm inverse solution on each epochs and returns + single trial source estimates. + + Parameters + ---------- + epochs: Epochs object + Single trial epochs + inverse_operator: dict + Inverse operator read with mne.read_inverse_operator + lambda2: float + The regularization parameter + dSPM: bool + do dSPM ? + label: Label + Restricts the source estimates to a given label + nave: int + Number of averages used to regularize the solution. + Set to 1 on single Epoch by default. + pick_normal: bool + If True, rather than pooling the orientations by taking the norm, + only the radial component is kept. This is only implemented + when working with loose orientations. + Returns + ------- + stc: list of SourceEstimate + The source estimates for all epochs + """ # - # Transformation into current distributions by weighting the eigenleads - # with the weights computed above + # Set up the inverse according to the parameters # - if inv['eigen_leads_weighted']: - # - # R^0.5 has been already factored in - # - print '(eigenleads already weighted)...', - sol = np.dot(eigen_leads, trans) - else: - # - # R^0.5 has to be factored in - # - print '(eigenleads need to be weighted)...', - sol = np.sqrt(source_cov) * np.dot(eigen_leads, trans) + inv = prepare_inverse_operator(inverse_operator, nave, lambda2, dSPM) + # + # Pick the correct channels from the data + # + sel = pick_channels(epochs.ch_names, include=inv['noise_cov']['names']) + print 'Picked %d channels from the data' % len(sel) - if inv['source_ori'] == FIFF.FIFFV_MNE_FREE_ORI: - if pick_normal: - print 'Picking only the normal components...', - is_loose = 0 < inverse_operator['orient_prior']['data'][0] < 1 - if not is_loose: - raise ValueError('The pick_normal parameter is only valid ' - 'when working with loose orientations.') - sol = sol[2::3] # take one every 3 sources ie. only the normal - else: - print 'Combining the current components...', - sol = combine_xyz(sol) + print 'Computing inverse...', + K, noise_norm, lh_vertno, rh_vertno = _assemble_kernel(inv, label, dSPM) - if dSPM: - print '(dSPM)...', - sol *= noise_norm + stcs = list() + tstep = 1.0 / epochs.info['sfreq'] + tmin = epochs.times[0] + + for k, e in enumerate(epochs): + print "Processing epoch : %d" % (k + 1) + sol = np.dot(K, e[sel]) # apply imaging kernel + sol = _combine_ori(sol, inv, pick_normal) + + if noise_norm is not None: + sol *= noise_norm + + stcs.append(_make_stc(sol, tmin, tstep, lh_vertno, rh_vertno)) - stc = SourceEstimate(None) - stc.data = sol - stc.tmin = float(times[0]) / raw.info['sfreq'] - stc.tstep = 1.0 / raw.info['sfreq'] - stc.lh_vertno = lh_vertno - stc.rh_vertno = rh_vertno - stc._init_times() print '[done]' - return stc + return stcs def _xyz2lf(Lf_xyz, normals): @@ -889,7 +943,6 @@ def minimum_norm(evoked, forward, whitener, method='dspm', print 'Combining the current components...', sol = combine_xyz(sol) - src = forward['src'] stc = SourceEstimate(None) stc.data = sol diff --git a/mne/minimum_norm/tests/test_inverse.py b/mne/minimum_norm/tests/test_inverse.py index 7ad3ef2..d40f10f 100644 --- a/mne/minimum_norm/tests/test_inverse.py +++ b/mne/minimum_norm/tests/test_inverse.py @@ -4,8 +4,12 @@ import numpy as np # from numpy.testing import assert_array_almost_equal, assert_equal from ...datasets import sample +from ...label import read_label +from ...event import read_events +from ...epochs import Epochs from ... import fiff, Covariance, read_forward_solution -from ..inverse import minimum_norm, apply_inverse, read_inverse_operator +from ..inverse import minimum_norm, apply_inverse, read_inverse_operator, \ + apply_inverse_raw, apply_inverse_epochs examples_folder = op.join(op.dirname(__file__), '..', '..', '..', 'examples') @@ -18,25 +22,62 @@ fname_cov = op.join(data_path, 'MEG', 'sample', 'sample_audvis-cov.fif') fname_fwd = op.join(data_path, 'MEG', 'sample', 'sample_audvis-meg-eeg-oct-6-fwd.fif') +fname_raw = op.join(data_path, 'MEG', 'sample', + 'sample_audvis_filt-0-40_raw.fif') +fname_event = op.join(data_path, 'MEG', 'sample', + 'sample_audvis_filt-0-40_raw-eve.fif') +label = 'Aud-lh' +fname_label = op.join(data_path, 'MEG', 'sample', 'labels', '%s.label' % label) +inverse_operator = read_inverse_operator(fname_inv) +label = read_label(fname_label) +raw = fiff.Raw(fname_raw) +snr = 3.0 +lambda2 = 1.0 / snr ** 2 +dSPM = True -def test_apply_mne_inverse_operator(): - """Test MNE inverse computation with precomputed inverse operator + +def test_apply_mne_inverse(): + """Test MNE with precomputed inverse operator on Evoked """ setno = 0 - snr = 3.0 - lambda2 = 1.0 / snr ** 2 - dSPM = True - - evoked = fiff.Evoked(fname_data, setno=setno, baseline=(None, 0)) - inverse_operator = read_inverse_operator(fname_inv) - + evoked = fiff.Evoked(fname_data, setno=0, baseline=(None, 0)) stc = apply_inverse(evoked, inverse_operator, lambda2, dSPM) assert np.all(stc.data > 0) assert np.all(stc.data < 35) +def test_apply_mne_inverse_raw(): + """Test MNE with precomputed inverse operator on Raw + """ + stc = apply_inverse_raw(raw, inverse_operator, lambda2, dSPM=True, + label=label, start=0, stop=10, nave=1, + pick_normal=False) + assert np.all(stc.data > 0) + + +def test_apply_mne_inverse_epochs(): + """Test MNE with precomputed inverse operator on Epochs + """ + event_id, tmin, tmax = 1, -0.2, 0.5 + + picks = fiff.pick_types(raw.info, meg=True, eeg=False, stim=True, + ecg=True, eog=True, include=['STI 014']) + reject = dict(grad=4000e-13, mag=4e-12, eog=150e-6) + flat = dict(grad=1e-15, mag=1e-15) + + events = read_events(fname_event)[:3] + epochs = Epochs(raw, events, event_id, tmin, tmax, picks=picks, + baseline=(None, 0), reject=reject, flat=flat) + stcs = apply_inverse_epochs(epochs, inverse_operator, lambda2, dSPM, + label=label) + + assert len(stcs) == 1 + assert np.all(stcs[0].data > 0) + assert np.all(stcs[0].data < 42) + + def test_compute_minimum_norm(): """Test MNE inverse computation starting from forward operator """ -- Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-mne.git _______________________________________________ debian-med-commit mailing list [email protected] http://lists.alioth.debian.org/cgi-bin/mailman/listinfo/debian-med-commit
