Hello,

I am running a pipeline built in the Python SDK that reads from a Redis
stream <https://redis.io/docs/data-types/streams/> via an SDF, in the
following environment:

   - Python 3.11
   - Apache Beam 2.48.0
   - Flink 1.16
   - Checkpoint interval: 60s
   - state.backend (Flink): hashmap
   - state_backend (Beam): filesystem

The issue that I am observing is that the checkpoint size keeps growing,
even when there are no items to read on the Redis stream. Since there are
no items to read on the Redis stream, the Redis stream SDF is simply doing
the following steps repeatedly, as part of DoFn.process, i.e. the pattern
described in the user-initiated checkpoint pattern in the Apache Beam
programming guide
<https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint>
to handle polling for new items with some delay, if the last poll returned
no items:

   1. Make the call to the Redis client to read items from the Redis stream
   2. Receive no items from the Redis stream, and hence,
   3. Call tracker.defer_remainder(Duration.of(5)) and return-ing to defer
   execution for 5 seconds. That code is located here
   
<https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-beam-py-L534-L541>
   .
   4. Go back to step 1.

This checkpoint size growth happens regardless of whether I'm using
heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows
large enough to cause the task manager to crash, due to exhausting Java
heap space. The rate of checkpoint size growth is proportional to the
number of tracker.defer_remainder() calls I have done, i.e. increasing
parallelism and/or decreasing the timeout used in tracker.defer_remainder
will increase the rate of checkpoint growth.

I took a look at the heap-based checkpoint files that I observed were
getting larger with each checkpoint (just using the less command) and
noticed that many copies of the residual restriction were present, which
seemed like a red flag. The residual restriction here is the one that
results from calling tracker.defer_remainder(), which results in a
tracker.try_split(0.0).

I've included the SDF code and jobmanager logs showing growing checkpoint
size here: https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e.
I've included the restriction provider/tracker and other pieces for
completeness, but the SDF is towards the bottom.

Any help would be appreciated! 🙏🏾

Thanks,
-- 
Nimalan Mahendran
ML Engineer at Liminal Insights

Reply via email to