This is an automated email from the ASF dual-hosted git repository.
cdionysio pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 5dfa26f235 [SYSTEMDS-3835] Add additional context operators
5dfa26f235 is described below
commit 5dfa26f23583f399187e5b3d785bf653a25ee8d3
Author: Christina Dionysio <[email protected]>
AuthorDate: Wed Sep 3 10:00:13 2025 +0200
[SYSTEMDS-3835] Add additional context operators
This patch adds two additional context operators to Scuro. The first one is
a StaticWindow operator that, given a number of desired windows, defines the
suitable window size and aggregates a sequence into num_window features. The
second context operator is a DynamicWindow where a sequence is also aggregated
into num_window features with the difference that the window size for more
recent data points is smaller than the window size for more historic data
points in a timeseries.
---
src/main/python/systemds/scuro/__init__.py | 8 +-
.../systemds/scuro/drsearch/unimodal_optimizer.py | 4 +-
.../systemds/scuro/representations/fusion.py | 2 +
.../scuro/representations/window_aggregation.py | 107 +++++++++++++++++----
.../python/tests/scuro/test_operator_registry.py | 12 ++-
.../python/tests/scuro/test_unimodal_optimizer.py | 2 +-
.../python/tests/scuro/test_window_operations.py | 53 ++++++++--
7 files changed, 159 insertions(+), 29 deletions(-)
diff --git a/src/main/python/systemds/scuro/__init__.py
b/src/main/python/systemds/scuro/__init__.py
index b2a5e9df37..8e83c865a2 100644
--- a/src/main/python/systemds/scuro/__init__.py
+++ b/src/main/python/systemds/scuro/__init__.py
@@ -55,7 +55,11 @@ from systemds.scuro.representations.swin_video_transformer
import SwinVideoTrans
from systemds.scuro.representations.tfidf import TfIdf
from systemds.scuro.representations.unimodal import UnimodalRepresentation
from systemds.scuro.representations.wav2vec import Wav2Vec
-from systemds.scuro.representations.window_aggregation import WindowAggregation
+from systemds.scuro.representations.window_aggregation import (
+ WindowAggregation,
+ DynamicWindow,
+ StaticWindow,
+)
from systemds.scuro.representations.word2vec import W2V
from systemds.scuro.representations.x3d import X3D
from systemds.scuro.models.model import Model
@@ -145,4 +149,6 @@ __all__ = [
"RMSE",
"Spectral",
"AttentionFusion",
+ "DynamicWindow",
+ "StaticWindow",
]
diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
index b84d86d94d..86c7ce1e63 100644
--- a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
+++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
@@ -122,8 +122,8 @@ class UnimodalOptimizer:
for context_operator_after in context_operators:
con_op_after = context_operator_after()
- mod = mod.context(con_op_after)
- self._evaluate_local(mod, [mod_op, con_op_after],
local_results)
+ mod_con = mod.context(con_op_after)
+ self._evaluate_local(mod_con, [mod_op, con_op_after],
local_results)
return local_results
diff --git a/src/main/python/systemds/scuro/representations/fusion.py
b/src/main/python/systemds/scuro/representations/fusion.py
index 61988abba2..ea614ac095 100644
--- a/src/main/python/systemds/scuro/representations/fusion.py
+++ b/src/main/python/systemds/scuro/representations/fusion.py
@@ -105,6 +105,8 @@ class Fusion(Representation):
curr_shape = modalities[idx].data[0].shape
if len(modalities[idx - 1].data) != len(modalities[idx].data):
raise f"Modality sizes don't match!"
+ elif len(curr_shape) == 1:
+ continue
elif curr_shape[1] > max_size:
max_size = curr_shape[1]
diff --git
a/src/main/python/systemds/scuro/representations/window_aggregation.py
b/src/main/python/systemds/scuro/representations/window_aggregation.py
index 167f4adafe..b3ad9e1b93 100644
--- a/src/main/python/systemds/scuro/representations/window_aggregation.py
+++ b/src/main/python/systemds/scuro/representations/window_aggregation.py
@@ -18,6 +18,8 @@
# under the License.
#
# -------------------------------------------------------------
+import copy
+
import numpy as np
import math
@@ -28,17 +30,13 @@ from systemds.scuro.representations.aggregate import
Aggregation
from systemds.scuro.representations.context import Context
-@register_context_operator()
-class WindowAggregation(Context):
- def __init__(self, window_size=10, aggregation_function="mean", pad=True):
+class Window(Context):
+ def __init__(self, name, aggregation_function):
parameters = {
- "window_size": [window_size],
"aggregation_function":
list(Aggregation().get_aggregation_functions()),
- } # TODO: window_size should be dynamic and adapted to the shape of
the data
- super().__init__("WindowAggregation", parameters)
- self.window_size = window_size
+ }
+ super().__init__(name, parameters)
self.aggregation_function = aggregation_function
- self.pad = pad
@property
def aggregation_function(self):
@@ -48,6 +46,15 @@ class WindowAggregation(Context):
def aggregation_function(self, value):
self._aggregation_function = Aggregation(value)
+
+@register_context_operator()
+class WindowAggregation(Window):
+ def __init__(self, window_size=10, aggregation_function="mean", pad=False):
+ super().__init__("WindowAggregation", aggregation_function)
+ self.parameters["window_size"] = [window_size]
+ self.window_size = window_size
+ self.pad = pad
+
def execute(self, modality):
windowed_data = []
original_lengths = []
@@ -107,24 +114,90 @@ class WindowAggregation(Context):
def window_aggregate_single_level(self, instance, new_length):
if isinstance(instance, str):
return instance
- instance = np.array(instance)
- num_cols = instance.shape[1] if instance.ndim > 1 else 1
- result = np.empty((new_length, num_cols))
+ instance = np.array(copy.deepcopy(instance))
+
+ result = []
for i in range(0, new_length):
- result[i] = self.aggregation_function.aggregate_instance(
- instance[i * self.window_size : i * self.window_size +
self.window_size]
+ result.append(
+ self.aggregation_function.aggregate_instance(
+ instance[
+ i * self.window_size : i * self.window_size +
self.window_size
+ ]
+ )
)
- if num_cols == 1:
- result = result.reshape(-1)
- return result
+ return np.array(result)
def window_aggregate_nested_level(self, instance, new_length):
result = [[] for _ in range(0, new_length)]
- data = np.stack(instance)
+ data = np.stack(copy.deepcopy(instance))
for i in range(0, new_length):
result[i] = self.aggregation_function.aggregate_instance(
data[i * self.window_size : i * self.window_size +
self.window_size]
)
return np.array(result)
+
+
+@register_context_operator()
+class StaticWindow(Window):
+ def __init__(self, num_windows=100, aggregation_function="mean"):
+ super().__init__("StaticWindow", aggregation_function)
+ self.parameters["num_windows"] = [num_windows]
+ self.num_windows = num_windows
+
+ def execute(self, modality):
+ windowed_data = []
+
+ for instance in modality.data:
+ window_size = len(instance) // self.num_windows
+ remainder = len(instance) % self.num_windows
+ output = []
+ start = 0
+ for i in range(0, self.num_windows):
+ extra = 1 if i < remainder else 0
+ end = start + window_size + extra
+ window = copy.deepcopy(instance[start:end])
+ val = (
+ self.aggregation_function.aggregate_instance(window)
+ if len(window) > 0
+ else np.zeros_like(output[i - 1])
+ )
+ output.append(val)
+ start = end
+
+ windowed_data.append(output)
+ return np.array(windowed_data)
+
+
+@register_context_operator()
+class DynamicWindow(Window):
+ def __init__(self, num_windows=100, aggregation_function="mean"):
+ super().__init__("DynamicWindow", aggregation_function)
+ self.parameters["num_windows"] = [num_windows]
+ self.num_windows = num_windows
+
+ def execute(self, modality):
+ windowed_data = []
+
+ for instance in modality.data:
+ N = len(instance)
+ weights = np.geomspace(4, 256, num=self.num_windows)
+ weights = weights / np.sum(weights)
+ window_sizes = (weights * N).astype(int)
+ window_sizes[-1] += N - np.sum(window_sizes)
+ indices = np.cumsum(window_sizes)
+ output = []
+ start = 0
+ for end in indices:
+ window = copy.deepcopy(instance[start:end])
+ val = (
+ self.aggregation_function.aggregate_instance(window)
+ if len(window) > 0
+ else np.zeros_like(instance[0])
+ )
+ output.append(val)
+ start = end
+ windowed_data.append(output)
+
+ return np.array(windowed_data)
diff --git a/src/main/python/tests/scuro/test_operator_registry.py
b/src/main/python/tests/scuro/test_operator_registry.py
index a6941fe618..0d83d83bda 100644
--- a/src/main/python/tests/scuro/test_operator_registry.py
+++ b/src/main/python/tests/scuro/test_operator_registry.py
@@ -30,7 +30,11 @@ from systemds.scuro.representations.covarep_audio_features
import (
from systemds.scuro.representations.mfcc import MFCC
from systemds.scuro.representations.swin_video_transformer import
SwinVideoTransformer
from systemds.scuro.representations.wav2vec import Wav2Vec
-from systemds.scuro.representations.window_aggregation import WindowAggregation
+from systemds.scuro.representations.window_aggregation import (
+ WindowAggregation,
+ StaticWindow,
+ DynamicWindow,
+)
from systemds.scuro.representations.bow import BoW
from systemds.scuro.representations.word2vec import W2V
from systemds.scuro.representations.tfidf import TfIdf
@@ -83,7 +87,11 @@ class TestOperatorRegistry(unittest.TestCase):
def test_context_operator_in_registry(self):
registry = Registry()
- assert registry.get_context_operators() == [WindowAggregation]
+ assert registry.get_context_operators() == [
+ WindowAggregation,
+ StaticWindow,
+ DynamicWindow,
+ ]
# def test_fusion_operator_in_registry(self):
# registry = Registry()
diff --git a/src/main/python/tests/scuro/test_unimodal_optimizer.py
b/src/main/python/tests/scuro/test_unimodal_optimizer.py
index b5d2b266f6..a4952d29f9 100644
--- a/src/main/python/tests/scuro/test_unimodal_optimizer.py
+++ b/src/main/python/tests/scuro/test_unimodal_optimizer.py
@@ -141,7 +141,7 @@ class
TestUnimodalRepresentationOptimizer(unittest.TestCase):
def test_unimodal_optimizer_for_audio_modality(self):
audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data(
- self.num_instances, 100
+ self.num_instances, 3000
)
audio = UnimodalModality(
TestDataLoader(
diff --git a/src/main/python/tests/scuro/test_window_operations.py
b/src/main/python/tests/scuro/test_window_operations.py
index ea1b0f46f2..9aab25a814 100644
--- a/src/main/python/tests/scuro/test_window_operations.py
+++ b/src/main/python/tests/scuro/test_window_operations.py
@@ -24,8 +24,13 @@ import math
import numpy as np
-from tests.scuro.data_generator import ModalityRandomDataGenerator
+from tests.scuro.data_generator import ModalityRandomDataGenerator,
TestDataLoader
from systemds.scuro.modality.type import ModalityType
+from systemds.scuro.modality.unimodal_modality import UnimodalModality
+from systemds.scuro.representations.window_aggregation import (
+ StaticWindow,
+ DynamicWindow,
+)
class TestWindowOperations(unittest.TestCase):
@@ -35,20 +40,56 @@ class TestWindowOperations(unittest.TestCase):
cls.data_generator = ModalityRandomDataGenerator()
cls.aggregations = ["mean", "sum", "max", "min"]
- def test_window_operations_on_audio_representations(self):
+ def test_static_window(self):
+ num_windows = 5
+ data, md =
self.data_generator.create_visual_modality(self.num_instances, 50)
+ modality = UnimodalModality(
+ TestDataLoader(
+ [i for i in range(0, self.num_instances)],
+ None,
+ ModalityType.VIDEO,
+ data,
+ np.float32,
+ md,
+ )
+ )
+ aggregated_window = modality.context(StaticWindow(num_windows))
+
+ for i in range(0, self.num_instances):
+ assert len(aggregated_window.data[i]) == num_windows
+
+ def test_dynamic_window(self):
+ num_windows = 5
+ data, md =
self.data_generator.create_visual_modality(self.num_instances, 50)
+ modality = UnimodalModality(
+ TestDataLoader(
+ [i for i in range(0, self.num_instances)],
+ None,
+ ModalityType.VIDEO,
+ data,
+ np.float32,
+ md,
+ )
+ )
+ aggregated_window = modality.context(DynamicWindow(num_windows))
+
+ for i in range(0, self.num_instances):
+ assert len(aggregated_window.data[i]) == num_windows
+
+ def test_window_aggregation_on_audio_representations(self):
window_size = 10
- self.run_window_operations_for_modality(ModalityType.AUDIO,
window_size)
+ self.run_window_aggregation_for_modality(ModalityType.AUDIO,
window_size)
def test_window_operations_on_video_representations(self):
window_size = 10
- self.run_window_operations_for_modality(ModalityType.VIDEO,
window_size)
+ self.run_window_aggregation_for_modality(ModalityType.VIDEO,
window_size)
def test_window_operations_on_text_representations(self):
window_size = 10
- self.run_window_operations_for_modality(ModalityType.TEXT, window_size)
+ self.run_window_aggregation_for_modality(ModalityType.TEXT,
window_size)
- def run_window_operations_for_modality(self, modality_type, window_size):
+ def run_window_aggregation_for_modality(self, modality_type, window_size):
r = self.data_generator.create1DModality(40, 100, modality_type)
for aggregation in self.aggregations:
windowed_modality = r.window_aggregation(window_size, aggregation)