This is an automated email from the ASF dual-hosted git repository.

christinadionysio pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 574a816325 [SYSTEMDS-3947] Implement HP-tuning for Scuro 
Representation DAGs
574a816325 is described below

commit 574a816325b9f1fb80321dc34beea73bd23d88ab
Author: Christina Dionysio <[email protected]>
AuthorDate: Wed Apr 29 07:03:50 2026 +0200

    [SYSTEMDS-3947] Implement HP-tuning for Scuro Representation DAGs
    
    This patch improves the existing HP Tuner for the new unimodal optimizer.
---
 .../scuro/drsearch/hyperparameter_tuner.py         | 469 +++++++++++++++++----
 .../systemds/scuro/drsearch/node_executor.py       |  97 +++--
 .../systemds/scuro/drsearch/node_scheduler.py      |  23 +-
 .../systemds/scuro/drsearch/representation_dag.py  |  21 +-
 .../systemds/scuro/drsearch/unimodal_optimizer.py  |  30 +-
 .../python/systemds/scuro/modality/transformed.py  |   7 +-
 .../representations/aggregated_representation.py   |  14 +-
 .../python/systemds/scuro/representations/bert.py  |   2 +-
 .../python/systemds/scuro/representations/bow.py   |   8 +-
 .../scuro/representations/mlp_averaging.py         |  13 +-
 .../python/systemds/scuro/representations/tfidf.py |  11 +-
 .../scuro/representations/window_aggregation.py    |  27 +-
 src/main/python/tests/scuro/test_hp_tuner.py       | 354 +++++++++-------
 .../python/tests/scuro/test_unimodal_optimizer.py  | 119 ++++--
 14 files changed, 811 insertions(+), 384 deletions(-)

diff --git a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py 
b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
index 64141dc929..0737f18d62 100644
--- a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
+++ b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
@@ -20,16 +20,19 @@
 # -------------------------------------------------------------
 from typing import Dict, List, Tuple, Any, Optional
 import os
-from skopt.space import Real, Integer, Categorical
 import numpy as np
 import logging
 from dataclasses import dataclass
 import time
 import copy
 from joblib import Parallel, delayed
-from skopt import Optimizer
+import itertools
+import math
+import random
 from systemds.scuro.drsearch.representation_dag import (
     RepresentationDAGBuilder,
+    RepresentationDag,
+    RepresentationNode,
 )
 from systemds.scuro.modality.modality import Modality
 from systemds.scuro.drsearch.task import PerformanceMeasure
@@ -110,13 +113,16 @@ class HyperparameterTuner:
         tasks,
         optimization_results,
         k: int = 2,
-        n_jobs: int = -1,
+        n_jobs: int = 1,
         scoring_metric: str = "accuracy",
         maximize_metric: bool = True,
         save_results: bool = False,
         debug: bool = False,
         checkpoint_every: Optional[int] = None,
         resume: bool = True,
+        random_state: int = 42,
+        exhaustive_threshold: int = 256,
+        local_search_patience: int = 3,
     ):
         self.tasks = tasks
         self.unimodal_optimization_results = optimization_results
@@ -136,6 +142,10 @@ class HyperparameterTuner:
         self.logger = logging.getLogger(__name__)
         self.checkpoint_every = checkpoint_every
         self.resume = resume
+        self.random_state = random_state
+        self.exhaustive_threshold = max(1, exhaustive_threshold)
+        self.local_search_patience = max(1, local_search_patience)
+        self._rng = random.Random(self.random_state)
         self._checkpoint_manager = CheckpointManager(
             os.getcwd(),
             "hyperparam_checkpoint_",
@@ -209,11 +219,12 @@ class HyperparameterTuner:
         self.resume_from_checkpoint()
         for task in self.tasks:
             reps = self.k_best_representations[task.model.name]
-            skip_remaining = 
self._checkpoint_manager.skip_remaining_by_key.get(
-                task.model.name, 0
-            )
-            if skip_remaining >= len(reps):
-                continue
+            skip_remaining = 0
+            # skip_remaining = 
self._checkpoint_manager.skip_remaining_by_key.get(
+            #     task.model.name, 0
+            # )
+            # if skip_remaining >= len(reps):
+            #     continue
 
             chunk_size = self.checkpoint_every or len(reps)
             for start_idx in range(skip_remaining, len(reps), chunk_size):
@@ -258,8 +269,9 @@ class HyperparameterTuner:
                 visit_node(input_id)
             visited.add(node_id)
             if node.operation is not None:
-                if node.operation().parameters:
-                    hyperparams[node_id] = node.operation().parameters
+                params = self._get_params_for_node(node)
+                if params:
+                    hyperparams[node_id] = params
                 reps.append(node.operation)
                 node_order.append(node_id)
             if node.modality_id is not None:
@@ -267,85 +279,39 @@ class HyperparameterTuner:
 
         visit_node(root_node_id)
 
-        if not hyperparams:
-            return None
-
         start_time = time.time()
         rep_name = "-".join([rep.__name__ for rep in reps])
-
-        search_space = []
-        param_names = []
-        for op_id, op_params in hyperparams.items():
-            for param_name, param_values in op_params.items():
-                param_names.append(op_id + "-" + param_name)
-                if isinstance(param_values, list):
-                    search_space.append(
-                        Categorical(param_values, name=op_id + "-" + 
param_name)
-                    )
-                elif isinstance(param_values, tuple) and len(param_values) == 
2:
-                    if isinstance(param_values[0], int) and isinstance(
-                        param_values[1], int
-                    ):
-                        search_space.append(
-                            Integer(
-                                param_values[0],
-                                param_values[1],
-                                name=op_id + "-" + param_name,
-                            )
-                        )
-                    else:
-                        search_space.append(
-                            Real(
-                                param_values[0],
-                                param_values[1],
-                                name=op_id + "-" + param_name,
-                            )
-                        )
-                else:
-                    search_space.append(
-                        Categorical([param_values], name=op_id + "-" + 
param_name)
-                    )
-
-        n_calls = max_evals if max_evals else 50
-
-        all_results = []
-
-        def evaluate_point(point):
-            params = dict(zip(param_names, point))
-            result = self.evaluate_dag_config(
+        modalities_override = (
+            self._get_cached_modalities_for_task(task, modality_ids) if mm_opt 
else None
+        )
+        if not hyperparams:
+            # TODO: extract the information from the unimodal optimization 
results
+            baseline = self.evaluate_dag_config(
                 dag,
-                params,
+                {},
                 node_order,
                 modality_ids,
                 task,
-                modalities_override=(
-                    self._get_cached_modalities_for_task(task, modality_ids)
-                    if mm_opt
-                    else None
-                ),
+                modalities_override=modalities_override,
+            )
+            all_results = [baseline]
+        else:
+            n_calls = max_evals if max_evals else 50
+            param_specs = self._build_param_specs(hyperparams)
+            default_config = {}
+            all_results = self._search_best_configs(
+                dag=dag,
+                task=task,
+                node_order=node_order,
+                modality_ids=modality_ids,
+                modalities_override=modalities_override,
+                param_specs=param_specs,
+                budget=n_calls,
+                initial_config=None,
             )
-            score = result[1]
-            if isinstance(score, PerformanceMeasure):
-                score = score.average_scores[self.scoring_metric]
-            if self.maximize_metric:
-                objective_value = -score
-            else:
-                objective_value = score
-            return objective_value, result
 
-        opt = Optimizer(
-            search_space, random_state=42, n_initial_points=min(10, n_calls // 
2)
-        )
-        self.n_jobs = 2
-        n_batch = min(abs(self.n_jobs), n_calls) if self.n_jobs != 0 else 1
-        for _ in range(0, n_calls, n_batch):
-            points = opt.ask(n_points=n_batch)
-            results = Parallel(
-                n_jobs=self.n_jobs, max_nbytes=None, mmap_mode=None, 
backend="threading"
-            )(delayed(evaluate_point)(p) for p in points)
-            objective_values = [result[0] for result in results]
-            all_results.extend(result[1] for result in results)
-            opt.tell(points, objective_values)
+        if not all_results:
+            return None
 
         def get_score(result):
             score = result[1]
@@ -359,7 +325,21 @@ class HyperparameterTuner:
             best_params, best_score = min(all_results, key=get_score)
 
         tuning_time = time.time() - start_time
-
+        # results = 
self.unimodal_optimization_results.results[self.modalities[0].modality_id][task.model.name]
+
+        # default_result = sorted(
+        #     results,
+        #     key=lambda r: r.val_score[self.scoring_metric],
+        #     reverse=True,
+        # )[0]
+        # pm = PerformanceMeasure(name=self.scoring_metric, 
metrics=self.scoring_metric, higher_is_better=self.maximize_metric)
+        # pm.add_scores({self.scoring_metric: 
default_result.val_score[self.scoring_metric]})
+        # default_params = self._get_default_params(dag)
+        # def_par ={}
+        # for k, v in default_params.items():
+        #     for k_v, v_v in v.items():
+        #         def_par[k+"-"+k_v] = v_v
+        # all_results.append((def_par, pm))
         best_result = HyperparamResult(
             representation_name=rep_name,
             best_params=best_params,
@@ -374,6 +354,320 @@ class HyperparameterTuner:
 
         return best_result
 
+    def _get_params_for_node(self, node: RepresentationNode) -> Dict[str, Any]:
+        if not node.operation().parameters:
+            return None
+
+        params = copy.deepcopy(node.operation().parameters)
+        return params
+
+    def _build_param_specs(
+        self, hyperparams: Dict[str, Dict[str, Any]]
+    ) -> List[Dict[str, Any]]:
+        param_specs = []
+        for op_id, op_params in hyperparams.items():
+            for param_name, param_values in op_params.items():
+                full_name = op_id + "-" + param_name
+                if isinstance(param_values, list):
+                    param_type = "categorical"
+                    domain = list(param_values)
+                elif isinstance(param_values, tuple) and len(param_values) == 
2:
+                    lo, hi = param_values
+                    if isinstance(lo, int) and isinstance(hi, int):
+                        param_type = "integer"
+                    else:
+                        param_type = "real"
+                    domain = (lo, hi)
+                else:
+                    param_type = "categorical"
+                    domain = [param_values]
+                param_specs.append(
+                    {"name": full_name, "type": param_type, "domain": domain}
+                )
+        return param_specs
+
+    def _config_key(self, params: Dict[str, Any]) -> Tuple[Tuple[str, Any], 
...]:
+        key_items = []
+        for name, value in sorted(params.items()):
+            if isinstance(value, float):
+                value = round(value, 10)
+            key_items.append((name, value))
+        return tuple(key_items)
+
+    def _score_value(self, score: Any) -> float:
+        if isinstance(score, PerformanceMeasure):
+            return score.average_scores.get(self.scoring_metric, np.nan)
+        return score
+
+    def _is_better(self, candidate_score: float, best_score: float) -> bool:
+        if np.isnan(candidate_score):
+            return False
+        if np.isnan(best_score):
+            return True
+        return (
+            candidate_score > best_score
+            if self.maximize_metric
+            else candidate_score < best_score
+        )
+
+    def _sample_random_config(
+        self, param_specs: List[Dict[str, Any]]
+    ) -> Dict[str, Any]:
+        config = {}
+        for spec in param_specs:
+            name = spec["name"]
+            domain = spec["domain"]
+            if spec["type"] == "categorical":
+                config[name] = self._rng.choice(domain)
+            elif spec["type"] == "integer":
+                config[name] = self._rng.randint(int(domain[0]), 
int(domain[1]))
+            else:
+                config[name] = self._rng.uniform(float(domain[0]), 
float(domain[1]))
+        return config
+
+    def _estimate_discrete_search_size(
+        self, param_specs: List[Dict[str, Any]]
+    ) -> Optional[int]:
+        size = 1
+        for spec in param_specs:
+            if spec["type"] == "real":
+                return None
+            if spec["type"] == "integer":
+                size *= max(0, int(spec["domain"][1]) - int(spec["domain"][0]) 
+ 1)
+            else:
+                size *= len(spec["domain"])
+            if size > self.exhaustive_threshold:
+                return size
+        return size
+
+    def _enumerate_configs(
+        self, param_specs: List[Dict[str, Any]]
+    ) -> List[Dict[str, Any]]:
+        domains = []
+        names = []
+        for spec in param_specs:
+            names.append(spec["name"])
+            if spec["type"] == "integer":
+                lo, hi = int(spec["domain"][0]), int(spec["domain"][1])
+                domains.append(list(range(lo, hi + 1)))
+            else:
+                domains.append(list(spec["domain"]))
+        return [dict(zip(names, values)) for values in 
itertools.product(*domains)]
+
+    def _generate_neighbor_config(
+        self,
+        base_config: Dict[str, Any],
+        param_specs: List[Dict[str, Any]],
+        step_scale: float,
+    ) -> Dict[str, Any]:
+        candidate = dict(base_config)
+        if not param_specs:
+            return candidate
+
+        n_mutations = 1 if len(param_specs) == 1 else self._rng.randint(1, 2)
+        mutated_specs = self._rng.sample(
+            param_specs, k=min(n_mutations, len(param_specs))
+        )
+        for spec in mutated_specs:
+            name = spec["name"]
+            current = candidate[name]
+            if spec["type"] == "categorical":
+                values = [value for value in spec["domain"] if value != 
current]
+                if values:
+                    candidate[name] = self._rng.choice(values)
+            elif spec["type"] == "integer":
+                lo, hi = int(spec["domain"][0]), int(spec["domain"][1])
+                width = max(1, hi - lo)
+                step = max(1, int(math.ceil(width * step_scale)))
+                delta = self._rng.choice([-step, step])
+                candidate[name] = max(lo, min(hi, int(current) + delta))
+            else:
+                lo, hi = float(spec["domain"][0]), float(spec["domain"][1])
+                span = max(1e-9, hi - lo)
+                delta = self._rng.uniform(-span * step_scale, span * 
step_scale)
+                candidate[name] = max(lo, min(hi, float(current) + delta))
+        return candidate
+
+    def _evaluate_configs(
+        self,
+        dag,
+        task,
+        node_order,
+        modality_ids,
+        modalities_override,
+        candidate_configs: List[Dict[str, Any]],
+        seen_configs: Dict[Tuple[Tuple[str, Any], ...], Tuple[Dict[str, Any], 
Any]],
+    ) -> List[Tuple[Dict[str, Any], Any]]:
+        ordered_unique_configs = []
+        unique_keys_in_order = []
+        unique_keys_set = set()
+        for config in candidate_configs:
+            key = self._config_key(config)
+            if key not in unique_keys_set:
+                unique_keys_set.add(key)
+                unique_keys_in_order.append(key)
+                ordered_unique_configs.append(config)
+
+        pending_configs = []
+        for config in ordered_unique_configs:
+            key = self._config_key(config)
+            if key not in seen_configs:
+                pending_configs.append(config)
+
+        if pending_configs:
+            n_jobs = self.n_jobs if self.n_jobs != 0 else 1
+            evaluated = Parallel(
+                n_jobs=n_jobs, max_nbytes=None, mmap_mode=None, 
backend="threading"
+            )(
+                delayed(self.evaluate_dag_config)(
+                    dag,
+                    config,
+                    node_order,
+                    modality_ids,
+                    task,
+                    modalities_override=modalities_override,
+                )
+                for config in pending_configs
+            )
+            for result in evaluated:
+                seen_configs[self._config_key(result[0])] = result
+
+        return [
+            seen_configs[key] for key in unique_keys_in_order if key in 
seen_configs
+        ]
+
+    def _search_best_configs(
+        self,
+        dag,
+        task,
+        node_order,
+        modality_ids,
+        modalities_override,
+        param_specs: List[Dict[str, Any]],
+        budget: int,
+        initial_config: Dict[str, Any],
+    ) -> List[Tuple[Dict[str, Any], Any]]:
+        budget = max(1, budget)
+        seen_configs: Dict[Tuple[Tuple[str, Any], ...], Tuple[Dict[str, Any], 
Any]] = {}
+        all_results: List[Tuple[Dict[str, Any], Any]] = []
+        best_score = np.nan
+        best_config = None
+        if initial_config is not None and budget > 0:
+            initial_results = self._evaluate_configs(
+                dag,
+                task,
+                node_order,
+                modality_ids,
+                modalities_override,
+                [initial_config],
+                seen_configs,
+            )
+            all_results.extend(initial_results)
+            if initial_results:
+                p, s = initial_results[0]
+                best_config = p
+                best_score = self._score_value(s)
+            budget -= 1
+
+        discrete_size = self._estimate_discrete_search_size(param_specs)
+        if discrete_size is not None and discrete_size <= min(
+            self.exhaustive_threshold, budget
+        ):
+            candidates = self._enumerate_configs(param_specs)
+            self._rng.shuffle(candidates)
+            candidates = candidates[:budget]
+            batch_results = self._evaluate_configs(
+                dag,
+                task,
+                node_order,
+                modality_ids,
+                modalities_override,
+                candidates,
+                seen_configs,
+            )
+            all_results.extend(batch_results)
+            return all_results
+
+        initial_budget = min(budget, max(8, len(param_specs) * 4))
+        initial_candidates = [
+            self._sample_random_config(param_specs) for _ in 
range(initial_budget)
+        ]
+        initial_results = self._evaluate_configs(
+            dag,
+            task,
+            node_order,
+            modality_ids,
+            modalities_override,
+            initial_candidates,
+            seen_configs,
+        )
+        all_results.extend(initial_results)
+
+        for params, score in initial_results:
+            numeric_score = self._score_value(score)
+            if self._is_better(numeric_score, best_score):
+                best_score = numeric_score
+                best_config = params
+
+        eval_count = len(seen_configs)
+        no_improvement_rounds = 0
+        step_scale = 0.5
+
+        while eval_count < budget:
+            if best_config is None:
+                candidate_batch = [self._sample_random_config(param_specs)]
+            else:
+                candidate_batch = []
+                batch_size = min(
+                    max(2, abs(self.n_jobs) if self.n_jobs != 0 else 1),
+                    budget - eval_count,
+                )
+                for _ in range(batch_size):
+                    candidate_batch.append(
+                        self._generate_neighbor_config(
+                            best_config, param_specs, step_scale
+                        )
+                    )
+
+                if budget - eval_count > 3:
+                    
candidate_batch.append(self._sample_random_config(param_specs))
+
+            batch_results = self._evaluate_configs(
+                dag,
+                task,
+                node_order,
+                modality_ids,
+                modalities_override,
+                candidate_batch,
+                seen_configs,
+            )
+            if not batch_results:
+                step_scale = max(0.05, step_scale * 0.5)
+                if step_scale <= 0.05:
+                    break
+                continue
+
+            improved = False
+            for params, score in batch_results:
+                numeric_score = self._score_value(score)
+                if self._is_better(numeric_score, best_score):
+                    best_score = numeric_score
+                    best_config = params
+                    improved = True
+            all_results.extend(batch_results)
+            eval_count = len(seen_configs)
+
+            if improved:
+                no_improvement_rounds = 0
+                step_scale = min(0.5, step_scale * 1.1)
+            else:
+                no_improvement_rounds += 1
+                step_scale = max(0.05, step_scale * 0.7)
+                if no_improvement_rounds >= self.local_search_patience:
+                    break
+
+        return all_results
+
     def _get_cached_modalities_for_task(self, task, modality_ids):
         if not self.k_best_cache_by_modality:
             return self.get_modalities_by_id(modality_ids)
@@ -402,12 +696,13 @@ class HyperparameterTuner:
                 else self.get_modalities_by_id(modality_ids)
             )
             modified_modality = dag_copy.execute(modalities, task)
-            score = task.run(
-                modified_modality[list(modified_modality.keys())[-1]].data
-            )[1]
+            score = task.run(modified_modality.data)[1]
 
             return params, score
         except Exception as e:
+            import traceback
+
+            traceback.print_exc()
             self.logger.error(f"Error evaluating DAG with params {params}: 
{e}")
             return params, np.nan
 
@@ -485,11 +780,11 @@ class HyperparameterTuner:
                     self.optimization_results.add_result([result])
                     self._checkpoint_manager.increment(task.model.name, 1)
                     self._checkpoint_manager.checkpoint_if_due(
-                        self.optimization_results.results, "eval_count_by_task"
+                        self.optimization_results.results
                     )
                 except Exception:
                     self._checkpoint_manager.save_checkpoint(
-                        self.optimization_results.results, 
"eval_count_by_task", {}
+                        self.optimization_results.results, {}
                     )
                     raise
         if self.save_results:
diff --git a/src/main/python/systemds/scuro/drsearch/node_executor.py 
b/src/main/python/systemds/scuro/drsearch/node_executor.py
index bf51dfde85..418b11fe70 100644
--- a/src/main/python/systemds/scuro/drsearch/node_executor.py
+++ b/src/main/python/systemds/scuro/drsearch/node_executor.py
@@ -271,7 +271,9 @@ class NodeExecutor:
         checkpoint_manager: Optional[CheckpointManager] = None,
         max_num_workers: int = -1,
         result_path: Optional[str] = None,
+        enable_checkpointing: bool = True,
     ):
+        self.enable_checkpointing = enable_checkpointing
         available_total_cpu = (
             float(psutil.virtual_memory().available)
             - float(psutil.virtual_memory().available) * 0.30
@@ -316,14 +318,17 @@ class NodeExecutor:
             def submit_node(node_id: str):
                 node = self.scheduler.mapping[node_id]
                 gpu_id = node.gpu_id
-                parent_result = None
-                parent_node_id = self.scheduler.get_valid_parent(node_id)
-                if parent_node_id is not None:
-                    parent_result = self.result_cache.get(parent_node_id)
+                parent_node_ids = self.scheduler.get_valid_parents(node_id)
+                parent_results = None
+                if parent_node_ids:
+                    parent_results = [
+                        self.result_cache.get(parent_node_id)
+                        for parent_node_id in parent_node_ids
+                    ]
                 if self._is_task_node(node):
                     task_result = ResultEntry(
                         dag=self._get_dag_from_node_ids(node_id),
-                        representation_time=parent_result.transform_time,
+                        representation_time=parent_results[0].transform_time,
                     )
                     task_results[node_id] = task_result
                     task_idx = int(node.parameters.get("_task_idx", 0))
@@ -333,8 +338,8 @@ class NodeExecutor:
                         self.tasks[task_idx],
                         (
                             self.modalities[0].data
-                            if parent_result is None
-                            else parent_result.data
+                            if parent_results is None
+                            else parent_results[0].data
                         ),
                         gpu_id,
                     )
@@ -342,7 +347,7 @@ class NodeExecutor:
                     future = executor.submit(
                         _execute_node_worker,
                         node,
-                        self.modalities if parent_result is None else 
[parent_result],
+                        self.modalities if parent_results is None else 
parent_results,
                         None,
                         None,
                         gpu_id,
@@ -393,25 +398,27 @@ class NodeExecutor:
                         task_results[node_id].test_score = result["scores"][
                             2
                         ].average_scores
-                        self.checkpoint_manager.increment(node_id)
-                        self.checkpoint_manager.checkpoint_if_due(task_results)
-                        self._checkpoint_memory_usage(
-                            node_id,
-                            peak_bytes,
-                            gpu_peak_bytes,
-                            "task",
-                            memory_usage_data,
-                            None,
-                        )
+                        if self.enable_checkpointing:
+                            self.checkpoint_manager.increment(node_id)
+                            
self.checkpoint_manager.checkpoint_if_due(task_results)
+                            self._checkpoint_memory_usage(
+                                node_id,
+                                peak_bytes,
+                                gpu_peak_bytes,
+                                "task",
+                                memory_usage_data,
+                                None,
+                            )
 
-                        parent_node_id = 
self.scheduler.get_valid_parent(node_id)
-                        if parent_node_id is not None:
-                            self.result_cache.dec_ref(parent_node_id)
-                            if (
-                                parent_node_id in self.result_cache.ref_count
-                                and 
self.result_cache.ref_count[parent_node_id] == 0
-                            ):
-                                self.result_cache.clear(parent_node_id)
+                        parent_node_ids = 
self.scheduler.get_valid_parents(node_id)
+                        if len(parent_node_ids) > 0:
+                            for parent_node_id in parent_node_ids:
+                                self.result_cache.dec_ref(parent_node_id)
+                                if (
+                                    parent_node_id in 
self.result_cache.ref_count
+                                    and 
self.result_cache.ref_count[parent_node_id] == 0
+                                ):
+                                    self.result_cache.clear(parent_node_id)
                         self.scheduler.complete_node(node_id)
 
                     else:
@@ -430,14 +437,15 @@ class NodeExecutor:
                             
self.scheduler.update_node_stats_and_reestimate_descendants(
                                 node_id, actual_stats
                             )
-                        self._checkpoint_memory_usage(
-                            node_id,
-                            peak_bytes,
-                            gpu_peak_bytes,
-                            result["operation_name"],
-                            memory_usage_data,
-                            transformed_modality.data,
-                        )
+                        if self.enable_checkpointing:
+                            self._checkpoint_memory_usage(
+                                node_id,
+                                peak_bytes,
+                                gpu_peak_bytes,
+                                result["operation_name"],
+                                memory_usage_data,
+                                transformed_modality.data,
+                            )
                         before_bytes = 
self.result_cache.get_memory_total_memory_usage()
                         self._manage_result_cache(node_id, 
transformed_modality)
                         after_bytes = 
self.result_cache.get_memory_total_memory_usage()
@@ -539,6 +547,9 @@ class NodeExecutor:
                 assert (
                     len(result) == node_stats.num_instances
                 ), f"Node {node_id} {operation_name} should have 
{node_stats.num_instances} instances, actual: {len(result)}"
+                # assert (
+                #     shape == node_stats.output_shape
+                # ), f"Node {node_id} {operation_name} should have shape of 
{node_stats.output_shape}, actual shape: {shape}"
             return shape
 
     def _infer_actual_output_stats(
@@ -575,20 +586,22 @@ class NodeExecutor:
         return None
 
     def _manage_result_cache(self, node_id: str, result: Any):
-        parent_node_id = self.scheduler.get_valid_parent(node_id)
-        if parent_node_id is not None:
-            self.result_cache.dec_ref(parent_node_id)
+        parent_node_ids = self.scheduler.get_valid_parents(node_id)
+        if len(parent_node_ids) > 0:
+            for parent_node_id in parent_node_ids:
+                self.result_cache.dec_ref(parent_node_id)
 
         if self.scheduler.get_children(node_id):
             for _ in self.scheduler.get_children(node_id):
                 self.result_cache.inc_ref(node_id)
             self.result_cache.add_result(node_id, result)
 
-        if (
-            parent_node_id in self.result_cache.ref_count
-            and self.result_cache.ref_count[parent_node_id] == 0
-        ):
-            self.result_cache.clear(parent_node_id)
+        for parent_node_id in parent_node_ids:
+            if (
+                parent_node_id in self.result_cache.ref_count
+                and self.result_cache.ref_count[parent_node_id] == 0
+            ):
+                self.result_cache.clear(parent_node_id)
 
     def _get_nodes_by_ids(self, nodes_ids: List[str]) -> 
List[RepresentationNode]:
         return [self.scheduler.mapping[node_id] for node_id in nodes_ids]
diff --git a/src/main/python/systemds/scuro/drsearch/node_scheduler.py 
b/src/main/python/systemds/scuro/drsearch/node_scheduler.py
index 4fe3ac1d03..ef3ccc844e 100644
--- a/src/main/python/systemds/scuro/drsearch/node_scheduler.py
+++ b/src/main/python/systemds/scuro/drsearch/node_scheduler.py
@@ -166,11 +166,12 @@ class MemoryAwareNodeScheduler:
 
         return False
 
-    def get_valid_parent(self, node_id: str) -> bool:
+    def get_valid_parents(self, node_id: str) -> bool:
+        parents = []
         for parent_id in self.parents[node_id]:
             if parent_id not in self.leaves:
-                return parent_id
-        return None
+                parents.append(parent_id)
+        return parents
 
     def get_children(self, node_id: str) -> List[str]:
         return list(self.children[node_id])
@@ -207,8 +208,9 @@ class MemoryAwareNodeScheduler:
                     params=self.mapping[desc_id].parameters
                 )
                 peak_memory = operation.estimate_peak_memory_bytes(input_stats)
+                input_stats_for_overhead = 
self._stats_for_overhead(input_stats)
                 peak_memory["cpu_peak_bytes"] += (
-                    64 * 1024 + 512 * input_stats.num_instances
+                    64 * 1024 + 512 * input_stats_for_overhead.num_instances
                 )
                 output_stats = operation.get_output_stats(input_stats)
 
@@ -395,14 +397,17 @@ class MemoryAwareNodeScheduler:
             else:
                 parent_ids = list(self.parents.get(node, set()))
                 parent_stats = [node_stats[parent_id] for parent_id in 
parent_ids]
-                input_stats = parent_stats[0] if parent_stats else None
+                input_stats = (
+                    parent_stats[0] if len(parent_stats) == 1 else parent_stats
+                )
                 if node not in self.roots:
                     operation = self.mapping[node].operation(
                         params=self.mapping[node].parameters
                     )
                     peak_memory = 
operation.estimate_peak_memory_bytes(input_stats)
+                    input_stats_for_overhead = 
self._stats_for_overhead(input_stats)
                     peak_memory["cpu_peak_bytes"] += (
-                        64 * 1024 + 512 * input_stats.num_instances
+                        64 * 1024 + 512 * 
input_stats_for_overhead.num_instances
                     )  # Placeholder for transformed modality creation overhead
                     peak_memory["cpu_peak_bytes"] *= 1
                     output_stats = operation.get_output_stats(input_stats)
@@ -429,6 +434,12 @@ class MemoryAwareNodeScheduler:
         self.node_stats = node_stats
         return node_resources
 
+    @staticmethod
+    def _stats_for_overhead(input_stats: Any) -> Any:
+        if isinstance(input_stats, list) and len(input_stats) > 0:
+            return input_stats[0]
+        return input_stats
+
     @staticmethod
     def _stats_to_bytes(stats: Optional[Any], dtype_size: int = 4) -> int:
         if stats is None:
diff --git a/src/main/python/systemds/scuro/drsearch/representation_dag.py 
b/src/main/python/systemds/scuro/drsearch/representation_dag.py
index b2911dbe7a..099732e46d 100644
--- a/src/main/python/systemds/scuro/drsearch/representation_dag.py
+++ b/src/main/python/systemds/scuro/drsearch/representation_dag.py
@@ -74,7 +74,8 @@ class RepresentationNode:
 @dataclass
 class RepresentationDag:
 
-    def __init__(self, nodes: List[Any], root_node_id):
+    def __init__(self, nodes: List[Any], root_node_id, dag_id: int = None):
+        self.dag_id = dag_id
         self.root_node_id = root_node_id
         self.nodes = self.filter_connected_nodes(nodes)
 
@@ -399,6 +400,7 @@ class RepresentationDAGBuilder:
     def __init__(self):
         self.nodes = []
         self.node_counter = 0
+        self.dag_counter = 0
 
     def create_leaf_node(
         self, modality_id: str, representation_index: int = -1, operation=None
@@ -431,10 +433,13 @@ class RepresentationDAGBuilder:
         self.nodes.append(node)
         return node_id
 
-    def build(self, root_node_id: str) -> RepresentationDag:
+    def build(self, root_node_id: str, dag_id: int = None) -> 
RepresentationDag:
         dag = RepresentationDag(
-            nodes=copy.deepcopy(self.nodes), root_node_id=root_node_id
+            nodes=copy.deepcopy(self.nodes),
+            root_node_id=root_node_id,
+            dag_id=dag_id if dag_id is not None else self.dag_counter + 1,
         )
+        self.dag_counter += 1
         if not dag.validate():
             raise ValueError("Invalid DAG construction")
         return dag
@@ -524,6 +529,7 @@ class CSEAwareDAGBuilder:
         self.signature_to_node: Dict[Hashable, str] = {}
         self.node_to_signature: Dict[str, Hashable] = {}
         self.node_counter = 0
+        self.dag_counter = 0
 
     def _compute_node_signature(
         self, operation: Any, inputs: List[str], parameters: Dict[str, Any] = 
None
@@ -602,8 +608,13 @@ class CSEAwareDAGBuilder:
             operation=operation, inputs=inputs, parameters=parameters, 
is_leaf=False
         )
 
-    def build(self, root_node_id: str) -> RepresentationDag:
-        dag = RepresentationDag(nodes=self.global_nodes, 
root_node_id=root_node_id)
+    def build(self, root_node_id: str, dag_id: int = None) -> 
RepresentationDag:
+        dag = RepresentationDag(
+            nodes=self.global_nodes,
+            root_node_id=root_node_id,
+            dag_id=dag_id if dag_id is not None else self.dag_counter + 1,
+        )
+        self.dag_counter += 1
         if not dag.validate():
             raise ValueError("Invalid DAG construction")
         return dag
diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py 
b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
index f733dc61ae..215f27929f 100644
--- a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
+++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
@@ -62,7 +62,9 @@ class UnimodalOptimizer:
         checkpoint_every: Optional[int] = 1,
         resume: bool = False,
         max_num_workers: int = -1,
+        enable_checkpointing: bool = True,
     ):
+        self.enable_checkpointing = enable_checkpointing
         self.modalities = modalities
         self.tasks = tasks
         self.modality_ids = [modality.modality_id for modality in modalities]
@@ -295,7 +297,9 @@ class UnimodalOptimizer:
                 )
 
                 task_root_dag = RepresentationDag(
-                    nodes=[*dag.nodes, task_node], root_node_id=task_node_id
+                    nodes=[*dag.nodes, task_node],
+                    root_node_id=task_node_id,
+                    dag_id=dag.dag_id,
                 )
                 expanded_dags.append(task_root_dag)
         return expanded_dags
@@ -329,25 +333,28 @@ class UnimodalOptimizer:
             )
 
         dags = 
self.add_aggregation_operator(self.builders[modality.modality_id], dags)
-        dags = pushdown_aggregation(dags)
+        dags_with_pushdown = pushdown_aggregation(dags)
 
         if skip_remaining > 0:
             dags = dags[skip_remaining:]
 
-        expanded_dags = self._expand_dags_with_task_roots(dags)
+        expanded_dags_with_task_roots = self._expand_dags_with_task_roots(
+            dags_with_pushdown
+        )
 
         node_executor = NodeExecutor(
-            expanded_dags,
+            expanded_dags_with_task_roots,
             [modality],
             self.tasks,
             self._checkpoint_manager,
             self.max_num_workers,
             self.result_path,
+            enable_checkpointing=self.enable_checkpointing,
         )
         task_results = node_executor.run()
 
         for task_result in task_results:
-            local_results.add_task_result(task_result)
+            local_results.add_task_result(task_result, dags)
 
         if self.save_all_results:
             timestr = time.strftime("%Y%m%d-%H%M%S")
@@ -515,7 +522,7 @@ class UnimodalOptimizer:
                         [dag.root_node_id],
                         agg_op.get_current_parameters(),
                     )
-                    aggregated_dags.append(builder.build(agg_node_id))
+                    aggregated_dags.append(builder.build(agg_node_id, 
dag.dag_id))
                 else:
                     aggregated_dags.append(dag)
             new_dags = aggregated_dags
@@ -596,10 +603,12 @@ class UnimodalResults:
             self.results[modality] = {task_name: [] for task_name in 
self.task_names}
             self.cache[modality] = {task_name: [] for task_name in 
self.task_names}
 
-    def add_task_result(self, task_result: ResultEntry):
+    def add_task_result(self, task_result: ResultEntry, dags: 
List[RepresentationDag]):
+        dag_id = task_result.dag.dag_id
         task_name = self.task_names[
             task_result.dag.nodes[-1].parameters.get("_task_idx", 0)
         ]
+        task_result.dag = get_dag_by_id(dags, dag_id)
         self.results[task_result.dag.nodes[0].modality_id][task_name].append(
             task_result
         )
@@ -716,3 +725,10 @@ class UnimodalResults:
             ] = cache
 
         return results, cache
+
+
+def get_dag_by_id(dags: List[RepresentationDag], dag_id: int) -> 
RepresentationDag:
+    for dag in dags:
+        if dag.dag_id == dag_id:
+            return dag
+    return None
diff --git a/src/main/python/systemds/scuro/modality/transformed.py 
b/src/main/python/systemds/scuro/modality/transformed.py
index eaaa7a2032..e02695166d 100644
--- a/src/main/python/systemds/scuro/modality/transformed.py
+++ b/src/main/python/systemds/scuro/modality/transformed.py
@@ -19,6 +19,7 @@
 #
 # -------------------------------------------------------------
 from typing import Union, List
+import inspect
 import numpy as np
 from systemds.scuro.modality.type import ModalityType
 from systemds.scuro.modality.joined import JoinedModality
@@ -165,7 +166,11 @@ class TransformedModality(Modality):
 
     def apply_representation(self, representation, aggregation=None):
         start = time.time()
-        new_modality = representation.transform(self, aggregation=aggregation)
+        transform_sig = inspect.signature(representation.transform)
+        if "aggregation" in transform_sig.parameters:
+            new_modality = representation.transform(self, 
aggregation=aggregation)
+        else:
+            new_modality = representation.transform(self)
         new_modality.update_metadata()
         new_modality.transform_time += time.time() - start
         new_modality.self_contained = representation.self_contained
diff --git 
a/src/main/python/systemds/scuro/representations/aggregated_representation.py 
b/src/main/python/systemds/scuro/representations/aggregated_representation.py
index 4caa22762b..85744f9209 100644
--- 
a/src/main/python/systemds/scuro/representations/aggregated_representation.py
+++ 
b/src/main/python/systemds/scuro/representations/aggregated_representation.py
@@ -52,9 +52,9 @@ class AggregatedRepresentation(Representation):
 
     def get_output_stats(self, input_stats: RepresentationStats) -> 
RepresentationStats:
         input_shape = list(copy.deepcopy(input_stats.output_shape))
-        input_aggregate_dim = copy.deepcopy(input_stats.aggregate_dim)
-        for input_aggregate_dim in reversed(input_aggregate_dim):
-            input_shape.pop(input_aggregate_dim)
+        if self.target_dimensions is not None:
+            while len(input_shape) > self.target_dimensions:
+                input_shape.pop()
         out_shape = tuple(input_shape)
         self.stats = RepresentationStats(
             input_stats.num_instances,
@@ -100,11 +100,11 @@ class AggregatedRepresentation(Representation):
             if len(input_dimensions) == self.target_dimensions:
                 return modality
             else:
-
-                i = 1
-                while len(input_dimensions) - 1 > self.target_dimensions:
+                i = len(input_dimensions) - 1
+                aggregate_dim = ()
+                while len(input_dimensions) > self.target_dimensions:
                     aggregate_dim = aggregate_dim + (i,)
-                    i += 1
+                    i -= 1
                     input_dimensions = input_dimensions[:-1]
 
         aggregated_data = self.aggregation.execute(modality, aggregate_dim)
diff --git a/src/main/python/systemds/scuro/representations/bert.py 
b/src/main/python/systemds/scuro/representations/bert.py
index ab23f46345..fcaed8d493 100644
--- a/src/main/python/systemds/scuro/representations/bert.py
+++ b/src/main/python/systemds/scuro/representations/bert.py
@@ -329,7 +329,7 @@ class Bert(BertFamily):
         self.C = 0.3
 
 
-# @register_representation(ModalityType.TEXT)
+@register_representation(ModalityType.TEXT)
 class RoBERTa(BertFamily):
     def __init__(
         self,
diff --git a/src/main/python/systemds/scuro/representations/bow.py 
b/src/main/python/systemds/scuro/representations/bow.py
index 65e490972a..9e55add5de 100644
--- a/src/main/python/systemds/scuro/representations/bow.py
+++ b/src/main/python/systemds/scuro/representations/bow.py
@@ -43,7 +43,7 @@ class BoW(UnimodalRepresentation):
 
     def get_output_stats(self, input_stats: TextStats) -> RepresentationStats:
         vocab_estimate = min(
-            100_000,
+            100000,
             max(
                 1000,
                 input_stats.num_instances * input_stats.max_length * 
self.ngram_range,
@@ -76,7 +76,11 @@ class BoW(UnimodalRepresentation):
             ngram_range=(1, self.ngram_range), min_df=self.min_df
         )
 
-        X = vectorizer.fit_transform(modality.data).toarray()
+        X = (
+            vectorizer.fit_transform(modality.data)
+            .toarray()
+            .astype(np.float32, copy=False)
+        )
 
         if self.output_file is not None:
             save_embeddings(X, self.output_file)
diff --git a/src/main/python/systemds/scuro/representations/mlp_averaging.py 
b/src/main/python/systemds/scuro/representations/mlp_averaging.py
index bedfaf415b..8c8d67a06e 100644
--- a/src/main/python/systemds/scuro/representations/mlp_averaging.py
+++ b/src/main/python/systemds/scuro/representations/mlp_averaging.py
@@ -118,19 +118,18 @@ class MLPAveraging(DimensionalityReduction):
 
         batch_input_bytes = batch * input_dim * elem_size
         batch_output_bytes = batch * out_dim * elem_size
-
-        num_batches = (n + batch - 1) // batch
-        python_overhead = num_batches * 1024
-
+        input_torch_copy_bytes = input_bytes
+        output_accum_transient_bytes = output_bytes
         cpu_working = (
             input_bytes
-            + 2 * output_bytes
+            + input_torch_copy_bytes
+            + output_bytes
+            + output_accum_transient_bytes
             + weight_bytes
             + batch_input_bytes
             + batch_output_bytes
-            + python_overhead
         )
-        cpu_peak = int(cpu_working * 1.20 + 64 * 1024**2)
+        cpu_peak = int(cpu_working * 1.15 + 64 * 1024**2)
 
         gpu_working = weight_bytes + batch_input_bytes + batch_output_bytes
         gpu_peak = int(gpu_working * 1.35 + 560 * 1024**2)
diff --git a/src/main/python/systemds/scuro/representations/tfidf.py 
b/src/main/python/systemds/scuro/representations/tfidf.py
index 3fd61099f1..0b603f247e 100644
--- a/src/main/python/systemds/scuro/representations/tfidf.py
+++ b/src/main/python/systemds/scuro/representations/tfidf.py
@@ -56,19 +56,16 @@ class TfIdf(UnimodalRepresentation):
         )
 
     def estimate_peak_memory_bytes(self, input_stats: TextStats) -> dict:
-        output_bytes = self.estimate_output_memory_bytes(input_stats)
-        vectorizer_overhead = 640 * 1024
-        return {
-            "cpu_peak_bytes": output_bytes + vectorizer_overhead,
-            "gpu_peak_bytes": 0,
-        }
+        dense_bytes = self.estimate_output_memory_bytes(input_stats)
+        cpu_peak = int(dense_bytes * 2.2 + 32 * 1024 * 1024)
+        return {"cpu_peak_bytes": cpu_peak, "gpu_peak_bytes": 0}
 
     def transform(self, modality, aggregation=None):
         transformed_modality = TransformedModality(modality, self)
 
         vectorizer = TfidfVectorizer(min_df=self.min_df)
 
-        X = vectorizer.fit_transform(modality.data)
+        X = vectorizer.fit_transform(modality.data).astype(np.float32, 
copy=False)
         if self.output_file is not None:
             save_embeddings(X, self.output_file)
 
diff --git 
a/src/main/python/systemds/scuro/representations/window_aggregation.py 
b/src/main/python/systemds/scuro/representations/window_aggregation.py
index a34b6ebe4c..541a7b68fb 100644
--- a/src/main/python/systemds/scuro/representations/window_aggregation.py
+++ b/src/main/python/systemds/scuro/representations/window_aggregation.py
@@ -167,11 +167,18 @@ class WindowAggregation(Window):
         in_numel = effective_seq_len * self._rest_numel(in_shape)
         output_bytes = self.estimate_output_memory_bytes(input_stats)
         one_instance_bytes = in_numel * np.dtype(self.data_type).itemsize
-        cpu_peak = (
-            output_bytes * 2
-            + one_instance_bytes * input_stats.num_instances
-            + one_instance_bytes * self.window_size
-            + 8 * 1024 * 1024
+        input_bytes = one_instance_bytes * input_stats.num_instances
+
+        output_transient = output_bytes
+
+        pad_overhead = 0
+        if getattr(self, "pad", False):
+            out_seq_len = math.ceil(in_shape[0] / self.window_size)
+            pad_overhead = int(input_stats.num_instances * out_seq_len * 8)
+
+        cpu_peak = int(
+            (input_bytes + output_bytes + output_transient + pad_overhead) * 
1.15
+            + 16 * 1024 * 1024
         )
         return {"cpu_peak_bytes": cpu_peak, "gpu_peak_bytes": 0}
 
@@ -298,12 +305,12 @@ class StaticWindow(Window):
             return {"cpu_peak_bytes": 0, "gpu_peak_bytes": 0}
         effective_seq_len = in_shape[0]
         in_numel = effective_seq_len * self._rest_numel(in_shape)
-        output_bytes = self.estimate_output_memory_bytes(input_stats)
         one_instance_bytes = in_numel * np.dtype(self.data_type).itemsize
-        cpu_peak = (
-            output_bytes * 2
-            + one_instance_bytes * input_stats.num_instances
-            + 8 * 1024 * 1024
+        input_bytes = one_instance_bytes * input_stats.num_instances
+        output_bytes = self.estimate_output_memory_bytes(input_stats)
+        output_transient = output_bytes
+        cpu_peak = int(
+            (input_bytes + output_bytes + output_transient) * 1.12 + 12 * 1024 
* 1024
         )
         return {"cpu_peak_bytes": cpu_peak, "gpu_peak_bytes": 0}
 
diff --git a/src/main/python/tests/scuro/test_hp_tuner.py 
b/src/main/python/tests/scuro/test_hp_tuner.py
index 72591d74cc..c418cefcae 100644
--- a/src/main/python/tests/scuro/test_hp_tuner.py
+++ b/src/main/python/tests/scuro/test_hp_tuner.py
@@ -20,6 +20,7 @@
 # -------------------------------------------------------------
 
 import unittest
+from types import SimpleNamespace
 
 import numpy as np
 
@@ -54,163 +55,196 @@ from unittest.mock import patch
 
 
 class TestHPTuner(unittest.TestCase):
-    # Note: HPTuner is being refactored and not yet ready for testing
-    pass
-
-
-#     data_generator = None
-#     num_instances = 0
-
-#     @classmethod
-#     def setUpClass(cls):
-#         cls.num_instances = 10
-#         cls.mods = [
-#             ModalityType.VIDEO,
-#             ModalityType.AUDIO,
-#             ModalityType.TEXT,
-#             ModalityType.IMAGE,
-#         ]
-#         cls.indices = np.array(range(cls.num_instances))
-#         cls.tasks = [
-#             TestTask("UnimodalRepresentationTask1", "TestSVM1", 
cls.num_instances),
-#             TestTask("UnimodalRepresentationTask2", "TestSVM2", 
cls.num_instances),
-#         ]
-
-#     def test_hp_tuner_for_audio_modality(self):
-#         audio_data, audio_md = 
ModalityRandomDataGenerator().create_audio_data(
-#             self.num_instances, 3000
-#         )
-#         audio = UnimodalModality(
-#             TestDataLoader(
-#                 self.indices, None, ModalityType.AUDIO, audio_data, 
np.float32, audio_md
-#             )
-#         )
-
-#         self.run_hp_for_modality([audio])
-
-#     def test_multimodal_hp_tuning(self):
-#         audio_data, audio_md = 
ModalityRandomDataGenerator().create_audio_data(
-#             self.num_instances, 3000
-#         )
-#         audio = UnimodalModality(
-#             TestDataLoader(
-#                 self.indices, None, ModalityType.AUDIO, audio_data, 
np.float32, audio_md
-#             )
-#         )
-
-#         text_data, text_md = ModalityRandomDataGenerator().create_text_data(
-#             self.num_instances
-#         )
-#         text = UnimodalModality(
-#             TestDataLoader(
-#                 self.indices, None, ModalityType.TEXT, text_data, str, 
text_md
-#             )
-#         )
-
-#         # self.run_hp_for_modality(
-#         #     [audio, text], multimodal=True, 
tune_unimodal_representations=True
-#         # )
-#         self.run_hp_for_modality(
-#             [audio, text], multimodal=True, 
tune_unimodal_representations=False
-#         )
-
-#         def test_hp_tuner_for_text_modality(self):
-#             text_data, text_md = 
ModalityRandomDataGenerator().create_text_data(
-#                 self.num_instances
-#             )
-#             text = UnimodalModality(
-#                 TestDataLoader(
-#                     self.indices, None, ModalityType.TEXT, text_data, str, 
text_md
-#                 )
-#             )
-#             self.run_hp_for_modality([text])
-
-#     def test_hp_tuner_for_image_modality(self):
-#         image_data, image_md = 
ModalityRandomDataGenerator().create_visual_modality(
-#             self.num_instances, 1
-#         )
-#         image = UnimodalModality(
-#             TestDataLoader(
-#                 self.indices, None, ModalityType.IMAGE, image_data, 
np.float32, image_md
-#             )
-#         )
-#         self.run_hp_for_modality([image])
-
-#     def run_hp_for_modality(
-#         self, modalities, multimodal=False, 
tune_unimodal_representations=False
-#     ):
-#         with patch.object(
-#             Registry,
-#             "_representations",
-#             {
-#                 ModalityType.TEXT: [W2V, BoW],
-#                 ModalityType.AUDIO: [Spectrogram, ZeroCrossing, Spectral, 
Pitch],
-#                 ModalityType.TIMESERIES: [ResNet],
-#                 ModalityType.VIDEO: [ResNet],
-#                 ModalityType.IMAGE: [ResNet, ColorHistogram],
-#                 ModalityType.EMBEDDING: [],
-#             },
-#         ):
-#             registry = Registry()
-#             registry._fusion_operators = [LSTM]
-#             unimodal_optimizer = UnimodalOptimizer(modalities, self.tasks, 
False)
-#             unimodal_optimizer.optimize()
-
-#             hp = HyperparameterTuner(
-#                 modalities,
-#                 self.tasks,
-#                 unimodal_optimizer.operator_performance,
-#                 n_jobs=1,
-#             )
-
-#             if multimodal:
-#                 m_o = MultimodalOptimizer(
-#                     modalities,
-#                     unimodal_optimizer.operator_performance,
-#                     self.tasks,
-#                     debug=False,
-#                     min_modalities=2,
-#                     max_modalities=3,
-#                 )
-#                 fusion_results = m_o.optimize(20)
-
-#                 hp.tune_multimodal_representations(
-#                     fusion_results,
-#                     k=1,
-#                     optimize_unimodal=tune_unimodal_representations,
-#                     max_eval_per_rep=10,
-#                 )
-
-#             else:
-#                 hp.tune_unimodal_representations(max_eval_per_rep=10)
-
-#             assert len(hp.optimization_results.results) == len(self.tasks)
-#             if multimodal:
-#                 if tune_unimodal_representations:
-#                     assert (
-#                         len(
-#                             
hp.optimization_results.results[self.tasks[0].model.name][0]
-#                         )
-#                         == 1
-#                     )
-#                 else:
-#                     assert (
-#                         len(
-#                             
hp.optimization_results.results[self.tasks[0].model.name][
-#                                 "mm_results"
-#                             ]
-#                         )
-#                         == 1
-#                     )
-#             else:
-#                 assert (
-#                     
len(hp.optimization_results.results[self.tasks[0].model.name]) == 1
-#                 )
-#                 assert (
-#                     
len(hp.optimization_results.results[self.tasks[0].model.name][0])
-#                     == 2
-#                 )
-
-
-# if __name__ == "__main__":
-#     unittest.main()
+    data_generator = None
+    num_instances = 0
+
+    @classmethod
+    def setUpClass(cls):
+        cls.num_instances = 10
+        cls.mods = [
+            ModalityType.VIDEO,
+            ModalityType.AUDIO,
+            ModalityType.TEXT,
+            ModalityType.IMAGE,
+        ]
+        cls.indices = np.array(range(cls.num_instances))
+        cls.tasks = [
+            TestTask("UnimodalRepresentationTask1", "TestSVM1", 
cls.num_instances),
+            TestTask("UnimodalRepresentationTask2", "TestSVM2", 
cls.num_instances),
+        ]
+
+    def test_hp_tuner_for_text_modality(self):
+        text_data, text_md = ModalityRandomDataGenerator().create_text_data(
+            self.num_instances
+        )
+        text = UnimodalModality(
+            TestDataLoader(
+                self.indices, None, ModalityType.TEXT, text_data, str, text_md
+            )
+        )
+        self.run_hp_for_modality([text])
+
+    # TODO: Add once the final multimodal optimizer is implemented
+    # def test_multimodal_hp_tuning(self):
+    #     audio_data, audio_md = 
ModalityRandomDataGenerator().create_audio_data(
+    #         self.num_instances, 3000
+    #     )
+    #     audio = UnimodalModality(
+    #         TestDataLoader(
+    #             self.indices, None, ModalityType.AUDIO, audio_data, 
np.float32, audio_md
+    #         )
+    #     )
+
+    #     text_data, text_md = ModalityRandomDataGenerator().create_text_data(
+    #         self.num_instances
+    #     )
+    #     text = UnimodalModality(
+    #         TestDataLoader(
+    #             self.indices, None, ModalityType.TEXT, text_data, str, 
text_md
+    #         )
+    #     )
+
+    #     self.run_hp_for_modality(
+    #         [audio, text], multimodal=True, 
tune_unimodal_representations=False
+    #     )
+
+    def test_hp_tuner_for_image_modality(self):
+        image_data, image_md = 
ModalityRandomDataGenerator().create_visual_modality(
+            self.num_instances, 1
+        )
+        image = UnimodalModality(
+            TestDataLoader(
+                self.indices, None, ModalityType.IMAGE, image_data, 
np.float32, image_md
+            )
+        )
+        self.run_hp_for_modality([image])
+
+    def run_hp_for_modality(
+        self, modalities, multimodal=False, tune_unimodal_representations=False
+    ):
+        with patch.object(
+            Registry,
+            "_representations",
+            {
+                ModalityType.TEXT: [BoW, W2V],
+                ModalityType.AUDIO: [Spectrogram, ZeroCrossing, Spectral, 
Pitch],
+                ModalityType.TIMESERIES: [ResNet],
+                ModalityType.VIDEO: [ResNet],
+                ModalityType.IMAGE: [ResNet, ColorHistogram],
+                ModalityType.EMBEDDING: [],
+            },
+        ):
+            registry = Registry()
+            registry._fusion_operators = [LSTM]
+            unimodal_optimizer = UnimodalOptimizer(modalities, self.tasks, 
False)
+            unimodal_optimizer.optimize()
+
+            hp = HyperparameterTuner(
+                modalities,
+                self.tasks,
+                unimodal_optimizer.operator_performance,
+                n_jobs=1,
+            )
+
+            if multimodal:
+                m_o = MultimodalOptimizer(
+                    modalities,
+                    unimodal_optimizer.operator_performance,
+                    self.tasks,
+                    debug=False,
+                    min_modalities=2,
+                    max_modalities=3,
+                )
+                fusion_results = m_o.optimize(20)
+
+                hp.tune_multimodal_representations(
+                    fusion_results,
+                    k=1,
+                    optimize_unimodal=tune_unimodal_representations,
+                    max_eval_per_rep=10,
+                )
+
+            else:
+                hp.tune_unimodal_representations(max_eval_per_rep=10)
+
+            assert len(hp.optimization_results.results) == len(self.tasks)
+            if multimodal:
+                if tune_unimodal_representations:
+                    assert (
+                        len(
+                            
hp.optimization_results.results[self.tasks[0].model.name][0]
+                        )
+                        == 1
+                    )
+                else:
+                    assert (
+                        len(
+                            
hp.optimization_results.results[self.tasks[0].model.name][
+                                "mm_results"
+                            ]
+                        )
+                        == 1
+                    )
+            else:
+                assert (
+                    
len(hp.optimization_results.results[self.tasks[0].model.name]) == 1
+                )
+                modality_id = modalities[0].modality_id
+                assert (
+                    len(
+                        
hp.optimization_results.results[self.tasks[0].model.name][
+                            modality_id
+                        ]
+                    )
+                    == 2
+                )
+
+    def test_evaluate_configs_deduplicates_candidates(self):
+        class DummyOptimizationResults:
+            def get_k_best_results(self, modality, task, 
performance_metric_name):
+                return [], []
+
+        task = SimpleNamespace(model=SimpleNamespace(name="dummy_task"))
+        hp = HyperparameterTuner(
+            modalities=[],
+            tasks=[task],
+            optimization_results=DummyOptimizationResults(),
+            n_jobs=1,
+        )
+
+        eval_calls = {"count": 0}
+
+        def fake_evaluate(
+            dag, params, node_order, modality_ids, task, 
modalities_override=None
+        ):
+            eval_calls["count"] += 1
+            return params, float(params["x"])
+
+        hp.evaluate_dag_config = fake_evaluate
+
+        candidate_configs = [
+            {"x": 1},
+            {"x": 1},
+            {"x": 2},
+            {"x": 2},
+            {"x": 1},
+        ]
+        seen_configs = {}
+
+        results = hp._evaluate_configs(
+            dag=None,
+            task=None,
+            node_order=[],
+            modality_ids=[],
+            modalities_override=None,
+            candidate_configs=candidate_configs,
+            seen_configs=seen_configs,
+        )
+
+        self.assertEqual(eval_calls["count"], 2)
+        self.assertEqual(len(seen_configs), 2)
+        self.assertEqual([r[0]["x"] for r in results], [1, 2])
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/src/main/python/tests/scuro/test_unimodal_optimizer.py 
b/src/main/python/tests/scuro/test_unimodal_optimizer.py
index 4fe98800e6..3c5ce2a67f 100644
--- a/src/main/python/tests/scuro/test_unimodal_optimizer.py
+++ b/src/main/python/tests/scuro/test_unimodal_optimizer.py
@@ -24,23 +24,10 @@ import unittest
 
 import numpy as np
 from systemds.scuro.representations.clip import CLIPText, CLIPVisual
-from systemds.scuro.representations.bert import ALBERT, ELECTRA, RoBERTa, 
DistillBERT
 from systemds.scuro.representations.color_histogram import ColorHistogram
-from systemds.scuro.representations.glove import GloVe
-from systemds.scuro.representations.timeseries_representations import (
-    Mean,
-    ACF,
-)
 from systemds.scuro.drsearch.operator_registry import Registry
 from systemds.scuro.drsearch.unimodal_optimizer import UnimodalOptimizer
-from systemds.scuro.drsearch.representation_dag import RepresentationNode
-from systemds.scuro.representations.representation import RepresentationStats
 
-from systemds.scuro.representations.spectrogram import Spectrogram
-from systemds.scuro.representations.covarep_audio_features import (
-    ZeroCrossing,
-)
-from systemds.scuro.representations.vgg import VGG19
 from systemds.scuro.representations.word2vec import W2V
 from systemds.scuro.representations.bow import BoW
 from systemds.scuro.representations.bert import Bert
@@ -51,7 +38,17 @@ from tests.scuro.data_generator import (
     TestDataLoader,
     TestTask,
 )
+import copy
 
+from systemds.scuro.drsearch.representation_dag import (
+    CSEAwareDAGBuilder,
+    RepresentationDag,
+    pushdown_aggregation,
+)
+from systemds.scuro.representations.aggregated_representation import (
+    AggregatedRepresentation,
+)
+from systemds.scuro.representations.bert import Bert
 from systemds.scuro.modality.type import ModalityType
 
 from unittest.mock import patch
@@ -73,19 +70,6 @@ class TestUnimodalRepresentationOptimizer(unittest.TestCase):
             TestTask("UnimodalRepresentationTask2", "Test2", 
cls.num_instances),
         ]
 
-    # Note: Audio optimizer still needs some work
-    # def test_unimodal_optimizer_for_audio_modality(self):
-    #     audio_data, audio_md = 
ModalityRandomDataGenerator().create_audio_data(
-    #         self.num_instances, 3000
-    #     )
-    #     audio = UnimodalModality(
-    #         TestDataLoader(
-    #             self.indices, None, ModalityType.AUDIO, audio_data, 
np.float32, audio_md
-    #         )
-    #     )
-
-    #     self.optimize_unimodal_representation_for_modality([audio])
-
     def test_unimodal_optimizer_for_text_modality(self):
         text_data, text_md = ModalityRandomDataGenerator().create_text_data(
             self.num_instances, 10
@@ -127,18 +111,6 @@ class 
TestUnimodalRepresentationOptimizer(unittest.TestCase):
         )
         self.optimize_unimodal_representation_for_modality([text, image])
 
-    # Note: Timeseries optimizer still needs some work
-    # def test_unimodal_optimizer_for_ts_modality(self):
-    #     ts_data, ts_md = 
ModalityRandomDataGenerator().create_timeseries_data(
-    #         self.num_instances, 1000
-    #     )
-    #     ts = UnimodalModality(
-    #         TestDataLoader(
-    #             self.indices, None, ModalityType.TIMESERIES, ts_data, 
np.float32, ts_md
-    #         )
-    #     )
-    #     self.optimize_unimodal_representation_for_modality([ts])
-
     def test_unimodal_optimizer_for_video_modality(self):
         video_data, video_md = 
ModalityRandomDataGenerator().create_visual_modality(
             self.num_instances, 10, 10
@@ -150,6 +122,67 @@ class 
TestUnimodalRepresentationOptimizer(unittest.TestCase):
         )
         self.optimize_unimodal_representation_for_modality([video])
 
+    def 
test_aggregation_pushdown_preserves_dag_id_and_bert_node_parameters(self):
+        builder = CSEAwareDAGBuilder()
+        modality_id = "test_modality_agg_pushdown"
+        leaf_id = builder.create_leaf_node(modality_id)
+
+        bert = Bert()
+        bert_id = builder.create_operation_node(
+            Bert, [leaf_id], bert.get_current_parameters()
+        )
+
+        agg = AggregatedRepresentation(target_dimensions=1)
+        agg_id = builder.create_operation_node(
+            AggregatedRepresentation,
+            [bert_id],
+            agg.get_current_parameters(),
+        )
+
+        expected_dag_id = 1001
+        dag = RepresentationDag(
+            nodes=copy.deepcopy(builder.global_nodes),
+            root_node_id=agg_id,
+            dag_id=expected_dag_id,
+        )
+
+        by_id = {n.node_id: n for n in dag.nodes}
+        self.assertEqual(len(dag.nodes), 3)
+        self.assertEqual(dag.dag_id, expected_dag_id)
+        self.assertEqual(dag.root_node_id, agg_id)
+
+        self.assertEqual(by_id[leaf_id].inputs, [])
+        self.assertEqual(by_id[bert_id].inputs, [leaf_id])
+        self.assertEqual(by_id[agg_id].inputs, [bert_id])
+        self.assertIs(by_id[bert_id].operation, Bert)
+        self.assertIs(by_id[agg_id].operation, AggregatedRepresentation)
+
+        bert_params_before = copy.deepcopy(by_id[bert_id].parameters)
+        agg_params_snapshot = copy.deepcopy(by_id[agg_id].parameters)
+        self.assertNotIn("_pushdown_aggregation", bert_params_before)
+
+        pushdown_aggregation([dag])
+
+        self.assertEqual(dag.dag_id, expected_dag_id)
+        self.assertEqual(dag.root_node_id, bert_id)
+        self.assertEqual(len(dag.nodes), 2)
+        self.assertIsNone(dag.get_node_by_id(agg_id))
+
+        bert_after = dag.get_node_by_id(bert_id)
+        self.assertIsNotNone(bert_after)
+        self.assertEqual(bert_after.inputs, [leaf_id])
+        self.assertIn("_pushdown_aggregation", bert_after.parameters)
+        self.assertEqual(
+            bert_after.parameters["_pushdown_aggregation"],
+            agg_params_snapshot,
+        )
+        remaining = {
+            k: v
+            for k, v in bert_after.parameters.items()
+            if k != "_pushdown_aggregation"
+        }
+        self.assertEqual(remaining, bert_params_before)
+
     def optimize_unimodal_representation_for_modality(self, modalities):
         with patch.object(
             Registry,
@@ -161,8 +194,6 @@ class 
TestUnimodalRepresentationOptimizer(unittest.TestCase):
                     Bert,
                     CLIPText,
                 ],
-                ModalityType.AUDIO: [Spectrogram, ZeroCrossing],
-                ModalityType.TIMESERIES: [Mean, ACF],
                 ModalityType.VIDEO: [ResNet],
                 ModalityType.IMAGE: [ColorHistogram, CLIPVisual],
                 ModalityType.EMBEDDING: [],
@@ -171,7 +202,12 @@ class 
TestUnimodalRepresentationOptimizer(unittest.TestCase):
             registry = Registry()
 
             unimodal_optimizer = UnimodalOptimizer(
-                modalities, self.tasks, False, k=1, max_num_workers=1
+                modalities,
+                self.tasks,
+                False,
+                k=1,
+                max_num_workers=1,
+                enable_checkpointing=False,
             )
             unimodal_optimizer.optimize()
             for modality in modalities:
@@ -185,4 +221,3 @@ class 
TestUnimodalRepresentationOptimizer(unittest.TestCase):
                 modalities[0], self.tasks[0], "accuracy"
             )
             assert len(result) == 1
-            assert len(cached) == 1

Reply via email to