Hello, I am running a pipeline built in the Python SDK that reads from a Redis stream <https://redis.io/docs/data-types/streams/> via an SDF, in the following environment:
- Python 3.11 - Apache Beam 2.48.0 - Flink 1.16 - Checkpoint interval: 60s - state.backend (Flink): hashmap - state_backend (Beam): filesystem The issue that I am observing is that the checkpoint size keeps growing, even when there are no items to read on the Redis stream. Since there are no items to read on the Redis stream, the Redis stream SDF is simply doing the following steps repeatedly, as part of DoFn.process, i.e. the pattern described in the user-initiated checkpoint pattern in the Apache Beam programming guide <https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint> to handle polling for new items with some delay, if the last poll returned no items: 1. Make the call to the Redis client to read items from the Redis stream 2. Receive no items from the Redis stream, and hence, 3. Call tracker.defer_remainder(Duration.of(5)) and return-ing to defer execution for 5 seconds. That code is located here <https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-beam-py-L534-L541> . 4. Go back to step 1. This checkpoint size growth happens regardless of whether I'm using heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows large enough to cause the task manager to crash, due to exhausting Java heap space. The rate of checkpoint size growth is proportional to the number of tracker.defer_remainder() calls I have done, i.e. increasing parallelism and/or decreasing the timeout used in tracker.defer_remainder will increase the rate of checkpoint growth. I took a look at the heap-based checkpoint files that I observed were getting larger with each checkpoint (just using the less command) and noticed that many copies of the residual restriction were present, which seemed like a red flag. The residual restriction here is the one that results from calling tracker.defer_remainder(), which results in a tracker.try_split(0.0). I've included the SDF code and jobmanager logs showing growing checkpoint size here: https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. I've included the restriction provider/tracker and other pieces for completeness, but the SDF is towards the bottom. Any help would be appreciated! 🙏🏾 Thanks, -- Nimalan Mahendran ML Engineer at Liminal Insights