If it is still the case of multiple slots in one TaskManager, it is the same as 
before. But you said you already used the single slot per TaskManager, right?

If it is the case of single slot in TaskManager, you could attach the jstack 
when occurs next time, otherwise it is not needed.

Best,
Zhijiang
------------------------------------------------------------------
From:Narayanaswamy, Krishna <krishna.narayanasw...@gs.com>
Send Time:2019年5月22日(星期三) 00:49
To:zhijiang <wangzhijiang...@aliyun.com>; Aljoscha Krettek 
<aljos...@apache.org>; Piotr Nowojski <pi...@data-artisans.com>
Cc:Nico Kruber <n...@data-artisans.com>; user@flink.apache.org 
<user@flink.apache.org>; "Chan, Regina" <regina.c...@gs.com>; "Erai, Rahul" 
<rahul.e...@gs.com>
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


Hi Zhijiang,

I couldn’t get the jstack due to some constraints this time around. Will try 
and get them when it occurs next. But from the looks of it from the 
console/logs it appears to be the same as the 2 slot cases. DataSource 
finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck 
at DEPLOYING)

Thanks,
Krishna.

From: zhijiang <wangzhijiang...@aliyun.com> 
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek <aljos...@apache.org>; Piotr Nowojski 
<pi...@data-artisans.com>; Narayanaswamy, Krishna [Tech] 
<krishna.narayanasw...@ny.email.gs.com>
Cc: Nico Kruber <n...@data-artisans.com>; user@flink.apache.org; Chan, Regina 
[Tech] <regina.c...@ny.email.gs.com>; Erai, Rahul [Tech] 
<rahul.e...@ny.email.gs.com>
Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi  Krishna,
Could you show me or attach the jstack for the single slot case? Or is it the 
same jstack as before?
Best,
Zhijiang
------------------------------------------------------------------
From:Narayanaswamy, Krishna <krishna.narayanasw...@gs.com>
Send Time:2019年5月21日(星期二) 19:50
To:zhijiang <wangzhijiang...@aliyun.com>; Aljoscha Krettek 
<aljos...@apache.org>; Piotr Nowojski <pi...@data-artisans.com>
Cc:Nico Kruber <n...@data-artisans.com>; user@flink.apache.org 
<user@flink.apache.org>; "Chan, Regina" <regina.c...@gs.com>; "Erai, Rahul" 
<rahul.e...@gs.com>
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks

We started to run jobs using the single slotted task managers which seemed to 
be ok for the past couple of days, but today morning we seem to be seeing these 
deadlocks even with 1 slot. Is there something else we could try out?

Thanks,
Krishna.

From: Narayanaswamy, Krishna [Tech] 
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' <wangzhijiang...@aliyun.com>; Aljoscha Krettek 
<aljos...@apache.org>; Piotr Nowojski <pi...@data-artisans.com>
Cc: Nico Kruber <n...@data-artisans.com>; user@flink.apache.org; Chan, Regina 
[Tech] <regina.c...@ny.email.gs.com>; Erai, Rahul [Tech] 
<rahul.e...@ny.email.gs.com>
Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Thanks Zhijiang. 

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang <wangzhijiang...@aliyun.com> 
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek <aljos...@apache.org>; Piotr Nowojski 
<pi...@data-artisans.com>; Narayanaswamy, Krishna [Tech] 
<krishna.narayanasw...@ny.email.gs.com>
Cc: Nico Kruber <n...@data-artisans.com>; user@flink.apache.org; Chan, Regina 
[Tech] <regina.c...@ny.email.gs.com>; Erai, Rahul [Tech] 
<rahul.e...@ny.email.gs.com>
Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.
When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.
I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task to release memory in the same TM. Or you could increase the 
network buffer setting to work aournd, but not sure this way could work for 
your case because it is up to the total data size the source produced.
Best,
Zhijiang
------------------------------------------------------------------
From:Narayanaswamy, Krishna <krishna.narayanasw...@gs.com>
Send Time:2019年5月17日(星期五) 17:37
To:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com>; Aljoscha Krettek 
<aljos...@apache.org>; Piotr Nowojski <pi...@data-artisans.com>
Cc:Nico Kruber <n...@data-artisans.com>; user@flink.apache.org 
<user@flink.apache.org>; "Chan, Regina" <regina.c...@gs.com>; "Erai, Rahul" 
<rahul.e...@gs.com>
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink 
v1.6.4 which we are using now but the problem now seems to come up for 
relatively simpler scenarios as well. Deadlock dump below -

Java stack information for the threads listed above:
===================================================
"CoGroup (2/2)":
                at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
                - waiting to lock <0x000000062bf859b8> (a java.lang.Object)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
                at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
                at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
                - waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)
                at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
                at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
                at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
                at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
                at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
                at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
                at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
                - locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)
                at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
                at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
                at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
                at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
                - locked <0x000000063c785350> (a java.lang.Object)
                at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)
                at 
org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)
                at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
                - locked <0x000000062bf859b8> (a java.lang.Object)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
                at java.lang.Thread.run(Thread.java:745)
"DataSource  (1/1)":
                at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)
                - waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)
                at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
                at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)
                - locked <0x000000063fdf4888> (a java.util.ArrayDeque)
                at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
                at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
                at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
                at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
                at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
                at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
                at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
                at java.lang.Thread.run(Thread.java:745)

Found 1 deadlock.

We are not setting any slot sharing parameters since this batch based 
processing so it uses the default (and there don’t seem to be any options 
available to manipulate slot sharing for non-streaming).
If we disable slot sharing (assuming it will be through some config across the 
job) wouldn’t the job become relatively more slower?

Thanks,
Krishna.

From: Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com> 
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek <aljos...@apache.org>; Piotr Nowojski 
<pi...@data-artisans.com>
Cc: Narayanaswamy, Krishna [Tech] <krishna.narayanasw...@ny.email.gs.com>; Nico 
Kruber <n...@data-artisans.com>; user@flink.apache.org
Subject: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when 
running a large job > 10k tasks
There actually exists this deadlock for special scenarios.
Before fixing the bug, we can avoid this issue by not deploying the map and 
sink tasks in the same task manager to work around.
Krishna, do you share the slot for these two tasks? If so, you can set disable 
slot sharing for this job.
Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate 
blocking result partition to avoid this issue temporarily.
Best,
Zhijiang
------------------------------------------------------------------
发件人:Piotr Nowojski <pi...@data-artisans.com>
发送时间:2018年10月4日(星期四) 21:54
收件人:Aljoscha Krettek <aljos...@apache.org>
抄 送:"Narayanaswamy, Krishna" <krishna.narayanasw...@gs.com>; Nico Kruber 
<n...@data-artisans.com>; user@flink.apache.org <user@flink.apache.org>
主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when 
running a large job > 10k tasks
Hi,
Thanks for reporting the problem. This bug was previously unknown to us. I have 
created a jira ticket for this bug:
https://issues.apache.org/jira/browse/FLINK-10491
Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t 
know if there is some hot fix or anything that can at least mitigate/decrease 
the probability of the bug for you until we fix it properly. 
Piotrek
On 4 Oct 2018, at 13:55, Aljoscha Krettek <aljos...@apache.org> wrote:
Hi,
this looks like a potential Flink bug. Looping in Nico and Piotr who have 
looked into that in the past. Could you please comment on that?
Best,
Aljoscha
On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <krishna.narayanasw...@gs.com> 
wrote:
Hi,

I am trying to run one large single job graph which has > 10k tasks. The form 
of the graph is something like
DataSource -> Filter -> Map [...multiple]
· Sink1
· Sink2
I am using a parallelism of 10 with 1 slot per task manager and a memory 
allocation of 32G per TM. The JM is running with 8G.

Everything starts up and runs fine with close to 6-7k tasks (this is variable 
and is mostly the source /filter/map portions) completing and then the graph 
just hangs.  I managed to connect to the task managers and get a thread dump 
just in time and found the following deadlock on one of the TMs which 
apparently seems to be holding up everything else.
Please could someone take a look and advise if there is something I could do or 
try out to fix this.

Marked below are the 2 isolated thread stacks marking the deadlock -

Thread-1
"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
waiting for monitor entry
 waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a 
java.util.ArrayDeque)
   at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
   at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
   at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
   - locked <0x2dfd> (a java.util.ArrayDeque)
   at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
   at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
   - locked <0x2da5> (a java.lang.Object)
   at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
   at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
   at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
   at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
   at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
   at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
   at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
   at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
   at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
   at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
   at 
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
   at java.lang.Thread.run(Thread.java:745)


Thread-2
"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor 
entry
  java.lang.Thread.State: BLOCKED
 blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
 waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release 
lock on <0x2dfd> (a java.util.ArrayDeque)
   at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
   at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)
   at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)
   at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)
   at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)
   at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)
   at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)
   - locked <0x2dfb> (a java.util.ArrayDeque)
   at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)
   - locked <0x2dfc> (a 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
   at 
org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)
   at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)
   at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)
   at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
   at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
   at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
   at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)
   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
   at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
   at java.lang.Thread.run(Thread.java:745)

Thanks,
Krishna.


  ________________________________  

Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


 Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to:  
www.gs.com/privacy-notices


 Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to:  
www.gs.com/privacy-notices


 Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to:  
www.gs.com/privacy-notices

Reply via email to