ashb commented on code in PR #51699:
URL: https://github.com/apache/airflow/pull/51699#discussion_r2150427838
##########
task-sdk/src/airflow/sdk/execution_time/comms.py:
##########
@@ -80,20 +90,152 @@
)
from airflow.sdk.exceptions import ErrorType
+if TYPE_CHECKING:
+ from structlog.typing import FilteringBoundLogger as Logger
+
+SendMsgType = TypeVar("SendMsgType", bound=BaseModel)
+ReceiveMsgType = TypeVar("ReceiveMsgType", bound=BaseModel)
+
+
+def _msgpack_enc_hook(obj: Any) -> Any:
+ import pendulum
+
+ if isinstance(obj, pendulum.DateTime):
+ # convert the complex to a tuple of real, imag
+ return datetime(
+ obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second,
obj.microsecond, tzinfo=obj.tzinfo
+ )
+ if isinstance(obj, Path):
+ return str(obj)
+ if isinstance(obj, BaseModel):
+ return obj.model_dump(exclude_unset=True)
+
+ # Raise a NotImplementedError for other types
+ raise NotImplementedError(f"Objects of type {type(obj)} are not supported")
+
+
+def _new_encoder() -> msgspec.msgpack.Encoder:
+ return msgspec.msgpack.Encoder(enc_hook=_msgpack_enc_hook)
+
+
+class _RequestFrame(msgspec.Struct, array_like=True, frozen=True,
omit_defaults=True):
+ id: int
+ """
+ The request id, set by the sender.
+
+ This is used to allow "pipeling" of requests and to be able to tie
response to requests, which is
+ particularly useful in the Triggerer where multiple async tasks can send a
requests concurrently.
+ """
+ body: dict[str, Any] | None
+
+ req_encoder: ClassVar[msgspec.msgpack.Encoder] = _new_encoder()
+
+ def as_bytes(self) -> bytearray:
+ # https://jcristharif.com/msgspec/perf-tips.html#length-prefix-framing
for inspiration
+ buffer = bytearray(256)
+
+ self.req_encoder.encode_into(self, buffer, 4)
+
+ n = len(buffer) - 4
+ if n > 2**32:
Review Comment:
Probably, yes
##########
task-sdk/src/airflow/sdk/execution_time/comms.py:
##########
@@ -80,20 +90,152 @@
)
from airflow.sdk.exceptions import ErrorType
+if TYPE_CHECKING:
+ from structlog.typing import FilteringBoundLogger as Logger
+
+SendMsgType = TypeVar("SendMsgType", bound=BaseModel)
+ReceiveMsgType = TypeVar("ReceiveMsgType", bound=BaseModel)
+
+
+def _msgpack_enc_hook(obj: Any) -> Any:
+ import pendulum
+
+ if isinstance(obj, pendulum.DateTime):
+ # convert the complex to a tuple of real, imag
+ return datetime(
+ obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second,
obj.microsecond, tzinfo=obj.tzinfo
+ )
+ if isinstance(obj, Path):
+ return str(obj)
+ if isinstance(obj, BaseModel):
+ return obj.model_dump(exclude_unset=True)
+
+ # Raise a NotImplementedError for other types
+ raise NotImplementedError(f"Objects of type {type(obj)} are not supported")
+
+
+def _new_encoder() -> msgspec.msgpack.Encoder:
+ return msgspec.msgpack.Encoder(enc_hook=_msgpack_enc_hook)
+
+
+class _RequestFrame(msgspec.Struct, array_like=True, frozen=True,
omit_defaults=True):
+ id: int
+ """
+ The request id, set by the sender.
+
+ This is used to allow "pipeling" of requests and to be able to tie
response to requests, which is
+ particularly useful in the Triggerer where multiple async tasks can send a
requests concurrently.
+ """
+ body: dict[str, Any] | None
+
+ req_encoder: ClassVar[msgspec.msgpack.Encoder] = _new_encoder()
+
+ def as_bytes(self) -> bytearray:
+ # https://jcristharif.com/msgspec/perf-tips.html#length-prefix-framing
for inspiration
+ buffer = bytearray(256)
+
+ self.req_encoder.encode_into(self, buffer, 4)
+
+ n = len(buffer) - 4
+ if n > 2**32:
+ raise OverflowError("Cannot send messages larger than 4GiB")
+ buffer[:4] = n.to_bytes(4, byteorder="big")
+
+ return buffer
+
+
+class _ResponseFrame(_RequestFrame, msgspec.Struct, array_like=True,
frozen=True, omit_defaults=True):
Review Comment:
Might be. I'll see.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]