This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-state-materialization
by this push:
new e7a0f15f3d test: cover state materialization round trip
e7a0f15f3d is described below
commit e7a0f15f3d88ecadd871408372c91bb78f4438aa
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 23 01:08:30 2026 -0700
test: cover state materialization round trip
---
.../core/storage/iceberg/test_iceberg_document.py | 40 ++++++++++++++++++++++
.../result/iceberg/IcebergDocumentSpec.scala | 28 +++++++++++++++
2 files changed, 68 insertions(+)
diff --git
a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
index 9b374f7d5c..29e43f249c 100644
--- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
@@ -23,6 +23,12 @@ from concurrent.futures import as_completed
from concurrent.futures.thread import ThreadPoolExecutor
from core.models import Schema, Tuple
+from core.models.state import (
+ STATE_SCHEMA,
+ deserialize_state,
+ serialize_state,
+ state_uri_from_result_uri,
+)
from core.storage.document_factory import DocumentFactory
from core.storage.storage_config import StorageConfig
from core.storage.vfs_uri_factory import VFSURIFactory
@@ -317,3 +323,37 @@ class TestIcebergDocument:
assert iceberg_document.get_count() == len(sample_items), (
"get_count should return the same number as the length of
sample_items"
)
+
+ def test_state_materialization_round_trip(self):
+ operator_uuid = str(uuid.uuid4()).replace("-", "")
+ result_uri = VFSURIFactory.create_result_uri(
+ WorkflowIdentity(id=0),
+ ExecutionIdentity(id=0),
+ GlobalPortIdentity(
+ op_id=PhysicalOpIdentity(
+
logical_op_id=OperatorIdentity(id=f"test_state_{operator_uuid}"),
+ layer_name="main",
+ ),
+ port_id=PortIdentity(id=0),
+ input=False,
+ ),
+ )
+ state_uri = state_uri_from_result_uri(result_uri)
+ DocumentFactory.create_document(state_uri, STATE_SCHEMA)
+ document, _ = DocumentFactory.open_document(state_uri)
+
+ state = {
+ "loop_counter": 3,
+ "name": "outer-loop",
+ "payload": b"\x00\x01state-bytes",
+ "nested": {"enabled": True, "values": [1, 2, 3]},
+ }
+
+ writer = document.writer(str(uuid.uuid4()))
+ writer.open()
+ writer.put_one(serialize_state(state))
+ writer.close()
+
+ stored_rows = list(document.get())
+ assert len(stored_rows) == 1
+ assert deserialize_state(stored_rows[0]) == state
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index 8fdf039f3e..761efe6341 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -20,6 +20,7 @@
package org.apache.texera.amber.storage.result.iceberg
import org.apache.texera.amber.config.StorageConfig
+import org.apache.texera.amber.core.state.State
import org.apache.texera.amber.core.storage.model.{VirtualDocument,
VirtualDocumentSpec}
import org.apache.texera.amber.core.storage.{DocumentFactory,
IcebergCatalogInstance, VFSURIFactory}
import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
@@ -141,6 +142,33 @@ class IcebergDocumentSpec extends
VirtualDocumentSpec[Tuple] with BeforeAndAfter
}
}
+ it should "round trip materialized state documents" in {
+ val stateUri = State.stateUriFromResultUri(uri)
+ DocumentFactory.createDocument(stateUri, State.schema)
+ val stateDocument =
+
DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]]
+ val state: State = Map(
+ "loop_counter" -> 3,
+ "name" -> "outer-loop",
+ "payload" -> Array[Byte](0, 1, 2, 3),
+ "nested" -> Map("enabled" -> true, "values" -> List(1, 2, 3))
+ )
+
+ val writer = stateDocument.writer(UUID.randomUUID().toString)
+ writer.open()
+ writer.putOne(State.serialize(state))
+ writer.close()
+
+ val storedRows = stateDocument.get().toList
+ assert(storedRows.length == 1)
+ val deserialized = State.deserialize(storedRows.head)
+ assert(deserialized("loop_counter") == 3L)
+ assert(deserialized("name") == "outer-loop")
+
assert(deserialized("payload").asInstanceOf[Array[Byte]].sameElements(Array[Byte](0,
1, 2, 3)))
+ assert(deserialized("nested").asInstanceOf[Map[String, Any]]("enabled") ==
true)
+ assert(deserialized("nested").asInstanceOf[Map[String, Any]]("values") ==
List(1L, 2L, 3L))
+ }
+
/** Returns a dynamic proxy for `realTable` that increments `counter` on
every `refresh()` call. */
private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger):
Table =
Proxy