[
https://issues.apache.org/jira/browse/FLINK-17560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17116940#comment-17116940
]
josson paul kalapparambath edited comment on FLINK-17560 at 5/26/20, 8:22 PM:
------------------------------------------------------------------------------
[~xintongsong]
I am able to root cause this problem to stuck tasks in a highly threaded
environment.
Below i have explained how I was able to re-produce this issue(not consistently
though)
*Scenario: 1*
Job ID: JobID123 (parallelism 1)
TM has only Single slot
h5. Step 1: Schedule jobA
JobID123 is running on TM on a slot with allocation ID: *'AllocationID123'.*
Now *JobID123* is mapped to allocationID: *AllocationID123'*
h5. Step 2: Zookeeper stop
Task manager tries to cancel/fail all the tasks on *'AllocationID123'*. But
some of the tasks got stuck and never stops. Which means the *'finally'* block
which cleans up things never got called.
At this point, the above mentioned tasks are in *CANCELLING* state. But the
status of the Slot is still in *'ALLOCATED'* state. And it is allocated to job
id : *JobID123*
I can see that Flink has a Cancel task thread/Interrupter thread/ Watch dog
thread. But why this task is still stuck. Below I have pasted lines from the
thread dump from one such instance where we had a Task stuck. Are we hitting
this issue -> [https://bugs.openjdk.java.net/browse/JDK-8227375
.|https://bugs.openjdk.java.net/browse/JDK-8227375]
We are using Java-8
Note: This is not the task which is always stuck. It can be any task from the
pipelines.
{code:java}
"OutputFlusher for Source:
KUS/snmp_trap_proto/Kafka-Read/Read(KafkaUnboundedSource) -> FlatMap ->
KUS/snmp_trap_proto/KafkaRecordToCTuple/ParMultiDo(KafkaRecordToCTuple) ->
HSTrapParDoInit117/ParMultiDo(HealthScoreTrapProcParDo) ->
ParDo(GroupDataByEntityId)/ParMultiDo(GroupDataByEntityId) ->
ApplyHSEventFixedWindow118/Window.Assign.out -> ToKeyedWorkItem" #1542 daemon
prio=5 os_prio=0 tid=0x00007f5c0daff000 nid=0x638 sleeping[0x00007f5b91267000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.run(RecordWriter.java:362)
Locked ownable synchronizers:
- None
"deviceprocessor" #1538 prio=5 os_prio=0 tid=0x00007f5c0ed1a000 nid=0x634
runnable [0x00007f5bb45d6000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000049fc13cd8> (a
java.util.concurrent.SynchronousQueue$TransferStack)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
at
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:613)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:228)
at
org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:64)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:281)
- locked <0x0000000499000420> (a java.lang.Object)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
at
org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None {code}
h5. Step 3: Zookeeper start
Even though the tasks are not fully cancelled, the the status of the Slot is
still in 'ALLOCATED'. Now the TM offers this Slot (*AllocationID123*) to JM. JM
doesn't care and happily deploys the tasks. (*Is this intended?.*). All are
happy and the job is running fine
h5. Step 4: Job manager restart (Problem happens at this stage)
TM offers the same slot (with allocation id: *AllocationID123*). Now
unfortunately JM throws an exception (This is a rare scenario and happened
because of our internal change in the scheduling part. We are fixing it.). When
this happens the TM see that it still has Tasks running (Old stuck task) and
change the status of slot to *'RELEASING'.* Once this happens, the TM doesn't
have any more slots to offer to the JM and the pipeline will be in a scheduling
loop at JM side.
Even if the JM hadn't thrown any error, we would have got into a 'slot' lost
scenario if we try to cancel the pipeline. Cancelling pipeline will not clean
up this slot because of the stuck job.
*Scenario: 2*
Everything is same except we have *2 slots* instead of 1 at TM side.
h5. Step1: Schedule Job
Same as Scenario: 1
h5. *Step2: Zookeeper stop*
Same as Scenario: 1
h5. Step3: zookeeper start
At this point JM request for a 'New Slot'. TM associates both Slot1 (Old
allocation id) and Slot2 (new allocation id) to the job ID: *Job123*. JM
schedules the *Job123* on 'Slot2'. JM tries to free up 'Slot1' and 'Slot1' get
into 'RELEASING' status and never recovers because of the stuck task.
I am attaching the JM and TM logs and the thread dump also.
Job id: 3c632ca764b56741d2571ffae9b877f0
Allocation id: 576a29115a9f0b44be4d61ab7dee233c
Execution id: 10d5ea4915c3d50f96a3e8d1dad2c51f
grep "10d5ea4915c3d50f96a3e8d1dad2c51f" tm.log . You can see that 'finally'
never gets called. [^tm.log]
was (Author: josson):
[~xintongsong]
I am able to root cause this problem to stuck tasks in a highly threaded
environment.
Below i have explained how I was able to re-produce this issue(not consistently
though)
*Scenario: 1*
Job ID: JobID123 (parallelism 1)
TM has only Single slot
h5. Step 1: Schedule jobA
JobID123 is running on TM on a slot with allocation ID: *'AllocationID123'.*
Now *JobID123* is mapped to allocationID: *AllocationID123'*
h5. Step 2: Zookeeper stop
Task manager tries to cancel/fail all the tasks on *'AllocationID123'*. But
some of the tasks got stuck and never stops. Which means the *'finally'* block
which cleans up things never got called.
At this point, the above mentioned tasks are in *CANCELLING* state. But the
status of the Slot is still in *'ALLOCATED'* state. And it is allocated to job
id : *JobID123*
I can see that Flink has a Cancel task thread/Interrupter thread/ Watch dog
thread. But why this task is still stuck. Below I have pasted lines from the
thread dump from one such instance where we had a Task stuck. Are we hitting
this issue -> [https://bugs.openjdk.java.net/browse/JDK-8227375
.|https://bugs.openjdk.java.net/browse/JDK-8227375]
We are using Java-8
Note: This is not the task which is always stuck. It can be any task from the
pipelines.
{code:java}
"OutputFlusher for Source:
KUS/snmp_trap_proto/Kafka-Read/Read(KafkaUnboundedSource) -> FlatMap ->
KUS/snmp_trap_proto/KafkaRecordToCTuple/ParMultiDo(KafkaRecordToCTuple) ->
HSTrapParDoInit117/ParMultiDo(HealthScoreTrapProcParDo) ->
ParDo(GroupDataByEntityId)/ParMultiDo(GroupDataByEntityId) ->
ApplyHSEventFixedWindow118/Window.Assign.out -> ToKeyedWorkItem" #1542 daemon
prio=5 os_prio=0 tid=0x00007f5c0daff000 nid=0x638 sleeping[0x00007f5b91267000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.run(RecordWriter.java:362)
Locked ownable synchronizers:
- None
"deviceprocessor" #1538 prio=5 os_prio=0 tid=0x00007f5c0ed1a000 nid=0x634
runnable [0x00007f5bb45d6000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000049fc13cd8> (a
java.util.concurrent.SynchronousQueue$TransferStack)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
at
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:613)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:228)
at
org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:64)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:281)
- locked <0x0000000499000420> (a java.lang.Object)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
at
org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None {code}
h5. Step 3: Zookeeper start
Even though the tasks are not fully cancelled, the the status of the Slot is
still in 'ALLOCATED'. Now the TM offers this Slot (*AllocationID123*) to JM. JM
doesn't care and happily deploys the tasks. (*Is this intended?.*). All are
happy and the job is running fine
h5. Step 4: Job manager restart (Problem happens at this stage)
TM offers the same slot (with allocation id: *AllocationID123*). Now
unfortunately JM throws an exception (This is a rare scenario and happened
because of our internal change in the scheduling part. We are fixing it.). When
this happens the TM see that it still has Tasks running (Old stuck task) and
change the status of slot to *'RELEASING'.* Once this happens, the TM doesn't
have any more slots to offer to the JM and the pipeline will be in a scheduling
loop at JM side.
Even if the JM hadn't thrown any error, we would have got into a 'slot' lost
scenario if we try to cancel the pipeline. Cancelling pipeline will not clean
up this slot because of the stuck job.
*Scenario: 2*
Everything is same except we have *2 slots* instead of 1 at TM side.
h5. Step1: Same as *Scenario: 1*
h5. ** Step2: Same as *Scenario: 1*
h5. ** Step3: zookeeper start
At this point JM request for a 'New Slot'. TM associates both Slot1 (Old
allocation id) and Slot2 (new allocation id) to the job ID: *Job123*. JM
schedules the *Job123* on 'Slot2'. JM tries to free up 'Slot1' and 'Slot1' get
into 'RELEASING' status and never recovers because of the stuck task.
I am attaching the JM and TM logs and the thread dump also.
Job id: 3c632ca764b56741d2571ffae9b877f0
Allocation id: 576a29115a9f0b44be4d61ab7dee233c
Execution id: 10d5ea4915c3d50f96a3e8d1dad2c51f
grep "10d5ea4915c3d50f96a3e8d1dad2c51f" tm.log . You can see that 'finally'
never gets called. [^tm.log]
> No Slots available exception in Apache Flink Job Manager while Scheduling
> -------------------------------------------------------------------------
>
> Key: FLINK-17560
> URL: https://issues.apache.org/jira/browse/FLINK-17560
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.8.3
> Environment: Flink verson 1.8.3
> Session cluster
> Reporter: josson paul kalapparambath
> Priority: Major
> Attachments: jobmgr.log, threaddump-tm.txt, tm.log
>
>
> Set up
> ------
> Flink verson 1.8.3
> Zookeeper HA cluster
> 1 ResourceManager/Dispatcher (Same Node)
> 1 TaskManager
> 4 pipelines running with various parallelism's
> Issue
> ------
> Occationally when the Job Manager gets restarted we noticed that all the
> pipelines are not getting scheduled. The error that is reporeted by the Job
> Manger is 'not enough slots are available'. This should not be the case
> because task manager was deployed with sufficient slots for the number of
> pipelines/parallelism we have.
> We further noticed that the slot report sent by the taskmanger contains solts
> filled with old CANCELLED job Ids. I am not sure why the task manager still
> holds the details of the old jobs. Thread dump on the task manager confirms
> that old pipelines are not running.
> I am aware of https://issues.apache.org/jira/browse/FLINK-12865. But this is
> not the issue happening in this case.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)