Source code for laion_fmri.subject

"""Subject class for accessing per-subject data files.

Every accessor maps to exactly one file in the bucket layout: no
averaging, concatenation, or rebinning across sessions, with one
exception -- :attr:`Subject.metadata` aggregates the per-session trial
TSVs into one trial table for convenience.
"""

import numpy as np
import pandas as pd

from laion_fmri._constants import resolve_subject_id
from laion_fmri._errors import (
    DataNotDownloadedError,
    StimuliNotDownloadedError,
)
from laion_fmri._paths import (
    anatomical_file_path,
    anatomical_subject_dir,
    betas_path,
    freesurfer_subject_dir,
    glmsingle_subject_dir,
    parse_roi_label,
    r2mean_path,
    roi_freesurfer_label_path,
    roi_mask_path,
    roi_surface_path,
    rois_subject_dir,
    session_noise_ceiling_path,
    stimuli_h5_path,
    stimuli_metadata_path,
    subject_noise_ceiling_path,
    trialinfo_path,
)
from laion_fmri.config import get_data_dir
from laion_fmri.io import (
    load_freesurfer_label,
    load_gifti_mask,
    load_nifti_4d,
    load_nifti_data,
    load_nifti_mask,
    load_nifti_with_affine,
    load_tsv,
)


_VALID_FORMATS = {
    "all", "volume", "nii.gz", "gii", "func.gii", "label",
}
_VALID_HEMIS = {"all", "L", "R"}


[docs] def load_subject(subject): """Load a subject by BIDS ID or integer index. Parameters ---------- subject : int or str Returns ------- Subject Raises ------ SubjectNotFoundError If the subject identifier is invalid. DataNotDownloadedError If the subject's data has not been downloaded. """ subject_id = resolve_subject_id(subject) data_dir = get_data_dir() glm_dir = glmsingle_subject_dir(data_dir, subject_id) if not glm_dir.is_dir(): raise DataNotDownloadedError( f"Data for {subject_id} not found at {glm_dir}. " "Run: from laion_fmri.download import download; " f"download(subject='{subject_id}')" ) return Subject(subject_id, data_dir)
[docs] class Subject: """Access loaded data files for a single subject. Parameters ---------- subject_id : str BIDS subject ID. data_dir : str Path to the local data directory. """ def __init__(self, subject_id, data_dir): self._subject_id = subject_id self._data_dir = data_dir # Lazily-built handles / caches. self._stim_handle = None self._stim_metadata_cache = None self._trial_table_cache = None # Proxy namespaces (instantiated eagerly; their work is lazy). self._images_ns = _SubjectImages(self) self._embeddings_ns = _SubjectEmbeddings(self) self._segmentations_ns = _SubjectSegmentations(self) self._captions_ns = _SubjectCaptions(self) @property def subject_id(self): """Return the BIDS subject ID (e.g. ``"sub-03"``).""" return self._subject_id # ── Discovery ───────────────────────────────────────────────
[docs] def get_sessions(self): """Return sorted list of available session IDs.""" glm_dir = glmsingle_subject_dir( self._data_dir, self._subject_id, ) sessions = [] for d in sorted(glm_dir.iterdir()): if d.is_dir() and d.name.startswith("ses-"): sessions.append(d.name) return sessions
[docs] def get_available_rois(self, category=None): """Return sorted list of ROI names available on disk. Parameters ---------- category : str or None Restrict to ROIs in this category subdirectory. Returns ------- list[str] Sorted ROI names (BIDS-clean form). """ by_cat = self._rois_by_category() if category is not None: return sorted(by_cat.get(category, [])) all_rois = set() for rois in by_cat.values(): all_rois.update(rois) return sorted(all_rois)
[docs] def get_available_categories(self): """Return sorted list of ROI category directory names.""" return sorted(self._rois_by_category().keys())
def _rois_by_category(self): """Walk local rois tree -> dict[category, list[roi]].""" root = rois_subject_dir(self._data_dir, self._subject_id) if not root.is_dir(): return {} out = {} for cat_dir in sorted(root.iterdir()): if not cat_dir.is_dir(): continue rois = [ roi for f in sorted(cat_dir.iterdir()) if (roi := parse_roi_label( f.name, self._subject_id, )) is not None ] if rois: out[cat_dir.name] = rois return out def _resolve_rois_query(self, query): """Expand specific / category / "all" / list into flat ROI names. See ``get_roi_mask`` for the user-visible grammar. """ if isinstance(query, str): query = [query] by_cat = self._rois_by_category() all_rois = sorted({r for rs in by_cat.values() for r in rs}) all_categories = sorted(by_cat.keys()) resolved = [] for item in query: if item == "all": resolved.extend(all_rois) elif item in by_cat: resolved.extend(by_cat[item]) elif item in all_rois: resolved.append(item) else: raise ValueError( f"Unknown ROI/category: {item!r}. " f"Available ROIs: {all_rois}. " f"Available categories: {all_categories}." ) seen = set() deduped = [] for r in resolved: if r not in seen: seen.add(r) deduped.append(r) return deduped
[docs] def get_n_stimuli(self, stimuli=None): """Return number of stimuli described in the metadata CSV. Parameters ---------- stimuli : "shared", "unique", or None """ meta = self._stim_metadata() if stimuli is None: return len(meta) if stimuli == "shared": return int((meta["unique_or_shared"] == "shared").sum()) if stimuli == "unique": return int((meta["unique_or_shared"] == "unique").sum()) raise ValueError( f"stimuli must be 'shared', 'unique', or None; got {stimuli!r}" )
[docs] def get_n_voxels(self, source="anatomical", res="1pt8"): """Number of voxels in the subject's brain mask. ``source`` and ``res`` mirror :meth:`get_brain_mask`; see its docstring for the available values. """ return int( self.get_brain_mask(source=source, res=res).sum() )
# ── Brain mask ──────────────────────────────────────────────
[docs] def get_brain_mask(self, source="anatomical", res="1pt8"): """Load the subject's brain mask as a flat boolean array. Parameters ---------- source : ``"anatomical"`` (default) | ``"rsquare"`` ``"anatomical"`` uses the brain mask in ``derivatives/anatomical/.../desc-brain_mask.nii.gz`` -- a wider, anatomically-derived mask. Pull it with ``download(include_anatomical=True)``. ``"rsquare"`` derives the mask from the subject-level mean-R^2 map (voxels with any non-zero GLMsingle fit; the bucket ships R2mean as ``..._stat-rsquare_desc-R2mean_statmap.nii.gz``). res : ``"1pt8"`` (default) | ``None`` Anatomical-mask resolution. ``"1pt8"`` matches the functional grid, so the returned mask aligns with the voxel axis of ``get_betas`` / ``get_noise_ceiling`` and with the rsquare-derived mask. ``None`` loads the full-resolution anatomical mask; the returned 1-D array is larger and will not align with the loader cascade. Ignored for ``source="rsquare"`` (the rsquare-derived mask is published at one resolution only). Returns ------- np.ndarray 1-D boolean array over the full image grid. """ return load_nifti_mask(self._brain_mask_path(source, res))
def _brain_mask_path(self, source, res="1pt8"): """Resolve the on-disk file backing ``get_brain_mask(source)``. Shared by :meth:`get_brain_mask`, :meth:`get_n_voxels`, :meth:`get_betas`, :meth:`get_noise_ceiling`, :meth:`to_nifti`, and :meth:`get_voxel_coordinates`. The loader cascade (everything but ``get_brain_mask`` and ``get_n_voxels``) calls this with the default ``res`` so the returned mask aligns with the functional grid. """ if source == "rsquare": return r2mean_path(self._data_dir, self._subject_id) if source == "anatomical": return anatomical_file_path( self._data_dir, self._subject_id, suffix="mask", res=res, desc="brain", ) raise ValueError( f"source must be 'rsquare' or 'anatomical'; " f"got {source!r}." ) # ── Betas (single-trial NIfTI per session) ─────────────────
[docs] def get_betas( self, session, roi=None, mask=None, nc_threshold=None, stimuli=None, streaming=False, mask_source="anatomical", ): """Load single-trial betas for one or more sessions. Parameters ---------- session : str, list of str BIDS session ID. A list returns a dict keyed by session ID, since trial counts may differ per session. Single-trial betas live per session in the bucket, so the caller must pick which sessions to load. roi : str, list[str], or None Named ROI(s) for voxel selection (union if list). mask : np.ndarray[bool] or None Custom boolean mask over brain-mask voxels. nc_threshold : float or None Minimum per-session noise ceiling to keep a voxel. stimuli : "shared", "unique", or None Trial-level filter using the stimulus-metadata ``shared`` flag. mask_source : ``"anatomical"`` (default) | ``"rsquare"`` Which brain mask to filter the voxel axis on; see :meth:`get_brain_mask` for the difference. streaming : bool If False (default), the full 4-D NIfTI is materialized in RAM and then masked. Decompresses any ``.nii.gz`` once; peak memory is the full file (~12 GB for a real session) plus the masked output. Best when you have plenty of RAM. If True, the file is streamed volume-by-volume and the combined brain + ROI + NC mask is applied inline: peak memory is one volume (~10-50 MB) plus the masked output. Use this on memory-constrained machines like Colab. Works on both ``.nii`` (nibabel-managed per-volume reads) and ``.nii.gz`` (a custom gzip pipeline that never re-decompresses). Returns ------- np.ndarray or dict[str, np.ndarray] ``(n_trials, n_selected_voxels)`` for a single session; a ``{session: array}`` dict for a list. Values are GLMsingle single-trial β estimates in percent-signal-change units. Voxels that GLMsingle did not model (failed fit) arrive as ``NaN`` -- that's the signal for "no estimate available", distinct from "estimate is 0". Handle them at the caller's analysis layer. """ if isinstance(session, (list, tuple)): return { s: self.get_betas( session=s, roi=roi, mask=mask, nc_threshold=nc_threshold, stimuli=stimuli, streaming=streaming, mask_source=mask_source, ) for s in session } if not session: raise ValueError( "session is required: only single-trial betas are " "available, and they live per session in the bucket." ) if roi is not None and mask is not None: raise ValueError( "roi and mask are mutually exclusive." ) path = betas_path( self._data_dir, self._subject_id, session, ) mask_path = self._brain_mask_path(mask_source) # Build a single full-volume voxel mask before reading the # betas: the streaming path then applies brain + ROI + NC # inline, avoiding a brain-only intermediate (~1 GB on a # real session). brain_mask = load_nifti_mask(mask_path) voxel_filter = self._build_voxel_mask( roi, mask, nc_threshold, session, mask_source, ) if voxel_filter is None: combined_mask = brain_mask else: combined_mask = brain_mask.copy() combined_mask[brain_mask] = voxel_filter betas = load_nifti_4d( path, combined_mask, streaming=streaming, ) if stimuli is not None: trial_mask = self._stimulus_trial_filter(stimuli, session) betas = betas[trial_mask] return betas
def _build_voxel_mask( self, roi, mask, nc_threshold, session, mask_source, ): """Combine ROI/custom-mask/NC-threshold into one boolean mask.""" combined = None if roi is not None: combined = self.get_roi_mask(roi, mask_source=mask_source) if mask is not None: combined = mask if nc_threshold is not None: nc = self.get_noise_ceiling( session=session, mask_source=mask_source, ) nc_mask = nc >= nc_threshold combined = ( nc_mask if combined is None else combined & nc_mask ) return combined def _stimulus_trial_filter(self, stimuli, session): """Boolean trial mask for ``shared`` / ``unique`` subsets. Two events.tsv schemas are supported: * Real bucket: a ``label`` column whose values start with ``shared_`` or ``unique_``. The prefix is parsed directly -- no stimulus-metadata table required. * Synthetic / future schema: a ``stimulus_id`` column, joined against the stimulus metadata CSV's ``unique_or_shared`` column via ``image_name``. """ if stimuli not in ("shared", "unique"): raise ValueError( f"stimuli must be 'shared' or 'unique', " f"got {stimuli!r}" ) trials = self.get_trial_info(session=session) if "label" in trials.columns: flags = ( trials["label"].str.startswith("shared_").to_numpy() ) elif "stimulus_id" in trials.columns: meta = self._stim_metadata() is_shared = dict(zip( meta["image_name"], meta["unique_or_shared"] == "shared", )) flags = np.array([ bool(is_shared[sid]) for sid in trials["stimulus_id"] ]) else: raise ValueError( "Events TSV has neither 'label' nor " "'stimulus_id' -- cannot derive shared/unique." ) return flags if stimuli == "shared" else ~flags # ── ROI masks ───────────────────────────────────────────────
[docs] def get_roi_mask(self, query, mask_source="anatomical"): """Load one or more ROI masks, restricted to brain-mask voxels. ``query`` accepts the multi-level grammar: * a specific ROI name (``"FFA1"``); * a category name (``"face"``) -- expands to every ROI in that category; * ``"all"`` -- expands to every ROI on disk; * a list mixing any of the above. Multi-element resolutions are unioned voxel-wise. Always returns one 1-D bool array within the brain mask. ``mask_source`` selects which brain mask the result is indexed within; see :meth:`get_brain_mask`. """ rois = self._resolve_rois_query(query) union = np.zeros( self.get_n_voxels(source=mask_source), dtype=bool, ) for roi in rois: union |= self._load_roi_volume_mask( roi, mask_source=mask_source, ) return union
[docs] def get_roi_masks(self, queries, mask_source="anatomical"): """Load several ROI masks at once. ``queries`` is a list (or single string). Each element is passed verbatim to ``get_roi_mask``; the returned dict is keyed by the user's strings, so categories and "all" appear as their original keys with a union mask as value. ``mask_source`` is forwarded to :meth:`get_roi_mask`. """ if isinstance(queries, str): queries = [queries] return { q: self.get_roi_mask(q, mask_source=mask_source) for q in queries }
[docs] def get_roi_data( self, query, format=None, hemi=None, mask_source="anatomical", ): """Load multi-format ROI data: volume, surface, FreeSurfer label. Parameters ---------- query : str or list[str] Multi-level ROI query (see ``get_roi_mask``). format : str or None One of ``"all"``, ``"volume"`` / ``"nii.gz"`` (synonyms), ``"gii"`` (per-hemi func.gii + label), ``"func.gii"`` (per-hemi surface mask only), ``"label"`` (per-hemi FreeSurfer label only). ``None`` means ``"all"``. hemi : str or None One of ``"L"``, ``"R"``, or ``"all"`` (default). Ignored when ``format`` resolves to volume only. Returns ------- dict Top-level dict keyed by ROI name. Each value is a nested dict shaped:: { "volume": <1-D bool ndarray>, "gii": { "hemi-L": {"func.gii": ..., "label": ...}, "hemi-R": {...}, }, } Format/hemi filters prune this tree. """ format = format or "all" hemi = hemi or "all" if format not in _VALID_FORMATS: raise ValueError( f"format must be one of {sorted(_VALID_FORMATS)}, " f"got {format!r}" ) if hemi not in _VALID_HEMIS: raise ValueError( f"hemi must be one of {sorted(_VALID_HEMIS)}, " f"got {hemi!r}" ) rois = self._resolve_rois_query(query) return { roi: self._build_roi_data( roi, format, hemi, mask_source, ) for roi in rois }
def _load_roi_volume_mask(self, roi, mask_source="anatomical"): """Load a single volumetric ROI mask within the brain mask.""" roi_vol = load_nifti_mask( roi_mask_path(self._data_dir, self._subject_id, roi), ) brain = self.get_brain_mask(source=mask_source) return roi_vol[brain] def _build_roi_data(self, roi, format, hemi, mask_source): """Assemble the nested per-ROI dict, pruned by format/hemi.""" out = {} want_volume = format in ("all", "volume", "nii.gz") want_gii = format in ("all", "gii", "func.gii", "label") if want_volume: out["volume"] = self._load_roi_volume_mask( roi, mask_source=mask_source, ) if want_gii: hemis = ("L", "R") if hemi == "all" else (hemi,) gii = {} for h in hemis: hemi_data = {} if format in ("all", "gii", "func.gii"): hemi_data["func.gii"] = load_gifti_mask( roi_surface_path( self._data_dir, self._subject_id, roi, h, ), ) if format in ("all", "gii", "label"): hemi_data["label"] = load_freesurfer_label( roi_freesurfer_label_path( self._data_dir, self._subject_id, roi, h, ), ) gii[f"hemi-{h}"] = hemi_data out["gii"] = gii return out # ── Noise ceiling ───────────────────────────────────────────
[docs] def get_noise_ceiling( self, session=None, desc=None, roi=None, mask=None, mask_source="anatomical", ): """Load a noise-ceiling map. Exactly one of ``session`` or ``desc`` must be set: * ``session="ses-01"`` -> per-session NC NIfTI. * ``desc="noiseceiling33ses"`` -> the subject-level aggregate NC NIfTI with the given ``desc-...`` token. Either argument also accepts a list, in which case the return value is a dict keyed by session ID / desc token. Parameters ---------- session : str, list of str, or None desc : str, list of str, or None roi : str or None mask : np.ndarray[bool] or None mask_source : ``"anatomical"`` (default) | ``"rsquare"`` Brain-mask choice; see :meth:`get_brain_mask`. Returns ------- np.ndarray or dict[str, np.ndarray] Noise ceiling in percent variance explained (0-100, GLMsingle convention). Threshold near 10-20 % keeps reliably driven voxels. """ if isinstance(session, (list, tuple)): return { s: self.get_noise_ceiling( session=s, roi=roi, mask=mask, mask_source=mask_source, ) for s in session } if isinstance(desc, (list, tuple)): return { d: self.get_noise_ceiling( desc=d, roi=roi, mask=mask, mask_source=mask_source, ) for d in desc } if (session is None) == (desc is None): raise ValueError( "Exactly one of `session` or `desc` must be set." ) if session is not None: nc_file = session_noise_ceiling_path( self._data_dir, self._subject_id, session, ) else: nc_file = subject_noise_ceiling_path( self._data_dir, self._subject_id, desc, ) if not nc_file.exists(): if session is not None: command = ( f"laion-fmri download --subject {self._subject_id} " f"--ses {session} --desc Noiseceiling " "--suffix statmap --extension nii.gz" ) else: command = ( f"laion-fmri download --subject {self._subject_id} " f"--desc {desc} --suffix statmap --extension nii.gz" ) raise DataNotDownloadedError( f"Noise-ceiling file not found at {nc_file}. " f"Run: {command}" ) mask_file = self._brain_mask_path(mask_source) nc = load_nifti_data(nc_file, mask_file) if roi is not None: nc = nc[self.get_roi_mask(roi, mask_source=mask_source)] elif mask is not None: nc = nc[mask] return nc
# ── Trial info (events.tsv per session) ────────────────────
[docs] def get_trial_info(self, session): """Load the events TSV for one or more sessions. Parameters ---------- session : str or list of str Required -- events live per session in the bucket. A list returns a dict keyed by session ID. Returns ------- pd.DataFrame or dict[str, pd.DataFrame] """ if isinstance(session, (list, tuple)): return { s: self.get_trial_info(session=s) for s in session } if not session: raise ValueError( "session is required: events are stored per session." ) path = trialinfo_path( self._data_dir, self._subject_id, session, ) if not path.exists(): raise DataNotDownloadedError( f"Trial-info TSV for {self._subject_id} {session} " f"not found at {path}. " "Run: " f"laion-fmri download --subject {self._subject_id} " f"--ses {session} --suffix trials --extension tsv" ) return load_tsv(path)
# ── Stimulus-side data: images, embeddings, segmentations, captions ──
[docs] def has_stimuli(self): """Return True if the stimuli (HDF5 + CSV) are on disk. Useful as a guard before touching stimulus-side data (:attr:`metadata`, :attr:`images`, :attr:`embeddings`, :attr:`segmentations`, :attr:`captions`, :meth:`to_torch_dataset`) when the archive hasn't been downloaded yet. """ return ( stimuli_metadata_path(self._data_dir).exists() and stimuli_h5_path(self._data_dir).exists() )
[docs] def has_freesurfer(self): """Return True if the per-subject FreeSurfer recon is on disk. The recon ships under ``derivatives/freesurfer/{subject}/``; pull it with ``download(..., include_freesurfer=True)``. Required by :meth:`to_template` to project T1w-volume data onto fsaverage / fsLR / MNI templates. """ return freesurfer_subject_dir( self._data_dir, self._subject_id, ).is_dir()
[docs] def get_freesurfer_dir(self): """Return the path to this subject's FreeSurfer recon. Raises ------ DataNotDownloadedError If the recon directory does not exist on disk. """ fs_dir = freesurfer_subject_dir( self._data_dir, self._subject_id, ) if not fs_dir.is_dir(): raise DataNotDownloadedError( f"FreeSurfer recon for {self._subject_id} not " f"found at {fs_dir}. Run: " "from laion_fmri.download import download; " f"download(subject='{self._subject_id}', " "include_freesurfer=True)" ) return fs_dir
[docs] def has_anatomical(self): """Return True if this subject's anatomical derivatives are on disk. Anatomical files live under ``derivatives/anatomical/{subject}/ses-PrismaAnat/anat/`` and ship T1w / T2w volumes plus a brain mask at two resolutions (full and ``res-1pt8``). """ return anatomical_subject_dir( self._data_dir, self._subject_id, ).is_dir()
[docs] def get_anatomical_dir(self): """Return the path to this subject's anatomical derivatives. Raises ------ DataNotDownloadedError If the anatomical directory does not exist on disk. """ anat_dir = anatomical_subject_dir( self._data_dir, self._subject_id, ) if not anat_dir.is_dir(): raise DataNotDownloadedError( f"Anatomical derivatives for {self._subject_id} " f"not found at {anat_dir}. Run: " "from laion_fmri.download import download; " f"download(subject='{self._subject_id}', " "include_anatomical=True)" ) return anat_dir
[docs] def get_t1w(self, *, res=None): """Return the path to this subject's anatomical T1w volume. ``res=None`` returns the full-resolution image; ``res="1pt8"`` returns the variant on the functional grid. """ return self._anatomical_file(suffix="T1w", res=res)
[docs] def get_t2w(self, *, res=None): """Return the path to this subject's anatomical T2w volume.""" return self._anatomical_file(suffix="T2w", res=res)
[docs] def get_anatomical_brain_mask(self, *, res=None): """Return the path to the anatomically-derived brain mask. Distinct from :meth:`get_brain_mask`, which returns the rsquare-derived mask as a flat boolean array on the subject's brain-mask voxels. """ return self._anatomical_file( suffix="mask", res=res, desc="brain", )
def _anatomical_file(self, *, suffix, res=None, desc=None): """Resolve one file under this subject's anatomical dir.""" path = anatomical_file_path( self._data_dir, self._subject_id, suffix=suffix, res=res, desc=desc, ) if not path.is_file(): raise DataNotDownloadedError( f"Anatomical file not found at {path}. Run: " "from laion_fmri.download import download; " f"download(subject='{self._subject_id}', " "include_anatomical=True)" ) return path @property def metadata(self): """Trial table for this subject, concatenated across all sessions. One row per single-trial beta. Columns include everything from the per-session events TSV plus the derived columns ``session``, ``session_trial``, ``image_name``, ``stim_idx``, ``unique_or_shared``, and ``dataset``. Returns ------- pandas.DataFrame Indexed 0..n_total_trials-1. Each row's index is the "global trial index" used by :attr:`images`, :attr:`embeddings`, :attr:`segmentations`, and :attr:`captions`. """ if self._trial_table_cache is None: self._trial_table_cache = self._build_trial_table() return self._trial_table_cache @property def images(self): """Per-trial stimulus images (PIL + raw bytes).""" return self._images_ns @property def embeddings(self): """Per-trial pretrained embeddings (CLIP, DINOv2, ...).""" return self._embeddings_ns @property def segmentations(self): """Per-trial object-segmentation masks (shared images only).""" return self._segmentations_ns @property def captions(self): """Per-trial human captions, plus shared non-OOD AI captions.""" return self._captions_ns # ── Stimulus-side internals ───────────────────────────────── def _stim(self): """Cached :class:`Stimuli` handle. Built on first access; reused for all stimulus-side reads. """ if self._stim_handle is None: from laion_fmri.stimuli import Stimuli self._stim_handle = Stimuli(data_dir=self._data_dir) return self._stim_handle def _stim_metadata(self): """Dataset-wide stimulus metadata CSV (cached).""" if self._stim_metadata_cache is None: path = stimuli_metadata_path(self._data_dir) if not path.exists(): raise StimuliNotDownloadedError( f"Stimulus metadata not found at {path}. Run " "`laion-fmri download-stimuli` " "(or check has_stimuli() first)." ) self._stim_metadata_cache = pd.read_csv(path) return self._stim_metadata_cache def _build_trial_table(self): """Concatenate every session's events TSV into one trial table.""" stim_meta = self._stim_metadata() name_to_stim_idx = { n: i for i, n in enumerate(stim_meta["image_name"]) } name_to_us = dict(zip( stim_meta["image_name"], stim_meta["unique_or_shared"], )) name_to_dataset = dict(zip( stim_meta["image_name"], stim_meta["dataset"], )) parts = [] for ses in self.get_sessions(): trials = self.get_trial_info(session=ses).copy() if "label" in trials.columns: names = trials["label"].astype(str) elif "stimulus_id" in trials.columns: names = trials["stimulus_id"].astype(str) else: raise ValueError( f"Trial info for {ses} has neither 'label' nor " "'stimulus_id'; cannot map trials to stimuli." ) trials["session"] = ses trials["session_trial"] = np.arange(len(trials)) trials["image_name"] = names.values trials["stim_idx"] = names.map(name_to_stim_idx).values trials["unique_or_shared"] = names.map(name_to_us).values trials["dataset"] = names.map(name_to_dataset).values parts.append(trials) if not parts: return pd.DataFrame( columns=[ "session", "session_trial", "image_name", "stim_idx", "unique_or_shared", "dataset", ] ) return pd.concat(parts, ignore_index=True) # ── Brain space ─────────────────────────────────────────────
[docs] def to_nifti( self, values, output_path, roi=None, mask=None, mask_source="anatomical", ): """Write a per-voxel array to a 3-D NIfTI volume. ``values`` is sized to the brain mask selected by ``mask_source`` (default anatomical-derived). """ from laion_fmri.brain import to_nifti mask_file = self._brain_mask_path(mask_source) _, affine = load_nifti_with_affine(mask_file) roi_mask_arr = ( self.get_roi_mask(roi, mask_source=mask_source) if roi is not None else None ) to_nifti( values, output_path, str(mask_file), affine, roi_mask=roi_mask_arr, custom_mask=mask, )
[docs] def to_template(self, values, target, **kwargs): """Project T1w-space values into a template / reference space. Forwards to :func:`laion_fmri.templates.to_template`; see that function's docstring for the full kwargs surface (``hemi``, ``route``, ``surface``, ``fsaverage_density``, ``interpolation``, ``output_dir``, ``desc``, ``session``). Requires the optional ``[template]`` extra; ``ImportError`` is raised at call time if any of nilearn / nitransforms / templateflow is missing. """ from laion_fmri.templates import to_template return to_template(self, values, target, **kwargs)
[docs] def volume_to_surface(self, values, target="fsaverage", **kwargs): """Volume input → surface target (currently ``"fsaverage"``).""" from laion_fmri.templates import volume_to_surface return volume_to_surface(self, values, target, **kwargs)
[docs] def volume_to_template(self, values, target, **kwargs): """Volume input → volume target (MNI variants).""" from laion_fmri.templates import volume_to_template return volume_to_template(self, values, target, **kwargs)
[docs] def surface_to_template(self, values, target="fsaverage", **kwargs): """fsnative-surface input → surface target. Accepts a single hemi array (with ``hemi="L"``/``"R"``) or a ``{"L": ..., "R": ...}`` dict; returns the same shape. """ from laion_fmri.templates import surface_to_template return surface_to_template(self, values, target, **kwargs)
[docs] def get_voxel_coordinates( self, roi=None, mask=None, mask_source="anatomical", ): """Return MNI/T1w coordinates for the selected voxels. ``mask_source`` picks which brain mask defines "selected voxels"; see :meth:`get_brain_mask`. """ from laion_fmri.brain import get_voxel_coordinates mask_file = self._brain_mask_path(mask_source) _, affine = load_nifti_with_affine(mask_file) roi_mask_arr = ( self.get_roi_mask(roi, mask_source=mask_source) if roi is not None else None ) return get_voxel_coordinates( str(mask_file), affine, roi_mask=roi_mask_arr, custom_mask=mask, )
# ── PyTorch ─────────────────────────────────────────────────
[docs] def to_torch_dataset(self, **kwargs): """Wrap this subject as a ``torch.utils.data.Dataset``.""" from laion_fmri.torch_data import LaionFMRIDataset return LaionFMRIDataset(self, **kwargs)
# ── Per-trial namespace proxies on Subject ─────────────────────── def _filter_metadata(meta, session): """Filter the trial table by session, preserving the global index.""" if session is None: return meta return meta[meta["session"] == session] class _SubjectImages: """``sub.images`` namespace: per-trial image access. Trial indices are global (rows of :attr:`Subject.metadata`). """ def __init__(self, subject): self._subject = subject def __len__(self): return len(self._subject.metadata) def __getitem__(self, trial_idx): """Raw JPEG bytes for the image shown on trial ``trial_idx``.""" name = self._subject.metadata.iloc[int(trial_idx)]["image_name"] return self._subject._stim().images[name] def get(self, trial_idx): """Decoded :class:`PIL.Image.Image` for trial ``trial_idx``.""" name = self._subject.metadata.iloc[int(trial_idx)]["image_name"] return self._subject._stim().images.get(name) def all(self, session=None): """Iterator yielding PIL images in trial order. Parameters ---------- session : str, optional Restrict to one session ID (e.g. ``"ses-01"``). """ meta = _filter_metadata(self._subject.metadata, session) stim_images = self._subject._stim().images for name in meta["image_name"]: yield stim_images.get(name) def array(self, session=None): """``(n_trials, H, W, 3)`` uint8 stack of images in trial order.""" return np.stack( [np.array(img) for img in self.all(session=session)], ).astype(np.uint8) class _SubjectEmbeddings: """``sub.embeddings`` namespace: per-trial pretrained features.""" def __init__(self, subject): self._subject = subject @property def models(self): """Models available on disk for this subject's data dir.""" return self._subject._stim().embeddings.models def get(self, model, trial_idx): """Embedding row ``(D,)`` for one trial.""" name = self._subject.metadata.iloc[int(trial_idx)]["image_name"] return self._subject._stim().embeddings.get(model, name) def all(self, model, session=None): """``(n_trials, D)`` array in trial order. Parameters ---------- model : str One of :data:`laion_fmri.embeddings.AVAILABLE_MODELS`. session : str, optional Restrict to one session ID. """ meta = _filter_metadata(self._subject.metadata, session) names = meta["image_name"].tolist() return self._subject._stim().embeddings.get(model, names) class _SubjectSegmentations: """``sub.segmentations`` namespace: per-trial object masks. Note that masks ship only for the **shared** stimulus set; for unique-image trials all methods behave as if no masks exist (``nouns`` returns ``[]``, ``has_image`` returns ``False``, ``get`` raises :class:`KeyError`). """ def __init__(self, subject): self._subject = subject def has_image(self, trial_idx): """True if the image shown on this trial has any masks.""" name = self._subject.metadata.iloc[int(trial_idx)]["image_name"] return self._subject._stim().segmentations.has_image(name) def nouns(self, trial_idx, localized_only=True): """Nouns present in the image shown on this trial. Returns ``[]`` (not an error) when the trial showed a subject-unique image, since masks ship only for the shared set. """ name = self._subject.metadata.iloc[int(trial_idx)]["image_name"] return self._subject._stim().segmentations.nouns( name, localized_only=localized_only, ) def for_image(self, trial_idx): """Metadata slice for the image shown on this trial (may be empty).""" name = self._subject.metadata.iloc[int(trial_idx)]["image_name"] return self._subject._stim().segmentations.for_image(name) def get(self, trial_idx, noun, instance=0): """Mask for ``(this trial's image, noun, instance)``. Raises ------ KeyError If the trial's image is uncovered (unique stimulus) or the requested ``(noun, instance)`` doesn't exist. """ name = self._subject.metadata.iloc[int(trial_idx)]["image_name"] return self._subject._stim().segmentations.get(name, noun, instance) class _SubjectCaptions: """``sub.captions`` namespace: per-trial stimulus captions. Human captions are present for every stimulus image. AI captions are present for shared non-OOD images only. """ def __init__(self, subject): self._subject = subject def list(self, trial_idx, source=None): """Captions for the image shown on ``trial_idx``. Parameters ---------- trial_idx : int Global trial index (row of :attr:`Subject.metadata`). source : {"human", "ai"}, optional Restrict to one source. ``None`` returns all available captions in ``caption_idx`` order. """ name = self._subject.metadata.iloc[int(trial_idx)]["image_name"] return self._subject._stim().captions.list(name, source=source) def human(self, trial_idx, limit=None): """Human captions for the image shown on ``trial_idx``.""" name = self._subject.metadata.iloc[int(trial_idx)]["image_name"] return self._subject._stim().captions.human(name, limit=limit) def ai(self, trial_idx): """AI caption for this trial's image, or ``None`` if absent.""" name = self._subject.metadata.iloc[int(trial_idx)]["image_name"] return self._subject._stim().captions.ai(name) def for_image(self, trial_idx): """Caption table slice for the image shown on ``trial_idx``.""" name = self._subject.metadata.iloc[int(trial_idx)]["image_name"] return self._subject._stim().captions.get(name)