This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-state-materialization 
by this push:
     new e7a0f15f3d test: cover state materialization round trip
e7a0f15f3d is described below

commit e7a0f15f3d88ecadd871408372c91bb78f4438aa
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 23 01:08:30 2026 -0700

    test: cover state materialization round trip
---
 .../core/storage/iceberg/test_iceberg_document.py  | 40 ++++++++++++++++++++++
 .../result/iceberg/IcebergDocumentSpec.scala       | 28 +++++++++++++++
 2 files changed, 68 insertions(+)

diff --git 
a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py 
b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
index 9b374f7d5c..29e43f249c 100644
--- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
@@ -23,6 +23,12 @@ from concurrent.futures import as_completed
 from concurrent.futures.thread import ThreadPoolExecutor
 
 from core.models import Schema, Tuple
+from core.models.state import (
+    STATE_SCHEMA,
+    deserialize_state,
+    serialize_state,
+    state_uri_from_result_uri,
+)
 from core.storage.document_factory import DocumentFactory
 from core.storage.storage_config import StorageConfig
 from core.storage.vfs_uri_factory import VFSURIFactory
@@ -317,3 +323,37 @@ class TestIcebergDocument:
         assert iceberg_document.get_count() == len(sample_items), (
             "get_count should return the same number as the length of 
sample_items"
         )
+
+    def test_state_materialization_round_trip(self):
+        operator_uuid = str(uuid.uuid4()).replace("-", "")
+        result_uri = VFSURIFactory.create_result_uri(
+            WorkflowIdentity(id=0),
+            ExecutionIdentity(id=0),
+            GlobalPortIdentity(
+                op_id=PhysicalOpIdentity(
+                    
logical_op_id=OperatorIdentity(id=f"test_state_{operator_uuid}"),
+                    layer_name="main",
+                ),
+                port_id=PortIdentity(id=0),
+                input=False,
+            ),
+        )
+        state_uri = state_uri_from_result_uri(result_uri)
+        DocumentFactory.create_document(state_uri, STATE_SCHEMA)
+        document, _ = DocumentFactory.open_document(state_uri)
+
+        state = {
+            "loop_counter": 3,
+            "name": "outer-loop",
+            "payload": b"\x00\x01state-bytes",
+            "nested": {"enabled": True, "values": [1, 2, 3]},
+        }
+
+        writer = document.writer(str(uuid.uuid4()))
+        writer.open()
+        writer.put_one(serialize_state(state))
+        writer.close()
+
+        stored_rows = list(document.get())
+        assert len(stored_rows) == 1
+        assert deserialize_state(stored_rows[0]) == state
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index 8fdf039f3e..761efe6341 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -20,6 +20,7 @@
 package org.apache.texera.amber.storage.result.iceberg
 
 import org.apache.texera.amber.config.StorageConfig
+import org.apache.texera.amber.core.state.State
 import org.apache.texera.amber.core.storage.model.{VirtualDocument, 
VirtualDocumentSpec}
 import org.apache.texera.amber.core.storage.{DocumentFactory, 
IcebergCatalogInstance, VFSURIFactory}
 import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
@@ -141,6 +142,33 @@ class IcebergDocumentSpec extends 
VirtualDocumentSpec[Tuple] with BeforeAndAfter
     }
   }
 
+  it should "round trip materialized state documents" in {
+    val stateUri = State.stateUriFromResultUri(uri)
+    DocumentFactory.createDocument(stateUri, State.schema)
+    val stateDocument =
+      
DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]]
+    val state: State = Map(
+      "loop_counter" -> 3,
+      "name" -> "outer-loop",
+      "payload" -> Array[Byte](0, 1, 2, 3),
+      "nested" -> Map("enabled" -> true, "values" -> List(1, 2, 3))
+    )
+
+    val writer = stateDocument.writer(UUID.randomUUID().toString)
+    writer.open()
+    writer.putOne(State.serialize(state))
+    writer.close()
+
+    val storedRows = stateDocument.get().toList
+    assert(storedRows.length == 1)
+    val deserialized = State.deserialize(storedRows.head)
+    assert(deserialized("loop_counter") == 3L)
+    assert(deserialized("name") == "outer-loop")
+    
assert(deserialized("payload").asInstanceOf[Array[Byte]].sameElements(Array[Byte](0,
 1, 2, 3)))
+    assert(deserialized("nested").asInstanceOf[Map[String, Any]]("enabled") == 
true)
+    assert(deserialized("nested").asInstanceOf[Map[String, Any]]("values") == 
List(1L, 2L, 3L))
+  }
+
   /** Returns a dynamic proxy for `realTable` that increments `counter` on 
every `refresh()` call. */
   private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger): 
Table =
     Proxy

Reply via email to