wenjin272 commented on PR #63:
URL: https://github.com/apache/flink-agents/pull/63#issuecomment-3086539825
> It seems we are always trying the pydantic serialization first, and
fallback to json serialization with custom serializer if the first try fail.
>
> This approach leads to a few issues.
>
> * Inefficiency, for trying to serialize the object twice.
> * Potential inconsistency. Adding one row-type filed in the nested object
will entirely change the serializer.
>
> I think the most elegant approach might be making the pyflink `Row` a
pydantic base model and override `model_dump_json()` for it. However, even we
make the change in Flink now, it will only be available to future Flink
versions. Moreover, we'd better not to affect Flink for Flink Agents untill the
latter is stabilized. Alternatively, we may consider modifying `Row` in Flink
Agents with some monkey patches, as a temporal solution.
>
> WDYT? @Kavishankarks @wenjin272
I think it make sense to avoid the issues.
Firstly, I try the monkey patches, but it occurs exception
```
TypeError: __bases__ assignment: 'BaseModel' deallocator differs from
'object'
```
Then, I try to find some methods to inject customer serializer to BaseModel
model_dump_json(). Fortunately, there is exactly a fallback parameters when
call model_dump_json()
```
fallback: A function to call when an unknown value is encountered. If not
provided,
a
[`PydanticSerializationError`][pydantic_core.PydanticSerializationError] error
is raised.
```
So, I modify the Event code like:
```
class Event(BaseModel, ABC, extra="allow"):
"""Base class for all event types in the system. Event allow extra
properties, but
these properties are required isinstance of BaseModel, or json
serializable.
Attributes:
----------
id : UUID
Unique identifier for the event, automatically generated using uuid4.
"""
id: UUID = Field(default_factory=uuid4)
@staticmethod
def __serialize_unknow(field: Any) -> Dict[str, Any]:
if isinstance(field, Row):
return {"type": "Row", "values": field._values}
else:
err_msg = f"Unable to serialize unknown type: {field.__class__}"
raise PydanticSerializationError(err_msg)
@override
def model_dump_json(self, **kwargs: Any) -> str:
"""Override model_dump_json to handle Row objects."""
return super().model_dump_json(fallback=self.__serialize_unknow)
@model_validator(mode='after')
def validate_extra(self) -> 'Event':
"""Ensure init fields is serializable."""
self.model_dump_json()
return self
def __setattr__(self, name: str, value: Any) -> None:
super().__setattr__(name, value)
# Ensure added property can be serialized.
self.model_dump_json()
```
And it works
```
def test_inject_super_class_to_row() -> None:
event = InputEvent(input=Row({"a": 1}))
print(event.model_dump_json())
{"id":"037d5556-4a99-4d5f-a3f4-c7ec12773dc0","input":{"type":"Row","values":[{"a":1}]}}
```
WDYT? @Kavishankarks @xintongsong
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]