This is an automated email from the ASF dual-hosted git repository.

cdionysio pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new d3ffc705a3 [SYSTEMDS-3887] Multimodal HP-tuning fix
d3ffc705a3 is described below

commit d3ffc705a32857fce65e6d2071d4000ce11f3c99
Author: Christina Dionysio <[email protected]>
AuthorDate: Wed Jan 28 19:31:43 2026 +0100

    [SYSTEMDS-3887] Multimodal HP-tuning fix
    
    This patch fixes an issue in the multimodal hyperparameter tuning. It 
introduces the correct functionality to get the TransformedModalities with the 
latest representations already applied on.
---
 .../scuro/drsearch/hyperparameter_tuner.py         | 283 ++++++++++++++-------
 .../scuro/representations/window_aggregation.py    |  12 +-
 src/main/python/tests/scuro/data_generator.py      |   1 +
 src/main/python/tests/scuro/test_hp_tuner.py       |  80 ++++--
 4 files changed, 251 insertions(+), 125 deletions(-)

diff --git a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py 
b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
index ed0eb5abde..0a129c8eb4 100644
--- a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
+++ b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
@@ -19,27 +19,85 @@
 #
 # -------------------------------------------------------------
 from typing import Dict, List, Tuple, Any, Optional
-from skopt import gp_minimize
 from skopt.space import Real, Integer, Categorical
-from skopt.utils import use_named_args
-import json
+import numpy as np
 import logging
 from dataclasses import dataclass
 import time
 import copy
-
+from joblib import Parallel, delayed
+from skopt import Optimizer
+from systemds.scuro.drsearch.representation_dag import (
+    RepresentationDAGBuilder,
+)
 from systemds.scuro.modality.modality import Modality
+from systemds.scuro.drsearch.task import PerformanceMeasure
+import pickle
+
+
+def get_params_for_node(node_id, params):
+    return {
+        k.split("-")[-1]: v for k, v in params.items() if k.startswith(node_id 
+ "-")
+    }
 
 
 @dataclass
 class HyperparamResult:
-
     representation_name: str
     best_params: Dict[str, Any]
     best_score: float
     all_results: List[Tuple[Dict[str, Any], float]]
     tuning_time: float
     modality_id: int
+    task_name: str
+    dag: Any
+    mm_opt: bool = False
+
+
+class HyperparamResults:
+    def __init__(self, tasks, modalities):
+        self.tasks = tasks
+        self.modalities = modalities
+        self.results = {}
+        for task in tasks:
+            self.results[task.model.name] = {
+                modality.modality_id: [] for modality in modalities
+            }
+
+    def add_result(self, results):
+        # TODO: Check if order of best results matters (deterministic)
+        for result in results:
+            if result.mm_opt:
+                self.results[result.task_name]["mm_results"].append(result)
+            else:
+                
self.results[result.task_name][result.modality_id].append(result)
+
+    def setup_mm(self, optimize_unimodal):
+        if not optimize_unimodal:
+            self.results = {}
+            for task in self.tasks:
+                self.results[task.model.name] = {"mm_results": []}
+
+    def get_k_best_results(self, modality, task, performance_metric_name):
+        results = self.results[task.model.name][modality.modality_id]
+        dags = []
+        for result in results:
+            dag_with_best_params = RepresentationDAGBuilder()
+            prev_node_id = None
+            for node in result.dag.nodes:
+                if node.operation is not None and node.parameters:
+                    params = get_params_for_node(node.node_id, 
result.best_params)
+                    prev_node_id = dag_with_best_params.create_operation_node(
+                        node.operation, [prev_node_id], params
+                    )
+                else:  # it is a leaf node
+                    prev_node_id = dag_with_best_params.create_leaf_node(
+                        node.modality_id
+                    )
+
+            dags.append(dag_with_best_params.build(prev_node_id))
+        representations = [list(dag.execute([modality]).values())[-1] for dag 
in dags]
+        return results, representations
 
 
 class HyperparameterTuner:
@@ -57,16 +115,17 @@ class HyperparameterTuner:
         debug: bool = False,
     ):
         self.tasks = tasks
-        self.optimization_results = optimization_results
+        self.unimodal_optimization_results = optimization_results
+        self.optimization_results = HyperparamResults(tasks, modalities)
         self.n_jobs = n_jobs
         self.scoring_metric = scoring_metric
         self.maximize_metric = maximize_metric
         self.save_results = save_results
-        self.results = {}
         self.k = k
         self.modalities = modalities
         self.representations = None
         self.k_best_cache = None
+        self.k_best_cache_by_modality = None
         self.k_best_representations = None
         self.extract_k_best_modalities_per_task()
         self.debug = debug
@@ -96,43 +155,45 @@ class HyperparameterTuner:
     def extract_k_best_modalities_per_task(self):
         self.k_best_representations = {}
         self.k_best_cache = {}
+        self.k_best_cache_by_modality = {}
         representations = {}
         for task in self.tasks:
             self.k_best_representations[task.model.name] = []
             self.k_best_cache[task.model.name] = []
+            self.k_best_cache_by_modality[task.model.name] = {}
             representations[task.model.name] = {}
             for modality in self.modalities:
                 k_best_results, cached_data = (
-                    self.optimization_results.get_k_best_results(
+                    self.unimodal_optimization_results.get_k_best_results(
                         modality, task, self.scoring_metric
                     )
                 )
                 representations[task.model.name][modality.modality_id] = 
k_best_results
+                self.k_best_cache_by_modality[task.model.name][
+                    modality.modality_id
+                ] = cached_data
                 
self.k_best_representations[task.model.name].extend(k_best_results)
                 self.k_best_cache[task.model.name].extend(cached_data)
         self.representations = representations
 
     def tune_unimodal_representations(self, max_eval_per_rep: Optional[int] = 
None):
-        results = {}
         for task in self.tasks:
-            results[task.model.name] = []
-            for representation in self.k_best_representations[task.model.name]:
-                result = self.tune_dag_representation(
-                    representation.dag,
-                    representation.dag.root_node_id,
-                    task,
-                    max_eval_per_rep,
+            reps = self.k_best_representations[task.model.name]
+            self.optimization_results.add_result(
+                Parallel(n_jobs=self.n_jobs)(
+                    delayed(self.tune_dag_representation)(
+                        rep.dag, rep.dag.root_node_id, task, max_eval_per_rep
+                    )
+                    for rep in reps
                 )
-                results[task.model.name].append(result)
-
-        self.results = results
+            )
 
         if self.save_results:
             self.save_tuning_results()
 
-        return results
-
-    def tune_dag_representation(self, dag, root_node_id, task, max_evals=None):
+    def tune_dag_representation(
+        self, dag, root_node_id, task, max_evals=None, mm_opt=False
+    ):
         hyperparams = {}
         reps = []
         modality_ids = []
@@ -149,7 +210,7 @@ class HyperparameterTuner:
             visited.add(node_id)
             if node.operation is not None:
                 if node.operation().parameters:
-                    hyperparams.update(node.operation().parameters)
+                    hyperparams[node_id] = node.operation().parameters
                 reps.append(node.operation)
                 node_order.append(node_id)
             if node.modality_id is not None:
@@ -161,99 +222,136 @@ class HyperparameterTuner:
             return None
 
         start_time = time.time()
-        rep_name = "_".join([rep.__name__ for rep in reps])
+        rep_name = "-".join([rep.__name__ for rep in reps])
 
         search_space = []
         param_names = []
-        for param_name, param_values in hyperparams.items():
-            param_names.append(param_name)
-            if isinstance(param_values, list):
-                if all(isinstance(v, (int, float)) for v in param_values):
-                    if all(isinstance(v, int) for v in param_values):
+        for op_id, op_params in hyperparams.items():
+            for param_name, param_values in op_params.items():
+                param_names.append(op_id + "-" + param_name)
+                if isinstance(param_values, list):
+                    search_space.append(
+                        Categorical(param_values, name=op_id + "-" + 
param_name)
+                    )
+                elif isinstance(param_values, tuple) and len(param_values) == 
2:
+                    if isinstance(param_values[0], int) and isinstance(
+                        param_values[1], int
+                    ):
                         search_space.append(
                             Integer(
-                                min(param_values), max(param_values), 
name=param_name
+                                param_values[0],
+                                param_values[1],
+                                name=op_id + "-" + param_name,
                             )
                         )
                     else:
                         search_space.append(
-                            Real(min(param_values), max(param_values), 
name=param_name)
+                            Real(
+                                param_values[0],
+                                param_values[1],
+                                name=op_id + "-" + param_name,
+                            )
                         )
                 else:
-                    search_space.append(Categorical(param_values, 
name=param_name))
-            elif isinstance(param_values, tuple) and len(param_values) == 2:
-                if isinstance(param_values[0], int) and isinstance(
-                    param_values[1], int
-                ):
                     search_space.append(
-                        Integer(param_values[0], param_values[1], 
name=param_name)
+                        Categorical([param_values], name=op_id + "-" + 
param_name)
                     )
-                else:
-                    search_space.append(
-                        Real(param_values[0], param_values[1], name=param_name)
-                    )
-            else:
-                search_space.append(Categorical([param_values], 
name=param_name))
 
         n_calls = max_evals if max_evals else 50
 
         all_results = []
 
-        @use_named_args(search_space)
-        def objective(**params):
+        def evaluate_point(point):
+            params = dict(zip(param_names, point))
             result = self.evaluate_dag_config(
-                dag, params, node_order, modality_ids, task
+                dag,
+                params,
+                node_order,
+                modality_ids,
+                task,
+                modalities_override=(
+                    self._get_cached_modalities_for_task(task, modality_ids)
+                    if mm_opt
+                    else None
+                ),
             )
-            all_results.append(result)
-
-            score = result[1].average_scores[self.scoring_metric]
+            score = result[1]
+            if isinstance(score, PerformanceMeasure):
+                score = score.average_scores[self.scoring_metric]
             if self.maximize_metric:
-                return -score
+                objective_value = -score
             else:
-                return score
+                objective_value = score
+            return objective_value, result
 
-        result = gp_minimize(
-            objective,
-            search_space,
-            n_calls=n_calls,
-            random_state=42,
-            verbose=self.debug,
-            n_initial_points=min(10, n_calls // 2),
+        opt = Optimizer(
+            search_space, random_state=42, n_initial_points=min(10, n_calls // 
2)
         )
 
-        if self.maximize_metric:
-            best_params, best_score = max(
-                all_results, key=lambda x: 
x[1].average_scores[self.scoring_metric]
+        n_batch = min(abs(self.n_jobs), n_calls) if self.n_jobs != 0 else 1
+        for _ in range(0, n_calls, n_batch):
+            points = opt.ask(n_points=n_batch)
+            results = Parallel(n_jobs=self.n_jobs)(
+                delayed(evaluate_point)(p) for p in points
             )
+            objective_values = [result[0] for result in results]
+            all_results.extend(result[1] for result in results)
+            opt.tell(points, objective_values)
+
+        def get_score(result):
+            score = result[1]
+            if isinstance(score, PerformanceMeasure):
+                return score.average_scores[self.scoring_metric]
+            return score
+
+        if self.maximize_metric:
+            best_params, best_score = max(all_results, key=get_score)
         else:
-            best_params, best_score = min(
-                all_results, key=lambda x: 
x[1].average_scores[self.scoring_metric]
-            )
+            best_params, best_score = min(all_results, key=get_score)
 
         tuning_time = time.time() - start_time
 
-        return HyperparamResult(
+        best_result = HyperparamResult(
             representation_name=rep_name,
             best_params=best_params,
             best_score=best_score,
             all_results=all_results,
             tuning_time=tuning_time,
             modality_id=modality_ids[0] if modality_ids else None,
+            task_name=task.model.name,
+            dag=dag,
+            mm_opt=mm_opt,
         )
 
-    def evaluate_dag_config(self, dag, params, node_order, modality_ids, task):
+        return best_result
+
+    def _get_cached_modalities_for_task(self, task, modality_ids):
+        if not self.k_best_cache_by_modality:
+            return self.get_modalities_by_id(modality_ids)
+        unique_modality_ids = list(dict.fromkeys(modality_ids))
+        cached_modalities = []
+        for modality_id in unique_modality_ids:
+            cached_modalities.extend(
+                
self.k_best_cache_by_modality[task.model.name].get(modality_id, [])
+            )
+        return cached_modalities
+
+    def evaluate_dag_config(
+        self, dag, params, node_order, modality_ids, task, 
modalities_override=None
+    ):
         try:
             dag_copy = copy.deepcopy(dag)
 
             for node_id in node_order:
                 node = dag_copy.get_node_by_id(node_id)
                 if node.operation is not None and node.parameters:
-                    node_params = {
-                        k: v for k, v in params.items() if k in node.parameters
-                    }
-                    node.parameters = node_params
+                    node.parameters = get_params_for_node(node_id, params)
 
-            modalities = self.get_modalities_by_id(modality_ids)
+            modalities = (
+                modalities_override
+                if modalities_override is not None
+                else self.get_modalities_by_id(modality_ids)
+            )
             modified_modality = dag_copy.execute(modalities, task)
             score = task.run(
                 modified_modality[list(modified_modality.keys())[-1]].data
@@ -262,7 +360,7 @@ class HyperparameterTuner:
             return params, score
         except Exception as e:
             self.logger.error(f"Error evaluating DAG with params {params}: 
{e}")
-            return params, float("-inf") if self.maximize_metric else 
float("inf")
+            return params, np.nan
 
     def tune_multimodal_representations(
         self,
@@ -271,14 +369,25 @@ class HyperparameterTuner:
         optimize_unimodal: bool = True,
         max_eval_per_rep: Optional[int] = None,
     ):
-        results = {}
+        self.optimization_results.setup_mm(optimize_unimodal)
         for task in self.tasks:
+
+            def _get_metric_value(result):
+                score = result.val_score
+                if isinstance(score, PerformanceMeasure):
+                    score = score.average_scores
+                if isinstance(score, dict):
+                    return score.get(
+                        self.scoring_metric,
+                        float("-inf") if self.maximize_metric else 
float("inf"),
+                    )
+                return score
+
             best_results = sorted(
                 optimization_results[task.model.name],
-                key=lambda x: x.val_score,
-                reverse=True,
+                key=_get_metric_value,
+                reverse=self.maximize_metric,
             )[:k]
-            results[task.model.name] = []
             best_optimization_results = best_results
 
             for representation in best_optimization_results:
@@ -314,32 +423,18 @@ class HyperparameterTuner:
                         representation.dag.root_node_id,
                         task,
                         max_eval_per_rep,
+                        mm_opt=True,
                     )
-                results[task.model.name].append(result)
-
-        self.results = results
-
+                self.optimization_results.add_result([result])
         if self.save_results:
             self.save_tuning_results()
 
-        return results
-
     def save_tuning_results(self, filepath: str = None):
         if not filepath:
             filepath = f"hyperparameter_results_{int(time.time())}.json"
 
-        json_results = {}
-        for task in self.results.keys():
-            for result in self.results[task]:
-                json_results[result.representation_name] = {
-                    "best_params": result.best_params,
-                    "best_score": result.best_score,
-                    "tuning_time": result.tuning_time,
-                    "num_evaluations": len(result.all_results),
-                }
-
-        with open(filepath, "w") as f:
-            json.dump(json_results, f, indent=2)
+        with open(filepath, "wb") as f:
+            pickle.dump(self.optimization_results.results, f)
 
         if self.debug:
             self.logger.info(f"Results saved to {filepath}")
diff --git 
a/src/main/python/systemds/scuro/representations/window_aggregation.py 
b/src/main/python/systemds/scuro/representations/window_aggregation.py
index 4d4ec19c5b..039387eb01 100644
--- a/src/main/python/systemds/scuro/representations/window_aggregation.py
+++ b/src/main/python/systemds/scuro/representations/window_aggregation.py
@@ -59,7 +59,9 @@ class Window(Context):
             self._aggregation_function = Aggregation(value)
 
 
-@register_context_operator([ModalityType.TIMESERIES, ModalityType.AUDIO])
+@register_context_operator(
+    [ModalityType.TIMESERIES, ModalityType.AUDIO, ModalityType.EMBEDDING]
+)
 class WindowAggregation(Window):
     def __init__(self, aggregation_function="mean", window_size=10, pad=False):
         super().__init__("WindowAggregation", aggregation_function)
@@ -167,7 +169,9 @@ class WindowAggregation(Window):
         return np.array(result)
 
 
-@register_context_operator([ModalityType.TIMESERIES, ModalityType.AUDIO])
+@register_context_operator(
+    [ModalityType.TIMESERIES, ModalityType.AUDIO, ModalityType.EMBEDDING]
+)
 class StaticWindow(Window):
     def __init__(self, aggregation_function="mean", num_windows=100):
         super().__init__("StaticWindow", aggregation_function)
@@ -198,7 +202,9 @@ class StaticWindow(Window):
         return np.array(windowed_data)
 
 
-@register_context_operator([ModalityType.TIMESERIES, ModalityType.AUDIO])
+@register_context_operator(
+    [ModalityType.TIMESERIES, ModalityType.AUDIO, ModalityType.EMBEDDING]
+)
 class DynamicWindow(Window):
     def __init__(self, aggregation_function="mean", num_windows=100):
         super().__init__("DynamicWindow", aggregation_function)
diff --git a/src/main/python/tests/scuro/data_generator.py 
b/src/main/python/tests/scuro/data_generator.py
index ae78c50b8a..cfa77b1dd6 100644
--- a/src/main/python/tests/scuro/data_generator.py
+++ b/src/main/python/tests/scuro/data_generator.py
@@ -62,6 +62,7 @@ class TestDataLoader(BaseLoader):
 class ModalityRandomDataGenerator:
 
     def __init__(self):
+        np.random.seed(4)
         self.modality_id = 0
         self.modality_type = None
         self.metadata = {}
diff --git a/src/main/python/tests/scuro/test_hp_tuner.py 
b/src/main/python/tests/scuro/test_hp_tuner.py
index 73c498e236..de7c8f0217 100644
--- a/src/main/python/tests/scuro/test_hp_tuner.py
+++ b/src/main/python/tests/scuro/test_hp_tuner.py
@@ -78,31 +78,31 @@ class TestHPTuner(unittest.TestCase):
 
         self.run_hp_for_modality([audio])
 
-    # def test_multimodal_hp_tuning(self):
-    #     audio_data, audio_md = 
ModalityRandomDataGenerator().create_audio_data(
-    #         self.num_instances, 3000
-    #     )
-    #     audio = UnimodalModality(
-    #         TestDataLoader(
-    #             self.indices, None, ModalityType.AUDIO, audio_data, 
np.float32, audio_md
-    #         )
-    #     )
-    #
-    #     text_data, text_md = ModalityRandomDataGenerator().create_text_data(
-    #         self.num_instances
-    #     )
-    #     text = UnimodalModality(
-    #         TestDataLoader(
-    #             self.indices, None, ModalityType.TEXT, text_data, str, 
text_md
-    #         )
-    #     )
-    #
-    #     self.run_hp_for_modality(
-    #         [audio, text], multimodal=True, 
tune_unimodal_representations=True
-    #     )
-    #     self.run_hp_for_modality(
-    #         [audio, text], multimodal=True, 
tune_unimodal_representations=False
-    #     )
+    def test_multimodal_hp_tuning(self):
+        audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data(
+            self.num_instances, 3000
+        )
+        audio = UnimodalModality(
+            TestDataLoader(
+                self.indices, None, ModalityType.AUDIO, audio_data, 
np.float32, audio_md
+            )
+        )
+
+        text_data, text_md = ModalityRandomDataGenerator().create_text_data(
+            self.num_instances
+        )
+        text = UnimodalModality(
+            TestDataLoader(
+                self.indices, None, ModalityType.TEXT, text_data, str, text_md
+            )
+        )
+
+        # self.run_hp_for_modality(
+        #     [audio, text], multimodal=True, 
tune_unimodal_representations=True
+        # )
+        self.run_hp_for_modality(
+            [audio, text], multimodal=True, tune_unimodal_representations=False
+        )
 
     def test_hp_tuner_for_text_modality(self):
         text_data, text_md = ModalityRandomDataGenerator().create_text_data(
@@ -130,7 +130,7 @@ class TestHPTuner(unittest.TestCase):
             },
         ):
             registry = Registry()
-            registry._fusion_operators = [Average, Concatenation, LSTM]
+            registry._fusion_operators = [LSTM]
             unimodal_optimizer = UnimodalOptimizer(modalities, self.tasks, 
False)
             unimodal_optimizer.optimize()
 
@@ -159,8 +159,32 @@ class TestHPTuner(unittest.TestCase):
             else:
                 hp.tune_unimodal_representations(max_eval_per_rep=10)
 
-            assert len(hp.results) == len(self.tasks)
-            assert len(hp.results[self.tasks[0].model.name]) == 2
+            assert len(hp.optimization_results.results) == len(self.tasks)
+            if multimodal:
+                if tune_unimodal_representations:
+                    assert (
+                        len(
+                            
hp.optimization_results.results[self.tasks[0].model.name][0]
+                        )
+                        == 1
+                    )
+                else:
+                    assert (
+                        len(
+                            
hp.optimization_results.results[self.tasks[0].model.name][
+                                "mm_results"
+                            ]
+                        )
+                        == 1
+                    )
+            else:
+                assert (
+                    
len(hp.optimization_results.results[self.tasks[0].model.name]) == 1
+                )
+                assert (
+                    
len(hp.optimization_results.results[self.tasks[0].model.name][0])
+                    == 2
+                )
 
 
 if __name__ == "__main__":

Reply via email to