Your SDF looks fine. I wonder if there is an issue with how Flink is implementing SDFs (e.g. not garbage collecting previous remainders).
On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran <[email protected]> wrote: > > Hello, > > I am running a pipeline built in the Python SDK that reads from a Redis > stream via an SDF, in the following environment: > > Python 3.11 > Apache Beam 2.48.0 > Flink 1.16 > Checkpoint interval: 60s > state.backend (Flink): hashmap > state_backend (Beam): filesystem > > The issue that I am observing is that the checkpoint size keeps growing, even > when there are no items to read on the Redis stream. Since there are no items > to read on the Redis stream, the Redis stream SDF is simply doing the > following steps repeatedly, as part of DoFn.process, i.e. the pattern > described in the user-initiated checkpoint pattern in the Apache Beam > programming guide to handle polling for new items with some delay, if the > last poll returned no items: > > Make the call to the Redis client to read items from the Redis stream > Receive no items from the Redis stream, and hence, > Call tracker.defer_remainder(Duration.of(5)) and return-ing to defer > execution for 5 seconds. That code is located here. > Go back to step 1. > > This checkpoint size growth happens regardless of whether I'm using > heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows > large enough to cause the task manager to crash, due to exhausting Java heap > space. The rate of checkpoint size growth is proportional to the number of > tracker.defer_remainder() calls I have done, i.e. increasing parallelism > and/or decreasing the timeout used in tracker.defer_remainder will increase > the rate of checkpoint growth. > > I took a look at the heap-based checkpoint files that I observed were getting > larger with each checkpoint (just using the less command) and noticed that > many copies of the residual restriction were present, which seemed like a red > flag. The residual restriction here is the one that results from calling > tracker.defer_remainder(), which results in a tracker.try_split(0.0). > > I've included the SDF code and jobmanager logs showing growing checkpoint > size here: https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. > I've included the restriction provider/tracker and other pieces for > completeness, but the SDF is towards the bottom. > > Any help would be appreciated! 🙏🏾 > > Thanks, > -- > Nimalan Mahendran > ML Engineer at Liminal Insights
