Re: timeout exception when consuming from kafka

2019-07-28 Thread Yitzchak Lieberman
Hi.

Turned out that the cause was non-replicated (replication factor = 1)
topics in Kafka.

On Wed, Jul 24, 2019 at 4:20 PM Yitzchak Lieberman <
yitzch...@sentinelone.com> wrote:

> Hi.
>
> Do we have an idea for this exception?
>
> Thanks,
> Yitzchak.
>
> On Tue, Jul 23, 2019 at 12:59 PM Fabian Hueske  wrote:
>
>> Hi Yitzchak,
>>
>> Thanks for reaching out.
>> I'm not an expert on the Kafka consumer, but I think the number of
>> partitions and the number of source tasks might be interesting to know.
>>
>> Maybe Gordon (in CC) has an idea of what's going wrong here.
>>
>> Best, Fabian
>>
>> Am Di., 23. Juli 2019 um 08:50 Uhr schrieb Yitzchak Lieberman <
>> yitzch...@sentinelone.com>:
>>
>>> Hi.
>>>
>>> Another question - what will happen during a triggered checkpoint if one
>>> of the kafka brokers is unavailable?
>>> Will appreciate your insights.
>>>
>>> Thanks.
>>>
>>> On Mon, Jul 22, 2019 at 12:42 PM Yitzchak Lieberman <
>>> yitzch...@sentinelone.com> wrote:
>>>
 Hi.

 I'm running a Flink application (version 1.8.0) that
 uses FlinkKafkaConsumer to fetch topic data and perform transformation on
 the data, with state backend as below:
 StreamExecutionEnvironment env =
 StreamExecutionEnvironment.getExecutionEnvironment();
 env.enableCheckpointing(5_000, CheckpointingMode.AT_LEAST_ONCE);
 env.setStateBackend((StateBackend) new
 FsStateBackend("file:///test"));
 env.getCheckpointConfig().setCheckpointTimeout(30_000);

 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

 My problem is with the kafka brokers, where in the cluster there are 3
 operating brokers and 2 are down - total 5 brokers.
 I was able to consume the data, but when the checkpoint triggered it
 throws this exception:

 [INFO ] 2019-07-22 12:29:14.634
 [flink-akka.actor.default-dispatcher-18] o.a.f.r.c.CheckpointCoordinator -
 Decline checkpoint 6 by task 457b1f801fee89d6f9544409877e29d8 of job
 1c46aa5719bac1f0bea436d460b79db1.
 [INFO ] 2019-07-22 12:29:14.636
 [flink-akka.actor.default-dispatcher-28] o.a.f.r.t.TaskExecutor -
 Un-registering task and sending final execution state FAILED to JobManager
 for task Source: Custom Source -> Sink: Print to Std. Out
 457b1f801fee89d6f9544409877e29d8.
 [INFO ] 2019-07-22 12:29:14.634
 [flink-akka.actor.default-dispatcher-18] o.a.f.r.c.CheckpointCoordinator -
 Discarding checkpoint 6 of job 1c46aa5719bac1f0bea436d460b79db1.
 org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
 Task Source: Custom Source -> Sink: Print to Std. Out (2/4) was not running
 at
 org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1198)
 ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
 at
 org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:700)
 ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
 at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?]
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_201]
 at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
 at
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
 ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
 at
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
 ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
 at
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
 ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
 at
 akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
 ~[akka-actor_2.11-2.4.20.jar:?]
 at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:502)
 ~[akka-actor_2.11-2.4.20.jar:?]
 at akka.actor.Actor$class.aroundReceive(Actor.scala)
 ~[akka-actor_2.11-2.4.20.jar:?]
 at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
 ~[akka-actor_2.11-2.4.20.jar:?]
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
 ~[akka-actor_2.11-2.4.20.jar:?]
 at akka.actor.ActorCell.invoke(ActorCell.scala:495)
 ~[akka-actor_2.11-2.4.20.jar:?]
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
 ~[akka-actor_2.11-2.4.20.jar:?]
 at akka.dispatch.Mailbox.run(Mailbox.scala:224)
 ~[akka-actor_2.11-2.4.20.jar:?]
 at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
 ~[akka-actor_2.11-2.4.20.jar:?]
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 [scala-library-2.11.12.jar:?]
 at
 

Re: timeout exception when consuming from kafka

2019-07-24 Thread Yitzchak Lieberman
Hi.

Do we have an idea for this exception?

Thanks,
Yitzchak.

On Tue, Jul 23, 2019 at 12:59 PM Fabian Hueske  wrote:

> Hi Yitzchak,
>
> Thanks for reaching out.
> I'm not an expert on the Kafka consumer, but I think the number of
> partitions and the number of source tasks might be interesting to know.
>
> Maybe Gordon (in CC) has an idea of what's going wrong here.
>
> Best, Fabian
>
> Am Di., 23. Juli 2019 um 08:50 Uhr schrieb Yitzchak Lieberman <
> yitzch...@sentinelone.com>:
>
>> Hi.
>>
>> Another question - what will happen during a triggered checkpoint if one
>> of the kafka brokers is unavailable?
>> Will appreciate your insights.
>>
>> Thanks.
>>
>> On Mon, Jul 22, 2019 at 12:42 PM Yitzchak Lieberman <
>> yitzch...@sentinelone.com> wrote:
>>
>>> Hi.
>>>
>>> I'm running a Flink application (version 1.8.0) that
>>> uses FlinkKafkaConsumer to fetch topic data and perform transformation on
>>> the data, with state backend as below:
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.enableCheckpointing(5_000, CheckpointingMode.AT_LEAST_ONCE);
>>> env.setStateBackend((StateBackend) new
>>> FsStateBackend("file:///test"));
>>> env.getCheckpointConfig().setCheckpointTimeout(30_000);
>>>
>>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>>>
>>> My problem is with the kafka brokers, where in the cluster there are 3
>>> operating brokers and 2 are down - total 5 brokers.
>>> I was able to consume the data, but when the checkpoint triggered it
>>> throws this exception:
>>>
>>> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
>>> o.a.f.r.c.CheckpointCoordinator - Decline checkpoint 6 by task
>>> 457b1f801fee89d6f9544409877e29d8 of job 1c46aa5719bac1f0bea436d460b79db1.
>>> [INFO ] 2019-07-22 12:29:14.636 [flink-akka.actor.default-dispatcher-28]
>>> o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution
>>> state FAILED to JobManager for task Source: Custom Source -> Sink: Print to
>>> Std. Out 457b1f801fee89d6f9544409877e29d8.
>>> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
>>> o.a.f.r.c.CheckpointCoordinator - Discarding checkpoint 6 of job
>>> 1c46aa5719bac1f0bea436d460b79db1.
>>> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
>>> Task Source: Custom Source -> Sink: Print to Std. Out (2/4) was not running
>>> at
>>> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1198)
>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:700)
>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?]
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[?:1.8.0_201]
>>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>> at
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:502)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> [scala-library-2.11.12.jar:?]
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> [scala-library-2.11.12.jar:?]
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> [scala-library-2.11.12.jar:?]
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> [scala-library-2.11.12.jar:?]
>>> 

Re: timeout exception when consuming from kafka

2019-07-23 Thread Fabian Hueske
Hi Yitzchak,

Thanks for reaching out.
I'm not an expert on the Kafka consumer, but I think the number of
partitions and the number of source tasks might be interesting to know.

Maybe Gordon (in CC) has an idea of what's going wrong here.

Best, Fabian

Am Di., 23. Juli 2019 um 08:50 Uhr schrieb Yitzchak Lieberman <
yitzch...@sentinelone.com>:

> Hi.
>
> Another question - what will happen during a triggered checkpoint if one
> of the kafka brokers is unavailable?
> Will appreciate your insights.
>
> Thanks.
>
> On Mon, Jul 22, 2019 at 12:42 PM Yitzchak Lieberman <
> yitzch...@sentinelone.com> wrote:
>
>> Hi.
>>
>> I'm running a Flink application (version 1.8.0) that
>> uses FlinkKafkaConsumer to fetch topic data and perform transformation on
>> the data, with state backend as below:
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.enableCheckpointing(5_000, CheckpointingMode.AT_LEAST_ONCE);
>> env.setStateBackend((StateBackend) new
>> FsStateBackend("file:///test"));
>> env.getCheckpointConfig().setCheckpointTimeout(30_000);
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>>
>> My problem is with the kafka brokers, where in the cluster there are 3
>> operating brokers and 2 are down - total 5 brokers.
>> I was able to consume the data, but when the checkpoint triggered it
>> throws this exception:
>>
>> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
>> o.a.f.r.c.CheckpointCoordinator - Decline checkpoint 6 by task
>> 457b1f801fee89d6f9544409877e29d8 of job 1c46aa5719bac1f0bea436d460b79db1.
>> [INFO ] 2019-07-22 12:29:14.636 [flink-akka.actor.default-dispatcher-28]
>> o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution
>> state FAILED to JobManager for task Source: Custom Source -> Sink: Print to
>> Std. Out 457b1f801fee89d6f9544409877e29d8.
>> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
>> o.a.f.r.c.CheckpointCoordinator - Discarding checkpoint 6 of job
>> 1c46aa5719bac1f0bea436d460b79db1.
>> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
>> Task Source: Custom Source -> Sink: Print to Std. Out (2/4) was not running
>> at
>> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1198)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:700)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?]
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_201]
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:502)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.Actor$class.aroundReceive(Actor.scala)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> ~[akka-actor_2.11-2.4.20.jar:?]
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [scala-library-2.11.12.jar:?]
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [scala-library-2.11.12.jar:?]
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [scala-library-2.11.12.jar:?]
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [scala-library-2.11.12.jar:?]
>> [INFO ] 2019-07-22 12:29:14.637 [flink-akka.actor.default-dispatcher-28]
>> o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out
>> (2/4) (457b1f801fee89d6f9544409877e29d8) switched from RUNNING to FAILED.
>> 

Re: timeout exception when consuming from kafka

2019-07-23 Thread Yitzchak Lieberman
Hi.

Another question - what will happen during a triggered checkpoint if one of
the kafka brokers is unavailable?
Will appreciate your insights.

Thanks.

On Mon, Jul 22, 2019 at 12:42 PM Yitzchak Lieberman <
yitzch...@sentinelone.com> wrote:

> Hi.
>
> I'm running a Flink application (version 1.8.0) that
> uses FlinkKafkaConsumer to fetch topic data and perform transformation on
> the data, with state backend as below:
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(5_000, CheckpointingMode.AT_LEAST_ONCE);
> env.setStateBackend((StateBackend) new
> FsStateBackend("file:///test"));
> env.getCheckpointConfig().setCheckpointTimeout(30_000);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>
> My problem is with the kafka brokers, where in the cluster there are 3
> operating brokers and 2 are down - total 5 brokers.
> I was able to consume the data, but when the checkpoint triggered it
> throws this exception:
>
> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
> o.a.f.r.c.CheckpointCoordinator - Decline checkpoint 6 by task
> 457b1f801fee89d6f9544409877e29d8 of job 1c46aa5719bac1f0bea436d460b79db1.
> [INFO ] 2019-07-22 12:29:14.636 [flink-akka.actor.default-dispatcher-28]
> o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution
> state FAILED to JobManager for task Source: Custom Source -> Sink: Print to
> Std. Out 457b1f801fee89d6f9544409877e29d8.
> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
> o.a.f.r.c.CheckpointCoordinator - Discarding checkpoint 6 of job
> 1c46aa5719bac1f0bea436d460b79db1.
> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
> Task Source: Custom Source -> Sink: Print to Std. Out (2/4) was not running
> at
> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1198)
> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:700)
> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_201]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:502)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.actor.Actor$class.aroundReceive(Actor.scala)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> ~[akka-actor_2.11-2.4.20.jar:?]
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [scala-library-2.11.12.jar:?]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [scala-library-2.11.12.jar:?]
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [scala-library-2.11.12.jar:?]
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [scala-library-2.11.12.jar:?]
> [INFO ] 2019-07-22 12:29:14.637 [flink-akka.actor.default-dispatcher-28]
> o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out
> (2/4) (457b1f801fee89d6f9544409877e29d8) switched from RUNNING to FAILED.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
> fetching topic metadata
>
> My question is (as I think what does the checkpoint tries to do) why is it
> trying to fetch topic metadata from the brokers that are down?
>
> Thanks,
> Yitzchak.
>


timeout exception when consuming from kafka

2019-07-22 Thread Yitzchak Lieberman
Hi.

I'm running a Flink application (version 1.8.0) that
uses FlinkKafkaConsumer to fetch topic data and perform transformation on
the data, with state backend as below:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5_000, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend((StateBackend) new
FsStateBackend("file:///test"));
env.getCheckpointConfig().setCheckpointTimeout(30_000);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

My problem is with the kafka brokers, where in the cluster there are 3
operating brokers and 2 are down - total 5 brokers.
I was able to consume the data, but when the checkpoint triggered it throws
this exception:

[INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
o.a.f.r.c.CheckpointCoordinator - Decline checkpoint 6 by task
457b1f801fee89d6f9544409877e29d8 of job 1c46aa5719bac1f0bea436d460b79db1.
[INFO ] 2019-07-22 12:29:14.636 [flink-akka.actor.default-dispatcher-28]
o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution
state FAILED to JobManager for task Source: Custom Source -> Sink: Print to
Std. Out 457b1f801fee89d6f9544409877e29d8.
[INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
o.a.f.r.c.CheckpointCoordinator - Discarding checkpoint 6 of job
1c46aa5719bac1f0bea436d460b79db1.
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
Task Source: Custom Source -> Sink: Print to Std. Out (2/4) was not running
at
org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1198)
~[flink-runtime_2.11-1.8.0.jar:1.8.0]
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:700)
~[flink-runtime_2.11-1.8.0.jar:1.8.0]
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_201]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
~[flink-runtime_2.11-1.8.0.jar:1.8.0]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
~[flink-runtime_2.11-1.8.0.jar:1.8.0]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
~[flink-runtime_2.11-1.8.0.jar:1.8.0]
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
~[akka-actor_2.11-2.4.20.jar:?]
at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:502)
~[akka-actor_2.11-2.4.20.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala)
~[akka-actor_2.11-2.4.20.jar:?]
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
~[akka-actor_2.11-2.4.20.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
~[akka-actor_2.11-2.4.20.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
~[akka-actor_2.11-2.4.20.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
~[akka-actor_2.11-2.4.20.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
~[akka-actor_2.11-2.4.20.jar:?]
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
~[akka-actor_2.11-2.4.20.jar:?]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[scala-library-2.11.12.jar:?]
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[scala-library-2.11.12.jar:?]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[scala-library-2.11.12.jar:?]
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[scala-library-2.11.12.jar:?]
[INFO ] 2019-07-22 12:29:14.637 [flink-akka.actor.default-dispatcher-28]
o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out
(2/4) (457b1f801fee89d6f9544409877e29d8) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while
fetching topic metadata

My question is (as I think what does the checkpoint tries to do) why is it
trying to fetch topic metadata from the brokers that are down?

Thanks,
Yitzchak.