RE: failures during job start

2021-08-20 Thread Colletta, Edward
Thanks, will try that.

From: Chesnay Schepler 
Sent: Friday, August 20, 2021 8:06 AM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: failures during job start

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

I don't think there are any metrics; logging-wise you will need to do some 
detective work.

We do know which tasks have started deployment by this message from the 
JobManager:
ExecutionGraph [] -  (/) () 
switched from SCHEDULED to DEPLOYING.

We also know which have completed deployment by this message from the 
JobManager:
ExecutionGraph [] -  (/) () 
switched from DEPLOYING to RUNNING.

So what I would do is pick the task that failed with the 
PartitionNotFoundException, then figure out from the application from which 
tasks it consumes data, then check which of these have not finished deployment.


On 19/08/2021 22:34, Colletta, Edward wrote:
Thanks you.   I am going to try the first option for now, but I do need to 
figure out why deployment takes so long.
Are there any metrics or log patterns that would indicate which task is waiting 
and which task is being waited on?


From: Chesnay Schepler <mailto:ches...@apache.org>
Sent: Thursday, August 19, 2021 2:23 PM
To: Colletta, Edward <mailto:edward.colle...@fmr.com>; 
user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: failures during job start

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

This exception means that a task was deployed, but the task that produces the 
data it wants to consume was not available yet (even after waiting for a while).

Your case sounds similar to https://issues.apache.org/jira/browse/FLINK-9413, 
where this happens because the deployment of the producing task takes too long.

You have 2 options to solve this:
a) Have Flink wait longer for the partition to be created by increasing 
taskmanager.network.request-backoff.max
b) Speed up the deployment; for this you'd naturally have to investigate why 
the deployment takes so long in the first place.

On 19/08/2021 07:15, Colletta, Edward wrote:
Any help with this would be appreciated.   Is it possible that this is a 
data/application issue or a flink config/resource issue?

Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each node.

I have an issue where starting a job takes a long time, and sometimes fails 
with PartitionNotFoundException, but succeeds on restart.   The job has 10 
kafka sources (10 partitions for each topic) and parallelism 5.
The failure does not happen when the kafka logs are empty.

Note during below scenario, cpu usage on task manager and job managers is low 
(below 30%)

The scenario we see

  1.  run request to load and run a jar, job appears on dashboard with all 160 
subtasks in Deploying state
  2.  after 2 minutes some subtasks start transitioning to running.
  3.  after another 30 seconds failure occurs and job goes into Restarting state
  4.  after another minute, restart completes all nodes running.

Exception history shows
2021-08-15 07:55:02
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
Partition 205a0867c6ef540009acd962d556f981#0@a6b547c5096f3c33eb9059cfe767a2ec 
not found.
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765)
at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)










Re: failures during job start

2021-08-20 Thread Chesnay Schepler
I don't think there are any metrics; logging-wise you will need to do 
some detective work.


We do know which tasks have started deployment by this message from the 
JobManager:
ExecutionGraph [] -  (/) 
() switched from SCHEDULED to DEPLOYING.


We also know which have completed deployment by this message from the 
JobManager:
ExecutionGraph [] -  (/) 
() switched from DEPLOYING to RUNNING.


So what I would do is pick the task that failed with the 
PartitionNotFoundException, then figure out from the application from 
which tasks it consumes data, then check which of these have not 
finished deployment.




On 19/08/2021 22:34, Colletta, Edward wrote:


Thanks you.   I am going to try the first option for now, but I do 
need to figure out why deployment takes so long.


Are there any metrics or log patterns that would indicate which task 
is waiting and which task is being waited on?


*From:*Chesnay Schepler 
*Sent:* Thursday, August 19, 2021 2:23 PM
*To:* Colletta, Edward ; user@flink.apache.org
*Subject:* Re: failures during job start

*NOTICE: This email is from an external sender - **do not click on 
links or attachments unless you recognize the sender and know the 
content is safe. ***


This exception means that a task was deployed, but the task that 
produces the data it wants to consume was not available yet (even 
after waiting for a while).


Your case sounds similar to 
https://issues.apache.org/jira/browse/FLINK-9413 
<https://issues.apache.org/jira/browse/FLINK-9413>, where this happens 
because the deployment of the producing task takes too long.


You have 2 options to solve this:

a) Have Flink wait longer for the partition to be created by 
increasing taskmanager.network.request-backoff.max


b) Speed up the deployment; for this you'd naturally have to 
investigate why the deployment takes so long in the first place.


On 19/08/2021 07:15, Colletta, Edward wrote:

Any help with this would be appreciated.   Is it possible that
this is a data/application issue or a flink config/resource issue?

Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each
node.

I have an issue where starting a job takes a long time, and
sometimes fails with PartitionNotFoundException, but succeeds on
restart.   The job has 10 kafka sources (10 partitions for each
topic) and parallelism 5.

The failure does not happen when the kafka logs are empty.

Note during below scenario, cpu usage on task manager and job
managers is low (below 30%)

The scenario we see

 1. run request to load and run a jar, job appears on dashboard
with all 160 subtasks in Deploying state
 2. after 2 minutes some subtasks start transitioning to running.
 3. after another 30 seconds failure occurs and job goes into
Restarting state
 4. after another minute, restart completes all nodes running.

Exception history shows

2021-08-15 07:55:02

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
Partition
205a0867c6ef540009acd962d556f981#0@a6b547c5096f3c33eb9059cfe767a2ec
not found.

at

org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)

at

org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)

at

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)

at

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765)

at

java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)

at

java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)

at

akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at

akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

   at

akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)





RE: failures during job start

2021-08-19 Thread Colletta, Edward
Thanks you.   I am going to try the first option for now, but I do need to 
figure out why deployment takes so long.
Are there any metrics or log patterns that would indicate which task is waiting 
and which task is being waited on?


From: Chesnay Schepler 
Sent: Thursday, August 19, 2021 2:23 PM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: failures during job start

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

This exception means that a task was deployed, but the task that produces the 
data it wants to consume was not available yet (even after waiting for a while).

Your case sounds similar to https://issues.apache.org/jira/browse/FLINK-9413, 
where this happens because the deployment of the producing task takes too long.

You have 2 options to solve this:
a) Have Flink wait longer for the partition to be created by increasing 
taskmanager.network.request-backoff.max
b) Speed up the deployment; for this you'd naturally have to investigate why 
the deployment takes so long in the first place.

On 19/08/2021 07:15, Colletta, Edward wrote:
Any help with this would be appreciated.   Is it possible that this is a 
data/application issue or a flink config/resource issue?

Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each node.

I have an issue where starting a job takes a long time, and sometimes fails 
with PartitionNotFoundException, but succeeds on restart.   The job has 10 
kafka sources (10 partitions for each topic) and parallelism 5.
The failure does not happen when the kafka logs are empty.

Note during below scenario, cpu usage on task manager and job managers is low 
(below 30%)

The scenario we see

  1.  run request to load and run a jar, job appears on dashboard with all 160 
subtasks in Deploying state
  2.  after 2 minutes some subtasks start transitioning to running.
  3.  after another 30 seconds failure occurs and job goes into Restarting state
  4.  after another minute, restart completes all nodes running.

Exception history shows
2021-08-15 07:55:02
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
Partition 205a0867c6ef540009acd962d556f981#0@a6b547c5096f3c33eb9059cfe767a2ec 
not found.
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765)
at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)








Re: failures during job start

2021-08-19 Thread Chesnay Schepler
This exception means that a task was deployed, but the task that 
produces the data it wants to consume was not available yet (even after 
waiting for a while).


Your case sounds similar to 
https://issues.apache.org/jira/browse/FLINK-9413, where this happens 
because the deployment of the producing task takes too long.


You have 2 options to solve this:
a) Have Flink wait longer for the partition to be created by increasing 
taskmanager.network.request-backoff.max
b) Speed up the deployment; for this you'd naturally have to investigate 
why the deployment takes so long in the first place.


On 19/08/2021 07:15, Colletta, Edward wrote:


Any help with this would be appreciated.   Is it possible that this is 
a data/application issue or a flink config/resource issue?


Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each node.

I have an issue where starting a job takes a long time, and sometimes 
fails with PartitionNotFoundException, but succeeds on restart.   The 
job has 10 kafka sources (10 partitions for each topic) and parallelism 5.


The failure does not happen when the kafka logs are empty.

Note during below scenario, cpu usage on task manager and job managers 
is low (below 30%)


The scenario we see

  * run request to load and run a jar, job appears on dashboard with
all 160 subtasks in Deploying state
  * after 2 minutes some subtasks start transitioning to running.
  * after another 30 seconds failure occurs and job goes into
Restarting state
  * after another minute, restart completes all nodes running.

Exception history shows

2021-08-15 07:55:02

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
Partition 
205a0867c6ef540009acd962d556f981#0@a6b547c5096f3c33eb9059cfe767a2ec 
not found.


    at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)


    at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)


    at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)


    at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765)


    at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)


    at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)


    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)

    at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)


    at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)


    at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)


    at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)


    at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)






failures during job start

2021-08-18 Thread Colletta, Edward
Any help with this would be appreciated.   Is it possible that this is a 
data/application issue or a flink config/resource issue?

Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each node.

I have an issue where starting a job takes a long time, and sometimes fails 
with PartitionNotFoundException, but succeeds on restart.   The job has 10 
kafka sources (10 partitions for each topic) and parallelism 5.
The failure does not happen when the kafka logs are empty.

Note during below scenario, cpu usage on task manager and job managers is low 
(below 30%)

The scenario we see

  *   run request to load and run a jar, job appears on dashboard with all 160 
subtasks in Deploying state
  *   after 2 minutes some subtasks start transitioning to running.
  *   after another 30 seconds failure occurs and job goes into Restarting state
  *   after another minute, restart completes all nodes running.

Exception history shows
2021-08-15 07:55:02
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
Partition 205a0867c6ef540009acd962d556f981#0@a6b547c5096f3c33eb9059cfe767a2ec 
not found.
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765)
at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)