weiqingy opened a new issue, #845:
URL: https://github.com/apache/flink-agents/issues/845

   ### Description
   
   Follow-up to #723 / #839.
   
   The `set()`-time memory value validator added in #839 
(`validate_memory_value` in `python/flink_agents/api/memory_object.py`) accepts 
a recursively checkpoint-stable contract of `None | bool | int | float | str | 
list[...] | dict[str, ...]`. The accepted scalar set is exact-typed:
   
   ```python
   _CHECKPOINT_STABLE_SCALARS = (bool, int, float, str)
   ```
   
   `bytes` was part of the contract originally proposed in #723:
   
   ```python
   None | bool | int | float | str | bytes | list[MemoryValue] | dict[str, 
MemoryValue]
   ```
   
   but was **intentionally excluded** from the validator. The reason: the 
Python→Java `bytes` conversion through Pemja is unverified end-to-end, and 
accepting an unsafe value would defeat the validator's purpose (turning an 
early, debuggable rejection back into a checkpoint-time JVM crash on restore). 
Excluding it was the safe default. This issue tracks closing that gap.
   
   ### Task
   
   Verify whether `bytes` survives the full Pemja → Flink state → checkpoint → 
TaskManager-restart → restore path, the same real-job path used to reproduce 
#723 (not just direct Flink serializer behavior):
   
   1. Confirm whether Pemja materializes a Python `bytes` value into a native, 
checkpoint-stable JVM type (e.g. `byte[]`) on the `FlinkMemoryObject.set()` 
path, rather than a `PyObject`/`PyJObject` wrapper.
   2. If it does materialize safely:
      - Add `bytes` to `_CHECKPOINT_STABLE_SCALARS`.
      - Extend `test_memory_value_validation.py` to cover `bytes` accept (incl. 
nested in `list`/`dict`).
      - Update the Python memory value contract docs (added in #840) to include 
`bytes`.
   3. If it does **not** materialize safely:
      - Keep `bytes` rejected, and document in the contract docs that `bytes` 
must be materialized before storing (e.g. base64-encode to `str`), with the 
rationale.
   
   ### Notes
   
   A true restore test can't run on the MiniCluster — in-place recovery does 
not recreate the JVM, so the Pemja conversion path isn't crossed. Verification 
needs the real restart path described in #723 (RocksDB + filesystem 
checkpointing + manual TaskManager kill/restart), or the e2e 
checkpoint-recovery harness tracked in #836.
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to