This is an automated email from the ASF dual-hosted git repository.

cdionysio pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 38d348ea11 [SYSTEMDS-3937] Improve timing and memory efficiency (#2396)
38d348ea11 is described below

commit 38d348ea11a571b3ec25e60d5e32a0ada0874a5e
Author: Christina Dionysio <[email protected]>
AuthorDate: Fri Jan 9 15:54:13 2026 +0100

    [SYSTEMDS-3937] Improve timing and memory efficiency (#2396)
    
    This patch fixes some issues with the timing of representations and updates 
the caching mechanism for memory efficiency in the optimizer.
---
 .../scuro/drsearch/multimodal_optimizer.py         |  37 +++----
 .../systemds/scuro/drsearch/representation_dag.py  | 114 ++++++++++++++++-----
 .../systemds/scuro/drsearch/unimodal_optimizer.py  |  77 +++++++++-----
 .../python/systemds/scuro/modality/modality.py     |  11 +-
 .../python/systemds/scuro/modality/transformed.py  |  12 ++-
 .../systemds/scuro/modality/unimodal_modality.py   |   4 +-
 .../representations/aggregated_representation.py   |   4 +
 .../systemds/scuro/representations/fusion.py       |   3 +
 src/main/python/tests/scuro/data_generator.py      |   1 +
 9 files changed, 180 insertions(+), 83 deletions(-)

diff --git a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py 
b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
index 76831f6aae..93a78e2cc2 100644
--- a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
+++ b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
@@ -57,10 +57,7 @@ def _evaluate_dag_worker(dag_pickle, task_pickle, 
modalities_pickle, debug=False
                 f"[DEBUG][worker] pid={os.getpid()} evaluating 
dag_root={getattr(dag, 'root_node_id', None)} task={getattr(task.model, 'name', 
None)}"
             )
 
-        dag_copy = copy.deepcopy(dag)
-        task_copy = copy.deepcopy(task)
-
-        fused_representation = dag_copy.execute(modalities_for_dag, task_copy)
+        fused_representation = dag.execute(modalities_for_dag, task)
         if fused_representation is None:
             return None
 
@@ -73,22 +70,22 @@ def _evaluate_dag_worker(dag_pickle, task_pickle, 
modalities_pickle, debug=False
         )
         from systemds.scuro.representations.aggregate import Aggregation
 
-        if task_copy.expected_dim == 1 and 
get_shape(final_representation.metadata) > 1:
+        if task.expected_dim == 1 and get_shape(final_representation.metadata) 
> 1:
             agg_operator = AggregatedRepresentation(Aggregation())
             final_representation = agg_operator.transform(final_representation)
 
         eval_start = time.time()
-        scores = task_copy.run(final_representation.data)
+        scores = task.run(final_representation.data)
         eval_time = time.time() - eval_start
         total_time = time.time() - start_time
 
         return OptimizationResult(
-            dag=dag_copy,
+            dag=dag,
             train_score=scores[0].average_scores,
             val_score=scores[1].average_scores,
             test_score=scores[2].average_scores,
             runtime=total_time,
-            task_name=task_copy.model.name,
+            task_name=task.model.name,
             task_time=eval_time,
             representation_time=total_time - eval_time,
         )
@@ -354,21 +351,14 @@ class MultimodalOptimizer:
     def _evaluate_dag(self, dag: RepresentationDag, task: Task) -> 
"OptimizationResult":
         start_time = time.time()
         try:
-            tid = threading.get_ident()
-            tname = threading.current_thread().name
 
-            dag_copy = copy.deepcopy(dag)
-            modalities_for_dag = copy.deepcopy(
+            fused_representation = dag.execute(
                 list(
                     chain.from_iterable(
                         self.k_best_representations[task.model.name].values()
                     )
-                )
-            )
-            task_copy = copy.deepcopy(task)
-            fused_representation = dag_copy.execute(
-                modalities_for_dag,
-                task_copy,
+                ),
+                task,
             )
 
             torch.cuda.empty_cache()
@@ -379,27 +369,24 @@ class MultimodalOptimizer:
             final_representation = fused_representation[
                 list(fused_representation.keys())[-1]
             ]
-            if (
-                task_copy.expected_dim == 1
-                and get_shape(final_representation.metadata) > 1
-            ):
+            if task.expected_dim == 1 and 
get_shape(final_representation.metadata) > 1:
                 agg_operator = AggregatedRepresentation(Aggregation())
                 final_representation = 
agg_operator.transform(final_representation)
 
             eval_start = time.time()
-            scores = task_copy.run(final_representation.data)
+            scores = task.run(final_representation.data)
             eval_time = time.time() - eval_start
 
             total_time = time.time() - start_time
 
             return OptimizationResult(
-                dag=dag_copy,
+                dag=dag,
                 train_score=scores[0].average_scores,
                 val_score=scores[1].average_scores,
                 test_score=scores[2].average_scores,
                 runtime=total_time,
                 representation_time=total_time - eval_time,
-                task_name=task_copy.model.name,
+                task_name=task.model.name,
                 task_time=eval_time,
             )
 
diff --git a/src/main/python/systemds/scuro/drsearch/representation_dag.py 
b/src/main/python/systemds/scuro/drsearch/representation_dag.py
index 5543da32dd..ff46d1db95 100644
--- a/src/main/python/systemds/scuro/drsearch/representation_dag.py
+++ b/src/main/python/systemds/scuro/drsearch/representation_dag.py
@@ -32,6 +32,32 @@ from 
systemds.scuro.representations.aggregated_representation import (
 from systemds.scuro.representations.context import Context
 from systemds.scuro.utils.identifier import get_op_id, get_node_id
 
+from collections import OrderedDict
+from typing import Any, Hashable, Optional
+
+
+class LRUCache:
+    def __init__(self, max_size: int = 256):
+        self.max_size = max_size
+        self._cache: "OrderedDict[Hashable, Any]" = OrderedDict()
+
+    def get(self, key: Hashable) -> Optional[Any]:
+        if key not in self._cache:
+            return None
+        value = self._cache.pop(key)
+        self._cache[key] = value
+        return value
+
+    def put(self, key: Hashable, value: Any) -> None:
+        if key in self._cache:
+            self._cache.pop(key)
+        elif len(self._cache) >= self.max_size:
+            self._cache.popitem(last=False)
+        self._cache[key] = value
+
+    def __len__(self) -> int:
+        return len(self._cache)
+
 
 @dataclass
 class RepresentationNode:
@@ -119,10 +145,22 @@ class RepresentationDag:
 
         return not has_cycle(self.root_node_id, set())
 
+    def _compute_leaf_signature(self, node) -> Hashable:
+        return ("leaf", node.modality_id, node.representation_index)
+
+    def _compute_node_signature(self, node, input_sig_tuple) -> Hashable:
+        op_cls = node.operation
+        params_items = tuple(sorted((node.parameters or {}).items()))
+        return ("op", op_cls, params_items, input_sig_tuple)
+
     def execute(
-        self, modalities: List[Modality], task=None
+        self,
+        modalities: List[Modality],
+        task=None,
+        external_cache: Optional[LRUCache] = None,
     ) -> Dict[str, TransformedModality]:
-        cache = {}
+        cache: Dict[str, TransformedModality] = {}
+        node_signatures: Dict[str, Hashable] = {}
 
         def execute_node(node_id: str, task) -> TransformedModality:
             if node_id in cache:
@@ -135,38 +173,58 @@ class RepresentationDag:
                     modalities, node.modality_id, node.representation_index
                 )
                 cache[node_id] = modality
+                node_signatures[node_id] = self._compute_leaf_signature(node)
                 return modality
 
             input_mods = [execute_node(input_id, task) for input_id in 
node.inputs]
+            input_signatures = tuple(
+                node_signatures[input_id] for input_id in node.inputs
+            )
+            node_signature = self._compute_node_signature(node, 
input_signatures)
+            is_unimodal = len(input_mods) == 1
+
+            cached_result = None
+            if external_cache and is_unimodal:
+                cached_result = external_cache.get(node_signature)
+            if cached_result is not None:
+                result = cached_result
 
-            node_operation = copy.deepcopy(node.operation())
-            if len(input_mods) == 1:
-                # It's a unimodal operation
-                if isinstance(node_operation, Context):
-                    result = input_mods[0].context(node_operation)
-                elif isinstance(node_operation, AggregatedRepresentation):
-                    result = node_operation.transform(input_mods[0])
-                elif isinstance(node_operation, UnimodalRepresentation):
+            else:
+                node_operation = copy.deepcopy(node.operation())
+                if len(input_mods) == 1:
+                    # It's a unimodal operation
+                    if isinstance(node_operation, Context):
+                        result = input_mods[0].context(node_operation)
+                    elif isinstance(node_operation, AggregatedRepresentation):
+                        result = node_operation.transform(input_mods[0])
+                    elif isinstance(node_operation, UnimodalRepresentation):
+                        if (
+                            isinstance(input_mods[0], TransformedModality)
+                            and input_mods[0].transformation[0].__class__
+                            == node.operation
+                        ):
+                            # Avoid duplicate transformations
+                            result = input_mods[0]
+                        else:
+                            # Compute the representation
+                            result = 
input_mods[0].apply_representation(node_operation)
+                else:
+                    # It's a fusion operation
+                    fusion_op = node_operation
                     if (
-                        isinstance(input_mods[0], TransformedModality)
-                        and input_mods[0].transformation[0].__class__ == 
node.operation
+                        hasattr(fusion_op, "needs_training")
+                        and fusion_op.needs_training
                     ):
-                        # Avoid duplicate transformations
-                        result = input_mods[0]
+                        result = input_mods[0].combine_with_training(
+                            input_mods[1:], fusion_op, task
+                        )
                     else:
-                        # Compute the representation
-                        result = 
input_mods[0].apply_representation(node_operation)
-            else:
-                # It's a fusion operation
-                fusion_op = node_operation
-                if hasattr(fusion_op, "needs_training") and 
fusion_op.needs_training:
-                    result = input_mods[0].combine_with_training(
-                        input_mods[1:], fusion_op, task
-                    )
-                else:
-                    result = input_mods[0].combine(input_mods[1:], fusion_op)
+                        result = input_mods[0].combine(input_mods[1:], 
fusion_op)
+                if external_cache and is_unimodal:
+                    external_cache.put(node_signature, result)
 
             cache[node_id] = result
+            node_signatures[node_id] = node_signature
             return result
 
         execute_node(self.root_node_id, task)
@@ -230,3 +288,9 @@ class RepresentationDAGBuilder:
         if not dag.validate():
             raise ValueError("Invalid DAG construction")
         return dag
+
+    def get_node(self, node_id: str) -> Optional[RepresentationNode]:
+        for node in self.nodes:
+            if node.node_id == node_id:
+                return node
+        return None
diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py 
b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
index 4cde294b17..e9029d63ee 100644
--- a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
+++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
@@ -25,7 +25,7 @@ from dataclasses import dataclass
 import multiprocessing as mp
 from typing import List, Any
 from functools import lru_cache
-
+from systemds.scuro.drsearch.task import Task
 from systemds.scuro import ModalityType
 from systemds.scuro.drsearch.ranking import rank_by_tradeoff
 from systemds.scuro.drsearch.task import PerformanceMeasure
@@ -46,6 +46,7 @@ from systemds.scuro.drsearch.representation_dag import (
     RepresentationDAGBuilder,
 )
 from systemds.scuro.drsearch.representation_dag_visualizer import visualize_dag
+from systemds.scuro.drsearch.representation_dag import LRUCache
 
 
 class UnimodalOptimizer:
@@ -54,6 +55,7 @@ class UnimodalOptimizer:
     ):
         self.modalities = modalities
         self.tasks = tasks
+        self.modality_ids = [modality.modality_id for modality in modalities]
         self.save_all_results = save_all_results
         self.result_path = result_path
 
@@ -177,24 +179,27 @@ class UnimodalOptimizer:
         modality_specific_operators = self._get_modality_operators(
             modality.modality_type
         )
-
+        dags = []
         for operator in modality_specific_operators:
-            dags = self._build_modality_dag(modality, operator())
-
-            for dag in dags:
-                representations = dag.execute([modality])
-                node_id = list(representations.keys())[-1]
-                node = dag.get_node_by_id(node_id)
-                if node.operation is None:
-                    continue
-
-                reps = self._get_representation_chain(node, dag)
-                combination = next((op for op in reps if isinstance(op, 
Fusion)), None)
-                self._evaluate_local(
-                    representations[node_id], local_results, dag, combination
-                )
-                if self.debug:
-                    visualize_dag(dag)
+            dags.extend(self._build_modality_dag(modality, operator()))
+
+        external_cache = LRUCache(max_size=32)
+        for dag in dags:
+            representations = dag.execute(
+                [modality], task=self.tasks[0], external_cache=external_cache
+            )  # TODO: dynamic task selection
+            node_id = list(representations.keys())[-1]
+            node = dag.get_node_by_id(node_id)
+            if node.operation is None:
+                continue
+
+            reps = self._get_representation_chain(node, dag)
+            combination = next((op for op in reps if isinstance(op, Fusion)), 
None)
+            self._evaluate_local(
+                representations[node_id], local_results, dag, combination
+            )
+            if self.debug:
+                visualize_dag(dag)
 
         if self.save_all_results:
             timestr = time.strftime("%Y%m%d-%H%M%S")
@@ -242,15 +247,21 @@ class UnimodalOptimizer:
                     agg_operator.get_current_parameters(),
                 )
                 dag = builder.build(rep_node_id)
-                representations = dag.execute([modality])
-                node_id = list(representations.keys())[-1]
+
+                aggregated_modality = agg_operator.transform(modality)
+
                 for task in self.tasks:
                     start = time.perf_counter()
-                    scores = task.run(representations[node_id].data)
+                    scores = task.run(aggregated_modality.data)
                     end = time.perf_counter()
 
                     local_results.add_result(
-                        scores, modality, task.model.name, end - start, 
combination, dag
+                        scores,
+                        aggregated_modality,
+                        task.model.name,
+                        end - start,
+                        combination,
+                        dag,
                     )
             else:
                 modality.pad()
@@ -272,7 +283,10 @@ class UnimodalOptimizer:
                         agg_operator.get_current_parameters(),
                     )
                     dag = builder.build(rep_node_id)
+                    start_rep = time.perf_counter()
                     representations = dag.execute([modality])
+                    end_rep = time.perf_counter()
+                    modality.transform_time += end_rep - start_rep
                     node_id = list(representations.keys())[-1]
 
                     start = time.perf_counter()
@@ -458,7 +472,9 @@ class UnimodalResults:
                 for entry in self.results[modality][task_name]:
                     print(f"{modality}_{task_name}: {entry}")
 
-    def get_k_best_results(self, modality, k, task, performance_metric_name):
+    def get_k_best_results(
+        self, modality, k, task, performance_metric_name, prune_cache=False
+    ):
         """
         Get the k best results for the given modality
         :param modality: modality to get the best results for
@@ -488,6 +504,21 @@ class UnimodalResults:
             cache_items = list(task_cache.items()) if task_cache else []
             cache = [cache_items[i][1] for i in sorted_indices if i < 
len(cache_items)]
 
+        if prune_cache:
+            # Note: in case the unimodal results are loaded from a file, we 
need to initialize the cache for the modality and task
+            if modality.modality_id not in self.operator_performance.cache:
+                self.operator_performance.cache[modality.modality_id] = {}
+            if (
+                task.model.name
+                not in self.operator_performance.cache[modality.modality_id]
+            ):
+                self.operator_performance.cache[modality.modality_id][
+                    task.model.name
+                ] = {}
+            self.operator_performance.cache[modality.modality_id][
+                task.model.name
+            ] = cache
+
         return results, cache
 
 
diff --git a/src/main/python/systemds/scuro/modality/modality.py 
b/src/main/python/systemds/scuro/modality/modality.py
index f6e0320469..d0cb148b20 100644
--- a/src/main/python/systemds/scuro/modality/modality.py
+++ b/src/main/python/systemds/scuro/modality/modality.py
@@ -18,11 +18,9 @@
 # under the License.
 #
 # -------------------------------------------------------------
-from copy import deepcopy
 from typing import List
 
 import numpy as np
-from numpy.f2py.auxfuncs import throw_error
 
 from systemds.scuro.modality.type import ModalityType
 from systemds.scuro.representations import utils
@@ -31,7 +29,12 @@ from systemds.scuro.representations import utils
 class Modality:
 
     def __init__(
-        self, modalityType: ModalityType, modality_id=-1, metadata={}, 
data_type=None
+        self,
+        modalityType: ModalityType,
+        modality_id=-1,
+        metadata={},
+        data_type=None,
+        transform_time=0,
     ):
         """
         Parent class of the different Modalities (unimodal & multimodal)
@@ -45,7 +48,7 @@ class Modality:
         self.cost = None
         self.shape = None
         self.modality_id = modality_id
-        self.transform_time = None
+        self.transform_time = transform_time if transform_time else 0
 
     @property
     def data(self):
diff --git a/src/main/python/systemds/scuro/modality/transformed.py 
b/src/main/python/systemds/scuro/modality/transformed.py
index 3b01465302..c19c90adaa 100644
--- a/src/main/python/systemds/scuro/modality/transformed.py
+++ b/src/main/python/systemds/scuro/modality/transformed.py
@@ -43,7 +43,11 @@ class TransformedModality(Modality):
 
         metadata = modality.metadata.copy() if modality.metadata is not None 
else None
         super().__init__(
-            new_modality_type, modality.modality_id, metadata, 
modality.data_type
+            new_modality_type,
+            modality.modality_id,
+            metadata,
+            modality.data_type,
+            modality.transform_time,
         )
         self.transformation = None
         self.self_contained = (
@@ -106,7 +110,7 @@ class TransformedModality(Modality):
         )
         start = time.time()
         transformed_modality.data = w.execute(self)
-        transformed_modality.transform_time = time.time() - start
+        transformed_modality.transform_time += time.time() - start
         return transformed_modality
 
     def context(self, context_operator):
@@ -115,14 +119,14 @@ class TransformedModality(Modality):
         )
         start = time.time()
         transformed_modality.data = context_operator.execute(self)
-        transformed_modality.transform_time = time.time() - start
+        transformed_modality.transform_time += time.time() - start
         return transformed_modality
 
     def apply_representation(self, representation):
         start = time.time()
         new_modality = representation.transform(self)
         new_modality.update_metadata()
-        new_modality.transform_time = time.time() - start
+        new_modality.transform_time += time.time() - start
         new_modality.self_contained = representation.self_contained
         return new_modality
 
diff --git a/src/main/python/systemds/scuro/modality/unimodal_modality.py 
b/src/main/python/systemds/scuro/modality/unimodal_modality.py
index e4ed85cce3..22a40db16c 100644
--- a/src/main/python/systemds/scuro/modality/unimodal_modality.py
+++ b/src/main/python/systemds/scuro/modality/unimodal_modality.py
@@ -95,7 +95,7 @@ class UnimodalModality(Modality):
         transformed_modality = TransformedModality(self, context_operator)
 
         transformed_modality.data = context_operator.execute(self)
-        transformed_modality.transform_time = time.time() - start
+        transformed_modality.transform_time += time.time() - start
         return transformed_modality
 
     def aggregate(self, aggregation_function):
@@ -191,6 +191,6 @@ class UnimodalModality(Modality):
             )
             new_modality.data = padded_embeddings
         new_modality.update_metadata()
-        new_modality.transform_time = time.time() - start
+        new_modality.transform_time += time.time() - start
         new_modality.self_contained = representation.self_contained
         return new_modality
diff --git 
a/src/main/python/systemds/scuro/representations/aggregated_representation.py 
b/src/main/python/systemds/scuro/representations/aggregated_representation.py
index 1e98d2f92a..bcc36f4621 100644
--- 
a/src/main/python/systemds/scuro/representations/aggregated_representation.py
+++ 
b/src/main/python/systemds/scuro/representations/aggregated_representation.py
@@ -21,6 +21,7 @@
 from systemds.scuro.modality.transformed import TransformedModality
 from systemds.scuro.representations.representation import Representation
 from systemds.scuro.representations.aggregate import Aggregation
+import time
 
 
 class AggregatedRepresentation(Representation):
@@ -33,8 +34,11 @@ class AggregatedRepresentation(Representation):
         self.self_contained = True
 
     def transform(self, modality):
+        start = time.perf_counter()
         aggregated_modality = TransformedModality(
             modality, self, self_contained=modality.self_contained
         )
+        end = time.perf_counter()
+        aggregated_modality.transform_time += end - start
         aggregated_modality.data = self.aggregation.execute(modality)
         return aggregated_modality
diff --git a/src/main/python/systemds/scuro/representations/fusion.py 
b/src/main/python/systemds/scuro/representations/fusion.py
index 7ac0200819..3f4257e64a 100644
--- a/src/main/python/systemds/scuro/representations/fusion.py
+++ b/src/main/python/systemds/scuro/representations/fusion.py
@@ -22,6 +22,8 @@ import copy
 from typing import List
 
 import numpy as np
+
+from systemds.scuro.modality.type import ModalityType
 from systemds.scuro.representations.aggregated_representation import (
     AggregatedRepresentation,
 )
@@ -44,6 +46,7 @@ class Fusion(Representation):
         self.needs_alignment = False
         self.needs_training = False
         self.needs_instance_alignment = False
+        self.output_modality_type = ModalityType.EMBEDDING
 
     def transform(self, modalities: List[Modality]):
         """
diff --git a/src/main/python/tests/scuro/data_generator.py 
b/src/main/python/tests/scuro/data_generator.py
index 9da0aa82c0..ae78c50b8a 100644
--- a/src/main/python/tests/scuro/data_generator.py
+++ b/src/main/python/tests/scuro/data_generator.py
@@ -66,6 +66,7 @@ class ModalityRandomDataGenerator:
         self.modality_type = None
         self.metadata = {}
         self.data_type = np.float32
+        self.transform_time = None
 
     def create1DModality(
         self,

Reply via email to