Hi Yitzchak,

Thanks for reaching out.
I'm not an expert on the Kafka consumer, but I think the number of
partitions and the number of source tasks might be interesting to know.

Maybe Gordon (in CC) has an idea of what's going wrong here.

Best, Fabian

Am Di., 23. Juli 2019 um 08:50 Uhr schrieb Yitzchak Lieberman <
yitzch...@sentinelone.com>:

> Hi.
>
> Another question - what will happen during a triggered checkpoint if one
> of the kafka brokers is unavailable?
> Will appreciate your insights.
>
> Thanks.
>
> On Mon, Jul 22, 2019 at 12:42 PM Yitzchak Lieberman <
> yitzch...@sentinelone.com> wrote:
>
>> Hi.
>>
>> I'm running a Flink application (version 1.8.0) that
>> uses FlinkKafkaConsumer to fetch topic data and perform transformation on
>> the data, with state backend as below:
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.enableCheckpointing(5_000, CheckpointingMode.AT_LEAST_ONCE);
>>         env.setStateBackend((StateBackend) new
>> FsStateBackend("file:///test"));
>>         env.getCheckpointConfig().setCheckpointTimeout(30_000);
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>>
>> My problem is with the kafka brokers, where in the cluster there are 3
>> operating brokers and 2 are down - total 5 brokers.
>> I was able to consume the data, but when the checkpoint triggered it
>> throws this exception:
>>
>> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
>> o.a.f.r.c.CheckpointCoordinator - Decline checkpoint 6 by task
>> 457b1f801fee89d6f9544409877e29d8 of job 1c46aa5719bac1f0bea436d460b79db1.
>> [INFO ] 2019-07-22 12:29:14.636 [flink-akka.actor.default-dispatcher-28]
>> o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution
>> state FAILED to JobManager for task Source: Custom Source -> Sink: Print to
>> Std. Out 457b1f801fee89d6f9544409877e29d8.
>> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
>> o.a.f.r.c.CheckpointCoordinator - Discarding checkpoint 6 of job
>> 1c46aa5719bac1f0bea436d460b79db1.
>> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
>> Task Source: Custom Source -> Sink: Print to Std. Out (2/4) was not running
>> at
>> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1198)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:700)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?]
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_201]
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:502)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.Actor$class.aroundReceive(Actor.scala)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [scala-library-2.11.12.jar:?]
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [scala-library-2.11.12.jar:?]
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [scala-library-2.11.12.jar:?]
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [scala-library-2.11.12.jar:?]
>> [INFO ] 2019-07-22 12:29:14.637 [flink-akka.actor.default-dispatcher-28]
>> o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out
>> (2/4) (457b1f801fee89d6f9544409877e29d8) switched from RUNNING to FAILED.
>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>> fetching topic metadata
>>
>> My question is (as I think what does the checkpoint tries to do) why is
>> it trying to fetch topic metadata from the brokers that are down?
>>
>> Thanks,
>> Yitzchak.
>>
>

Reply via email to