Bumping this and adding +John Casey <johnjca...@google.com> who knows about
KafkaIO and unbounded sources, though probably less about the FlinkRunner.
It seems you have isolated it to the Flink translation logic. I'm not sure
who would be the best expert to evaluate if that logic is still OK.

Kenn

On Wed, Jun 29, 2022 at 11:07 AM Kathula, Sandeep <
sandeep_kath...@intuit.com> wrote:

> Hi,
>
>    We have a stateless application which
>
>
>
>    1. Reads from kafka
>    2. Doing some stateless transformations by reading from in memory
>    databases and updating the records
>    3. Writing back to Kafka.
>
>
>
>
>
>
>
> *With Beam 2.23 and Flink 1.9, we are seeing checkpoints are working fine
> (it takes max 1 min).*
>
>
>
> *With Beam 2.29 and Flink 1.12, we are seeing checkpoints taking longer
> time (it takes max 6-7 min sometimes)*
>
>
>
> *With Beam 2.38 and Flink 1.14, we are seeing checkpoints timing out after
> 10 minutes.*
>
>
>
>
>
> I am checking Beam code and after some logging and analysis found the
> problem is at
> https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L287-L307
>
>
>
>
>
> We are still using the old API to read from Kafka and not yet using KafkaIO
> based on SplittableDoFn.
>
>
>
> There are two threads
>
>    1. Legacy source thread reading from kafka and doing entire processing.
>    2. Thread which emits watermark on timer
>    
> https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L454-L474
>
>
>
> Both these code blocks are in synchronized block waiting for same
> checkpoint lock. Under heavy load, the thread reading from kafka is running
> for ever in the while loop and  the thread emitting the watermarks is
> waiting for ever to get the lock not emitting the watermarks and the
> checkpoint times out.
>
>
>
>
>
> Is it a known issue and do we have any solution here? For now we are
> putting Thread.sleep(1) once for every 10 sec after the synchronized block
> so that the thread emitting the watermarks can be unblocked and run.
>
>
>
> One of my colleagues tried to follow up on this (attaching the previous
> email here) but we didn’t get any reply. Any help on this would be
> appreciated.
>
>
>
> Thanks,
>
> Sandeep
>

Reply via email to