This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
new 484eb37965 fix fmt
484eb37965 is described below
commit 484eb37965568323fa3f16154207aa0f9ba41262
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Apr 18 03:38:35 2026 -0700
fix fmt
---
amber/src/main/python/core/models/state.py | 13 +++---------
.../main/python/core/runnables/network_sender.py | 6 +-----
.../org/apache/texera/amber/core/state/State.scala | 23 +++++-----------------
3 files changed, 9 insertions(+), 33 deletions(-)
diff --git a/amber/src/main/python/core/models/state.py
b/amber/src/main/python/core/models/state.py
index 57c3cb4206..e1c22e3b00 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -25,14 +25,12 @@ from .tuple import Tuple
State: TypeAlias = Dict[str, Any]
SERIALIZED_STATE_CONTENT = "serialized_state_content"
-LOOP_COUNTER = "loop_counter"
_TYPE_MARKER = "__texera_type__"
_PAYLOAD_MARKER = "payload"
_BYTES_TYPE = "bytes"
STATE_SCHEMA = Schema()
STATE_SCHEMA.add(SERIALIZED_STATE_CONTENT, AttributeType.STRING)
-STATE_SCHEMA.add(LOOP_COUNTER, AttributeType.LONG)
def state_uri_from_result_uri(result_uri: str) -> str:
@@ -40,23 +38,18 @@ def state_uri_from_result_uri(result_uri: str) -> str:
def serialize_state(state: State) -> Tuple:
- payload = dict(state)
- loop_counter = int(payload.pop(LOOP_COUNTER, 0))
return Tuple(
{
SERIALIZED_STATE_CONTENT: json.dumps(
- _to_json_value(payload), separators=(",", ":")
- ),
- LOOP_COUNTER: loop_counter,
+ _to_json_value(state), separators=(",", ":")
+ )
},
schema=STATE_SCHEMA,
)
def deserialize_state(row: Tuple) -> State:
- state = _from_json_value(json.loads(row[SERIALIZED_STATE_CONTENT] or "{}"))
- state[LOOP_COUNTER] = int(row[LOOP_COUNTER])
- return state
+ return _from_json_value(json.loads(row[SERIALIZED_STATE_CONTENT] or "{}"))
def _to_json_value(value: Any) -> Any:
diff --git a/amber/src/main/python/core/runnables/network_sender.py
b/amber/src/main/python/core/runnables/network_sender.py
index 11824dbe68..0bbac8b110 100644
--- a/amber/src/main/python/core/runnables/network_sender.py
+++ b/amber/src/main/python/core/runnables/network_sender.py
@@ -29,7 +29,6 @@ from core.models.internal_queue import (
)
from core.models.state import (
SERIALIZED_STATE_CONTENT,
- LOOP_COUNTER,
STATE_SCHEMA,
serialize_state,
)
@@ -110,10 +109,7 @@ class NetworkSender(StoppableQueueBlockingRunnable):
serialized_state = serialize_state(data_payload.frame)
table = pa.Table.from_pydict(
{
- SERIALIZED_STATE_CONTENT: [
- serialized_state[SERIALIZED_STATE_CONTENT]
- ],
- LOOP_COUNTER: [serialized_state[LOOP_COUNTER]],
+ SERIALIZED_STATE_CONTENT:
[serialized_state[SERIALIZED_STATE_CONTENT]],
},
schema=STATE_SCHEMA.as_arrow_schema(),
)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
index cabff0e42d..fa361350d7 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
@@ -29,33 +29,28 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala
object State {
private val SerializedStateContentColumn = "serialized_state_content"
- private val LoopCounterColumn = "loop_counter"
private val BytesTypeMarker = "__texera_type__"
private val BytesValue = "bytes"
private val PayloadMarker = "payload"
val materializationSchema: Schema = new Schema(
- new Attribute(SerializedStateContentColumn, AttributeType.STRING),
- new Attribute(LoopCounterColumn, AttributeType.LONG)
+ new Attribute(SerializedStateContentColumn, AttributeType.STRING)
)
def stateUriFromResultUri(resultUri: URI): URI =
new URI(resultUri.toString.replace("/result", "/state"))
def serializeState(state: Map[String, Any]): Tuple = {
- val loopCounter = state.get(LoopCounterColumn).map(toLong).getOrElse(0L)
- val payloadJson =
objectMapper.writeValueAsString(toJsonValue(state.removed(LoopCounterColumn)))
- Tuple.builder(materializationSchema).addSequentially(Array(payloadJson,
loopCounter)).build()
+ val payloadJson = objectMapper.writeValueAsString(toJsonValue(state))
+
Tuple.builder(materializationSchema).addSequentially(Array(payloadJson)).build()
}
def deserializeState(tuple: Tuple): Map[String, Any] = {
val payload =
Option(tuple.getField[String](SerializedStateContentColumn)).getOrElse("{}")
val root = objectMapper.readTree(payload)
- val state =
- if (root == null || !root.isObject) Map.empty[String, Any]
- else root.fields().asScala.map(entry => entry.getKey ->
fromJsonValue(entry.getValue)).toMap
- state + (LoopCounterColumn ->
toLong(tuple.getField[Any](LoopCounterColumn)))
+ if (root == null || !root.isObject) Map.empty[String, Any]
+ else root.fields().asScala.map(entry => entry.getKey ->
fromJsonValue(entry.getValue)).toMap
}
private def toJsonValue(value: Any): Any = value match {
@@ -92,12 +87,4 @@ object State {
node.asText()
}
}
-
- private def toLong(value: Any): Long = value match {
- case null => 0L
- case number: java.lang.Number => number.longValue()
- case text: String => text.toLong
- case other =>
- throw new IllegalArgumentException(s"Cannot convert $other to loop
counter")
- }
}