This is an automated email from the ASF dual-hosted git repository.
cdionysio pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 38d348ea11 [SYSTEMDS-3937] Improve timing and memory efficiency (#2396)
38d348ea11 is described below
commit 38d348ea11a571b3ec25e60d5e32a0ada0874a5e
Author: Christina Dionysio <[email protected]>
AuthorDate: Fri Jan 9 15:54:13 2026 +0100
[SYSTEMDS-3937] Improve timing and memory efficiency (#2396)
This patch fixes some issues with the timing of representations and updates
the caching mechanism for memory efficiency in the optimizer.
---
.../scuro/drsearch/multimodal_optimizer.py | 37 +++----
.../systemds/scuro/drsearch/representation_dag.py | 114 ++++++++++++++++-----
.../systemds/scuro/drsearch/unimodal_optimizer.py | 77 +++++++++-----
.../python/systemds/scuro/modality/modality.py | 11 +-
.../python/systemds/scuro/modality/transformed.py | 12 ++-
.../systemds/scuro/modality/unimodal_modality.py | 4 +-
.../representations/aggregated_representation.py | 4 +
.../systemds/scuro/representations/fusion.py | 3 +
src/main/python/tests/scuro/data_generator.py | 1 +
9 files changed, 180 insertions(+), 83 deletions(-)
diff --git a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
index 76831f6aae..93a78e2cc2 100644
--- a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
+++ b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
@@ -57,10 +57,7 @@ def _evaluate_dag_worker(dag_pickle, task_pickle,
modalities_pickle, debug=False
f"[DEBUG][worker] pid={os.getpid()} evaluating
dag_root={getattr(dag, 'root_node_id', None)} task={getattr(task.model, 'name',
None)}"
)
- dag_copy = copy.deepcopy(dag)
- task_copy = copy.deepcopy(task)
-
- fused_representation = dag_copy.execute(modalities_for_dag, task_copy)
+ fused_representation = dag.execute(modalities_for_dag, task)
if fused_representation is None:
return None
@@ -73,22 +70,22 @@ def _evaluate_dag_worker(dag_pickle, task_pickle,
modalities_pickle, debug=False
)
from systemds.scuro.representations.aggregate import Aggregation
- if task_copy.expected_dim == 1 and
get_shape(final_representation.metadata) > 1:
+ if task.expected_dim == 1 and get_shape(final_representation.metadata)
> 1:
agg_operator = AggregatedRepresentation(Aggregation())
final_representation = agg_operator.transform(final_representation)
eval_start = time.time()
- scores = task_copy.run(final_representation.data)
+ scores = task.run(final_representation.data)
eval_time = time.time() - eval_start
total_time = time.time() - start_time
return OptimizationResult(
- dag=dag_copy,
+ dag=dag,
train_score=scores[0].average_scores,
val_score=scores[1].average_scores,
test_score=scores[2].average_scores,
runtime=total_time,
- task_name=task_copy.model.name,
+ task_name=task.model.name,
task_time=eval_time,
representation_time=total_time - eval_time,
)
@@ -354,21 +351,14 @@ class MultimodalOptimizer:
def _evaluate_dag(self, dag: RepresentationDag, task: Task) ->
"OptimizationResult":
start_time = time.time()
try:
- tid = threading.get_ident()
- tname = threading.current_thread().name
- dag_copy = copy.deepcopy(dag)
- modalities_for_dag = copy.deepcopy(
+ fused_representation = dag.execute(
list(
chain.from_iterable(
self.k_best_representations[task.model.name].values()
)
- )
- )
- task_copy = copy.deepcopy(task)
- fused_representation = dag_copy.execute(
- modalities_for_dag,
- task_copy,
+ ),
+ task,
)
torch.cuda.empty_cache()
@@ -379,27 +369,24 @@ class MultimodalOptimizer:
final_representation = fused_representation[
list(fused_representation.keys())[-1]
]
- if (
- task_copy.expected_dim == 1
- and get_shape(final_representation.metadata) > 1
- ):
+ if task.expected_dim == 1 and
get_shape(final_representation.metadata) > 1:
agg_operator = AggregatedRepresentation(Aggregation())
final_representation =
agg_operator.transform(final_representation)
eval_start = time.time()
- scores = task_copy.run(final_representation.data)
+ scores = task.run(final_representation.data)
eval_time = time.time() - eval_start
total_time = time.time() - start_time
return OptimizationResult(
- dag=dag_copy,
+ dag=dag,
train_score=scores[0].average_scores,
val_score=scores[1].average_scores,
test_score=scores[2].average_scores,
runtime=total_time,
representation_time=total_time - eval_time,
- task_name=task_copy.model.name,
+ task_name=task.model.name,
task_time=eval_time,
)
diff --git a/src/main/python/systemds/scuro/drsearch/representation_dag.py
b/src/main/python/systemds/scuro/drsearch/representation_dag.py
index 5543da32dd..ff46d1db95 100644
--- a/src/main/python/systemds/scuro/drsearch/representation_dag.py
+++ b/src/main/python/systemds/scuro/drsearch/representation_dag.py
@@ -32,6 +32,32 @@ from
systemds.scuro.representations.aggregated_representation import (
from systemds.scuro.representations.context import Context
from systemds.scuro.utils.identifier import get_op_id, get_node_id
+from collections import OrderedDict
+from typing import Any, Hashable, Optional
+
+
+class LRUCache:
+ def __init__(self, max_size: int = 256):
+ self.max_size = max_size
+ self._cache: "OrderedDict[Hashable, Any]" = OrderedDict()
+
+ def get(self, key: Hashable) -> Optional[Any]:
+ if key not in self._cache:
+ return None
+ value = self._cache.pop(key)
+ self._cache[key] = value
+ return value
+
+ def put(self, key: Hashable, value: Any) -> None:
+ if key in self._cache:
+ self._cache.pop(key)
+ elif len(self._cache) >= self.max_size:
+ self._cache.popitem(last=False)
+ self._cache[key] = value
+
+ def __len__(self) -> int:
+ return len(self._cache)
+
@dataclass
class RepresentationNode:
@@ -119,10 +145,22 @@ class RepresentationDag:
return not has_cycle(self.root_node_id, set())
+ def _compute_leaf_signature(self, node) -> Hashable:
+ return ("leaf", node.modality_id, node.representation_index)
+
+ def _compute_node_signature(self, node, input_sig_tuple) -> Hashable:
+ op_cls = node.operation
+ params_items = tuple(sorted((node.parameters or {}).items()))
+ return ("op", op_cls, params_items, input_sig_tuple)
+
def execute(
- self, modalities: List[Modality], task=None
+ self,
+ modalities: List[Modality],
+ task=None,
+ external_cache: Optional[LRUCache] = None,
) -> Dict[str, TransformedModality]:
- cache = {}
+ cache: Dict[str, TransformedModality] = {}
+ node_signatures: Dict[str, Hashable] = {}
def execute_node(node_id: str, task) -> TransformedModality:
if node_id in cache:
@@ -135,38 +173,58 @@ class RepresentationDag:
modalities, node.modality_id, node.representation_index
)
cache[node_id] = modality
+ node_signatures[node_id] = self._compute_leaf_signature(node)
return modality
input_mods = [execute_node(input_id, task) for input_id in
node.inputs]
+ input_signatures = tuple(
+ node_signatures[input_id] for input_id in node.inputs
+ )
+ node_signature = self._compute_node_signature(node,
input_signatures)
+ is_unimodal = len(input_mods) == 1
+
+ cached_result = None
+ if external_cache and is_unimodal:
+ cached_result = external_cache.get(node_signature)
+ if cached_result is not None:
+ result = cached_result
- node_operation = copy.deepcopy(node.operation())
- if len(input_mods) == 1:
- # It's a unimodal operation
- if isinstance(node_operation, Context):
- result = input_mods[0].context(node_operation)
- elif isinstance(node_operation, AggregatedRepresentation):
- result = node_operation.transform(input_mods[0])
- elif isinstance(node_operation, UnimodalRepresentation):
+ else:
+ node_operation = copy.deepcopy(node.operation())
+ if len(input_mods) == 1:
+ # It's a unimodal operation
+ if isinstance(node_operation, Context):
+ result = input_mods[0].context(node_operation)
+ elif isinstance(node_operation, AggregatedRepresentation):
+ result = node_operation.transform(input_mods[0])
+ elif isinstance(node_operation, UnimodalRepresentation):
+ if (
+ isinstance(input_mods[0], TransformedModality)
+ and input_mods[0].transformation[0].__class__
+ == node.operation
+ ):
+ # Avoid duplicate transformations
+ result = input_mods[0]
+ else:
+ # Compute the representation
+ result =
input_mods[0].apply_representation(node_operation)
+ else:
+ # It's a fusion operation
+ fusion_op = node_operation
if (
- isinstance(input_mods[0], TransformedModality)
- and input_mods[0].transformation[0].__class__ ==
node.operation
+ hasattr(fusion_op, "needs_training")
+ and fusion_op.needs_training
):
- # Avoid duplicate transformations
- result = input_mods[0]
+ result = input_mods[0].combine_with_training(
+ input_mods[1:], fusion_op, task
+ )
else:
- # Compute the representation
- result =
input_mods[0].apply_representation(node_operation)
- else:
- # It's a fusion operation
- fusion_op = node_operation
- if hasattr(fusion_op, "needs_training") and
fusion_op.needs_training:
- result = input_mods[0].combine_with_training(
- input_mods[1:], fusion_op, task
- )
- else:
- result = input_mods[0].combine(input_mods[1:], fusion_op)
+ result = input_mods[0].combine(input_mods[1:],
fusion_op)
+ if external_cache and is_unimodal:
+ external_cache.put(node_signature, result)
cache[node_id] = result
+ node_signatures[node_id] = node_signature
return result
execute_node(self.root_node_id, task)
@@ -230,3 +288,9 @@ class RepresentationDAGBuilder:
if not dag.validate():
raise ValueError("Invalid DAG construction")
return dag
+
+ def get_node(self, node_id: str) -> Optional[RepresentationNode]:
+ for node in self.nodes:
+ if node.node_id == node_id:
+ return node
+ return None
diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
index 4cde294b17..e9029d63ee 100644
--- a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
+++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
@@ -25,7 +25,7 @@ from dataclasses import dataclass
import multiprocessing as mp
from typing import List, Any
from functools import lru_cache
-
+from systemds.scuro.drsearch.task import Task
from systemds.scuro import ModalityType
from systemds.scuro.drsearch.ranking import rank_by_tradeoff
from systemds.scuro.drsearch.task import PerformanceMeasure
@@ -46,6 +46,7 @@ from systemds.scuro.drsearch.representation_dag import (
RepresentationDAGBuilder,
)
from systemds.scuro.drsearch.representation_dag_visualizer import visualize_dag
+from systemds.scuro.drsearch.representation_dag import LRUCache
class UnimodalOptimizer:
@@ -54,6 +55,7 @@ class UnimodalOptimizer:
):
self.modalities = modalities
self.tasks = tasks
+ self.modality_ids = [modality.modality_id for modality in modalities]
self.save_all_results = save_all_results
self.result_path = result_path
@@ -177,24 +179,27 @@ class UnimodalOptimizer:
modality_specific_operators = self._get_modality_operators(
modality.modality_type
)
-
+ dags = []
for operator in modality_specific_operators:
- dags = self._build_modality_dag(modality, operator())
-
- for dag in dags:
- representations = dag.execute([modality])
- node_id = list(representations.keys())[-1]
- node = dag.get_node_by_id(node_id)
- if node.operation is None:
- continue
-
- reps = self._get_representation_chain(node, dag)
- combination = next((op for op in reps if isinstance(op,
Fusion)), None)
- self._evaluate_local(
- representations[node_id], local_results, dag, combination
- )
- if self.debug:
- visualize_dag(dag)
+ dags.extend(self._build_modality_dag(modality, operator()))
+
+ external_cache = LRUCache(max_size=32)
+ for dag in dags:
+ representations = dag.execute(
+ [modality], task=self.tasks[0], external_cache=external_cache
+ ) # TODO: dynamic task selection
+ node_id = list(representations.keys())[-1]
+ node = dag.get_node_by_id(node_id)
+ if node.operation is None:
+ continue
+
+ reps = self._get_representation_chain(node, dag)
+ combination = next((op for op in reps if isinstance(op, Fusion)),
None)
+ self._evaluate_local(
+ representations[node_id], local_results, dag, combination
+ )
+ if self.debug:
+ visualize_dag(dag)
if self.save_all_results:
timestr = time.strftime("%Y%m%d-%H%M%S")
@@ -242,15 +247,21 @@ class UnimodalOptimizer:
agg_operator.get_current_parameters(),
)
dag = builder.build(rep_node_id)
- representations = dag.execute([modality])
- node_id = list(representations.keys())[-1]
+
+ aggregated_modality = agg_operator.transform(modality)
+
for task in self.tasks:
start = time.perf_counter()
- scores = task.run(representations[node_id].data)
+ scores = task.run(aggregated_modality.data)
end = time.perf_counter()
local_results.add_result(
- scores, modality, task.model.name, end - start,
combination, dag
+ scores,
+ aggregated_modality,
+ task.model.name,
+ end - start,
+ combination,
+ dag,
)
else:
modality.pad()
@@ -272,7 +283,10 @@ class UnimodalOptimizer:
agg_operator.get_current_parameters(),
)
dag = builder.build(rep_node_id)
+ start_rep = time.perf_counter()
representations = dag.execute([modality])
+ end_rep = time.perf_counter()
+ modality.transform_time += end_rep - start_rep
node_id = list(representations.keys())[-1]
start = time.perf_counter()
@@ -458,7 +472,9 @@ class UnimodalResults:
for entry in self.results[modality][task_name]:
print(f"{modality}_{task_name}: {entry}")
- def get_k_best_results(self, modality, k, task, performance_metric_name):
+ def get_k_best_results(
+ self, modality, k, task, performance_metric_name, prune_cache=False
+ ):
"""
Get the k best results for the given modality
:param modality: modality to get the best results for
@@ -488,6 +504,21 @@ class UnimodalResults:
cache_items = list(task_cache.items()) if task_cache else []
cache = [cache_items[i][1] for i in sorted_indices if i <
len(cache_items)]
+ if prune_cache:
+ # Note: in case the unimodal results are loaded from a file, we
need to initialize the cache for the modality and task
+ if modality.modality_id not in self.operator_performance.cache:
+ self.operator_performance.cache[modality.modality_id] = {}
+ if (
+ task.model.name
+ not in self.operator_performance.cache[modality.modality_id]
+ ):
+ self.operator_performance.cache[modality.modality_id][
+ task.model.name
+ ] = {}
+ self.operator_performance.cache[modality.modality_id][
+ task.model.name
+ ] = cache
+
return results, cache
diff --git a/src/main/python/systemds/scuro/modality/modality.py
b/src/main/python/systemds/scuro/modality/modality.py
index f6e0320469..d0cb148b20 100644
--- a/src/main/python/systemds/scuro/modality/modality.py
+++ b/src/main/python/systemds/scuro/modality/modality.py
@@ -18,11 +18,9 @@
# under the License.
#
# -------------------------------------------------------------
-from copy import deepcopy
from typing import List
import numpy as np
-from numpy.f2py.auxfuncs import throw_error
from systemds.scuro.modality.type import ModalityType
from systemds.scuro.representations import utils
@@ -31,7 +29,12 @@ from systemds.scuro.representations import utils
class Modality:
def __init__(
- self, modalityType: ModalityType, modality_id=-1, metadata={},
data_type=None
+ self,
+ modalityType: ModalityType,
+ modality_id=-1,
+ metadata={},
+ data_type=None,
+ transform_time=0,
):
"""
Parent class of the different Modalities (unimodal & multimodal)
@@ -45,7 +48,7 @@ class Modality:
self.cost = None
self.shape = None
self.modality_id = modality_id
- self.transform_time = None
+ self.transform_time = transform_time if transform_time else 0
@property
def data(self):
diff --git a/src/main/python/systemds/scuro/modality/transformed.py
b/src/main/python/systemds/scuro/modality/transformed.py
index 3b01465302..c19c90adaa 100644
--- a/src/main/python/systemds/scuro/modality/transformed.py
+++ b/src/main/python/systemds/scuro/modality/transformed.py
@@ -43,7 +43,11 @@ class TransformedModality(Modality):
metadata = modality.metadata.copy() if modality.metadata is not None
else None
super().__init__(
- new_modality_type, modality.modality_id, metadata,
modality.data_type
+ new_modality_type,
+ modality.modality_id,
+ metadata,
+ modality.data_type,
+ modality.transform_time,
)
self.transformation = None
self.self_contained = (
@@ -106,7 +110,7 @@ class TransformedModality(Modality):
)
start = time.time()
transformed_modality.data = w.execute(self)
- transformed_modality.transform_time = time.time() - start
+ transformed_modality.transform_time += time.time() - start
return transformed_modality
def context(self, context_operator):
@@ -115,14 +119,14 @@ class TransformedModality(Modality):
)
start = time.time()
transformed_modality.data = context_operator.execute(self)
- transformed_modality.transform_time = time.time() - start
+ transformed_modality.transform_time += time.time() - start
return transformed_modality
def apply_representation(self, representation):
start = time.time()
new_modality = representation.transform(self)
new_modality.update_metadata()
- new_modality.transform_time = time.time() - start
+ new_modality.transform_time += time.time() - start
new_modality.self_contained = representation.self_contained
return new_modality
diff --git a/src/main/python/systemds/scuro/modality/unimodal_modality.py
b/src/main/python/systemds/scuro/modality/unimodal_modality.py
index e4ed85cce3..22a40db16c 100644
--- a/src/main/python/systemds/scuro/modality/unimodal_modality.py
+++ b/src/main/python/systemds/scuro/modality/unimodal_modality.py
@@ -95,7 +95,7 @@ class UnimodalModality(Modality):
transformed_modality = TransformedModality(self, context_operator)
transformed_modality.data = context_operator.execute(self)
- transformed_modality.transform_time = time.time() - start
+ transformed_modality.transform_time += time.time() - start
return transformed_modality
def aggregate(self, aggregation_function):
@@ -191,6 +191,6 @@ class UnimodalModality(Modality):
)
new_modality.data = padded_embeddings
new_modality.update_metadata()
- new_modality.transform_time = time.time() - start
+ new_modality.transform_time += time.time() - start
new_modality.self_contained = representation.self_contained
return new_modality
diff --git
a/src/main/python/systemds/scuro/representations/aggregated_representation.py
b/src/main/python/systemds/scuro/representations/aggregated_representation.py
index 1e98d2f92a..bcc36f4621 100644
---
a/src/main/python/systemds/scuro/representations/aggregated_representation.py
+++
b/src/main/python/systemds/scuro/representations/aggregated_representation.py
@@ -21,6 +21,7 @@
from systemds.scuro.modality.transformed import TransformedModality
from systemds.scuro.representations.representation import Representation
from systemds.scuro.representations.aggregate import Aggregation
+import time
class AggregatedRepresentation(Representation):
@@ -33,8 +34,11 @@ class AggregatedRepresentation(Representation):
self.self_contained = True
def transform(self, modality):
+ start = time.perf_counter()
aggregated_modality = TransformedModality(
modality, self, self_contained=modality.self_contained
)
+ end = time.perf_counter()
+ aggregated_modality.transform_time += end - start
aggregated_modality.data = self.aggregation.execute(modality)
return aggregated_modality
diff --git a/src/main/python/systemds/scuro/representations/fusion.py
b/src/main/python/systemds/scuro/representations/fusion.py
index 7ac0200819..3f4257e64a 100644
--- a/src/main/python/systemds/scuro/representations/fusion.py
+++ b/src/main/python/systemds/scuro/representations/fusion.py
@@ -22,6 +22,8 @@ import copy
from typing import List
import numpy as np
+
+from systemds.scuro.modality.type import ModalityType
from systemds.scuro.representations.aggregated_representation import (
AggregatedRepresentation,
)
@@ -44,6 +46,7 @@ class Fusion(Representation):
self.needs_alignment = False
self.needs_training = False
self.needs_instance_alignment = False
+ self.output_modality_type = ModalityType.EMBEDDING
def transform(self, modalities: List[Modality]):
"""
diff --git a/src/main/python/tests/scuro/data_generator.py
b/src/main/python/tests/scuro/data_generator.py
index 9da0aa82c0..ae78c50b8a 100644
--- a/src/main/python/tests/scuro/data_generator.py
+++ b/src/main/python/tests/scuro/data_generator.py
@@ -66,6 +66,7 @@ class ModalityRandomDataGenerator:
self.modality_type = None
self.metadata = {}
self.data_type = np.float32
+ self.transform_time = None
def create1DModality(
self,