This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new 484eb37965 fix fmt
484eb37965 is described below

commit 484eb37965568323fa3f16154207aa0f9ba41262
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Apr 18 03:38:35 2026 -0700

    fix fmt
---
 amber/src/main/python/core/models/state.py         | 13 +++---------
 .../main/python/core/runnables/network_sender.py   |  6 +-----
 .../org/apache/texera/amber/core/state/State.scala | 23 +++++-----------------
 3 files changed, 9 insertions(+), 33 deletions(-)

diff --git a/amber/src/main/python/core/models/state.py 
b/amber/src/main/python/core/models/state.py
index 57c3cb4206..e1c22e3b00 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -25,14 +25,12 @@ from .tuple import Tuple
 State: TypeAlias = Dict[str, Any]
 
 SERIALIZED_STATE_CONTENT = "serialized_state_content"
-LOOP_COUNTER = "loop_counter"
 _TYPE_MARKER = "__texera_type__"
 _PAYLOAD_MARKER = "payload"
 _BYTES_TYPE = "bytes"
 
 STATE_SCHEMA = Schema()
 STATE_SCHEMA.add(SERIALIZED_STATE_CONTENT, AttributeType.STRING)
-STATE_SCHEMA.add(LOOP_COUNTER, AttributeType.LONG)
 
 
 def state_uri_from_result_uri(result_uri: str) -> str:
@@ -40,23 +38,18 @@ def state_uri_from_result_uri(result_uri: str) -> str:
 
 
 def serialize_state(state: State) -> Tuple:
-    payload = dict(state)
-    loop_counter = int(payload.pop(LOOP_COUNTER, 0))
     return Tuple(
         {
             SERIALIZED_STATE_CONTENT: json.dumps(
-                _to_json_value(payload), separators=(",", ":")
-            ),
-            LOOP_COUNTER: loop_counter,
+                _to_json_value(state), separators=(",", ":")
+            )
         },
         schema=STATE_SCHEMA,
     )
 
 
 def deserialize_state(row: Tuple) -> State:
-    state = _from_json_value(json.loads(row[SERIALIZED_STATE_CONTENT] or "{}"))
-    state[LOOP_COUNTER] = int(row[LOOP_COUNTER])
-    return state
+    return _from_json_value(json.loads(row[SERIALIZED_STATE_CONTENT] or "{}"))
 
 
 def _to_json_value(value: Any) -> Any:
diff --git a/amber/src/main/python/core/runnables/network_sender.py 
b/amber/src/main/python/core/runnables/network_sender.py
index 11824dbe68..0bbac8b110 100644
--- a/amber/src/main/python/core/runnables/network_sender.py
+++ b/amber/src/main/python/core/runnables/network_sender.py
@@ -29,7 +29,6 @@ from core.models.internal_queue import (
 )
 from core.models.state import (
     SERIALIZED_STATE_CONTENT,
-    LOOP_COUNTER,
     STATE_SCHEMA,
     serialize_state,
 )
@@ -110,10 +109,7 @@ class NetworkSender(StoppableQueueBlockingRunnable):
             serialized_state = serialize_state(data_payload.frame)
             table = pa.Table.from_pydict(
                 {
-                    SERIALIZED_STATE_CONTENT: [
-                        serialized_state[SERIALIZED_STATE_CONTENT]
-                    ],
-                    LOOP_COUNTER: [serialized_state[LOOP_COUNTER]],
+                    SERIALIZED_STATE_CONTENT: 
[serialized_state[SERIALIZED_STATE_CONTENT]],
                 },
                 schema=STATE_SCHEMA.as_arrow_schema(),
             )
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
index cabff0e42d..fa361350d7 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
@@ -29,33 +29,28 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala
 
 object State {
   private val SerializedStateContentColumn = "serialized_state_content"
-  private val LoopCounterColumn = "loop_counter"
   private val BytesTypeMarker = "__texera_type__"
   private val BytesValue = "bytes"
   private val PayloadMarker = "payload"
 
   val materializationSchema: Schema = new Schema(
-    new Attribute(SerializedStateContentColumn, AttributeType.STRING),
-    new Attribute(LoopCounterColumn, AttributeType.LONG)
+    new Attribute(SerializedStateContentColumn, AttributeType.STRING)
   )
 
   def stateUriFromResultUri(resultUri: URI): URI =
     new URI(resultUri.toString.replace("/result", "/state"))
 
   def serializeState(state: Map[String, Any]): Tuple = {
-    val loopCounter = state.get(LoopCounterColumn).map(toLong).getOrElse(0L)
-    val payloadJson = 
objectMapper.writeValueAsString(toJsonValue(state.removed(LoopCounterColumn)))
-    Tuple.builder(materializationSchema).addSequentially(Array(payloadJson, 
loopCounter)).build()
+    val payloadJson = objectMapper.writeValueAsString(toJsonValue(state))
+    
Tuple.builder(materializationSchema).addSequentially(Array(payloadJson)).build()
   }
 
   def deserializeState(tuple: Tuple): Map[String, Any] = {
     val payload =
       
Option(tuple.getField[String](SerializedStateContentColumn)).getOrElse("{}")
     val root = objectMapper.readTree(payload)
-    val state =
-      if (root == null || !root.isObject) Map.empty[String, Any]
-      else root.fields().asScala.map(entry => entry.getKey -> 
fromJsonValue(entry.getValue)).toMap
-    state + (LoopCounterColumn -> 
toLong(tuple.getField[Any](LoopCounterColumn)))
+    if (root == null || !root.isObject) Map.empty[String, Any]
+    else root.fields().asScala.map(entry => entry.getKey -> 
fromJsonValue(entry.getValue)).toMap
   }
 
   private def toJsonValue(value: Any): Any = value match {
@@ -92,12 +87,4 @@ object State {
       node.asText()
     }
   }
-
-  private def toLong(value: Any): Long = value match {
-    case null                     => 0L
-    case number: java.lang.Number => number.longValue()
-    case text: String             => text.toLong
-    case other                    =>
-      throw new IllegalArgumentException(s"Cannot convert $other to loop 
counter")
-  }
 }

Reply via email to