This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-state-only
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-state-only by this
push:
new d92ed51b1c fix fmt
d92ed51b1c is described below
commit d92ed51b1cdb65effe962e65f6cb1138a97a383f
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 30 18:01:53 2026 -0700
fix fmt
---
amber/src/main/python/core/models/state.py | 28 ++++++++++++----------
.../main/python/core/runnables/network_receiver.py | 15 +++---------
.../main/python/core/runnables/network_sender.py | 14 +++--------
3 files changed, 21 insertions(+), 36 deletions(-)
diff --git a/amber/src/main/python/core/models/state.py
b/amber/src/main/python/core/models/state.py
index 8603c2124f..003aaa212a 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -24,25 +24,27 @@ from .tuple import Tuple
class State(dict):
- pass
+ CONTENT = "content"
+ SCHEMA = Schema(raw_schema={CONTENT: "STRING"})
-STATE_CONTENT = "content"
-_TYPE_MARKER = "__texera_type__"
-_PAYLOAD_MARKER = "payload"
-_BYTES_TYPE = "bytes"
+ def to_json(self) -> str:
+ return json.dumps(_to_json_value(self), separators=(",", ":"))
-STATE_SCHEMA = Schema(raw_schema={STATE_CONTENT: "STRING"})
+ def to_tuple(self) -> Tuple:
+ return Tuple({State.CONTENT: self.to_json()}, schema=State.SCHEMA)
+ @classmethod
+ def from_json(cls, payload: str) -> "State":
+ return cls(_from_json_value(json.loads(payload)))
-def serialize_state(state: State) -> Tuple:
- return Tuple(
- {STATE_CONTENT: json.dumps(_to_json_value(state), separators=(",",
":"))},
- schema=STATE_SCHEMA,
- )
+ @classmethod
+ def from_tuple(cls, row: Tuple) -> "State":
+ return cls.from_json(row[cls.CONTENT])
-def deserialize_state(row: Tuple) -> State:
- return State(_from_json_value(json.loads(row[STATE_CONTENT])))
+_TYPE_MARKER = "__texera_type__"
+_PAYLOAD_MARKER = "payload"
+_BYTES_TYPE = "bytes"
def _to_json_value(value: Any) -> Any:
diff --git a/amber/src/main/python/core/runnables/network_receiver.py
b/amber/src/main/python/core/runnables/network_receiver.py
index 8cd8a0d537..739cf0788e 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -32,7 +32,7 @@ from
core.architecture.handlers.actorcommand.credit_update_handler import (
)
from core.models import (
DataFrame,
- Tuple,
+ State,
StateFrame,
)
from core.models.internal_queue import (
@@ -42,7 +42,6 @@ from core.models.internal_queue import (
ECMElement,
)
from core.proxy import ProxyServer
-from core.models.state import STATE_SCHEMA, deserialize_state
from core.util import Stoppable, get_one_of
from core.util.runnable.runnable import Runnable
from proto.org.apache.texera.amber.engine.architecture.rpc import
EmbeddedControlMessage
@@ -146,20 +145,12 @@ class NetworkReceiver(Runnable, Stoppable):
self._proxy_server.register_actor_message_handler(actor_message_handler)
@staticmethod
- def _deserialize_state_payload(table: Table) -> dict:
+ def _deserialize_state_payload(table: Table) -> State:
# Each network State message carries exactly one serialized state row.
# Multiple states are sent as multiple State messages, not as multiple
# rows inside one network payload.
assert len(table) == 1
- return deserialize_state(
- Tuple(
- {
- name: table[name][0].as_py()
- for name in STATE_SCHEMA.get_attr_names()
- },
- schema=STATE_SCHEMA,
- )
- )
+ return State.from_json(table[State.CONTENT][0].as_py())
def register_shutdown(self, shutdown: callable) -> None:
self._proxy_server.register(
diff --git a/amber/src/main/python/core/runnables/network_sender.py
b/amber/src/main/python/core/runnables/network_sender.py
index 52d799d6f1..d8e3889ac1 100644
--- a/amber/src/main/python/core/runnables/network_sender.py
+++ b/amber/src/main/python/core/runnables/network_sender.py
@@ -20,18 +20,13 @@ from loguru import logger
from overrides import overrides
from typing import Optional
-from core.models import DataPayload, InternalQueue, DataFrame, StateFrame
+from core.models import DataPayload, InternalQueue, DataFrame, State,
StateFrame
from core.models.internal_queue import (
InternalQueueElement,
DataElement,
DCMElement,
ECMElement,
)
-from core.models.state import (
- STATE_CONTENT,
- STATE_SCHEMA,
- serialize_state,
-)
from core.proxy import ProxyClient
from core.util import StoppableQueueBlockingRunnable
from proto.org.apache.texera.amber.core import ChannelIdentity
@@ -104,12 +99,9 @@ class NetworkSender(StoppableQueueBlockingRunnable):
self._proxy_client.send_data(bytes(data_header),
data_payload.frame)
elif isinstance(data_payload, StateFrame):
data_header = PythonDataHeader(tag=to, payload_type="State")
- serialized_state = serialize_state(data_payload.frame)
table = pa.Table.from_pydict(
- {
- STATE_CONTENT: [serialized_state[STATE_CONTENT]],
- },
- schema=STATE_SCHEMA.as_arrow_schema(),
+ {State.CONTENT: [data_payload.frame.to_json()]},
+ schema=State.SCHEMA.as_arrow_schema(),
)
self._proxy_client.send_data(bytes(data_header), table)
else: