This is an automated email from the ASF dual-hosted git repository.
cdionysio pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new b3c6d289da [SYSTEMDS-3887] Parallelize multimodal optimizer
b3c6d289da is described below
commit b3c6d289daf1f395bf8d97b99bb432e10e7d8df0
Author: Christina Dionysio <[email protected]>
AuthorDate: Thu Nov 13 13:24:37 2025 +0100
[SYSTEMDS-3887] Parallelize multimodal optimizer
This patch adds a new multimodal optimization method that runs the
optimization in parallel using python multiprocessing.
Additionally, it adds a test that checks that the results for the parallel
run equal those of the single threaded run.
---
.../scuro/drsearch/multimodal_optimizer.py | 216 +++++++++++++++++++--
.../systemds/scuro/drsearch/operator_registry.py | 6 +
.../systemds/scuro/drsearch/representation_dag.py | 2 +-
src/main/python/systemds/scuro/drsearch/task.py | 24 ++-
.../systemds/scuro/drsearch/unimodal_optimizer.py | 48 +++--
.../python/tests/scuro/test_multimodal_fusion.py | 69 +++++++
6 files changed, 319 insertions(+), 46 deletions(-)
diff --git a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
index fab3da1adc..bb44703d5c 100644
--- a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
+++ b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
@@ -18,18 +18,13 @@
# under the License.
#
# -------------------------------------------------------------
-
-
+import os
+import multiprocessing as mp
import itertools
-import pickle
-import time
+import threading
from dataclasses import dataclass
from typing import List, Dict, Any, Generator
-import copy
-import traceback
-from itertools import chain
from systemds.scuro.drsearch.task import Task
-from systemds.scuro.modality.type import ModalityType
from systemds.scuro.drsearch.representation_dag import (
RepresentationDag,
RepresentationDAGBuilder,
@@ -41,6 +36,64 @@ from systemds.scuro.representations.aggregate import
Aggregation
from systemds.scuro.drsearch.operator_registry import Registry
from systemds.scuro.utils.schema_helpers import get_shape
+from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED
+import pickle
+import copy
+import time
+import traceback
+from itertools import chain
+
+
+def _evaluate_dag_worker(dag_pickle, task_pickle, modalities_pickle,
debug=False):
+ try:
+ dag = pickle.loads(dag_pickle)
+ task = pickle.loads(task_pickle)
+ modalities_for_dag = pickle.loads(modalities_pickle)
+
+ start_time = time.time()
+ if debug:
+ print(
+ f"[DEBUG][worker] pid={os.getpid()} evaluating
dag_root={getattr(dag, 'root_node_id', None)} task={getattr(task.model, 'name',
None)}"
+ )
+
+ dag_copy = copy.deepcopy(dag)
+ task_copy = copy.deepcopy(task)
+
+ fused_representation = dag_copy.execute(modalities_for_dag, task_copy)
+ if fused_representation is None:
+ return None
+
+ final_representation = fused_representation[
+ list(fused_representation.keys())[-1]
+ ]
+ from systemds.scuro.utils.schema_helpers import get_shape
+ from systemds.scuro.representations.aggregated_representation import (
+ AggregatedRepresentation,
+ )
+ from systemds.scuro.representations.aggregate import Aggregation
+
+ if task_copy.expected_dim == 1 and
get_shape(final_representation.metadata) > 1:
+ agg_operator = AggregatedRepresentation(Aggregation())
+ final_representation = agg_operator.transform(final_representation)
+
+ eval_start = time.time()
+ scores = task_copy.run(final_representation.data)
+ eval_time = time.time() - eval_start
+ total_time = time.time() - start_time
+
+ return OptimizationResult(
+ dag=dag_copy,
+ train_score=scores[0],
+ val_score=scores[1],
+ runtime=total_time,
+ task_name=task_copy.model.name,
+ evaluation_time=eval_time,
+ )
+ except Exception:
+ if debug:
+ traceback.print_exc()
+ return None
+
class MultimodalOptimizer:
def __init__(
@@ -68,6 +121,115 @@ class MultimodalOptimizer:
)
self.optimization_results = []
+ def optimize_parallel(
+ self, max_combinations: int = None, max_workers: int = 2, batch_size:
int = 4
+ ) -> Dict[str, List["OptimizationResult"]]:
+ all_results = {}
+
+ for task in self.tasks:
+ task_copy = copy.deepcopy(task)
+ if self.debug:
+ print(
+ f"[DEBUG] Optimizing multimodal fusion for task:
{task.model.name}"
+ )
+ all_results[task.model.name] = []
+ evaluated_count = 0
+ outstanding = set()
+ stop_generation = False
+
+ modalities_for_task = list(
+ chain.from_iterable(
+ self.k_best_representations[task.model.name].values()
+ )
+ )
+ task_pickle = pickle.dumps(task_copy)
+ modalities_pickle = pickle.dumps(modalities_for_task)
+ ctx = mp.get_context("spawn")
+ start = time.time()
+ with ProcessPoolExecutor(max_workers=max_workers, mp_context=ctx)
as ex:
+ for modality_subset in self._generate_modality_combinations():
+ if stop_generation:
+ break
+ if self.debug:
+ print(f"[DEBUG] Evaluating modality subset:
{modality_subset}")
+
+ for repr_combo in
self._generate_representation_combinations(
+ modality_subset, task.model.name
+ ):
+ if stop_generation:
+ break
+
+ for dag in self._generate_fusion_dags(
+ modality_subset, repr_combo
+ ):
+ if max_combinations and evaluated_count >=
max_combinations:
+ stop_generation = True
+ break
+
+ dag_pickle = pickle.dumps(dag)
+ fut = ex.submit(
+ _evaluate_dag_worker,
+ dag_pickle,
+ task_pickle,
+ modalities_pickle,
+ self.debug,
+ )
+ outstanding.add(fut)
+
+ if len(outstanding) >= batch_size:
+ done, not_done = wait(
+ outstanding, return_when=FIRST_COMPLETED
+ )
+ for fut_done in done:
+ try:
+ result = fut_done.result()
+ if result is not None:
+
all_results[task.model.name].append(result)
+ except Exception:
+ if self.debug:
+ traceback.print_exc()
+ evaluated_count += 1
+ if self.debug and evaluated_count % 100 ==
0:
+ print(
+ f"[DEBUG] Evaluated
{evaluated_count} combinations..."
+ )
+ else:
+ print(".", end="")
+ outstanding = set(not_done)
+
+ break
+
+ if outstanding:
+ done, not_done = wait(outstanding)
+ for fut_done in done:
+ try:
+ result = fut_done.result()
+ if result is not None:
+ all_results[task.model.name].append(result)
+ except Exception:
+ if self.debug:
+ traceback.print_exc()
+ evaluated_count += 1
+ if self.debug and evaluated_count % 100 == 0:
+ print(
+ f"[DEBUG] Evaluated {evaluated_count}
combinations..."
+ )
+ else:
+ print(".", end="")
+ end = time.time()
+ if self.debug:
+ print(f"\n[DEBUG] Total optimization time: {end-start}")
+ print(
+ f"[DEBUG] Task completed:
{len(all_results[task.model.name])} valid combinations evaluated"
+ )
+
+ self.optimization_results = all_results
+
+ if self.debug:
+ print(f"[DEBUG] Optimization completed")
+
+ return all_results
+
def _extract_k_best_representations(
self, unimodal_optimization_results: Any
) -> Dict[str, Dict[str, List[Any]]]:
@@ -181,20 +343,27 @@ class MultimodalOptimizer:
yield builder_variant.build(root_id)
except ValueError:
if self.debug:
- print(f"Skipping invalid DAG for root {root_id}")
+ print(f"[DEBUG] Skipping invalid DAG for root
{root_id}")
continue
def _evaluate_dag(self, dag: RepresentationDag, task: Task) ->
"OptimizationResult":
start_time = time.time()
-
try:
- fused_representation = dag.execute(
+ tid = threading.get_ident()
+ tname = threading.current_thread().name
+
+ dag_copy = copy.deepcopy(dag)
+ modalities_for_dag = copy.deepcopy(
list(
chain.from_iterable(
self.k_best_representations[task.model.name].values()
)
- ),
- task,
+ )
+ )
+ task_copy = copy.deepcopy(task)
+ fused_representation = dag_copy.execute(
+ modalities_for_dag,
+ task_copy,
)
if fused_representation is None:
@@ -203,22 +372,25 @@ class MultimodalOptimizer:
final_representation = fused_representation[
list(fused_representation.keys())[-1]
]
- if task.expected_dim == 1 and
get_shape(final_representation.metadata) > 1:
+ if (
+ task_copy.expected_dim == 1
+ and get_shape(final_representation.metadata) > 1
+ ):
agg_operator = AggregatedRepresentation(Aggregation())
final_representation =
agg_operator.transform(final_representation)
eval_start = time.time()
- scores = task.run(final_representation.data)
+ scores = task_copy.run(final_representation.data)
eval_time = time.time() - eval_start
total_time = time.time() - start_time
return OptimizationResult(
- dag=dag,
+ dag=dag_copy,
train_score=scores[0],
val_score=scores[1],
runtime=total_time,
- task_name=task.model.name,
+ task_name=task_copy.model.name,
evaluation_time=eval_time,
)
@@ -244,13 +416,15 @@ class MultimodalOptimizer:
for task in self.tasks:
if self.debug:
- print(f"Optimizing multimodal fusion for task:
{task.model.name}")
+ print(
+ f"[DEBUG] Optimizing multimodal fusion for task:
{task.model.name}"
+ )
all_results[task.model.name] = []
evaluated_count = 0
for modality_subset in self._generate_modality_combinations():
if self.debug:
- print(f" Evaluating modality subset: {modality_subset}")
+ print(f"[DEBUG] Evaluating modality subset:
{modality_subset}")
for repr_combo in self._generate_representation_combinations(
modality_subset, task.model.name
@@ -277,13 +451,13 @@ class MultimodalOptimizer:
if self.debug:
print(
- f" Task completed: {len(all_results[task.model.name])}
valid combinations evaluated"
+ f"[DEBUG] Task completed:
{len(all_results[task.model.name])} valid combinations evaluated"
)
self.optimization_results = all_results
if self.debug:
- print(f"\nOptimization completed")
+ print(f"[DEBUG] Optimization completed")
return all_results
diff --git a/src/main/python/systemds/scuro/drsearch/operator_registry.py
b/src/main/python/systemds/scuro/drsearch/operator_registry.py
index 9bc90720f8..3b20245956 100644
--- a/src/main/python/systemds/scuro/drsearch/operator_registry.py
+++ b/src/main/python/systemds/scuro/drsearch/operator_registry.py
@@ -49,6 +49,12 @@ class Registry:
else:
self._fusion_operators = [fusion_operators]
+ def set_representations(self, modality_type, representations):
+ if isinstance(representations, list):
+ self._representations[modality_type] = representations
+ else:
+ self._representations[modality_type] = [representations]
+
def add_representation(
self, representation: Representation, modality: ModalityType
):
diff --git a/src/main/python/systemds/scuro/drsearch/representation_dag.py
b/src/main/python/systemds/scuro/drsearch/representation_dag.py
index 1d5f512eb8..5543da32dd 100644
--- a/src/main/python/systemds/scuro/drsearch/representation_dag.py
+++ b/src/main/python/systemds/scuro/drsearch/representation_dag.py
@@ -139,7 +139,7 @@ class RepresentationDag:
input_mods = [execute_node(input_id, task) for input_id in
node.inputs]
- node_operation = node.operation()
+ node_operation = copy.deepcopy(node.operation())
if len(input_mods) == 1:
# It's a unimodal operation
if isinstance(node_operation, Context):
diff --git a/src/main/python/systemds/scuro/drsearch/task.py
b/src/main/python/systemds/scuro/drsearch/task.py
index d08844c7bb..0dedc7ede3 100644
--- a/src/main/python/systemds/scuro/drsearch/task.py
+++ b/src/main/python/systemds/scuro/drsearch/task.py
@@ -18,9 +18,9 @@
# under the License.
#
# -------------------------------------------------------------
+import copy
import time
from typing import List, Union
-
from systemds.scuro.modality.modality import Modality
from systemds.scuro.representations.representation import Representation
from systemds.scuro.models.model import Model
@@ -62,6 +62,15 @@ class Task:
self.train_scores = []
self.val_scores = []
+ def create_model(self):
+ """
+ Return a fresh, unfitted model instance.
+ """
+ if self.model is None:
+ return None
+
+ return copy.deepcopy(self.model)
+
def get_train_test_split(self, data):
X_train = [data[i] for i in self.train_indices]
y_train = [self.labels[i] for i in self.train_indices]
@@ -78,6 +87,7 @@ class Task:
:return: the validation accuracy
"""
self._reset_params()
+ model = self.create_model()
skf = KFold(n_splits=self.kfold, shuffle=True, random_state=11)
fold = 0
@@ -88,13 +98,9 @@ class Task:
train_y = np.array(y)[train]
test_X = np.array(X)[test]
test_y = np.array(y)[test]
- self._run_fold(train_X, train_y, test_X, test_y)
+ self._run_fold(model, train_X, train_y, test_X, test_y)
fold += 1
- if self.measure_performance:
- self.inference_time = np.mean(self.inference_time)
- self.training_time = np.mean(self.training_time)
-
return [np.mean(self.train_scores), np.mean(self.val_scores)]
def _reset_params(self):
@@ -103,14 +109,14 @@ class Task:
self.train_scores = []
self.val_scores = []
- def _run_fold(self, train_X, train_y, test_X, test_y):
+ def _run_fold(self, model, train_X, train_y, test_X, test_y):
train_start = time.time()
- train_score = self.model.fit(train_X, train_y, test_X, test_y)
+ train_score = model.fit(train_X, train_y, test_X, test_y)
train_end = time.time()
self.training_time.append(train_end - train_start)
self.train_scores.append(train_score)
test_start = time.time()
- test_score = self.model.test(np.array(test_X), test_y)
+ test_score = model.test(np.array(test_X), test_y)
test_end = time.time()
self.inference_time.append(test_end - test_start)
self.val_scores.append(test_score)
diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
index f678700bdc..91d72dd35a 100644
--- a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
+++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
@@ -98,7 +98,21 @@ class UnimodalOptimizer:
def load_results(self, file_name):
with open(file_name, "rb") as f:
self.operator_performance.results = pickle.load(f)
- self.operator_performance.cache = None
+
+ def load_cache(self):
+ for modality in self.modalities:
+ for task in self.tasks:
+ self.operator_performance.cache[modality.modality_id][
+ task.model.name
+ ] = []
+ with open(
+ f"{modality.modality_id}_{task.model.name}_cache.pkl", "rb"
+ ) as f:
+ cache = pickle.load(f)
+ for c in cache:
+ self.operator_performance.cache[modality.modality_id][
+ task.model.name
+ ].append(c)
def optimize_parallel(self, n_workers=None):
if n_workers is None:
@@ -269,22 +283,22 @@ class UnimodalOptimizer:
)
dags.append(builder.build(combine_id))
current_node_id = combine_id
- if modality.modality_type in [
- ModalityType.EMBEDDING,
- ModalityType.IMAGE,
- ModalityType.AUDIO,
- ]:
- dags.extend(
- self.default_context_operators(
- modality, builder, leaf_id, current_node_id
+ if modality.modality_type in [
+ ModalityType.EMBEDDING,
+ ModalityType.IMAGE,
+ ModalityType.AUDIO,
+ ]:
+ dags.extend(
+ self.default_context_operators(
+ modality, builder, leaf_id, current_node_id
+ )
)
- )
- elif modality.modality_type == ModalityType.TIMESERIES:
- dags.extend(
- self.temporal_context_operators(
- modality, builder, leaf_id, current_node_id
+ elif modality.modality_type == ModalityType.TIMESERIES:
+ dags.extend(
+ self.temporal_context_operators(
+ modality, builder, leaf_id, current_node_id
+ )
)
- )
return dags
def default_context_operators(self, modality, builder, leaf_id,
current_node_id):
@@ -388,6 +402,10 @@ class UnimodalResults:
list(task_results[i].dag.execute([modality]).values())[-1]
for i in sorted_indices
]
+ elif isinstance(self.cache[modality.modality_id][task.model.name],
list):
+ cache = self.cache[modality.modality_id][
+ task.model.name
+ ] # used for precomputed cache
else:
cache_items = (
list(self.cache[modality.modality_id][task.model.name].items())
diff --git a/src/main/python/tests/scuro/test_multimodal_fusion.py
b/src/main/python/tests/scuro/test_multimodal_fusion.py
index 0925e47cf2..0f9c08d216 100644
--- a/src/main/python/tests/scuro/test_multimodal_fusion.py
+++ b/src/main/python/tests/scuro/test_multimodal_fusion.py
@@ -39,6 +39,7 @@ from systemds.scuro.representations.spectrogram import
Spectrogram
from systemds.scuro.representations.word2vec import W2V
from systemds.scuro.modality.unimodal_modality import UnimodalModality
from systemds.scuro.representations.resnet import ResNet
+from systemds.scuro.representations.timeseries_representations import Min, Max
from tests.scuro.data_generator import (
TestDataLoader,
ModalityRandomDataGenerator,
@@ -182,6 +183,74 @@ class
TestMultimodalRepresentationOptimizer(unittest.TestCase):
assert best_results[0].val_score >= best_results[1].val_score
+ def test_parallel_multimodal_fusion(self):
+ task = Task(
+ "MM_Fusion_Task1",
+ TestSVM(),
+ self.labels,
+ self.train_indizes,
+ self.val_indizes,
+ )
+
+ audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data(
+ self.num_instances, 1000
+ )
+ text_data, text_md = ModalityRandomDataGenerator().create_text_data(
+ self.num_instances
+ )
+
+ audio = UnimodalModality(
+ TestDataLoader(
+ self.indices, None, ModalityType.AUDIO, audio_data,
np.float32, audio_md
+ )
+ )
+ text = UnimodalModality(
+ TestDataLoader(
+ self.indices, None, ModalityType.TEXT, text_data, str, text_md
+ )
+ )
+
+ with patch.object(
+ Registry,
+ "_representations",
+ {
+ ModalityType.TEXT: [W2V],
+ ModalityType.AUDIO: [Spectrogram],
+ ModalityType.TIMESERIES: [Max, Min],
+ ModalityType.VIDEO: [ResNet],
+ ModalityType.EMBEDDING: [],
+ },
+ ):
+ registry = Registry()
+ registry._fusion_operators = [Average, Concatenation, LSTM]
+ unimodal_optimizer = UnimodalOptimizer([audio, text], [task],
debug=False)
+ unimodal_optimizer.optimize()
+ unimodal_optimizer.operator_performance.get_k_best_results(audio,
2, task)
+ m_o = MultimodalOptimizer(
+ [audio, text],
+ unimodal_optimizer.operator_performance,
+ [task],
+ debug=False,
+ min_modalities=2,
+ max_modalities=3,
+ )
+ fusion_results = m_o.optimize()
+ parallel_fusion_results = m_o.optimize_parallel(max_workers=4,
batch_size=8)
+
+ best_results = sorted(
+ fusion_results[task.model.name], key=lambda x: x.val_score,
reverse=True
+ )
+
+ best_results_parallel = sorted(
+ parallel_fusion_results[task.model.name],
+ key=lambda x: x.val_score,
+ reverse=True,
+ )
+
+ assert len(best_results) == len(best_results_parallel)
+ for i in range(len(best_results)):
+ assert best_results[i].val_score ==
best_results_parallel[i].val_score
+
if __name__ == "__main__":
unittest.main()