This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 1c3ec67977 test(pyamber): add unit tests for Operator base class
(#4735)
1c3ec67977 is described below
commit 1c3ec67977406fc1340d0f8e90d720ff71e7ae69
Author: Yicong Huang <[email protected]>
AuthorDate: Sat May 2 22:38:51 2026 -0700
test(pyamber): add unit tests for Operator base class (#4735)
### What changes were proposed in this PR?
Adds pytest coverage for
`amber/src/main/python/core/models/operator.py`. The `Operator` abstract
base class had no dedicated spec.
### Any related issues, documentation, discussions?
Closes #4733.
Bug pinned in the spec with an explanatory comment (filed separately as
a Bug issue): `SourceOperator` declares `__internal_is_source = True` at
class level, but Python name-mangles that into
`_SourceOperator__internal_is_source` while `Operator.is_source` reads
`self._Operator__internal_is_source`. The two are different attributes,
so a fresh `SourceOperator` subclass instance reports `is_source=False`
until `ExecutorManager.initialize_executor` invokes the explicit setter
— making the class-level declaration effectively dead code.
### How was this PR tested?
```
cd amber/src/main/python
ruff check core/models/test_operator.py
ruff format --check core/models/test_operator.py
python -m pytest core/models/test_operator.py
```
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
amber/src/main/python/core/models/test_operator.py | 291 +++++++++++++++++++++
1 file changed, 291 insertions(+)
diff --git a/amber/src/main/python/core/models/test_operator.py
b/amber/src/main/python/core/models/test_operator.py
new file mode 100644
index 0000000000..d33077ffde
--- /dev/null
+++ b/amber/src/main/python/core/models/test_operator.py
@@ -0,0 +1,291 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import base64
+
+import pytest
+
+from core.models import (
+ BatchOperator,
+ SourceOperator,
+ State,
+ Table,
+ Tuple,
+ TupleOperatorV2,
+)
+from core.models.operator import Operator, TableOperator
+
+
+class _ConcreteOperator(TupleOperatorV2):
+ """Minimal concrete subclass; implements abstract process_tuple."""
+
+ def process_tuple(self, tuple_, port):
+ yield tuple_
+
+
+class _ConcreteSource(SourceOperator):
+ """Minimal concrete subclass; implements abstract produce."""
+
+ def produce(self):
+ yield None
+
+
+class _ConcreteBatch(BatchOperator):
+ BATCH_SIZE = 4
+
+ def process_batch(self, batch, port):
+ yield batch
+
+
+class _ConcreteTable(TableOperator):
+ """Concrete subclass that records the table it received via
process_table."""
+
+ def __init__(self):
+ super().__init__()
+ self.received_tables = []
+
+ def process_table(self, table, port):
+ self.received_tables.append(table)
+ yield None
+
+
+class TestPythonTemplateDecoder:
+ def test_stdlib_decoder_decodes_str_input(self):
+ decoder = Operator.PythonTemplateDecoder.StdlibBase64Decoder()
+ encoded = base64.b64encode(b"hello").decode("ascii")
+ assert decoder.to_str(encoded) == "hello"
+
+ def test_stdlib_decoder_accepts_bytes_input(self):
+ decoder = Operator.PythonTemplateDecoder.StdlibBase64Decoder()
+ encoded = base64.b64encode("中".encode("utf-8")) # bytes
+ assert decoder.to_str(encoded) == "中"
+
+ def test_stdlib_decoder_rejects_non_utf8_bytes_strictly(self):
+ # `errors='strict'` must raise; `0x80` is not a valid UTF-8 leading
byte.
+ decoder = Operator.PythonTemplateDecoder.StdlibBase64Decoder()
+ bad = base64.b64encode(b"\x80\x81").decode("ascii")
+ with pytest.raises(UnicodeDecodeError):
+ decoder.to_str(bad)
+
+ def test_default_decoder_when_none_supplied(self):
+ wrapper = Operator.PythonTemplateDecoder()
+ encoded = base64.b64encode(b"abc").decode("ascii")
+ assert wrapper.decode(encoded) == "abc"
+
+ def test_uses_injected_custom_decoder(self):
+ class CountingDecoder:
+ def __init__(self):
+ self.calls = 0
+
+ def to_str(self, data):
+ self.calls += 1
+ return f"decoded:{data}"
+
+ injected = CountingDecoder()
+ wrapper = Operator.PythonTemplateDecoder(decoder=injected)
+ assert wrapper.decode("x") == "decoded:x"
+ assert injected.calls == 1
+
+ def test_lru_cache_reuses_results_for_repeated_inputs(self):
+ # Pin: the cache short-circuits the underlying decoder so identical
+ # inputs incur only one decode call. This is what makes the wrapper
+ # cheap when the same template appears in many tuples.
+ class CountingDecoder:
+ def __init__(self):
+ self.calls = 0
+
+ def to_str(self, data):
+ self.calls += 1
+ return f"d{self.calls}:{data}"
+
+ injected = CountingDecoder()
+ wrapper = Operator.PythonTemplateDecoder(decoder=injected,
cache_size=8)
+ first = wrapper.decode("same")
+ second = wrapper.decode("same")
+ assert first == "d1:same"
+ assert second == "d1:same" # same cached result
+ assert injected.calls == 1
+
+ def test_lru_cache_evicts_when_size_exceeded(self):
+ class CountingDecoder:
+ def __init__(self):
+ self.calls = 0
+
+ def to_str(self, data):
+ self.calls += 1
+ return f"d{self.calls}:{data}"
+
+ injected = CountingDecoder()
+ wrapper = Operator.PythonTemplateDecoder(decoder=injected,
cache_size=2)
+ wrapper.decode("a")
+ wrapper.decode("b")
+ wrapper.decode("c") # evicts "a"
+ wrapper.decode("a") # cache miss → re-decode
+ assert injected.calls == 4
+
+
+class TestIsSourceProperty:
+ def test_default_is_false(self):
+ op = _ConcreteOperator()
+ assert op.is_source is False
+
+ def test_setter_true_takes_effect(self):
+ op = _ConcreteOperator()
+ op.is_source = True
+ assert op.is_source is True
+
+ def test_setter_can_flip_back_to_false(self):
+ op = _ConcreteOperator()
+ op.is_source = True
+ op.is_source = False
+ assert op.is_source is False
+
+ def test_source_operator_class_attr_storage_diverges_from_property_read(
+ self,
+ ):
+ # Documents the underlying defect without claiming a contract: the
class
+ # attribute is stored under one mangled name, the property reads from
+ # another, so they cannot agree. Asserting the two attributes directly
+ # decouples this from the eventual is_source-on-SourceOperator fix.
+ src = _ConcreteSource()
+ # The mangled attribute SourceOperator set — present but unused:
+ assert getattr(src, "_SourceOperator__internal_is_source") is True
+ # The attribute Operator.is_source actually reads — still the default:
+ assert getattr(src, "_Operator__internal_is_source") is False
+
+ @pytest.mark.xfail(
+ strict=True,
+ reason=(
+ "Known bug: SourceOperator's __internal_is_source class attribute "
+ "is name-mangled to _SourceOperator__internal_is_source, while "
+ "Operator.is_source reads _Operator__internal_is_source. The "
+ "intended contract is is_source=True on SourceOperator instances; "
+ "this xfail flips to XPASS when the bug is fixed."
+ ),
+ )
+ def test_source_operator_subclass_should_report_is_source_true(self):
+ src = _ConcreteSource()
+ assert src.is_source is True
+
+
+class TestOperatorDefaultMethods:
+ def test_open_is_no_op(self):
+ # No state to assert; verify it does not raise and returns None.
+ assert _ConcreteOperator().open() is None
+
+ def test_close_is_no_op(self):
+ assert _ConcreteOperator().close() is None
+
+ def test_process_state_returns_input_state_unchanged(self):
+ # Default behavior is to forward the State to downstream operators.
+ op = _ConcreteOperator()
+ state = State()
+ assert op.process_state(state, port=0) is state
+
+ def test_produce_state_on_start_returns_none_by_default(self):
+ assert _ConcreteOperator().produce_state_on_start(port=0) is None
+
+ def test_produce_state_on_finish_returns_none_by_default(self):
+ assert _ConcreteOperator().produce_state_on_finish(port=0) is None
+
+
+class TestLazyTemplateDecoder:
+ def test_first_call_creates_decoder_and_caches_on_instance(self):
+ op = _ConcreteOperator()
+ assert not hasattr(op, "_python_template_decoder")
+ op._get_template_decoder()
+ assert hasattr(op, "_python_template_decoder")
+
+ def test_subsequent_calls_reuse_the_cached_decoder(self):
+ op = _ConcreteOperator()
+ first = op._get_template_decoder()
+ second = op._get_template_decoder()
+ assert first is second
+
+ def test_decode_python_template_delegates_to_lazy_decoder(self):
+ op = _ConcreteOperator()
+ encoded = base64.b64encode(b"payload").decode("ascii")
+ assert op.decode_python_template(encoded) == "payload"
+
+
+class TestBatchOperatorValidation:
+ def test_validate_batch_size_rejects_none(self):
+ with pytest.raises(ValueError, match="cannot be None"):
+ BatchOperator._validate_batch_size(None)
+
+ def test_validate_batch_size_rejects_non_int(self):
+ with pytest.raises(ValueError):
+ BatchOperator._validate_batch_size("10")
+
+ def test_validate_batch_size_rejects_zero(self):
+ with pytest.raises(ValueError, match="positive"):
+ BatchOperator._validate_batch_size(0)
+
+ def test_validate_batch_size_rejects_negative(self):
+ with pytest.raises(ValueError, match="positive"):
+ BatchOperator._validate_batch_size(-3)
+
+ def test_validate_batch_size_accepts_positive_int(self):
+ # No raise = pass; method returns None implicitly.
+ assert BatchOperator._validate_batch_size(1) is None
+ assert BatchOperator._validate_batch_size(1024) is None
+
+ def test_concrete_batch_operator_initializes_with_valid_size(self):
+ op = _ConcreteBatch()
+ assert op.BATCH_SIZE == 4
+
+
+class TestTableOperator:
+ def test_process_tuple_buffers_input_and_yields_none(self):
+ # process_tuple is @final on TableOperator: it must record the tuple
+ # internally and yield exactly one None so the framework's iterator
+ # protocol still sees a value, but no output is produced per-tuple.
+ op = _ConcreteTable()
+ out = list(op.process_tuple(Tuple({"x": 1}), port=0))
+ assert out == [None]
+ # Nothing was passed downstream to process_table yet.
+ assert op.received_tables == []
+
+ def test_on_finish_calls_process_table_with_buffered_tuples(self):
+ op = _ConcreteTable()
+ list(op.process_tuple(Tuple({"x": 1, "y": "a"}), port=0))
+ list(op.process_tuple(Tuple({"x": 2, "y": "b"}), port=0))
+ # Drain on_finish so the generator runs.
+ list(op.on_finish(port=0))
+
+ assert len(op.received_tables) == 1
+ table = op.received_tables[0]
+ assert isinstance(table, Table)
+ rows = [t for t in table.as_tuples()]
+ assert rows == [Tuple({"x": 1, "y": "a"}), Tuple({"x": 2, "y": "b"})]
+
+ def test_on_finish_with_no_buffered_tuples_yields_empty_table(self):
+ op = _ConcreteTable()
+ list(op.on_finish(port=0))
+ assert len(op.received_tables) == 1
+ assert list(op.received_tables[0].as_tuples()) == []
+
+ def test_buffers_are_keyed_by_port(self):
+ # Each input port has its own tuple buffer; on_finish for one port
+ # must not surface tuples written through a different port.
+ op = _ConcreteTable()
+ list(op.process_tuple(Tuple({"x": 1}), port=0))
+ list(op.process_tuple(Tuple({"x": 99}), port=1))
+
+ list(op.on_finish(port=0))
+ rows = list(op.received_tables[0].as_tuples())
+ assert rows == [Tuple({"x": 1})]