This is an automated email from the ASF dual-hosted git repository.
christinadionysio pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 574a816325 [SYSTEMDS-3947] Implement HP-tuning for Scuro
Representation DAGs
574a816325 is described below
commit 574a816325b9f1fb80321dc34beea73bd23d88ab
Author: Christina Dionysio <[email protected]>
AuthorDate: Wed Apr 29 07:03:50 2026 +0200
[SYSTEMDS-3947] Implement HP-tuning for Scuro Representation DAGs
This patch improves the existing HP Tuner for the new unimodal optimizer.
---
.../scuro/drsearch/hyperparameter_tuner.py | 469 +++++++++++++++++----
.../systemds/scuro/drsearch/node_executor.py | 97 +++--
.../systemds/scuro/drsearch/node_scheduler.py | 23 +-
.../systemds/scuro/drsearch/representation_dag.py | 21 +-
.../systemds/scuro/drsearch/unimodal_optimizer.py | 30 +-
.../python/systemds/scuro/modality/transformed.py | 7 +-
.../representations/aggregated_representation.py | 14 +-
.../python/systemds/scuro/representations/bert.py | 2 +-
.../python/systemds/scuro/representations/bow.py | 8 +-
.../scuro/representations/mlp_averaging.py | 13 +-
.../python/systemds/scuro/representations/tfidf.py | 11 +-
.../scuro/representations/window_aggregation.py | 27 +-
src/main/python/tests/scuro/test_hp_tuner.py | 354 +++++++++-------
.../python/tests/scuro/test_unimodal_optimizer.py | 119 ++++--
14 files changed, 811 insertions(+), 384 deletions(-)
diff --git a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
index 64141dc929..0737f18d62 100644
--- a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
+++ b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
@@ -20,16 +20,19 @@
# -------------------------------------------------------------
from typing import Dict, List, Tuple, Any, Optional
import os
-from skopt.space import Real, Integer, Categorical
import numpy as np
import logging
from dataclasses import dataclass
import time
import copy
from joblib import Parallel, delayed
-from skopt import Optimizer
+import itertools
+import math
+import random
from systemds.scuro.drsearch.representation_dag import (
RepresentationDAGBuilder,
+ RepresentationDag,
+ RepresentationNode,
)
from systemds.scuro.modality.modality import Modality
from systemds.scuro.drsearch.task import PerformanceMeasure
@@ -110,13 +113,16 @@ class HyperparameterTuner:
tasks,
optimization_results,
k: int = 2,
- n_jobs: int = -1,
+ n_jobs: int = 1,
scoring_metric: str = "accuracy",
maximize_metric: bool = True,
save_results: bool = False,
debug: bool = False,
checkpoint_every: Optional[int] = None,
resume: bool = True,
+ random_state: int = 42,
+ exhaustive_threshold: int = 256,
+ local_search_patience: int = 3,
):
self.tasks = tasks
self.unimodal_optimization_results = optimization_results
@@ -136,6 +142,10 @@ class HyperparameterTuner:
self.logger = logging.getLogger(__name__)
self.checkpoint_every = checkpoint_every
self.resume = resume
+ self.random_state = random_state
+ self.exhaustive_threshold = max(1, exhaustive_threshold)
+ self.local_search_patience = max(1, local_search_patience)
+ self._rng = random.Random(self.random_state)
self._checkpoint_manager = CheckpointManager(
os.getcwd(),
"hyperparam_checkpoint_",
@@ -209,11 +219,12 @@ class HyperparameterTuner:
self.resume_from_checkpoint()
for task in self.tasks:
reps = self.k_best_representations[task.model.name]
- skip_remaining =
self._checkpoint_manager.skip_remaining_by_key.get(
- task.model.name, 0
- )
- if skip_remaining >= len(reps):
- continue
+ skip_remaining = 0
+ # skip_remaining =
self._checkpoint_manager.skip_remaining_by_key.get(
+ # task.model.name, 0
+ # )
+ # if skip_remaining >= len(reps):
+ # continue
chunk_size = self.checkpoint_every or len(reps)
for start_idx in range(skip_remaining, len(reps), chunk_size):
@@ -258,8 +269,9 @@ class HyperparameterTuner:
visit_node(input_id)
visited.add(node_id)
if node.operation is not None:
- if node.operation().parameters:
- hyperparams[node_id] = node.operation().parameters
+ params = self._get_params_for_node(node)
+ if params:
+ hyperparams[node_id] = params
reps.append(node.operation)
node_order.append(node_id)
if node.modality_id is not None:
@@ -267,85 +279,39 @@ class HyperparameterTuner:
visit_node(root_node_id)
- if not hyperparams:
- return None
-
start_time = time.time()
rep_name = "-".join([rep.__name__ for rep in reps])
-
- search_space = []
- param_names = []
- for op_id, op_params in hyperparams.items():
- for param_name, param_values in op_params.items():
- param_names.append(op_id + "-" + param_name)
- if isinstance(param_values, list):
- search_space.append(
- Categorical(param_values, name=op_id + "-" +
param_name)
- )
- elif isinstance(param_values, tuple) and len(param_values) ==
2:
- if isinstance(param_values[0], int) and isinstance(
- param_values[1], int
- ):
- search_space.append(
- Integer(
- param_values[0],
- param_values[1],
- name=op_id + "-" + param_name,
- )
- )
- else:
- search_space.append(
- Real(
- param_values[0],
- param_values[1],
- name=op_id + "-" + param_name,
- )
- )
- else:
- search_space.append(
- Categorical([param_values], name=op_id + "-" +
param_name)
- )
-
- n_calls = max_evals if max_evals else 50
-
- all_results = []
-
- def evaluate_point(point):
- params = dict(zip(param_names, point))
- result = self.evaluate_dag_config(
+ modalities_override = (
+ self._get_cached_modalities_for_task(task, modality_ids) if mm_opt
else None
+ )
+ if not hyperparams:
+ # TODO: extract the information from the unimodal optimization
results
+ baseline = self.evaluate_dag_config(
dag,
- params,
+ {},
node_order,
modality_ids,
task,
- modalities_override=(
- self._get_cached_modalities_for_task(task, modality_ids)
- if mm_opt
- else None
- ),
+ modalities_override=modalities_override,
+ )
+ all_results = [baseline]
+ else:
+ n_calls = max_evals if max_evals else 50
+ param_specs = self._build_param_specs(hyperparams)
+ default_config = {}
+ all_results = self._search_best_configs(
+ dag=dag,
+ task=task,
+ node_order=node_order,
+ modality_ids=modality_ids,
+ modalities_override=modalities_override,
+ param_specs=param_specs,
+ budget=n_calls,
+ initial_config=None,
)
- score = result[1]
- if isinstance(score, PerformanceMeasure):
- score = score.average_scores[self.scoring_metric]
- if self.maximize_metric:
- objective_value = -score
- else:
- objective_value = score
- return objective_value, result
- opt = Optimizer(
- search_space, random_state=42, n_initial_points=min(10, n_calls //
2)
- )
- self.n_jobs = 2
- n_batch = min(abs(self.n_jobs), n_calls) if self.n_jobs != 0 else 1
- for _ in range(0, n_calls, n_batch):
- points = opt.ask(n_points=n_batch)
- results = Parallel(
- n_jobs=self.n_jobs, max_nbytes=None, mmap_mode=None,
backend="threading"
- )(delayed(evaluate_point)(p) for p in points)
- objective_values = [result[0] for result in results]
- all_results.extend(result[1] for result in results)
- opt.tell(points, objective_values)
+ if not all_results:
+ return None
def get_score(result):
score = result[1]
@@ -359,7 +325,21 @@ class HyperparameterTuner:
best_params, best_score = min(all_results, key=get_score)
tuning_time = time.time() - start_time
-
+ # results =
self.unimodal_optimization_results.results[self.modalities[0].modality_id][task.model.name]
+
+ # default_result = sorted(
+ # results,
+ # key=lambda r: r.val_score[self.scoring_metric],
+ # reverse=True,
+ # )[0]
+ # pm = PerformanceMeasure(name=self.scoring_metric,
metrics=self.scoring_metric, higher_is_better=self.maximize_metric)
+ # pm.add_scores({self.scoring_metric:
default_result.val_score[self.scoring_metric]})
+ # default_params = self._get_default_params(dag)
+ # def_par ={}
+ # for k, v in default_params.items():
+ # for k_v, v_v in v.items():
+ # def_par[k+"-"+k_v] = v_v
+ # all_results.append((def_par, pm))
best_result = HyperparamResult(
representation_name=rep_name,
best_params=best_params,
@@ -374,6 +354,320 @@ class HyperparameterTuner:
return best_result
+ def _get_params_for_node(self, node: RepresentationNode) -> Dict[str, Any]:
+ if not node.operation().parameters:
+ return None
+
+ params = copy.deepcopy(node.operation().parameters)
+ return params
+
+ def _build_param_specs(
+ self, hyperparams: Dict[str, Dict[str, Any]]
+ ) -> List[Dict[str, Any]]:
+ param_specs = []
+ for op_id, op_params in hyperparams.items():
+ for param_name, param_values in op_params.items():
+ full_name = op_id + "-" + param_name
+ if isinstance(param_values, list):
+ param_type = "categorical"
+ domain = list(param_values)
+ elif isinstance(param_values, tuple) and len(param_values) ==
2:
+ lo, hi = param_values
+ if isinstance(lo, int) and isinstance(hi, int):
+ param_type = "integer"
+ else:
+ param_type = "real"
+ domain = (lo, hi)
+ else:
+ param_type = "categorical"
+ domain = [param_values]
+ param_specs.append(
+ {"name": full_name, "type": param_type, "domain": domain}
+ )
+ return param_specs
+
+ def _config_key(self, params: Dict[str, Any]) -> Tuple[Tuple[str, Any],
...]:
+ key_items = []
+ for name, value in sorted(params.items()):
+ if isinstance(value, float):
+ value = round(value, 10)
+ key_items.append((name, value))
+ return tuple(key_items)
+
+ def _score_value(self, score: Any) -> float:
+ if isinstance(score, PerformanceMeasure):
+ return score.average_scores.get(self.scoring_metric, np.nan)
+ return score
+
+ def _is_better(self, candidate_score: float, best_score: float) -> bool:
+ if np.isnan(candidate_score):
+ return False
+ if np.isnan(best_score):
+ return True
+ return (
+ candidate_score > best_score
+ if self.maximize_metric
+ else candidate_score < best_score
+ )
+
+ def _sample_random_config(
+ self, param_specs: List[Dict[str, Any]]
+ ) -> Dict[str, Any]:
+ config = {}
+ for spec in param_specs:
+ name = spec["name"]
+ domain = spec["domain"]
+ if spec["type"] == "categorical":
+ config[name] = self._rng.choice(domain)
+ elif spec["type"] == "integer":
+ config[name] = self._rng.randint(int(domain[0]),
int(domain[1]))
+ else:
+ config[name] = self._rng.uniform(float(domain[0]),
float(domain[1]))
+ return config
+
+ def _estimate_discrete_search_size(
+ self, param_specs: List[Dict[str, Any]]
+ ) -> Optional[int]:
+ size = 1
+ for spec in param_specs:
+ if spec["type"] == "real":
+ return None
+ if spec["type"] == "integer":
+ size *= max(0, int(spec["domain"][1]) - int(spec["domain"][0])
+ 1)
+ else:
+ size *= len(spec["domain"])
+ if size > self.exhaustive_threshold:
+ return size
+ return size
+
+ def _enumerate_configs(
+ self, param_specs: List[Dict[str, Any]]
+ ) -> List[Dict[str, Any]]:
+ domains = []
+ names = []
+ for spec in param_specs:
+ names.append(spec["name"])
+ if spec["type"] == "integer":
+ lo, hi = int(spec["domain"][0]), int(spec["domain"][1])
+ domains.append(list(range(lo, hi + 1)))
+ else:
+ domains.append(list(spec["domain"]))
+ return [dict(zip(names, values)) for values in
itertools.product(*domains)]
+
+ def _generate_neighbor_config(
+ self,
+ base_config: Dict[str, Any],
+ param_specs: List[Dict[str, Any]],
+ step_scale: float,
+ ) -> Dict[str, Any]:
+ candidate = dict(base_config)
+ if not param_specs:
+ return candidate
+
+ n_mutations = 1 if len(param_specs) == 1 else self._rng.randint(1, 2)
+ mutated_specs = self._rng.sample(
+ param_specs, k=min(n_mutations, len(param_specs))
+ )
+ for spec in mutated_specs:
+ name = spec["name"]
+ current = candidate[name]
+ if spec["type"] == "categorical":
+ values = [value for value in spec["domain"] if value !=
current]
+ if values:
+ candidate[name] = self._rng.choice(values)
+ elif spec["type"] == "integer":
+ lo, hi = int(spec["domain"][0]), int(spec["domain"][1])
+ width = max(1, hi - lo)
+ step = max(1, int(math.ceil(width * step_scale)))
+ delta = self._rng.choice([-step, step])
+ candidate[name] = max(lo, min(hi, int(current) + delta))
+ else:
+ lo, hi = float(spec["domain"][0]), float(spec["domain"][1])
+ span = max(1e-9, hi - lo)
+ delta = self._rng.uniform(-span * step_scale, span *
step_scale)
+ candidate[name] = max(lo, min(hi, float(current) + delta))
+ return candidate
+
+ def _evaluate_configs(
+ self,
+ dag,
+ task,
+ node_order,
+ modality_ids,
+ modalities_override,
+ candidate_configs: List[Dict[str, Any]],
+ seen_configs: Dict[Tuple[Tuple[str, Any], ...], Tuple[Dict[str, Any],
Any]],
+ ) -> List[Tuple[Dict[str, Any], Any]]:
+ ordered_unique_configs = []
+ unique_keys_in_order = []
+ unique_keys_set = set()
+ for config in candidate_configs:
+ key = self._config_key(config)
+ if key not in unique_keys_set:
+ unique_keys_set.add(key)
+ unique_keys_in_order.append(key)
+ ordered_unique_configs.append(config)
+
+ pending_configs = []
+ for config in ordered_unique_configs:
+ key = self._config_key(config)
+ if key not in seen_configs:
+ pending_configs.append(config)
+
+ if pending_configs:
+ n_jobs = self.n_jobs if self.n_jobs != 0 else 1
+ evaluated = Parallel(
+ n_jobs=n_jobs, max_nbytes=None, mmap_mode=None,
backend="threading"
+ )(
+ delayed(self.evaluate_dag_config)(
+ dag,
+ config,
+ node_order,
+ modality_ids,
+ task,
+ modalities_override=modalities_override,
+ )
+ for config in pending_configs
+ )
+ for result in evaluated:
+ seen_configs[self._config_key(result[0])] = result
+
+ return [
+ seen_configs[key] for key in unique_keys_in_order if key in
seen_configs
+ ]
+
+ def _search_best_configs(
+ self,
+ dag,
+ task,
+ node_order,
+ modality_ids,
+ modalities_override,
+ param_specs: List[Dict[str, Any]],
+ budget: int,
+ initial_config: Dict[str, Any],
+ ) -> List[Tuple[Dict[str, Any], Any]]:
+ budget = max(1, budget)
+ seen_configs: Dict[Tuple[Tuple[str, Any], ...], Tuple[Dict[str, Any],
Any]] = {}
+ all_results: List[Tuple[Dict[str, Any], Any]] = []
+ best_score = np.nan
+ best_config = None
+ if initial_config is not None and budget > 0:
+ initial_results = self._evaluate_configs(
+ dag,
+ task,
+ node_order,
+ modality_ids,
+ modalities_override,
+ [initial_config],
+ seen_configs,
+ )
+ all_results.extend(initial_results)
+ if initial_results:
+ p, s = initial_results[0]
+ best_config = p
+ best_score = self._score_value(s)
+ budget -= 1
+
+ discrete_size = self._estimate_discrete_search_size(param_specs)
+ if discrete_size is not None and discrete_size <= min(
+ self.exhaustive_threshold, budget
+ ):
+ candidates = self._enumerate_configs(param_specs)
+ self._rng.shuffle(candidates)
+ candidates = candidates[:budget]
+ batch_results = self._evaluate_configs(
+ dag,
+ task,
+ node_order,
+ modality_ids,
+ modalities_override,
+ candidates,
+ seen_configs,
+ )
+ all_results.extend(batch_results)
+ return all_results
+
+ initial_budget = min(budget, max(8, len(param_specs) * 4))
+ initial_candidates = [
+ self._sample_random_config(param_specs) for _ in
range(initial_budget)
+ ]
+ initial_results = self._evaluate_configs(
+ dag,
+ task,
+ node_order,
+ modality_ids,
+ modalities_override,
+ initial_candidates,
+ seen_configs,
+ )
+ all_results.extend(initial_results)
+
+ for params, score in initial_results:
+ numeric_score = self._score_value(score)
+ if self._is_better(numeric_score, best_score):
+ best_score = numeric_score
+ best_config = params
+
+ eval_count = len(seen_configs)
+ no_improvement_rounds = 0
+ step_scale = 0.5
+
+ while eval_count < budget:
+ if best_config is None:
+ candidate_batch = [self._sample_random_config(param_specs)]
+ else:
+ candidate_batch = []
+ batch_size = min(
+ max(2, abs(self.n_jobs) if self.n_jobs != 0 else 1),
+ budget - eval_count,
+ )
+ for _ in range(batch_size):
+ candidate_batch.append(
+ self._generate_neighbor_config(
+ best_config, param_specs, step_scale
+ )
+ )
+
+ if budget - eval_count > 3:
+
candidate_batch.append(self._sample_random_config(param_specs))
+
+ batch_results = self._evaluate_configs(
+ dag,
+ task,
+ node_order,
+ modality_ids,
+ modalities_override,
+ candidate_batch,
+ seen_configs,
+ )
+ if not batch_results:
+ step_scale = max(0.05, step_scale * 0.5)
+ if step_scale <= 0.05:
+ break
+ continue
+
+ improved = False
+ for params, score in batch_results:
+ numeric_score = self._score_value(score)
+ if self._is_better(numeric_score, best_score):
+ best_score = numeric_score
+ best_config = params
+ improved = True
+ all_results.extend(batch_results)
+ eval_count = len(seen_configs)
+
+ if improved:
+ no_improvement_rounds = 0
+ step_scale = min(0.5, step_scale * 1.1)
+ else:
+ no_improvement_rounds += 1
+ step_scale = max(0.05, step_scale * 0.7)
+ if no_improvement_rounds >= self.local_search_patience:
+ break
+
+ return all_results
+
def _get_cached_modalities_for_task(self, task, modality_ids):
if not self.k_best_cache_by_modality:
return self.get_modalities_by_id(modality_ids)
@@ -402,12 +696,13 @@ class HyperparameterTuner:
else self.get_modalities_by_id(modality_ids)
)
modified_modality = dag_copy.execute(modalities, task)
- score = task.run(
- modified_modality[list(modified_modality.keys())[-1]].data
- )[1]
+ score = task.run(modified_modality.data)[1]
return params, score
except Exception as e:
+ import traceback
+
+ traceback.print_exc()
self.logger.error(f"Error evaluating DAG with params {params}:
{e}")
return params, np.nan
@@ -485,11 +780,11 @@ class HyperparameterTuner:
self.optimization_results.add_result([result])
self._checkpoint_manager.increment(task.model.name, 1)
self._checkpoint_manager.checkpoint_if_due(
- self.optimization_results.results, "eval_count_by_task"
+ self.optimization_results.results
)
except Exception:
self._checkpoint_manager.save_checkpoint(
- self.optimization_results.results,
"eval_count_by_task", {}
+ self.optimization_results.results, {}
)
raise
if self.save_results:
diff --git a/src/main/python/systemds/scuro/drsearch/node_executor.py
b/src/main/python/systemds/scuro/drsearch/node_executor.py
index bf51dfde85..418b11fe70 100644
--- a/src/main/python/systemds/scuro/drsearch/node_executor.py
+++ b/src/main/python/systemds/scuro/drsearch/node_executor.py
@@ -271,7 +271,9 @@ class NodeExecutor:
checkpoint_manager: Optional[CheckpointManager] = None,
max_num_workers: int = -1,
result_path: Optional[str] = None,
+ enable_checkpointing: bool = True,
):
+ self.enable_checkpointing = enable_checkpointing
available_total_cpu = (
float(psutil.virtual_memory().available)
- float(psutil.virtual_memory().available) * 0.30
@@ -316,14 +318,17 @@ class NodeExecutor:
def submit_node(node_id: str):
node = self.scheduler.mapping[node_id]
gpu_id = node.gpu_id
- parent_result = None
- parent_node_id = self.scheduler.get_valid_parent(node_id)
- if parent_node_id is not None:
- parent_result = self.result_cache.get(parent_node_id)
+ parent_node_ids = self.scheduler.get_valid_parents(node_id)
+ parent_results = None
+ if parent_node_ids:
+ parent_results = [
+ self.result_cache.get(parent_node_id)
+ for parent_node_id in parent_node_ids
+ ]
if self._is_task_node(node):
task_result = ResultEntry(
dag=self._get_dag_from_node_ids(node_id),
- representation_time=parent_result.transform_time,
+ representation_time=parent_results[0].transform_time,
)
task_results[node_id] = task_result
task_idx = int(node.parameters.get("_task_idx", 0))
@@ -333,8 +338,8 @@ class NodeExecutor:
self.tasks[task_idx],
(
self.modalities[0].data
- if parent_result is None
- else parent_result.data
+ if parent_results is None
+ else parent_results[0].data
),
gpu_id,
)
@@ -342,7 +347,7 @@ class NodeExecutor:
future = executor.submit(
_execute_node_worker,
node,
- self.modalities if parent_result is None else
[parent_result],
+ self.modalities if parent_results is None else
parent_results,
None,
None,
gpu_id,
@@ -393,25 +398,27 @@ class NodeExecutor:
task_results[node_id].test_score = result["scores"][
2
].average_scores
- self.checkpoint_manager.increment(node_id)
- self.checkpoint_manager.checkpoint_if_due(task_results)
- self._checkpoint_memory_usage(
- node_id,
- peak_bytes,
- gpu_peak_bytes,
- "task",
- memory_usage_data,
- None,
- )
+ if self.enable_checkpointing:
+ self.checkpoint_manager.increment(node_id)
+
self.checkpoint_manager.checkpoint_if_due(task_results)
+ self._checkpoint_memory_usage(
+ node_id,
+ peak_bytes,
+ gpu_peak_bytes,
+ "task",
+ memory_usage_data,
+ None,
+ )
- parent_node_id =
self.scheduler.get_valid_parent(node_id)
- if parent_node_id is not None:
- self.result_cache.dec_ref(parent_node_id)
- if (
- parent_node_id in self.result_cache.ref_count
- and
self.result_cache.ref_count[parent_node_id] == 0
- ):
- self.result_cache.clear(parent_node_id)
+ parent_node_ids =
self.scheduler.get_valid_parents(node_id)
+ if len(parent_node_ids) > 0:
+ for parent_node_id in parent_node_ids:
+ self.result_cache.dec_ref(parent_node_id)
+ if (
+ parent_node_id in
self.result_cache.ref_count
+ and
self.result_cache.ref_count[parent_node_id] == 0
+ ):
+ self.result_cache.clear(parent_node_id)
self.scheduler.complete_node(node_id)
else:
@@ -430,14 +437,15 @@ class NodeExecutor:
self.scheduler.update_node_stats_and_reestimate_descendants(
node_id, actual_stats
)
- self._checkpoint_memory_usage(
- node_id,
- peak_bytes,
- gpu_peak_bytes,
- result["operation_name"],
- memory_usage_data,
- transformed_modality.data,
- )
+ if self.enable_checkpointing:
+ self._checkpoint_memory_usage(
+ node_id,
+ peak_bytes,
+ gpu_peak_bytes,
+ result["operation_name"],
+ memory_usage_data,
+ transformed_modality.data,
+ )
before_bytes =
self.result_cache.get_memory_total_memory_usage()
self._manage_result_cache(node_id,
transformed_modality)
after_bytes =
self.result_cache.get_memory_total_memory_usage()
@@ -539,6 +547,9 @@ class NodeExecutor:
assert (
len(result) == node_stats.num_instances
), f"Node {node_id} {operation_name} should have
{node_stats.num_instances} instances, actual: {len(result)}"
+ # assert (
+ # shape == node_stats.output_shape
+ # ), f"Node {node_id} {operation_name} should have shape of
{node_stats.output_shape}, actual shape: {shape}"
return shape
def _infer_actual_output_stats(
@@ -575,20 +586,22 @@ class NodeExecutor:
return None
def _manage_result_cache(self, node_id: str, result: Any):
- parent_node_id = self.scheduler.get_valid_parent(node_id)
- if parent_node_id is not None:
- self.result_cache.dec_ref(parent_node_id)
+ parent_node_ids = self.scheduler.get_valid_parents(node_id)
+ if len(parent_node_ids) > 0:
+ for parent_node_id in parent_node_ids:
+ self.result_cache.dec_ref(parent_node_id)
if self.scheduler.get_children(node_id):
for _ in self.scheduler.get_children(node_id):
self.result_cache.inc_ref(node_id)
self.result_cache.add_result(node_id, result)
- if (
- parent_node_id in self.result_cache.ref_count
- and self.result_cache.ref_count[parent_node_id] == 0
- ):
- self.result_cache.clear(parent_node_id)
+ for parent_node_id in parent_node_ids:
+ if (
+ parent_node_id in self.result_cache.ref_count
+ and self.result_cache.ref_count[parent_node_id] == 0
+ ):
+ self.result_cache.clear(parent_node_id)
def _get_nodes_by_ids(self, nodes_ids: List[str]) ->
List[RepresentationNode]:
return [self.scheduler.mapping[node_id] for node_id in nodes_ids]
diff --git a/src/main/python/systemds/scuro/drsearch/node_scheduler.py
b/src/main/python/systemds/scuro/drsearch/node_scheduler.py
index 4fe3ac1d03..ef3ccc844e 100644
--- a/src/main/python/systemds/scuro/drsearch/node_scheduler.py
+++ b/src/main/python/systemds/scuro/drsearch/node_scheduler.py
@@ -166,11 +166,12 @@ class MemoryAwareNodeScheduler:
return False
- def get_valid_parent(self, node_id: str) -> bool:
+ def get_valid_parents(self, node_id: str) -> bool:
+ parents = []
for parent_id in self.parents[node_id]:
if parent_id not in self.leaves:
- return parent_id
- return None
+ parents.append(parent_id)
+ return parents
def get_children(self, node_id: str) -> List[str]:
return list(self.children[node_id])
@@ -207,8 +208,9 @@ class MemoryAwareNodeScheduler:
params=self.mapping[desc_id].parameters
)
peak_memory = operation.estimate_peak_memory_bytes(input_stats)
+ input_stats_for_overhead =
self._stats_for_overhead(input_stats)
peak_memory["cpu_peak_bytes"] += (
- 64 * 1024 + 512 * input_stats.num_instances
+ 64 * 1024 + 512 * input_stats_for_overhead.num_instances
)
output_stats = operation.get_output_stats(input_stats)
@@ -395,14 +397,17 @@ class MemoryAwareNodeScheduler:
else:
parent_ids = list(self.parents.get(node, set()))
parent_stats = [node_stats[parent_id] for parent_id in
parent_ids]
- input_stats = parent_stats[0] if parent_stats else None
+ input_stats = (
+ parent_stats[0] if len(parent_stats) == 1 else parent_stats
+ )
if node not in self.roots:
operation = self.mapping[node].operation(
params=self.mapping[node].parameters
)
peak_memory =
operation.estimate_peak_memory_bytes(input_stats)
+ input_stats_for_overhead =
self._stats_for_overhead(input_stats)
peak_memory["cpu_peak_bytes"] += (
- 64 * 1024 + 512 * input_stats.num_instances
+ 64 * 1024 + 512 *
input_stats_for_overhead.num_instances
) # Placeholder for transformed modality creation overhead
peak_memory["cpu_peak_bytes"] *= 1
output_stats = operation.get_output_stats(input_stats)
@@ -429,6 +434,12 @@ class MemoryAwareNodeScheduler:
self.node_stats = node_stats
return node_resources
+ @staticmethod
+ def _stats_for_overhead(input_stats: Any) -> Any:
+ if isinstance(input_stats, list) and len(input_stats) > 0:
+ return input_stats[0]
+ return input_stats
+
@staticmethod
def _stats_to_bytes(stats: Optional[Any], dtype_size: int = 4) -> int:
if stats is None:
diff --git a/src/main/python/systemds/scuro/drsearch/representation_dag.py
b/src/main/python/systemds/scuro/drsearch/representation_dag.py
index b2911dbe7a..099732e46d 100644
--- a/src/main/python/systemds/scuro/drsearch/representation_dag.py
+++ b/src/main/python/systemds/scuro/drsearch/representation_dag.py
@@ -74,7 +74,8 @@ class RepresentationNode:
@dataclass
class RepresentationDag:
- def __init__(self, nodes: List[Any], root_node_id):
+ def __init__(self, nodes: List[Any], root_node_id, dag_id: int = None):
+ self.dag_id = dag_id
self.root_node_id = root_node_id
self.nodes = self.filter_connected_nodes(nodes)
@@ -399,6 +400,7 @@ class RepresentationDAGBuilder:
def __init__(self):
self.nodes = []
self.node_counter = 0
+ self.dag_counter = 0
def create_leaf_node(
self, modality_id: str, representation_index: int = -1, operation=None
@@ -431,10 +433,13 @@ class RepresentationDAGBuilder:
self.nodes.append(node)
return node_id
- def build(self, root_node_id: str) -> RepresentationDag:
+ def build(self, root_node_id: str, dag_id: int = None) ->
RepresentationDag:
dag = RepresentationDag(
- nodes=copy.deepcopy(self.nodes), root_node_id=root_node_id
+ nodes=copy.deepcopy(self.nodes),
+ root_node_id=root_node_id,
+ dag_id=dag_id if dag_id is not None else self.dag_counter + 1,
)
+ self.dag_counter += 1
if not dag.validate():
raise ValueError("Invalid DAG construction")
return dag
@@ -524,6 +529,7 @@ class CSEAwareDAGBuilder:
self.signature_to_node: Dict[Hashable, str] = {}
self.node_to_signature: Dict[str, Hashable] = {}
self.node_counter = 0
+ self.dag_counter = 0
def _compute_node_signature(
self, operation: Any, inputs: List[str], parameters: Dict[str, Any] =
None
@@ -602,8 +608,13 @@ class CSEAwareDAGBuilder:
operation=operation, inputs=inputs, parameters=parameters,
is_leaf=False
)
- def build(self, root_node_id: str) -> RepresentationDag:
- dag = RepresentationDag(nodes=self.global_nodes,
root_node_id=root_node_id)
+ def build(self, root_node_id: str, dag_id: int = None) ->
RepresentationDag:
+ dag = RepresentationDag(
+ nodes=self.global_nodes,
+ root_node_id=root_node_id,
+ dag_id=dag_id if dag_id is not None else self.dag_counter + 1,
+ )
+ self.dag_counter += 1
if not dag.validate():
raise ValueError("Invalid DAG construction")
return dag
diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
index f733dc61ae..215f27929f 100644
--- a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
+++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
@@ -62,7 +62,9 @@ class UnimodalOptimizer:
checkpoint_every: Optional[int] = 1,
resume: bool = False,
max_num_workers: int = -1,
+ enable_checkpointing: bool = True,
):
+ self.enable_checkpointing = enable_checkpointing
self.modalities = modalities
self.tasks = tasks
self.modality_ids = [modality.modality_id for modality in modalities]
@@ -295,7 +297,9 @@ class UnimodalOptimizer:
)
task_root_dag = RepresentationDag(
- nodes=[*dag.nodes, task_node], root_node_id=task_node_id
+ nodes=[*dag.nodes, task_node],
+ root_node_id=task_node_id,
+ dag_id=dag.dag_id,
)
expanded_dags.append(task_root_dag)
return expanded_dags
@@ -329,25 +333,28 @@ class UnimodalOptimizer:
)
dags =
self.add_aggregation_operator(self.builders[modality.modality_id], dags)
- dags = pushdown_aggregation(dags)
+ dags_with_pushdown = pushdown_aggregation(dags)
if skip_remaining > 0:
dags = dags[skip_remaining:]
- expanded_dags = self._expand_dags_with_task_roots(dags)
+ expanded_dags_with_task_roots = self._expand_dags_with_task_roots(
+ dags_with_pushdown
+ )
node_executor = NodeExecutor(
- expanded_dags,
+ expanded_dags_with_task_roots,
[modality],
self.tasks,
self._checkpoint_manager,
self.max_num_workers,
self.result_path,
+ enable_checkpointing=self.enable_checkpointing,
)
task_results = node_executor.run()
for task_result in task_results:
- local_results.add_task_result(task_result)
+ local_results.add_task_result(task_result, dags)
if self.save_all_results:
timestr = time.strftime("%Y%m%d-%H%M%S")
@@ -515,7 +522,7 @@ class UnimodalOptimizer:
[dag.root_node_id],
agg_op.get_current_parameters(),
)
- aggregated_dags.append(builder.build(agg_node_id))
+ aggregated_dags.append(builder.build(agg_node_id,
dag.dag_id))
else:
aggregated_dags.append(dag)
new_dags = aggregated_dags
@@ -596,10 +603,12 @@ class UnimodalResults:
self.results[modality] = {task_name: [] for task_name in
self.task_names}
self.cache[modality] = {task_name: [] for task_name in
self.task_names}
- def add_task_result(self, task_result: ResultEntry):
+ def add_task_result(self, task_result: ResultEntry, dags:
List[RepresentationDag]):
+ dag_id = task_result.dag.dag_id
task_name = self.task_names[
task_result.dag.nodes[-1].parameters.get("_task_idx", 0)
]
+ task_result.dag = get_dag_by_id(dags, dag_id)
self.results[task_result.dag.nodes[0].modality_id][task_name].append(
task_result
)
@@ -716,3 +725,10 @@ class UnimodalResults:
] = cache
return results, cache
+
+
+def get_dag_by_id(dags: List[RepresentationDag], dag_id: int) ->
RepresentationDag:
+ for dag in dags:
+ if dag.dag_id == dag_id:
+ return dag
+ return None
diff --git a/src/main/python/systemds/scuro/modality/transformed.py
b/src/main/python/systemds/scuro/modality/transformed.py
index eaaa7a2032..e02695166d 100644
--- a/src/main/python/systemds/scuro/modality/transformed.py
+++ b/src/main/python/systemds/scuro/modality/transformed.py
@@ -19,6 +19,7 @@
#
# -------------------------------------------------------------
from typing import Union, List
+import inspect
import numpy as np
from systemds.scuro.modality.type import ModalityType
from systemds.scuro.modality.joined import JoinedModality
@@ -165,7 +166,11 @@ class TransformedModality(Modality):
def apply_representation(self, representation, aggregation=None):
start = time.time()
- new_modality = representation.transform(self, aggregation=aggregation)
+ transform_sig = inspect.signature(representation.transform)
+ if "aggregation" in transform_sig.parameters:
+ new_modality = representation.transform(self,
aggregation=aggregation)
+ else:
+ new_modality = representation.transform(self)
new_modality.update_metadata()
new_modality.transform_time += time.time() - start
new_modality.self_contained = representation.self_contained
diff --git
a/src/main/python/systemds/scuro/representations/aggregated_representation.py
b/src/main/python/systemds/scuro/representations/aggregated_representation.py
index 4caa22762b..85744f9209 100644
---
a/src/main/python/systemds/scuro/representations/aggregated_representation.py
+++
b/src/main/python/systemds/scuro/representations/aggregated_representation.py
@@ -52,9 +52,9 @@ class AggregatedRepresentation(Representation):
def get_output_stats(self, input_stats: RepresentationStats) ->
RepresentationStats:
input_shape = list(copy.deepcopy(input_stats.output_shape))
- input_aggregate_dim = copy.deepcopy(input_stats.aggregate_dim)
- for input_aggregate_dim in reversed(input_aggregate_dim):
- input_shape.pop(input_aggregate_dim)
+ if self.target_dimensions is not None:
+ while len(input_shape) > self.target_dimensions:
+ input_shape.pop()
out_shape = tuple(input_shape)
self.stats = RepresentationStats(
input_stats.num_instances,
@@ -100,11 +100,11 @@ class AggregatedRepresentation(Representation):
if len(input_dimensions) == self.target_dimensions:
return modality
else:
-
- i = 1
- while len(input_dimensions) - 1 > self.target_dimensions:
+ i = len(input_dimensions) - 1
+ aggregate_dim = ()
+ while len(input_dimensions) > self.target_dimensions:
aggregate_dim = aggregate_dim + (i,)
- i += 1
+ i -= 1
input_dimensions = input_dimensions[:-1]
aggregated_data = self.aggregation.execute(modality, aggregate_dim)
diff --git a/src/main/python/systemds/scuro/representations/bert.py
b/src/main/python/systemds/scuro/representations/bert.py
index ab23f46345..fcaed8d493 100644
--- a/src/main/python/systemds/scuro/representations/bert.py
+++ b/src/main/python/systemds/scuro/representations/bert.py
@@ -329,7 +329,7 @@ class Bert(BertFamily):
self.C = 0.3
-# @register_representation(ModalityType.TEXT)
+@register_representation(ModalityType.TEXT)
class RoBERTa(BertFamily):
def __init__(
self,
diff --git a/src/main/python/systemds/scuro/representations/bow.py
b/src/main/python/systemds/scuro/representations/bow.py
index 65e490972a..9e55add5de 100644
--- a/src/main/python/systemds/scuro/representations/bow.py
+++ b/src/main/python/systemds/scuro/representations/bow.py
@@ -43,7 +43,7 @@ class BoW(UnimodalRepresentation):
def get_output_stats(self, input_stats: TextStats) -> RepresentationStats:
vocab_estimate = min(
- 100_000,
+ 100000,
max(
1000,
input_stats.num_instances * input_stats.max_length *
self.ngram_range,
@@ -76,7 +76,11 @@ class BoW(UnimodalRepresentation):
ngram_range=(1, self.ngram_range), min_df=self.min_df
)
- X = vectorizer.fit_transform(modality.data).toarray()
+ X = (
+ vectorizer.fit_transform(modality.data)
+ .toarray()
+ .astype(np.float32, copy=False)
+ )
if self.output_file is not None:
save_embeddings(X, self.output_file)
diff --git a/src/main/python/systemds/scuro/representations/mlp_averaging.py
b/src/main/python/systemds/scuro/representations/mlp_averaging.py
index bedfaf415b..8c8d67a06e 100644
--- a/src/main/python/systemds/scuro/representations/mlp_averaging.py
+++ b/src/main/python/systemds/scuro/representations/mlp_averaging.py
@@ -118,19 +118,18 @@ class MLPAveraging(DimensionalityReduction):
batch_input_bytes = batch * input_dim * elem_size
batch_output_bytes = batch * out_dim * elem_size
-
- num_batches = (n + batch - 1) // batch
- python_overhead = num_batches * 1024
-
+ input_torch_copy_bytes = input_bytes
+ output_accum_transient_bytes = output_bytes
cpu_working = (
input_bytes
- + 2 * output_bytes
+ + input_torch_copy_bytes
+ + output_bytes
+ + output_accum_transient_bytes
+ weight_bytes
+ batch_input_bytes
+ batch_output_bytes
- + python_overhead
)
- cpu_peak = int(cpu_working * 1.20 + 64 * 1024**2)
+ cpu_peak = int(cpu_working * 1.15 + 64 * 1024**2)
gpu_working = weight_bytes + batch_input_bytes + batch_output_bytes
gpu_peak = int(gpu_working * 1.35 + 560 * 1024**2)
diff --git a/src/main/python/systemds/scuro/representations/tfidf.py
b/src/main/python/systemds/scuro/representations/tfidf.py
index 3fd61099f1..0b603f247e 100644
--- a/src/main/python/systemds/scuro/representations/tfidf.py
+++ b/src/main/python/systemds/scuro/representations/tfidf.py
@@ -56,19 +56,16 @@ class TfIdf(UnimodalRepresentation):
)
def estimate_peak_memory_bytes(self, input_stats: TextStats) -> dict:
- output_bytes = self.estimate_output_memory_bytes(input_stats)
- vectorizer_overhead = 640 * 1024
- return {
- "cpu_peak_bytes": output_bytes + vectorizer_overhead,
- "gpu_peak_bytes": 0,
- }
+ dense_bytes = self.estimate_output_memory_bytes(input_stats)
+ cpu_peak = int(dense_bytes * 2.2 + 32 * 1024 * 1024)
+ return {"cpu_peak_bytes": cpu_peak, "gpu_peak_bytes": 0}
def transform(self, modality, aggregation=None):
transformed_modality = TransformedModality(modality, self)
vectorizer = TfidfVectorizer(min_df=self.min_df)
- X = vectorizer.fit_transform(modality.data)
+ X = vectorizer.fit_transform(modality.data).astype(np.float32,
copy=False)
if self.output_file is not None:
save_embeddings(X, self.output_file)
diff --git
a/src/main/python/systemds/scuro/representations/window_aggregation.py
b/src/main/python/systemds/scuro/representations/window_aggregation.py
index a34b6ebe4c..541a7b68fb 100644
--- a/src/main/python/systemds/scuro/representations/window_aggregation.py
+++ b/src/main/python/systemds/scuro/representations/window_aggregation.py
@@ -167,11 +167,18 @@ class WindowAggregation(Window):
in_numel = effective_seq_len * self._rest_numel(in_shape)
output_bytes = self.estimate_output_memory_bytes(input_stats)
one_instance_bytes = in_numel * np.dtype(self.data_type).itemsize
- cpu_peak = (
- output_bytes * 2
- + one_instance_bytes * input_stats.num_instances
- + one_instance_bytes * self.window_size
- + 8 * 1024 * 1024
+ input_bytes = one_instance_bytes * input_stats.num_instances
+
+ output_transient = output_bytes
+
+ pad_overhead = 0
+ if getattr(self, "pad", False):
+ out_seq_len = math.ceil(in_shape[0] / self.window_size)
+ pad_overhead = int(input_stats.num_instances * out_seq_len * 8)
+
+ cpu_peak = int(
+ (input_bytes + output_bytes + output_transient + pad_overhead) *
1.15
+ + 16 * 1024 * 1024
)
return {"cpu_peak_bytes": cpu_peak, "gpu_peak_bytes": 0}
@@ -298,12 +305,12 @@ class StaticWindow(Window):
return {"cpu_peak_bytes": 0, "gpu_peak_bytes": 0}
effective_seq_len = in_shape[0]
in_numel = effective_seq_len * self._rest_numel(in_shape)
- output_bytes = self.estimate_output_memory_bytes(input_stats)
one_instance_bytes = in_numel * np.dtype(self.data_type).itemsize
- cpu_peak = (
- output_bytes * 2
- + one_instance_bytes * input_stats.num_instances
- + 8 * 1024 * 1024
+ input_bytes = one_instance_bytes * input_stats.num_instances
+ output_bytes = self.estimate_output_memory_bytes(input_stats)
+ output_transient = output_bytes
+ cpu_peak = int(
+ (input_bytes + output_bytes + output_transient) * 1.12 + 12 * 1024
* 1024
)
return {"cpu_peak_bytes": cpu_peak, "gpu_peak_bytes": 0}
diff --git a/src/main/python/tests/scuro/test_hp_tuner.py
b/src/main/python/tests/scuro/test_hp_tuner.py
index 72591d74cc..c418cefcae 100644
--- a/src/main/python/tests/scuro/test_hp_tuner.py
+++ b/src/main/python/tests/scuro/test_hp_tuner.py
@@ -20,6 +20,7 @@
# -------------------------------------------------------------
import unittest
+from types import SimpleNamespace
import numpy as np
@@ -54,163 +55,196 @@ from unittest.mock import patch
class TestHPTuner(unittest.TestCase):
- # Note: HPTuner is being refactored and not yet ready for testing
- pass
-
-
-# data_generator = None
-# num_instances = 0
-
-# @classmethod
-# def setUpClass(cls):
-# cls.num_instances = 10
-# cls.mods = [
-# ModalityType.VIDEO,
-# ModalityType.AUDIO,
-# ModalityType.TEXT,
-# ModalityType.IMAGE,
-# ]
-# cls.indices = np.array(range(cls.num_instances))
-# cls.tasks = [
-# TestTask("UnimodalRepresentationTask1", "TestSVM1",
cls.num_instances),
-# TestTask("UnimodalRepresentationTask2", "TestSVM2",
cls.num_instances),
-# ]
-
-# def test_hp_tuner_for_audio_modality(self):
-# audio_data, audio_md =
ModalityRandomDataGenerator().create_audio_data(
-# self.num_instances, 3000
-# )
-# audio = UnimodalModality(
-# TestDataLoader(
-# self.indices, None, ModalityType.AUDIO, audio_data,
np.float32, audio_md
-# )
-# )
-
-# self.run_hp_for_modality([audio])
-
-# def test_multimodal_hp_tuning(self):
-# audio_data, audio_md =
ModalityRandomDataGenerator().create_audio_data(
-# self.num_instances, 3000
-# )
-# audio = UnimodalModality(
-# TestDataLoader(
-# self.indices, None, ModalityType.AUDIO, audio_data,
np.float32, audio_md
-# )
-# )
-
-# text_data, text_md = ModalityRandomDataGenerator().create_text_data(
-# self.num_instances
-# )
-# text = UnimodalModality(
-# TestDataLoader(
-# self.indices, None, ModalityType.TEXT, text_data, str,
text_md
-# )
-# )
-
-# # self.run_hp_for_modality(
-# # [audio, text], multimodal=True,
tune_unimodal_representations=True
-# # )
-# self.run_hp_for_modality(
-# [audio, text], multimodal=True,
tune_unimodal_representations=False
-# )
-
-# def test_hp_tuner_for_text_modality(self):
-# text_data, text_md =
ModalityRandomDataGenerator().create_text_data(
-# self.num_instances
-# )
-# text = UnimodalModality(
-# TestDataLoader(
-# self.indices, None, ModalityType.TEXT, text_data, str,
text_md
-# )
-# )
-# self.run_hp_for_modality([text])
-
-# def test_hp_tuner_for_image_modality(self):
-# image_data, image_md =
ModalityRandomDataGenerator().create_visual_modality(
-# self.num_instances, 1
-# )
-# image = UnimodalModality(
-# TestDataLoader(
-# self.indices, None, ModalityType.IMAGE, image_data,
np.float32, image_md
-# )
-# )
-# self.run_hp_for_modality([image])
-
-# def run_hp_for_modality(
-# self, modalities, multimodal=False,
tune_unimodal_representations=False
-# ):
-# with patch.object(
-# Registry,
-# "_representations",
-# {
-# ModalityType.TEXT: [W2V, BoW],
-# ModalityType.AUDIO: [Spectrogram, ZeroCrossing, Spectral,
Pitch],
-# ModalityType.TIMESERIES: [ResNet],
-# ModalityType.VIDEO: [ResNet],
-# ModalityType.IMAGE: [ResNet, ColorHistogram],
-# ModalityType.EMBEDDING: [],
-# },
-# ):
-# registry = Registry()
-# registry._fusion_operators = [LSTM]
-# unimodal_optimizer = UnimodalOptimizer(modalities, self.tasks,
False)
-# unimodal_optimizer.optimize()
-
-# hp = HyperparameterTuner(
-# modalities,
-# self.tasks,
-# unimodal_optimizer.operator_performance,
-# n_jobs=1,
-# )
-
-# if multimodal:
-# m_o = MultimodalOptimizer(
-# modalities,
-# unimodal_optimizer.operator_performance,
-# self.tasks,
-# debug=False,
-# min_modalities=2,
-# max_modalities=3,
-# )
-# fusion_results = m_o.optimize(20)
-
-# hp.tune_multimodal_representations(
-# fusion_results,
-# k=1,
-# optimize_unimodal=tune_unimodal_representations,
-# max_eval_per_rep=10,
-# )
-
-# else:
-# hp.tune_unimodal_representations(max_eval_per_rep=10)
-
-# assert len(hp.optimization_results.results) == len(self.tasks)
-# if multimodal:
-# if tune_unimodal_representations:
-# assert (
-# len(
-#
hp.optimization_results.results[self.tasks[0].model.name][0]
-# )
-# == 1
-# )
-# else:
-# assert (
-# len(
-#
hp.optimization_results.results[self.tasks[0].model.name][
-# "mm_results"
-# ]
-# )
-# == 1
-# )
-# else:
-# assert (
-#
len(hp.optimization_results.results[self.tasks[0].model.name]) == 1
-# )
-# assert (
-#
len(hp.optimization_results.results[self.tasks[0].model.name][0])
-# == 2
-# )
-
-
-# if __name__ == "__main__":
-# unittest.main()
+ data_generator = None
+ num_instances = 0
+
+ @classmethod
+ def setUpClass(cls):
+ cls.num_instances = 10
+ cls.mods = [
+ ModalityType.VIDEO,
+ ModalityType.AUDIO,
+ ModalityType.TEXT,
+ ModalityType.IMAGE,
+ ]
+ cls.indices = np.array(range(cls.num_instances))
+ cls.tasks = [
+ TestTask("UnimodalRepresentationTask1", "TestSVM1",
cls.num_instances),
+ TestTask("UnimodalRepresentationTask2", "TestSVM2",
cls.num_instances),
+ ]
+
+ def test_hp_tuner_for_text_modality(self):
+ text_data, text_md = ModalityRandomDataGenerator().create_text_data(
+ self.num_instances
+ )
+ text = UnimodalModality(
+ TestDataLoader(
+ self.indices, None, ModalityType.TEXT, text_data, str, text_md
+ )
+ )
+ self.run_hp_for_modality([text])
+
+ # TODO: Add once the final multimodal optimizer is implemented
+ # def test_multimodal_hp_tuning(self):
+ # audio_data, audio_md =
ModalityRandomDataGenerator().create_audio_data(
+ # self.num_instances, 3000
+ # )
+ # audio = UnimodalModality(
+ # TestDataLoader(
+ # self.indices, None, ModalityType.AUDIO, audio_data,
np.float32, audio_md
+ # )
+ # )
+
+ # text_data, text_md = ModalityRandomDataGenerator().create_text_data(
+ # self.num_instances
+ # )
+ # text = UnimodalModality(
+ # TestDataLoader(
+ # self.indices, None, ModalityType.TEXT, text_data, str,
text_md
+ # )
+ # )
+
+ # self.run_hp_for_modality(
+ # [audio, text], multimodal=True,
tune_unimodal_representations=False
+ # )
+
+ def test_hp_tuner_for_image_modality(self):
+ image_data, image_md =
ModalityRandomDataGenerator().create_visual_modality(
+ self.num_instances, 1
+ )
+ image = UnimodalModality(
+ TestDataLoader(
+ self.indices, None, ModalityType.IMAGE, image_data,
np.float32, image_md
+ )
+ )
+ self.run_hp_for_modality([image])
+
+ def run_hp_for_modality(
+ self, modalities, multimodal=False, tune_unimodal_representations=False
+ ):
+ with patch.object(
+ Registry,
+ "_representations",
+ {
+ ModalityType.TEXT: [BoW, W2V],
+ ModalityType.AUDIO: [Spectrogram, ZeroCrossing, Spectral,
Pitch],
+ ModalityType.TIMESERIES: [ResNet],
+ ModalityType.VIDEO: [ResNet],
+ ModalityType.IMAGE: [ResNet, ColorHistogram],
+ ModalityType.EMBEDDING: [],
+ },
+ ):
+ registry = Registry()
+ registry._fusion_operators = [LSTM]
+ unimodal_optimizer = UnimodalOptimizer(modalities, self.tasks,
False)
+ unimodal_optimizer.optimize()
+
+ hp = HyperparameterTuner(
+ modalities,
+ self.tasks,
+ unimodal_optimizer.operator_performance,
+ n_jobs=1,
+ )
+
+ if multimodal:
+ m_o = MultimodalOptimizer(
+ modalities,
+ unimodal_optimizer.operator_performance,
+ self.tasks,
+ debug=False,
+ min_modalities=2,
+ max_modalities=3,
+ )
+ fusion_results = m_o.optimize(20)
+
+ hp.tune_multimodal_representations(
+ fusion_results,
+ k=1,
+ optimize_unimodal=tune_unimodal_representations,
+ max_eval_per_rep=10,
+ )
+
+ else:
+ hp.tune_unimodal_representations(max_eval_per_rep=10)
+
+ assert len(hp.optimization_results.results) == len(self.tasks)
+ if multimodal:
+ if tune_unimodal_representations:
+ assert (
+ len(
+
hp.optimization_results.results[self.tasks[0].model.name][0]
+ )
+ == 1
+ )
+ else:
+ assert (
+ len(
+
hp.optimization_results.results[self.tasks[0].model.name][
+ "mm_results"
+ ]
+ )
+ == 1
+ )
+ else:
+ assert (
+
len(hp.optimization_results.results[self.tasks[0].model.name]) == 1
+ )
+ modality_id = modalities[0].modality_id
+ assert (
+ len(
+
hp.optimization_results.results[self.tasks[0].model.name][
+ modality_id
+ ]
+ )
+ == 2
+ )
+
+ def test_evaluate_configs_deduplicates_candidates(self):
+ class DummyOptimizationResults:
+ def get_k_best_results(self, modality, task,
performance_metric_name):
+ return [], []
+
+ task = SimpleNamespace(model=SimpleNamespace(name="dummy_task"))
+ hp = HyperparameterTuner(
+ modalities=[],
+ tasks=[task],
+ optimization_results=DummyOptimizationResults(),
+ n_jobs=1,
+ )
+
+ eval_calls = {"count": 0}
+
+ def fake_evaluate(
+ dag, params, node_order, modality_ids, task,
modalities_override=None
+ ):
+ eval_calls["count"] += 1
+ return params, float(params["x"])
+
+ hp.evaluate_dag_config = fake_evaluate
+
+ candidate_configs = [
+ {"x": 1},
+ {"x": 1},
+ {"x": 2},
+ {"x": 2},
+ {"x": 1},
+ ]
+ seen_configs = {}
+
+ results = hp._evaluate_configs(
+ dag=None,
+ task=None,
+ node_order=[],
+ modality_ids=[],
+ modalities_override=None,
+ candidate_configs=candidate_configs,
+ seen_configs=seen_configs,
+ )
+
+ self.assertEqual(eval_calls["count"], 2)
+ self.assertEqual(len(seen_configs), 2)
+ self.assertEqual([r[0]["x"] for r in results], [1, 2])
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/main/python/tests/scuro/test_unimodal_optimizer.py
b/src/main/python/tests/scuro/test_unimodal_optimizer.py
index 4fe98800e6..3c5ce2a67f 100644
--- a/src/main/python/tests/scuro/test_unimodal_optimizer.py
+++ b/src/main/python/tests/scuro/test_unimodal_optimizer.py
@@ -24,23 +24,10 @@ import unittest
import numpy as np
from systemds.scuro.representations.clip import CLIPText, CLIPVisual
-from systemds.scuro.representations.bert import ALBERT, ELECTRA, RoBERTa,
DistillBERT
from systemds.scuro.representations.color_histogram import ColorHistogram
-from systemds.scuro.representations.glove import GloVe
-from systemds.scuro.representations.timeseries_representations import (
- Mean,
- ACF,
-)
from systemds.scuro.drsearch.operator_registry import Registry
from systemds.scuro.drsearch.unimodal_optimizer import UnimodalOptimizer
-from systemds.scuro.drsearch.representation_dag import RepresentationNode
-from systemds.scuro.representations.representation import RepresentationStats
-from systemds.scuro.representations.spectrogram import Spectrogram
-from systemds.scuro.representations.covarep_audio_features import (
- ZeroCrossing,
-)
-from systemds.scuro.representations.vgg import VGG19
from systemds.scuro.representations.word2vec import W2V
from systemds.scuro.representations.bow import BoW
from systemds.scuro.representations.bert import Bert
@@ -51,7 +38,17 @@ from tests.scuro.data_generator import (
TestDataLoader,
TestTask,
)
+import copy
+from systemds.scuro.drsearch.representation_dag import (
+ CSEAwareDAGBuilder,
+ RepresentationDag,
+ pushdown_aggregation,
+)
+from systemds.scuro.representations.aggregated_representation import (
+ AggregatedRepresentation,
+)
+from systemds.scuro.representations.bert import Bert
from systemds.scuro.modality.type import ModalityType
from unittest.mock import patch
@@ -73,19 +70,6 @@ class TestUnimodalRepresentationOptimizer(unittest.TestCase):
TestTask("UnimodalRepresentationTask2", "Test2",
cls.num_instances),
]
- # Note: Audio optimizer still needs some work
- # def test_unimodal_optimizer_for_audio_modality(self):
- # audio_data, audio_md =
ModalityRandomDataGenerator().create_audio_data(
- # self.num_instances, 3000
- # )
- # audio = UnimodalModality(
- # TestDataLoader(
- # self.indices, None, ModalityType.AUDIO, audio_data,
np.float32, audio_md
- # )
- # )
-
- # self.optimize_unimodal_representation_for_modality([audio])
-
def test_unimodal_optimizer_for_text_modality(self):
text_data, text_md = ModalityRandomDataGenerator().create_text_data(
self.num_instances, 10
@@ -127,18 +111,6 @@ class
TestUnimodalRepresentationOptimizer(unittest.TestCase):
)
self.optimize_unimodal_representation_for_modality([text, image])
- # Note: Timeseries optimizer still needs some work
- # def test_unimodal_optimizer_for_ts_modality(self):
- # ts_data, ts_md =
ModalityRandomDataGenerator().create_timeseries_data(
- # self.num_instances, 1000
- # )
- # ts = UnimodalModality(
- # TestDataLoader(
- # self.indices, None, ModalityType.TIMESERIES, ts_data,
np.float32, ts_md
- # )
- # )
- # self.optimize_unimodal_representation_for_modality([ts])
-
def test_unimodal_optimizer_for_video_modality(self):
video_data, video_md =
ModalityRandomDataGenerator().create_visual_modality(
self.num_instances, 10, 10
@@ -150,6 +122,67 @@ class
TestUnimodalRepresentationOptimizer(unittest.TestCase):
)
self.optimize_unimodal_representation_for_modality([video])
+ def
test_aggregation_pushdown_preserves_dag_id_and_bert_node_parameters(self):
+ builder = CSEAwareDAGBuilder()
+ modality_id = "test_modality_agg_pushdown"
+ leaf_id = builder.create_leaf_node(modality_id)
+
+ bert = Bert()
+ bert_id = builder.create_operation_node(
+ Bert, [leaf_id], bert.get_current_parameters()
+ )
+
+ agg = AggregatedRepresentation(target_dimensions=1)
+ agg_id = builder.create_operation_node(
+ AggregatedRepresentation,
+ [bert_id],
+ agg.get_current_parameters(),
+ )
+
+ expected_dag_id = 1001
+ dag = RepresentationDag(
+ nodes=copy.deepcopy(builder.global_nodes),
+ root_node_id=agg_id,
+ dag_id=expected_dag_id,
+ )
+
+ by_id = {n.node_id: n for n in dag.nodes}
+ self.assertEqual(len(dag.nodes), 3)
+ self.assertEqual(dag.dag_id, expected_dag_id)
+ self.assertEqual(dag.root_node_id, agg_id)
+
+ self.assertEqual(by_id[leaf_id].inputs, [])
+ self.assertEqual(by_id[bert_id].inputs, [leaf_id])
+ self.assertEqual(by_id[agg_id].inputs, [bert_id])
+ self.assertIs(by_id[bert_id].operation, Bert)
+ self.assertIs(by_id[agg_id].operation, AggregatedRepresentation)
+
+ bert_params_before = copy.deepcopy(by_id[bert_id].parameters)
+ agg_params_snapshot = copy.deepcopy(by_id[agg_id].parameters)
+ self.assertNotIn("_pushdown_aggregation", bert_params_before)
+
+ pushdown_aggregation([dag])
+
+ self.assertEqual(dag.dag_id, expected_dag_id)
+ self.assertEqual(dag.root_node_id, bert_id)
+ self.assertEqual(len(dag.nodes), 2)
+ self.assertIsNone(dag.get_node_by_id(agg_id))
+
+ bert_after = dag.get_node_by_id(bert_id)
+ self.assertIsNotNone(bert_after)
+ self.assertEqual(bert_after.inputs, [leaf_id])
+ self.assertIn("_pushdown_aggregation", bert_after.parameters)
+ self.assertEqual(
+ bert_after.parameters["_pushdown_aggregation"],
+ agg_params_snapshot,
+ )
+ remaining = {
+ k: v
+ for k, v in bert_after.parameters.items()
+ if k != "_pushdown_aggregation"
+ }
+ self.assertEqual(remaining, bert_params_before)
+
def optimize_unimodal_representation_for_modality(self, modalities):
with patch.object(
Registry,
@@ -161,8 +194,6 @@ class
TestUnimodalRepresentationOptimizer(unittest.TestCase):
Bert,
CLIPText,
],
- ModalityType.AUDIO: [Spectrogram, ZeroCrossing],
- ModalityType.TIMESERIES: [Mean, ACF],
ModalityType.VIDEO: [ResNet],
ModalityType.IMAGE: [ColorHistogram, CLIPVisual],
ModalityType.EMBEDDING: [],
@@ -171,7 +202,12 @@ class
TestUnimodalRepresentationOptimizer(unittest.TestCase):
registry = Registry()
unimodal_optimizer = UnimodalOptimizer(
- modalities, self.tasks, False, k=1, max_num_workers=1
+ modalities,
+ self.tasks,
+ False,
+ k=1,
+ max_num_workers=1,
+ enable_checkpointing=False,
)
unimodal_optimizer.optimize()
for modality in modalities:
@@ -185,4 +221,3 @@ class
TestUnimodalRepresentationOptimizer(unittest.TestCase):
modalities[0], self.tasks[0], "accuracy"
)
assert len(result) == 1
- assert len(cached) == 1