Hi,

I am using Flink 1.12 to read from and write to a RabbitMQ cluster. Flink's
RabbitMQ source has some surprising behavior when a stop-with-savepoint
request is made.

*Expected Behavior:*
The stop-with-savepoint request stops the job with a FINISHED state.

*Actual Behavior:*
The stop-with-savepoint request either times out or hangs indefinitely
unless a message arrives in all the queues that the job consumes from after
the stop-with-savepoint request is made.


I know that one possible workaround is to send a sentinel value to each of
the queues consumed by the job that the deserialization schema checks in
its isEndOfStream method. However, this is somewhat cumbersome and
complicates the continuous delivery of a Flink job. For example,
Ververica Platform will trigger a stop-with-savepoint for the user if one
of many possible Flink configurations for a job are changed. The
stop-with-savepoint can then hang indefinitely because only some of the
RabbitMQ sources will have reached a FINISHED state.

I have attached the TaskManager thread dump after the save-with-savepoint
request was made. Most every thread is either sleeping or waiting around
for locks to be released, and then there are a handful of threads trying to
read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom
method.

Ideally, once a stop-with-savepoint request is made, the threads trying to
read data from RabbitMQ would be interrupted so that all RabbitMQ sources
would reach a FINISHED state.

Regular checkpoints and savepoints complete successfully, it is only the
stop-with-savepoint request where I see this behavior.


Respectfully,


Jose Vargas

Software Engineer, Data Engineering

E: jose.var...@fiscalnote.com

fiscalnote.com <https://www.fiscalnote.com>  |  info.cq.com
<http://www.info.cq.com>  | rollcall.com <https://www.rollcall.com>

Attachment: taskmanager_thread_dump.json
Description: application/json

Reply via email to