Hi Marco,

what is your intention? You want to upgrade the pipeline? Flink uses checkpoints / savepoints (see [1]), so cancelling pipeline to savepoint and then resuming from the savepoint should be safe. Another option is to enable offset commit to Kafka via [2]. That way you should be able to resume even without savepoint (but you will loose any internal pipeline state, so that is mostly useful for stateless pipelines).

Would that work for you?

 Jan

[1] https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/

[2] https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--

On 9/30/21 12:28 AM, Marco Costantini wrote:
Using a FlinkRunner, if I cancel the job, the pipeline blows up. Exceptions stream across my screen rapidly.

```
java.lang.InterruptedException: null
        at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:944) ~[?:1.8.0_282]         at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:584) ~[blob_p-b0501fca08dc4506cf9cffe14466df74e4d010e9-d1cc5178ec372501d7c1e755b90de159:?]
```
How can I gracefully stop my Flink+Beam job that uses an unbounded KafkaIO source?

If it is not explicitly supported, are there any work-arounds?

Please and thank you,
Marco.

Reply via email to