This is an automated email from the ASF dual-hosted git repository.
cdionysio pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 61a82f267f [SYSTEMDS-3940] Add chunked execution to unimodal optimizer
61a82f267f is described below
commit 61a82f267ff9cdb2f364a50ef895f74fecf3034f
Author: Christina Dionysio <[email protected]>
AuthorDate: Wed Jan 21 14:49:59 2026 +0100
[SYSTEMDS-3940] Add chunked execution to unimodal optimizer
This patch enables the unimodal optimizer to apply a list of
representations to a raw data chunk that is loaded into memory instead of
loading the same data multiple times.
---
.../scuro/drsearch/hyperparameter_tuner.py | 2 +-
.../scuro/drsearch/multimodal_optimizer.py | 14 ++-
.../systemds/scuro/drsearch/representation_dag.py | 22 +++--
.../systemds/scuro/drsearch/unimodal_optimizer.py | 65 ++++++++++----
.../python/systemds/scuro/modality/modality.py | 98 ++++++++++++++++----
.../python/systemds/scuro/modality/transformed.py | 44 ++++++++-
.../systemds/scuro/modality/unimodal_modality.py | 100 ++++++++++++++-------
.../systemds/scuro/representations/fusion.py | 2 +-
.../scuro/representations/mlp_averaging.py | 11 ---
.../systemds/scuro/representations/unimodal.py | 1 +
src/main/python/systemds/utils/converters.py | 12 +--
src/main/python/tests/algorithms/test_cov.py | 1 -
src/main/python/tests/algorithms/test_solve.py | 1 -
src/main/python/tests/matrix/test_unique.py | 1 -
14 files changed, 267 insertions(+), 107 deletions(-)
diff --git a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
index 1605b7b87d..ed0eb5abde 100644
--- a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
+++ b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py
@@ -104,7 +104,7 @@ class HyperparameterTuner:
for modality in self.modalities:
k_best_results, cached_data = (
self.optimization_results.get_k_best_results(
- modality, self.k, task, self.scoring_metric
+ modality, task, self.scoring_metric
)
)
representations[task.model.name][modality.modality_id] =
k_best_results
diff --git a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
index 93a78e2cc2..7c17353663 100644
--- a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
+++ b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py
@@ -243,7 +243,7 @@ class MultimodalOptimizer:
for modality in self.modalities:
k_best_results, cached_data = (
unimodal_optimization_results.get_k_best_results(
- modality, self.k, task, self.metric_name
+ modality, task, self.metric_name
)
)
@@ -359,6 +359,7 @@ class MultimodalOptimizer:
)
),
task,
+ enable_cache=False,
)
torch.cuda.empty_cache()
@@ -366,19 +367,16 @@ class MultimodalOptimizer:
if fused_representation is None:
return None
- final_representation = fused_representation[
- list(fused_representation.keys())[-1]
- ]
- if task.expected_dim == 1 and
get_shape(final_representation.metadata) > 1:
+ if task.expected_dim == 1 and
get_shape(fused_representation.metadata) > 1:
agg_operator = AggregatedRepresentation(Aggregation())
- final_representation =
agg_operator.transform(final_representation)
+ fused_representation =
agg_operator.transform(fused_representation)
eval_start = time.time()
- scores = task.run(final_representation.data)
+ scores = task.run(fused_representation.data)
eval_time = time.time() - eval_start
total_time = time.time() - start_time
-
+ del fused_representation
return OptimizationResult(
dag=dag,
train_score=scores[0].average_scores,
diff --git a/src/main/python/systemds/scuro/drsearch/representation_dag.py
b/src/main/python/systemds/scuro/drsearch/representation_dag.py
index f9e8b8a2c0..01020546a0 100644
--- a/src/main/python/systemds/scuro/drsearch/representation_dag.py
+++ b/src/main/python/systemds/scuro/drsearch/representation_dag.py
@@ -20,7 +20,7 @@
# -------------------------------------------------------------
import copy
from dataclasses import dataclass, field
-from typing import List, Dict, Any
+from typing import List, Dict, Union, Any, Hashable, Optional
from systemds.scuro.modality.modality import Modality
from systemds.scuro.modality.transformed import TransformedModality
from systemds.scuro.representations.representation import (
@@ -34,9 +34,7 @@ from systemds.scuro.representations.dimensionality_reduction
import (
DimensionalityReduction,
)
from systemds.scuro.utils.identifier import get_op_id, get_node_id
-
from collections import OrderedDict
-from typing import Any, Hashable, Optional
class LRUCache:
@@ -161,7 +159,9 @@ class RepresentationDag:
modalities: List[Modality],
task=None,
external_cache: Optional[LRUCache] = None,
- ) -> Dict[str, TransformedModality]:
+ enable_cache=True,
+ rep_cache: Dict[Any, TransformedModality] = None,
+ ) -> Union[Dict[str, TransformedModality], TransformedModality]:
cache: Dict[str, TransformedModality] = {}
node_signatures: Dict[str, Hashable] = {}
@@ -175,7 +175,8 @@ class RepresentationDag:
modality = get_modality_by_id_and_instance_id(
modalities, node.modality_id, node.representation_index
)
- cache[node_id] = modality
+ if enable_cache:
+ cache[node_id] = modality
node_signatures[node_id] = self._compute_leaf_signature(node)
return modality
@@ -203,7 +204,9 @@ class RepresentationDag:
elif isinstance(node_operation, AggregatedRepresentation):
result = node_operation.transform(input_mods[0])
elif isinstance(node_operation, UnimodalRepresentation):
- if (
+ if rep_cache is not None:
+ result = rep_cache[node_operation.name]
+ elif (
isinstance(input_mods[0], TransformedModality)
and input_mods[0].transformation[0].__class__
== node.operation
@@ -228,13 +231,14 @@ class RepresentationDag:
if external_cache and is_unimodal:
external_cache.put(node_signature, result)
- cache[node_id] = result
+ if enable_cache:
+ cache[node_id] = result
node_signatures[node_id] = node_signature
return result
- execute_node(self.root_node_id, task)
+ result = execute_node(self.root_node_id, task)
- return cache
+ return cache if enable_cache else result
def get_modality_by_id_and_instance_id(
diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
index ae467fedd9..c555c2b677 100644
--- a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
+++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
@@ -50,14 +50,22 @@ from systemds.scuro.drsearch.representation_dag import
LRUCache
class UnimodalOptimizer:
def __init__(
- self, modalities, tasks, debug=True, save_all_results=False,
result_path=None
+ self,
+ modalities,
+ tasks,
+ debug=True,
+ save_all_results=False,
+ result_path=None,
+ k=2,
+ metric_name="accuracy",
):
self.modalities = modalities
self.tasks = tasks
self.modality_ids = [modality.modality_id for modality in modalities]
self.save_all_results = save_all_results
self.result_path = result_path
-
+ self.k = k
+ self.metric_name = metric_name
self.builders = {
modality.modality_id: RepresentationDAGBuilder() for modality in
modalities
}
@@ -65,7 +73,9 @@ class UnimodalOptimizer:
self.debug = debug
self.operator_registry = Registry()
- self.operator_performance = UnimodalResults(modalities, tasks, debug,
True)
+ self.operator_performance = UnimodalResults(
+ modalities, tasks, debug, True, k, metric_name
+ )
self._tasks_require_same_dims = True
self.expected_dimensions = tasks[0].expected_dim
@@ -185,12 +195,20 @@ class UnimodalOptimizer:
modality.modality_type
)
dags = []
+ operators = []
for operator in modality_specific_operators:
dags.extend(self._build_modality_dag(modality, operator()))
+ operators.append(operator())
external_cache = LRUCache(max_size=32)
+ rep_cache = None
+ if hasattr(modality, "data_loader") and
modality.data_loader.chunk_size:
+ rep_cache = modality.apply_representations(operators)
+
for dag in dags:
- representations = dag.execute([modality],
external_cache=external_cache)
+ representations = dag.execute(
+ [modality], external_cache=external_cache, rep_cache=rep_cache
+ )
node_id = list(representations.keys())[-1]
node = dag.get_node_by_id(node_id)
if node.operation is None:
@@ -466,17 +484,26 @@ class UnimodalOptimizer:
class UnimodalResults:
- def __init__(self, modalities, tasks, debug=False, store_cache=True):
+ def __init__(
+ self,
+ modalities,
+ tasks,
+ debug=False,
+ store_cache=True,
+ k=-1,
+ metric_name="accuracy",
+ ):
self.modality_ids = [modality.modality_id for modality in modalities]
self.task_names = [task.model.name for task in tasks]
self.results = {}
self.debug = debug
self.cache = {}
self.store_cache = store_cache
-
+ self.k = k
+ self.metric_name = metric_name
for modality in self.modality_ids:
self.results[modality] = {task_name: [] for task_name in
self.task_names}
- self.cache[modality] = {task_name: {} for task_name in
self.task_names}
+ self.cache[modality] = {task_name: [] for task_name in
self.task_names}
def add_result(self, scores, modality, task_name, task_time, combination,
dag):
entry = ResultEntry(
@@ -491,12 +518,20 @@ class UnimodalResults:
self.results[modality.modality_id][task_name].append(entry)
if self.store_cache:
- cache_key = (
- id(dag),
- scores[1],
- modality.transform_time,
+ self.cache[modality.modality_id][task_name].append(modality)
+
+ results = self.results[modality.modality_id][task_name]
+ if self.k != -1 and len(results) > self.k:
+ ranked, sorted_indices = rank_by_tradeoff(
+ results, performance_metric_name=self.metric_name
)
- self.cache[modality.modality_id][task_name][cache_key] = modality
+ keep = set(sorted_indices[: self.k])
+
+ self.cache[modality.modality_id][task_name] = [
+ m
+ for i, m in
enumerate(self.cache[modality.modality_id][task_name])
+ if i in keep
+ ]
if self.debug:
print(f"{modality.modality_id}_{task_name}: {entry}")
@@ -508,7 +543,7 @@ class UnimodalResults:
print(f"{modality}_{task_name}: {entry}")
def get_k_best_results(
- self, modality, k, task, performance_metric_name, prune_cache=False
+ self, modality, task, performance_metric_name, prune_cache=False
):
"""
Get the k best results for the given modality
@@ -524,8 +559,8 @@ class UnimodalResults:
task_results, performance_metric_name=performance_metric_name
)
- results = results[:k]
- sorted_indices = sorted_indices[:k]
+ results = results[: self.k]
+ sorted_indices = sorted_indices[: self.k]
task_cache = self.cache.get(modality.modality_id,
{}).get(task.model.name, None)
if not task_cache:
diff --git a/src/main/python/systemds/scuro/modality/modality.py
b/src/main/python/systemds/scuro/modality/modality.py
index d0cb148b20..8162662b7f 100644
--- a/src/main/python/systemds/scuro/modality/modality.py
+++ b/src/main/python/systemds/scuro/modality/modality.py
@@ -141,24 +141,86 @@ class Modality:
else:
raise "Needs padding to max_len"
except:
- maxlen = (
- max([len(seq) for seq in self.data]) if max_len is None else
max_len
- )
-
- result = np.full((len(self.data), maxlen), value,
dtype=self.data_type)
-
- for i, seq in enumerate(self.data):
- data = seq[:maxlen]
- result[i, : len(data)] = data
-
- if self.has_metadata():
- attention_mask = np.zeros(result.shape[1], dtype=np.int8)
- attention_mask[: len(seq[:maxlen])] = 1
- md_key = list(self.metadata.keys())[i]
- if "attention_mask" in self.metadata[md_key]:
- self.metadata[md_key]["attention_mask"] =
attention_mask
- else:
- self.metadata[md_key].update({"attention_mask":
attention_mask})
+ first = self.data[0]
+ if isinstance(first, np.ndarray) and first.ndim == 3:
+ maxlen = (
+ max([seq.shape[0] for seq in self.data])
+ if max_len is None
+ else max_len
+ )
+ tail_shape = first.shape[1:]
+ result = np.full(
+ (len(self.data), maxlen, *tail_shape),
+ value,
+ dtype=self.data_type or first.dtype,
+ )
+ for i, seq in enumerate(self.data):
+ data = seq[:maxlen]
+ result[i, : len(data), ...] = data
+ if self.has_metadata():
+ attention_mask = np.zeros(maxlen, dtype=np.int8)
+ attention_mask[: len(data)] = 1
+ md_key = list(self.metadata.keys())[i]
+ if "attention_mask" in self.metadata[md_key]:
+ self.metadata[md_key]["attention_mask"] =
attention_mask
+ else:
+ self.metadata[md_key].update(
+ {"attention_mask": attention_mask}
+ )
+ elif (
+ isinstance(first, list)
+ and len(first) > 0
+ and isinstance(first[0], np.ndarray)
+ and first[0].ndim == 2
+ ):
+ maxlen = (
+ max([len(seq) for seq in self.data]) if max_len is None
else max_len
+ )
+ row_dim, col_dim = first[0].shape
+ result = np.full(
+ (len(self.data), maxlen, row_dim, col_dim),
+ value,
+ dtype=self.data_type or first[0].dtype,
+ )
+ for i, seq in enumerate(self.data):
+ data = seq[:maxlen]
+ # stack list of 2D arrays into 3D then assign
+ if len(data) > 0:
+ result[i, : len(data), :, :] = np.stack(data, axis=0)
+ if self.has_metadata():
+ attention_mask = np.zeros(maxlen, dtype=np.int8)
+ attention_mask[: len(data)] = 1
+ md_key = list(self.metadata.keys())[i]
+ if "attention_mask" in self.metadata[md_key]:
+ self.metadata[md_key]["attention_mask"] =
attention_mask
+ else:
+ self.metadata[md_key].update(
+ {"attention_mask": attention_mask}
+ )
+ else:
+ maxlen = (
+ max([len(seq) for seq in self.data]) if max_len is None
else max_len
+ )
+ result = np.full((len(self.data), maxlen), value,
dtype=self.data_type)
+ for i, seq in enumerate(self.data):
+ data = seq[:maxlen]
+ try:
+ result[i, : len(data)] = data
+ except:
+ print(f"Error padding data for modality
{self.modality_id}")
+ print(f"Data shape: {data.shape}")
+ print(f"Result shape: {result.shape}")
+ raise Exception("Error padding data")
+ if self.has_metadata():
+ attention_mask = np.zeros(result.shape[1],
dtype=np.int8)
+ attention_mask[: len(data)] = 1
+ md_key = list(self.metadata.keys())[i]
+ if "attention_mask" in self.metadata[md_key]:
+ self.metadata[md_key]["attention_mask"] =
attention_mask
+ else:
+ self.metadata[md_key].update(
+ {"attention_mask": attention_mask}
+ )
# TODO: this might need to be a new modality (otherwise we loose the
original data)
self.data = result
diff --git a/src/main/python/systemds/scuro/modality/transformed.py
b/src/main/python/systemds/scuro/modality/transformed.py
index 8180950a10..078b65f0bc 100644
--- a/src/main/python/systemds/scuro/modality/transformed.py
+++ b/src/main/python/systemds/scuro/modality/transformed.py
@@ -19,7 +19,7 @@
#
# -------------------------------------------------------------
from typing import Union, List
-
+import numpy as np
from systemds.scuro.modality.type import ModalityType
from systemds.scuro.modality.joined import JoinedModality
from systemds.scuro.modality.modality import Modality
@@ -127,7 +127,21 @@ class TransformedModality(Modality):
self, dimensionality_reduction_operator,
self_contained=self.self_contained
)
start = time.time()
- transformed_modality.data =
dimensionality_reduction_operator.execute(self.data)
+ if len(self.data[0].shape) >= 3:
+ return self
+ else:
+ try:
+ data = np.array(self.data)
+ if len(data.shape) >= 3:
+ data = data.reshape(data.shape[0], -1)
+ transformed_modality.data =
dimensionality_reduction_operator.execute(
+ data
+ )
+ except:
+ transformed_modality.data =
self._padded_dimensionality_reduction(
+ dimensionality_reduction_operator
+ )
+
transformed_modality.transform_time += time.time() - start
return transformed_modality
@@ -178,3 +192,29 @@ class TransformedModality(Modality):
modalities.append(other)
return modalities
+
+ def _padded_dimensionality_reduction(self,
dimensionality_reduction_operator):
+ all_outputs = []
+ batch_size = 1024 if len(self.data[0].shape) >= 3 else len(self.data)
+ ndim = self.data[0].ndim
+ start = 0
+ while start < len(self.data):
+ end = min(start + batch_size, len(self.data))
+ max_shape = tuple(
+ max(a.shape[i] for a in self.data[start:end]) for i in
range(ndim)
+ )
+
+ padded = []
+ for a in self.data[start:end]:
+ pad_width = tuple((0, max_shape[i] - a.shape[i]) for i in
range(ndim))
+ padded.append(np.pad(a, pad_width=pad_width, mode="constant"))
+ padded = np.array(padded)
+ end = min(start + batch_size, len(self.data))
+
+ if len(padded.shape) >= 3:
+ padded = padded.reshape(padded.shape[0], -1)
+
+ out = dimensionality_reduction_operator.execute(padded)
+ all_outputs.append(out)
+ start = end
+ return np.concatenate(all_outputs, axis=0)
diff --git a/src/main/python/systemds/scuro/modality/unimodal_modality.py
b/src/main/python/systemds/scuro/modality/unimodal_modality.py
index 22a40db16c..4efaa7d733 100644
--- a/src/main/python/systemds/scuro/modality/unimodal_modality.py
+++ b/src/main/python/systemds/scuro/modality/unimodal_modality.py
@@ -18,8 +18,7 @@
# under the License.
#
# -------------------------------------------------------------
-from functools import reduce
-from operator import or_
+import gc
import time
import numpy as np
from systemds.scuro import ModalityType
@@ -103,29 +102,45 @@ class UnimodalModality(Modality):
raise Exception("Data is None")
def apply_representations(self, representations):
- # TODO
- pass
-
- def apply_representation(self, representation):
- new_modality = TransformedModality(
- self,
- representation,
- )
-
- pad_dim_one = False
+ """
+ Applies a list of representations to the modality. Specifically, it
applies the representations to the modality in a chunked manner.
+ :param representations: List of representations to apply
+ :return: List of transformed modalities
+ """
+ transformed_modalities_per_representation = {}
+ padding_per_representation = {}
+ original_lengths_per_representation = {}
+
+ # Initialize dictionaries for each representation
+ for representation in representations:
+ transformed_modality = TransformedModality(self,
representation.name)
+ transformed_modality.data = []
+ transformed_modalities_per_representation[representation.name] = (
+ transformed_modality
+ )
+ padding_per_representation[representation.name] = False
+ original_lengths_per_representation[representation.name] = []
- new_modality.data = []
- start = time.time()
- original_lengths = []
+ start = (
+ time.time()
+ ) # TODO: should be repalced in unimodal_representation.transform
if self.data_loader.chunk_size:
self.data_loader.reset()
while self.data_loader.next_chunk < self.data_loader.num_chunks:
self.extract_raw_data()
- transformed_chunk = representation.transform(self)
- new_modality.data.extend(transformed_chunk.data)
- for d in transformed_chunk.data:
- original_lengths.append(d.shape[0])
- new_modality.metadata.update(transformed_chunk.metadata)
+ for representation in representations:
+ transformed_chunk = representation.transform(self)
+ transformed_modalities_per_representation[
+ representation.name
+ ].data.extend(transformed_chunk.data)
+ transformed_modalities_per_representation[
+ representation.name
+ ].metadata.update(transformed_chunk.metadata)
+ for d in transformed_chunk.data:
+
original_lengths_per_representation[representation.name].append(
+ d.shape[0]
+ )
+
else:
if not self.has_data():
self.extract_raw_data()
@@ -141,15 +156,41 @@ class UnimodalModality(Modality):
):
for d in new_modality.data:
if d.shape[0] == 1 and d.ndim == 2:
- pad_dim_one = True
- original_lengths.append(d.shape[1])
+ padding_per_representation[representation.name] = True
+
original_lengths_per_representation[representation.name].append(
+ d.shape[1]
+ )
else:
- original_lengths.append(d.shape[0])
+
original_lengths_per_representation[representation.name].append(
+ d.shape[0]
+ )
+ transformed_modalities_per_representation[representation.name] = (
+ new_modality
+ )
+
+ for representation in representations:
+ self._apply_padding(
+ transformed_modalities_per_representation[representation.name],
+ original_lengths_per_representation[representation.name],
+ padding_per_representation[representation.name],
+ )
+ transformed_modalities_per_representation[
+ representation.name
+ ].transform_time += (time.time() - start)
+ transformed_modalities_per_representation[
+ representation.name
+ ].self_contained = representation.self_contained
+ gc.collect()
+ return transformed_modalities_per_representation
+
+ def apply_representation(self, representation):
+ return
self.apply_representations([representation])[representation.name]
+ def _apply_padding(self, modality, original_lengths, pad_dim_one):
if len(original_lengths) > 0 and min(original_lengths) <
max(original_lengths):
target_length = max(original_lengths)
padded_embeddings = []
- for embeddings in new_modality.data:
+ for embeddings in modality.data:
current_length = (
embeddings.shape[0] if not pad_dim_one else
embeddings.shape[1]
)
@@ -182,15 +223,12 @@ class UnimodalModality(Modality):
else:
padded_embeddings.append(embeddings)
- attention_masks = np.zeros((len(new_modality.data), target_length))
+ attention_masks = np.zeros((len(modality.data), target_length))
for i, length in enumerate(original_lengths):
attention_masks[i, :length] = 1
ModalityType(self.modality_type).add_field_for_instances(
- new_modality.metadata, "attention_masks", attention_masks
+ modality.metadata, "attention_masks", attention_masks
)
- new_modality.data = padded_embeddings
- new_modality.update_metadata()
- new_modality.transform_time += time.time() - start
- new_modality.self_contained = representation.self_contained
- return new_modality
+ modality.data = padded_embeddings
+ modality.update_metadata()
diff --git a/src/main/python/systemds/scuro/representations/fusion.py
b/src/main/python/systemds/scuro/representations/fusion.py
index 3f4257e64a..1426797f00 100644
--- a/src/main/python/systemds/scuro/representations/fusion.py
+++ b/src/main/python/systemds/scuro/representations/fusion.py
@@ -79,7 +79,7 @@ class Fusion(Representation):
d for i, d in enumerate(modality.data) if i in
fusion_train_indices
]
train_modality = TransformedModality(modality, self)
- train_modality.data = copy.deepcopy(train_data)
+ train_modality.data = list(train_data)
train_modalities.append(train_modality)
transformed_train = self.execute(
diff --git a/src/main/python/systemds/scuro/representations/mlp_averaging.py
b/src/main/python/systemds/scuro/representations/mlp_averaging.py
index a782802444..6935ab3721 100644
--- a/src/main/python/systemds/scuro/representations/mlp_averaging.py
+++ b/src/main/python/systemds/scuro/representations/mlp_averaging.py
@@ -53,17 +53,6 @@ class MLPAveraging(DimensionalityReduction):
self.batch_size = batch_size
def execute(self, data):
- # Make sure the data is a numpy array
- try:
- data = np.array(data)
- except Exception as e:
- raise ValueError(f"Data must be a numpy array: {e}")
-
- # Note: if the data is a 3D array this indicates that we are dealing
with a context operation
- # and we need to conacatenate the dimensions along the first axis
- if len(data.shape) == 3:
- data = data.reshape(data.shape[0], -1)
-
set_random_seeds(42)
input_dim = data.shape[1]
diff --git a/src/main/python/systemds/scuro/representations/unimodal.py
b/src/main/python/systemds/scuro/representations/unimodal.py
index 2bb34733e2..a1a1632c26 100644
--- a/src/main/python/systemds/scuro/representations/unimodal.py
+++ b/src/main/python/systemds/scuro/representations/unimodal.py
@@ -43,6 +43,7 @@ class UnimodalRepresentation(Representation):
@abc.abstractmethod
def transform(self, data):
+ # TODO: check if there is a way to time the transformation in here
(needed for chunked execution)
raise f"Not implemented for {self.name}"
diff --git a/src/main/python/systemds/utils/converters.py
b/src/main/python/systemds/utils/converters.py
index ab7b7ffd8d..5f4619a8bb 100644
--- a/src/main/python/systemds/utils/converters.py
+++ b/src/main/python/systemds/utils/converters.py
@@ -600,8 +600,7 @@ def _transfer_string_column_to_pipe(
py_total, py_encoding, py_packing, py_io, num_strings = py_timing
total_time = t1 - t0
- sds._log.debug(
- f"""
+ sds._log.debug(f"""
=== TO FrameBlock - Timing Breakdown (Strings) ===
Column: {col_name}
Total time: {total_time:.3f}s
@@ -612,8 +611,7 @@ def _transfer_string_column_to_pipe(
I/O writes: {py_io:.3f}s ({100*py_io/py_total:.1f}%)
Other: {py_total - py_encoding - py_packing - py_io:.3f}s
Strings processed: {num_strings:,}
- """
- )
+ """)
def _transfer_numeric_column_to_pipe(
@@ -798,8 +796,7 @@ def _receive_string_column_from_pipe(
_pipe_receive_strings(pipe, num_rows, batch_size_bytes, pipe_id,
sds._log)
)
- sds._log.debug(
- f"""
+ sds._log.debug(f"""
=== FROM FrameBlock - Timing Breakdown (Strings) ===
Column: {col_name}
Total time: {py_total:.3f}s
@@ -809,8 +806,7 @@ def _receive_string_column_from_pipe(
I/O reads: {py_io:.3f}s ({100*py_io/py_total:.1f}%)
Other: {py_total - py_decode - py_io:.3f}s
Strings processed: {num_strings:,}
- """
- )
+ """)
if not header_received:
_pipe_receive_header(pipe, pipe_id, sds._log)
diff --git a/src/main/python/tests/algorithms/test_cov.py
b/src/main/python/tests/algorithms/test_cov.py
index a20c0741f1..c498337ced 100644
--- a/src/main/python/tests/algorithms/test_cov.py
+++ b/src/main/python/tests/algorithms/test_cov.py
@@ -26,7 +26,6 @@ import numpy as np
from systemds.context import SystemDSContext
from systemds.operator.algorithm import cov
-
A = np.array([2, 4, 4, 2])
B = np.array([2, 4, 2, 4])
W = np.array([7, 1, 1, 1])
diff --git a/src/main/python/tests/algorithms/test_solve.py
b/src/main/python/tests/algorithms/test_solve.py
index 4491491718..2187dfe2fa 100644
--- a/src/main/python/tests/algorithms/test_solve.py
+++ b/src/main/python/tests/algorithms/test_solve.py
@@ -26,7 +26,6 @@ import numpy as np
from systemds.context import SystemDSContext
from systemds.operator.algorithm import solve
-
np.random.seed(7)
A = np.random.random((10, 10))
B = np.random.random(10)
diff --git a/src/main/python/tests/matrix/test_unique.py
b/src/main/python/tests/matrix/test_unique.py
index 66d1f19a9d..810d9977fa 100644
--- a/src/main/python/tests/matrix/test_unique.py
+++ b/src/main/python/tests/matrix/test_unique.py
@@ -23,7 +23,6 @@ import unittest
import numpy as np
from systemds.context import SystemDSContext
-
np.random.seed(7)