Your SDF looks fine. I wonder if there is an issue with how Flink is
implementing SDFs (e.g. not garbage collecting previous remainders).

On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran
<nima...@liminalinsights.com> wrote:
>
> Hello,
>
> I am running a pipeline built in the Python SDK that reads from a Redis 
> stream via an SDF, in the following environment:
>
> Python 3.11
> Apache Beam 2.48.0
> Flink 1.16
> Checkpoint interval: 60s
> state.backend (Flink): hashmap
> state_backend (Beam): filesystem
>
> The issue that I am observing is that the checkpoint size keeps growing, even 
> when there are no items to read on the Redis stream. Since there are no items 
> to read on the Redis stream, the Redis stream SDF is simply doing the 
> following steps repeatedly, as part of DoFn.process, i.e. the pattern 
> described in the user-initiated checkpoint pattern in the Apache Beam 
> programming guide to handle polling for new items with some delay, if the 
> last poll returned no items:
>
> Make the call to the Redis client to read items from the Redis stream
> Receive no items from the Redis stream, and hence,
> Call tracker.defer_remainder(Duration.of(5)) and return-ing to defer 
> execution for 5 seconds. That code is located here.
> Go back to step 1.
>
> This checkpoint size growth happens regardless of whether I'm using 
> heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows 
> large enough to cause the task manager to crash, due to exhausting Java heap 
> space. The rate of checkpoint size growth is proportional to the number of 
> tracker.defer_remainder() calls I have done, i.e. increasing parallelism 
> and/or decreasing the timeout used in tracker.defer_remainder will increase 
> the rate of checkpoint growth.
>
> I took a look at the heap-based checkpoint files that I observed were getting 
> larger with each checkpoint (just using the less command) and noticed that 
> many copies of the residual restriction were present, which seemed like a red 
> flag. The residual restriction here is the one that results from calling 
> tracker.defer_remainder(), which results in a tracker.try_split(0.0).
>
> I've included the SDF code and jobmanager logs showing growing checkpoint 
> size here: https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. 
> I've included the restriction provider/tracker and other pieces for 
> completeness, but the SDF is towards the bottom.
>
> Any help would be appreciated! 🙏🏾
>
> Thanks,
> --
> Nimalan Mahendran
> ML Engineer at Liminal Insights

Reply via email to