I've got a beam pipeline using the FlinkRunner that reads from two
different SQS sources (using the SqsIO).  It does some stateful
processing on each stream, and then cogroups the results together to
generate a result and write it to Sns (using the SnsIO).  The volume
of input data isn't particularly large (about 50 messages per minute
on one queue, and about 1 message per minute on the other queue).
It's using the Global window, discarding fired panes, with a
processing time trigger delayed by 1 minute.  Checkpointing is enabled
at a 1 minute interval, with a minimum delay between checkpoints of 30
seconds.  My state backend is RocksDB, using the FLASH_SSD_OPTIMIZED
predefined options.

This pipeline runs fine for a few hours with an average checkpoint
duration of 1s (with occasional spikes higher), but eventually the
time it takes to checkpoint begins to grow until it's in the minutes
on average, and finally it won't even complete within a 10 minute
period.  I'm using 2 parallelism, and it seems to keep up with the
number of incoming messages just fine (until the checkpoint duration
grows too large and it is unable to delete the messages any longer).
To try to isolate the problem, I wrote an alternate sqs reader that
uses the Watch transform to periodically read from SQS.  This variant
doesn't show the same behavior, and has been running for a week
without issue (an average checkpoint time of 1-2s).

Some other experiments I tried:

* I observed that the operators that took a long time to checkpoint
were the deduplicating operators after the actual unbounded source
operator.  I disabled requiresDeduping and added a Reshuffle instead,
however that exhibited the same growth in checkpoint durations after a
period of time.
* I tried with the AT_LEAST_ONCE checkpointing mode instead of exactly
once, however that also exhibited the same behavior.


Does anyone have any thoughts about what might cause this behavior
with the Unbounded Source (as opposed to the splittable do variant)?

I'm running on EMR emr-5.26.0, using Flink 1.8.0, and Beam 2.14.0.

Stephen

Reply via email to