[ 
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16970360#comment-16970360
 ] 

venkata subbarao chunduri edited comment on FLINK-10672 at 11/8/19 8:28 PM:
----------------------------------------------------------------------------

Could it be due to local channel? It looks, so.

I am also faced with the same issue. Seems, the local channel [can go to 
wait|https://github.com/apache/flink/blob/6322618bb0f1b7942d86cb1b2b7bc55290d9e330/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L539]
 and may never wake up as data from the [local channel pulled in the same 
thread|https://github.com/apache/flink/blob/6322618bb0f1b7942d86cb1b2b7bc55290d9e330/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java#L104].
 Could you confirm?

The issue seems to happen in a flow as below:
 1. Source stops producing data for a moment. 
 2. Input gate sees no data on the channel and calls 
[wait()|https://github.com/venkatsc/flink/blob/7297bacfe14b9f814c308d95003c837115a6bdd2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L539]
 expecting a notify in future. 
 3. Source starts producing data and consumes all the buffers on the partition 
and enters [indefinite loop 
|https://github.com/apache/flink/blob/7297bacfe14b9f814c308d95003c837115a6bdd2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java#L251]
 waiting for a new buffer. This stops any further writing for the partition.
{code:java}
at java.lang.Object.wait(Native Method)
 - waiting on <0x00000000f6d56450> (a java.util.ArrayDeque)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247){code}
4. But, no buffers are going to be released as the local channel is stuck in 
wait.
{code:java}
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:539){code}
The common factor in issue report and my modified flink code is, it has some 
additional threads in task manager (for example, GRPC threads in original issue 
report). Could these additional threads create any issues? 

This issue is clearly visible in presence of these additional threads (my 
customization, targeted for RDMA experiments uses a library that creates 
additional threads). Why could this happen?. Does anyone see a reason for it?

When tested with flink 1.8.1, the same job does not get stuck. But version with 
additional threads in the task manager, the scenario explained above happened 
and tasks got stuck forever.


was (Author: venkatsc):
Could it be due to local channel? It looks, so.

I am also faced with the same issue. Seems, the local channel [can go to 
wait|https://github.com/apache/flink/blob/6322618bb0f1b7942d86cb1b2b7bc55290d9e330/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L539]
 and may never wake up as data from the [local channel pulled in the same 
thread|https://github.com/apache/flink/blob/6322618bb0f1b7942d86cb1b2b7bc55290d9e330/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java#L104].
 Could you confirm?

The issue seems to happen in a flow as below:
 1. Source stops producing data for a moment. 
 2. Input gate sees no data on the channel and calls 
[wait()|https://github.com/venkatsc/flink/blob/7297bacfe14b9f814c308d95003c837115a6bdd2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L539]
 expecting a notify in future. 
 3. Source starts producing data and consumes all the buffers on the partition 
and enters [indefinite loop 
|https://github.com/apache/flink/blob/7297bacfe14b9f814c308d95003c837115a6bdd2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java#L251]
 waiting for a new buffer. This stops any further writing for the partition.
{code:java}
log: at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247
 ){code}
4. But, no buffers are going to be released as the local channel is stuck in 
wait.
{code:java}
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:539){code}

The common factor in issue report and my modified flink code is, it has some 
additional threads in task manager (for example, GRPC threads in original issue 
report). Could these additional threads create any issues? 

This issue is clearly visible in presence of these additional threads (my 
customization, targeted for RDMA experiments uses a library that creates 
additional threads). Why could this happen?. Does anyone see a reason for it?

When tested with flink 1.8.1, the same job does not get stuck. But version with 
additional threads in the task manager, the scenario explained above happened 
and tasks got stuck forever.

> Task stuck while writing output to flink
> ----------------------------------------
>
>                 Key: FLINK-10672
>                 URL: https://issues.apache.org/jira/browse/FLINK-10672
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.5.4
>         Environment: OS: Debuan rodente 4.17
> Flink version: 1.5.4
> ||Key||Value||
> |jobmanager.heap.mb|1024|
> |jobmanager.rpc.address|localhost|
> |jobmanager.rpc.port|6123|
> |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter|
> |metrics.reporter.jmx.port|9250-9260|
> |metrics.reporters|jmx|
> |parallelism.default|1|
> |rest.port|8081|
> |taskmanager.heap.mb|1024|
> |taskmanager.numberOfTaskSlots|1|
> |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26|
>  
> h1. Overview
> ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap 
> Size||Flink Managed Memory||
> |43501|1|0|12|62.9 GB|922 MB|642 MB|
> h1. Memory
> h2. JVM (Heap/Non-Heap)
> ||Type||Committed||Used||Maximum||
> |Heap|922 MB|575 MB|922 MB|
> |Non-Heap|68.8 MB|64.3 MB|-1 B|
> |Total|991 MB|639 MB|922 MB|
> h2. Outside JVM
> ||Type||Count||Used||Capacity||
> |Direct|3,292|105 MB|105 MB|
> |Mapped|0|0 B|0 B|
> h1. Network
> h2. Memory Segments
> ||Type||Count||
> |Available|3,194|
> |Total|3,278|
> h1. Garbage Collection
> ||Collector||Count||Time||
> |G1_Young_Generation|13|336|
> |G1_Old_Generation|1|21|
>            Reporter: Ankur Goenka
>            Assignee: Yun Gao
>            Priority: Major
>              Labels: beam
>         Attachments: 0.14_all_jobs.jpg, 1uruvakHxBu.png, 3aDKQ24WvKk.png, 
> Po89UGDn58V.png, WithBroadcastJob.png, jmx_dump.json, jmx_dump_detailed.json, 
> jstack_129827.log, jstack_163822.log, jstack_66985.log
>
>
> I am running a fairly complex pipleline with 200+ task.
> The pipeline works fine with small data (order of 10kb input) but gets stuck 
> with a slightly larger data (300kb input).
>  
> The task gets stuck while writing the output toFlink, more specifically it 
> gets stuck while requesting memory segment in local buffer pool. The Task 
> manager UI shows that it has enough memory and memory segments to work with.
> The relevant stack trace is 
> {quote}"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 
> tid=0x00007fedb0163800 nid=0x30b7f in Object.wait() [0x00007fedb4f90000]
>  java.lang.Thread.State: TIMED_WAITING (on object monitor)
>  at (C/C++) 0x00007fef201c7dae (Unknown Source)
>  at (C/C++) 0x00007fef1f2aea07 (Unknown Source)
>  at (C/C++) 0x00007fef1f241cd3 (Unknown Source)
>  at java.lang.Object.wait(Native Method)
>  - waiting on <0x00000000f6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
>  - locked <0x00000000f6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>  at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)
>  at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)
>  - locked <0x00000000f6a60bd0> (a java.lang.Object)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
>  at 
> org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
>  at 
> org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>  at 
> org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>  at 
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
>  at 
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
>  at 
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>  at 
> org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748){quote}
>  
> The full stack trace and logs are attached.
>  Please take a look and let me know if more information is needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to