This is an automated email from the ASF dual-hosted git repository.

cdionysio pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new b3c6d289da [SYSTEMDS-3887] Parallelize multimodal optimizer
b3c6d289da is described below

commit b3c6d289daf1f395bf8d97b99bb432e10e7d8df0
Author: Christina Dionysio <[email protected]>
AuthorDate: Thu Nov 13 13:24:37 2025 +0100

    [SYSTEMDS-3887] Parallelize multimodal optimizer
    
    This patch adds a new multimodal optimization method that runs the 
optimization in parallel using python multiprocessing.
    Additionally, it adds a test that checks that the results for the parallel 
run equal those of the single threaded run.
---
 .../scuro/drsearch/multimodal_optimizer.py         | 216 +++++++++++++++++++--
 .../systemds/scuro/drsearch/operator_registry.py   |   6 +
 .../systemds/scuro/drsearch/representation_dag.py  |   2 +-
 src/main/python/systemds/scuro/drsearch/task.py    |  24 ++-
 .../systemds/scuro/drsearch/unimodal_optimizer.py  |  48 +++--
 .../python/tests/scuro/test_multimodal_fusion.py   |  69 +++++++
 6 files changed, 319 insertions(+), 46 deletions(-)

diff --git a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py 
b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
index fab3da1adc..bb44703d5c 100644
--- a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
+++ b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
@@ -18,18 +18,13 @@
 # under the License.
 #
 # -------------------------------------------------------------
-
-
+import os
+import multiprocessing as mp
 import itertools
-import pickle
-import time
+import threading
 from dataclasses import dataclass
 from typing import List, Dict, Any, Generator
-import copy
-import traceback
-from itertools import chain
 from systemds.scuro.drsearch.task import Task
-from systemds.scuro.modality.type import ModalityType
 from systemds.scuro.drsearch.representation_dag import (
     RepresentationDag,
     RepresentationDAGBuilder,
@@ -41,6 +36,64 @@ from systemds.scuro.representations.aggregate import 
Aggregation
 from systemds.scuro.drsearch.operator_registry import Registry
 from systemds.scuro.utils.schema_helpers import get_shape
 
+from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED
+import pickle
+import copy
+import time
+import traceback
+from itertools import chain
+
+
+def _evaluate_dag_worker(dag_pickle, task_pickle, modalities_pickle, 
debug=False):
+    try:
+        dag = pickle.loads(dag_pickle)
+        task = pickle.loads(task_pickle)
+        modalities_for_dag = pickle.loads(modalities_pickle)
+
+        start_time = time.time()
+        if debug:
+            print(
+                f"[DEBUG][worker] pid={os.getpid()} evaluating 
dag_root={getattr(dag, 'root_node_id', None)} task={getattr(task.model, 'name', 
None)}"
+            )
+
+        dag_copy = copy.deepcopy(dag)
+        task_copy = copy.deepcopy(task)
+
+        fused_representation = dag_copy.execute(modalities_for_dag, task_copy)
+        if fused_representation is None:
+            return None
+
+        final_representation = fused_representation[
+            list(fused_representation.keys())[-1]
+        ]
+        from systemds.scuro.utils.schema_helpers import get_shape
+        from systemds.scuro.representations.aggregated_representation import (
+            AggregatedRepresentation,
+        )
+        from systemds.scuro.representations.aggregate import Aggregation
+
+        if task_copy.expected_dim == 1 and 
get_shape(final_representation.metadata) > 1:
+            agg_operator = AggregatedRepresentation(Aggregation())
+            final_representation = agg_operator.transform(final_representation)
+
+        eval_start = time.time()
+        scores = task_copy.run(final_representation.data)
+        eval_time = time.time() - eval_start
+        total_time = time.time() - start_time
+
+        return OptimizationResult(
+            dag=dag_copy,
+            train_score=scores[0],
+            val_score=scores[1],
+            runtime=total_time,
+            task_name=task_copy.model.name,
+            evaluation_time=eval_time,
+        )
+    except Exception:
+        if debug:
+            traceback.print_exc()
+        return None
+
 
 class MultimodalOptimizer:
     def __init__(
@@ -68,6 +121,115 @@ class MultimodalOptimizer:
         )
         self.optimization_results = []
 
+    def optimize_parallel(
+        self, max_combinations: int = None, max_workers: int = 2, batch_size: 
int = 4
+    ) -> Dict[str, List["OptimizationResult"]]:
+        all_results = {}
+
+        for task in self.tasks:
+            task_copy = copy.deepcopy(task)
+            if self.debug:
+                print(
+                    f"[DEBUG] Optimizing multimodal fusion for task: 
{task.model.name}"
+                )
+            all_results[task.model.name] = []
+            evaluated_count = 0
+            outstanding = set()
+            stop_generation = False
+
+            modalities_for_task = list(
+                chain.from_iterable(
+                    self.k_best_representations[task.model.name].values()
+                )
+            )
+            task_pickle = pickle.dumps(task_copy)
+            modalities_pickle = pickle.dumps(modalities_for_task)
+            ctx = mp.get_context("spawn")
+            start = time.time()
+            with ProcessPoolExecutor(max_workers=max_workers, mp_context=ctx) 
as ex:
+                for modality_subset in self._generate_modality_combinations():
+                    if stop_generation:
+                        break
+                    if self.debug:
+                        print(f"[DEBUG] Evaluating modality subset: 
{modality_subset}")
+
+                    for repr_combo in 
self._generate_representation_combinations(
+                        modality_subset, task.model.name
+                    ):
+                        if stop_generation:
+                            break
+
+                        for dag in self._generate_fusion_dags(
+                            modality_subset, repr_combo
+                        ):
+                            if max_combinations and evaluated_count >= 
max_combinations:
+                                stop_generation = True
+                                break
+
+                            dag_pickle = pickle.dumps(dag)
+                            fut = ex.submit(
+                                _evaluate_dag_worker,
+                                dag_pickle,
+                                task_pickle,
+                                modalities_pickle,
+                                self.debug,
+                            )
+                            outstanding.add(fut)
+
+                            if len(outstanding) >= batch_size:
+                                done, not_done = wait(
+                                    outstanding, return_when=FIRST_COMPLETED
+                                )
+                                for fut_done in done:
+                                    try:
+                                        result = fut_done.result()
+                                        if result is not None:
+                                            
all_results[task.model.name].append(result)
+                                    except Exception:
+                                        if self.debug:
+                                            traceback.print_exc()
+                                    evaluated_count += 1
+                                    if self.debug and evaluated_count % 100 == 
0:
+                                        print(
+                                            f"[DEBUG] Evaluated 
{evaluated_count} combinations..."
+                                        )
+                                    else:
+                                        print(".", end="")
+                                outstanding = set(not_done)
+
+                    break
+
+                if outstanding:
+                    done, not_done = wait(outstanding)
+                    for fut_done in done:
+                        try:
+                            result = fut_done.result()
+                            if result is not None:
+                                all_results[task.model.name].append(result)
+                        except Exception:
+                            if self.debug:
+                                traceback.print_exc()
+                        evaluated_count += 1
+                        if self.debug and evaluated_count % 100 == 0:
+                            print(
+                                f"[DEBUG] Evaluated {evaluated_count} 
combinations..."
+                            )
+                        else:
+                            print(".", end="")
+            end = time.time()
+            if self.debug:
+                print(f"\n[DEBUG] Total optimization time: {end-start}")
+                print(
+                    f"[DEBUG] Task completed: 
{len(all_results[task.model.name])} valid combinations evaluated"
+                )
+
+        self.optimization_results = all_results
+
+        if self.debug:
+            print(f"[DEBUG] Optimization completed")
+
+        return all_results
+
     def _extract_k_best_representations(
         self, unimodal_optimization_results: Any
     ) -> Dict[str, Dict[str, List[Any]]]:
@@ -181,20 +343,27 @@ class MultimodalOptimizer:
                         yield builder_variant.build(root_id)
                     except ValueError:
                         if self.debug:
-                            print(f"Skipping invalid DAG for root {root_id}")
+                            print(f"[DEBUG] Skipping invalid DAG for root 
{root_id}")
                         continue
 
     def _evaluate_dag(self, dag: RepresentationDag, task: Task) -> 
"OptimizationResult":
         start_time = time.time()
-
         try:
-            fused_representation = dag.execute(
+            tid = threading.get_ident()
+            tname = threading.current_thread().name
+
+            dag_copy = copy.deepcopy(dag)
+            modalities_for_dag = copy.deepcopy(
                 list(
                     chain.from_iterable(
                         self.k_best_representations[task.model.name].values()
                     )
-                ),
-                task,
+                )
+            )
+            task_copy = copy.deepcopy(task)
+            fused_representation = dag_copy.execute(
+                modalities_for_dag,
+                task_copy,
             )
 
             if fused_representation is None:
@@ -203,22 +372,25 @@ class MultimodalOptimizer:
             final_representation = fused_representation[
                 list(fused_representation.keys())[-1]
             ]
-            if task.expected_dim == 1 and 
get_shape(final_representation.metadata) > 1:
+            if (
+                task_copy.expected_dim == 1
+                and get_shape(final_representation.metadata) > 1
+            ):
                 agg_operator = AggregatedRepresentation(Aggregation())
                 final_representation = 
agg_operator.transform(final_representation)
 
             eval_start = time.time()
-            scores = task.run(final_representation.data)
+            scores = task_copy.run(final_representation.data)
             eval_time = time.time() - eval_start
 
             total_time = time.time() - start_time
 
             return OptimizationResult(
-                dag=dag,
+                dag=dag_copy,
                 train_score=scores[0],
                 val_score=scores[1],
                 runtime=total_time,
-                task_name=task.model.name,
+                task_name=task_copy.model.name,
                 evaluation_time=eval_time,
             )
 
@@ -244,13 +416,15 @@ class MultimodalOptimizer:
 
         for task in self.tasks:
             if self.debug:
-                print(f"Optimizing multimodal fusion for task: 
{task.model.name}")
+                print(
+                    f"[DEBUG] Optimizing multimodal fusion for task: 
{task.model.name}"
+                )
             all_results[task.model.name] = []
             evaluated_count = 0
 
             for modality_subset in self._generate_modality_combinations():
                 if self.debug:
-                    print(f"  Evaluating modality subset: {modality_subset}")
+                    print(f"[DEBUG] Evaluating modality subset: 
{modality_subset}")
 
                 for repr_combo in self._generate_representation_combinations(
                     modality_subset, task.model.name
@@ -277,13 +451,13 @@ class MultimodalOptimizer:
 
             if self.debug:
                 print(
-                    f"  Task completed: {len(all_results[task.model.name])} 
valid combinations evaluated"
+                    f"[DEBUG] Task completed: 
{len(all_results[task.model.name])} valid combinations evaluated"
                 )
 
         self.optimization_results = all_results
 
         if self.debug:
-            print(f"\nOptimization completed")
+            print(f"[DEBUG] Optimization completed")
 
         return all_results
 
diff --git a/src/main/python/systemds/scuro/drsearch/operator_registry.py 
b/src/main/python/systemds/scuro/drsearch/operator_registry.py
index 9bc90720f8..3b20245956 100644
--- a/src/main/python/systemds/scuro/drsearch/operator_registry.py
+++ b/src/main/python/systemds/scuro/drsearch/operator_registry.py
@@ -49,6 +49,12 @@ class Registry:
         else:
             self._fusion_operators = [fusion_operators]
 
+    def set_representations(self, modality_type, representations):
+        if isinstance(representations, list):
+            self._representations[modality_type] = representations
+        else:
+            self._representations[modality_type] = [representations]
+
     def add_representation(
         self, representation: Representation, modality: ModalityType
     ):
diff --git a/src/main/python/systemds/scuro/drsearch/representation_dag.py 
b/src/main/python/systemds/scuro/drsearch/representation_dag.py
index 1d5f512eb8..5543da32dd 100644
--- a/src/main/python/systemds/scuro/drsearch/representation_dag.py
+++ b/src/main/python/systemds/scuro/drsearch/representation_dag.py
@@ -139,7 +139,7 @@ class RepresentationDag:
 
             input_mods = [execute_node(input_id, task) for input_id in 
node.inputs]
 
-            node_operation = node.operation()
+            node_operation = copy.deepcopy(node.operation())
             if len(input_mods) == 1:
                 # It's a unimodal operation
                 if isinstance(node_operation, Context):
diff --git a/src/main/python/systemds/scuro/drsearch/task.py 
b/src/main/python/systemds/scuro/drsearch/task.py
index d08844c7bb..0dedc7ede3 100644
--- a/src/main/python/systemds/scuro/drsearch/task.py
+++ b/src/main/python/systemds/scuro/drsearch/task.py
@@ -18,9 +18,9 @@
 # under the License.
 #
 # -------------------------------------------------------------
+import copy
 import time
 from typing import List, Union
-
 from systemds.scuro.modality.modality import Modality
 from systemds.scuro.representations.representation import Representation
 from systemds.scuro.models.model import Model
@@ -62,6 +62,15 @@ class Task:
         self.train_scores = []
         self.val_scores = []
 
+    def create_model(self):
+        """
+        Return a fresh, unfitted model instance.
+        """
+        if self.model is None:
+            return None
+
+        return copy.deepcopy(self.model)
+
     def get_train_test_split(self, data):
         X_train = [data[i] for i in self.train_indices]
         y_train = [self.labels[i] for i in self.train_indices]
@@ -78,6 +87,7 @@ class Task:
          :return: the validation accuracy
         """
         self._reset_params()
+        model = self.create_model()
         skf = KFold(n_splits=self.kfold, shuffle=True, random_state=11)
 
         fold = 0
@@ -88,13 +98,9 @@ class Task:
             train_y = np.array(y)[train]
             test_X = np.array(X)[test]
             test_y = np.array(y)[test]
-            self._run_fold(train_X, train_y, test_X, test_y)
+            self._run_fold(model, train_X, train_y, test_X, test_y)
             fold += 1
 
-        if self.measure_performance:
-            self.inference_time = np.mean(self.inference_time)
-            self.training_time = np.mean(self.training_time)
-
         return [np.mean(self.train_scores), np.mean(self.val_scores)]
 
     def _reset_params(self):
@@ -103,14 +109,14 @@ class Task:
         self.train_scores = []
         self.val_scores = []
 
-    def _run_fold(self, train_X, train_y, test_X, test_y):
+    def _run_fold(self, model, train_X, train_y, test_X, test_y):
         train_start = time.time()
-        train_score = self.model.fit(train_X, train_y, test_X, test_y)
+        train_score = model.fit(train_X, train_y, test_X, test_y)
         train_end = time.time()
         self.training_time.append(train_end - train_start)
         self.train_scores.append(train_score)
         test_start = time.time()
-        test_score = self.model.test(np.array(test_X), test_y)
+        test_score = model.test(np.array(test_X), test_y)
         test_end = time.time()
         self.inference_time.append(test_end - test_start)
         self.val_scores.append(test_score)
diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py 
b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
index f678700bdc..91d72dd35a 100644
--- a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
+++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
@@ -98,7 +98,21 @@ class UnimodalOptimizer:
     def load_results(self, file_name):
         with open(file_name, "rb") as f:
             self.operator_performance.results = pickle.load(f)
-            self.operator_performance.cache = None
+
+    def load_cache(self):
+        for modality in self.modalities:
+            for task in self.tasks:
+                self.operator_performance.cache[modality.modality_id][
+                    task.model.name
+                ] = []
+                with open(
+                    f"{modality.modality_id}_{task.model.name}_cache.pkl", "rb"
+                ) as f:
+                    cache = pickle.load(f)
+                    for c in cache:
+                        self.operator_performance.cache[modality.modality_id][
+                            task.model.name
+                        ].append(c)
 
     def optimize_parallel(self, n_workers=None):
         if n_workers is None:
@@ -269,22 +283,22 @@ class UnimodalOptimizer:
                     )
                     dags.append(builder.build(combine_id))
                     current_node_id = combine_id
-            if modality.modality_type in [
-                ModalityType.EMBEDDING,
-                ModalityType.IMAGE,
-                ModalityType.AUDIO,
-            ]:
-                dags.extend(
-                    self.default_context_operators(
-                        modality, builder, leaf_id, current_node_id
+                if modality.modality_type in [
+                    ModalityType.EMBEDDING,
+                    ModalityType.IMAGE,
+                    ModalityType.AUDIO,
+                ]:
+                    dags.extend(
+                        self.default_context_operators(
+                            modality, builder, leaf_id, current_node_id
+                        )
                     )
-                )
-            elif modality.modality_type == ModalityType.TIMESERIES:
-                dags.extend(
-                    self.temporal_context_operators(
-                        modality, builder, leaf_id, current_node_id
+                elif modality.modality_type == ModalityType.TIMESERIES:
+                    dags.extend(
+                        self.temporal_context_operators(
+                            modality, builder, leaf_id, current_node_id
+                        )
                     )
-                )
         return dags
 
     def default_context_operators(self, modality, builder, leaf_id, 
current_node_id):
@@ -388,6 +402,10 @@ class UnimodalResults:
                 list(task_results[i].dag.execute([modality]).values())[-1]
                 for i in sorted_indices
             ]
+        elif isinstance(self.cache[modality.modality_id][task.model.name], 
list):
+            cache = self.cache[modality.modality_id][
+                task.model.name
+            ]  # used for precomputed cache
         else:
             cache_items = (
                 list(self.cache[modality.modality_id][task.model.name].items())
diff --git a/src/main/python/tests/scuro/test_multimodal_fusion.py 
b/src/main/python/tests/scuro/test_multimodal_fusion.py
index 0925e47cf2..0f9c08d216 100644
--- a/src/main/python/tests/scuro/test_multimodal_fusion.py
+++ b/src/main/python/tests/scuro/test_multimodal_fusion.py
@@ -39,6 +39,7 @@ from systemds.scuro.representations.spectrogram import 
Spectrogram
 from systemds.scuro.representations.word2vec import W2V
 from systemds.scuro.modality.unimodal_modality import UnimodalModality
 from systemds.scuro.representations.resnet import ResNet
+from systemds.scuro.representations.timeseries_representations import Min, Max
 from tests.scuro.data_generator import (
     TestDataLoader,
     ModalityRandomDataGenerator,
@@ -182,6 +183,74 @@ class 
TestMultimodalRepresentationOptimizer(unittest.TestCase):
 
             assert best_results[0].val_score >= best_results[1].val_score
 
+    def test_parallel_multimodal_fusion(self):
+        task = Task(
+            "MM_Fusion_Task1",
+            TestSVM(),
+            self.labels,
+            self.train_indizes,
+            self.val_indizes,
+        )
+
+        audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data(
+            self.num_instances, 1000
+        )
+        text_data, text_md = ModalityRandomDataGenerator().create_text_data(
+            self.num_instances
+        )
+
+        audio = UnimodalModality(
+            TestDataLoader(
+                self.indices, None, ModalityType.AUDIO, audio_data, 
np.float32, audio_md
+            )
+        )
+        text = UnimodalModality(
+            TestDataLoader(
+                self.indices, None, ModalityType.TEXT, text_data, str, text_md
+            )
+        )
+
+        with patch.object(
+            Registry,
+            "_representations",
+            {
+                ModalityType.TEXT: [W2V],
+                ModalityType.AUDIO: [Spectrogram],
+                ModalityType.TIMESERIES: [Max, Min],
+                ModalityType.VIDEO: [ResNet],
+                ModalityType.EMBEDDING: [],
+            },
+        ):
+            registry = Registry()
+            registry._fusion_operators = [Average, Concatenation, LSTM]
+            unimodal_optimizer = UnimodalOptimizer([audio, text], [task], 
debug=False)
+            unimodal_optimizer.optimize()
+            unimodal_optimizer.operator_performance.get_k_best_results(audio, 
2, task)
+            m_o = MultimodalOptimizer(
+                [audio, text],
+                unimodal_optimizer.operator_performance,
+                [task],
+                debug=False,
+                min_modalities=2,
+                max_modalities=3,
+            )
+            fusion_results = m_o.optimize()
+            parallel_fusion_results = m_o.optimize_parallel(max_workers=4, 
batch_size=8)
+
+            best_results = sorted(
+                fusion_results[task.model.name], key=lambda x: x.val_score, 
reverse=True
+            )
+
+            best_results_parallel = sorted(
+                parallel_fusion_results[task.model.name],
+                key=lambda x: x.val_score,
+                reverse=True,
+            )
+
+            assert len(best_results) == len(best_results_parallel)
+            for i in range(len(best_results)):
+                assert best_results[i].val_score == 
best_results_parallel[i].val_score
+
 
 if __name__ == "__main__":
     unittest.main()

Reply via email to