Hi Jose, hey Austin!!

I know we were just recently looking at trying to consume a fixed number of 
messages from an RMQ source, process them and output them to an RMQ sink. As a 
naive first attempt at stopping the job when the target number of messaged had 
been processed, we put a counter state in the process function and tried 
throwing an exception when the counter >= the target message count.

The job had:

  *   parallelism: 1
  *   checkpointing: 1000 (1 sec)
  *   restartStrategy: noRestart
  *   prefetchCount: 100

Running it with 150 messages in the input queue and 150 also as the target 
number, at the end the queues had:

  *   output queue - 150
  *   input queue - 50

So it looks like it did transfer all the messages, but some unack'd ones also 
got requeued back at the source so end up as duplicates. I know throwing an 
exception in the Flink job is not the same as triggering a stateful shutdown, 
but it might be hitting similar unack issues.

John

________________________________
From: Austin Cawley-Edwards <austin.caw...@gmail.com>
Sent: Thursday 13 May 2021 16:49
To: Jose Vargas <jose.var...@fiscalnote.com>; John Morrow 
<johnniemor...@hotmail.com>
Cc: user <user@flink.apache.org>
Subject: Re: RabbitMQ source does not stop unless message arrives in queue

Hey Jose,

Thanks for bringing this up – it indeed sounds like a bug. There is ongoing 
work to update the RMQ source to the new interface, which might address some of 
these issues (or should, if it is not already), tracked in FLINK-20628[1]. 
Would you be able to create a JIRA issue for this, or would you like me to?

At my previous company, we only consumed one Rabbit queue per application, so 
we didn't run into this exactly but did see other weird behavior in the RMQ 
source that could be related. I'm going to cc @John 
Morrow<mailto:johnniemor...@hotmail.com> who might be able to contribute to 
what he's seen working with the source, if he's around. I remember some 
messages not properly being ack'ed during a stateful shutdown via the Ververica 
Platform's stop-with-savepoint functionality that you mention, though that 
might be more related to FLINK-20244[2], perhaps.


Best,
Austin

[1]: https://issues.apache.org/jira/browse/FLINK-20628
[2]: https://issues.apache.org/jira/browse/FLINK-20244

On Thu, May 13, 2021 at 10:23 AM Jose Vargas 
<jose.var...@fiscalnote.com<mailto:jose.var...@fiscalnote.com>> wrote:
Hi,

I am using Flink 1.12 to read from and write to a RabbitMQ cluster. Flink's 
RabbitMQ source has some surprising behavior when a stop-with-savepoint request 
is made.

Expected Behavior:
The stop-with-savepoint request stops the job with a FINISHED state.

Actual Behavior:
The stop-with-savepoint request either times out or hangs indefinitely unless a 
message arrives in all the queues that the job consumes from after the 
stop-with-savepoint request is made.


I know that one possible workaround is to send a sentinel value to each of the 
queues consumed by the job that the deserialization schema checks in its 
isEndOfStream method. However, this is somewhat cumbersome and complicates the 
continuous delivery of a Flink job. For example, Ververica Platform will 
trigger a stop-with-savepoint for the user if one of many possible Flink 
configurations for a job are changed. The stop-with-savepoint can then hang 
indefinitely because only some of the RabbitMQ sources will have reached a 
FINISHED state.

I have attached the TaskManager thread dump after the save-with-savepoint 
request was made. Most every thread is either sleeping or waiting around for 
locks to be released, and then there are a handful of threads trying to read 
data from a socket via the com.rabbitmq.client.impl.Frame.readFrom method.

Ideally, once a stop-with-savepoint request is made, the threads trying to read 
data from RabbitMQ would be interrupted so that all RabbitMQ sources would 
reach a FINISHED state.

Regular checkpoints and savepoints complete successfully, it is only the 
stop-with-savepoint request where I see this behavior.


Respectfully,

[https://lh4.googleusercontent.com/cgQ5UKZ_oWF2ip_0c1HOs45h5UE-FQ6Gp561o43FbhJK7zovHLoYRx_PkeotKziAds52CL47siHAhV3N2eIqqsSAwfiZ_5O7fikdoFV1fj4h0UZnh--abrRte86VARCmquCG1w9KMnI]

Jose Vargas

Software Engineer, Data Engineering

E: jose.var...@fiscalnote.com<mailto:jose.var...@fiscalnote.com>

fiscalnote.com<https://www.fiscalnote.com>  |  
info.cq.com<http://www.info.cq.com>  | rollcall.com<https://www.rollcall.com>

Reply via email to