[ https://issues.apache.org/jira/browse/FLINK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443716#comment-16443716 ]
Nico Kruber commented on FLINK-2685: ------------------------------------ Hi [~amit.jain], if an operator is blocking in {{LocalBufferPool.requestBuffer}}, this usually only means that it is backpressured, i.e. the following/receiving operator does not process data (fast enough) and at some point all buffers have been filled and we stop filling more until the existing ones have been processed. You should have a look at the following operators to identify where the backpressure is happening and whether you only make slow progress or nothing at all. The Web UI should help you with that. > TaskManager deadlock on NetworkBufferPool > ----------------------------------------- > > Key: FLINK-2685 > URL: https://issues.apache.org/jira/browse/FLINK-2685 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Network > Affects Versions: 0.10.0 > Reporter: Greg Hogan > Assignee: Ufuk Celebi > Priority: Major > Attachments: job_manager_19_feb_15_30_running, > task_manager_19_feb_15_30_running > > > This deadlock occurs intermittently. I have a {{join}} followed by a > {{chain<join,filter>}} followed by a {{reduceGroup}}. Stack traces and local > variables from one each of the {{join}} threads below. > The {{join}}'s are waiting on a buffer to become available > ({{networkBufferPool.availableMemorySegments.count=0}}). Both > {{LocalBufferPool}}'s have been given extra capacity ({{currentPoolSize=60 > > numberOfRequiredMemorySegments=32}}). The first {{join}} is at full capacity > ({{currentPoolSize=numberOfRequestedMemorySegments=60}}) yet the second > {{join}} has not acquired any ({{numberOfRequestedMemorySegments=0}}). > {{LocalBufferPool.returnExcessMemorySegments}} only recycles > {{MemorySegment}}'s from its {{availableMemorySegments}}, so any requested > {{Buffer}}'s will only be released when explicitly recycled. > First join stack trace and variable values from > {{LocalBufferPool.requestBuffer}}: > {noformat} > owns: SpanningRecordSerializer<T> (id=723) > waiting for: ArrayDeque<E> (id=724) > Object.wait(long) line: not available [native method] > LocalBufferPool.requestBuffer(boolean) line: 163 > LocalBufferPool.requestBufferBlocking() line: 133 > RecordWriter<T>.emit(T) line: 92 > OutputCollector<T>.collect(T) line: 65 > JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) > line: 1088 > ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, > Collector<O>) line: 137 > JoinDriver<IT1,IT2,OT>.run() line: 208 > RegularPactTask<S,OT>.run() line: 489 > RegularPactTask<S,OT>.invoke() line: 354 > Task.run() line: 581 > Thread.run() line: 745 > {noformat} > {noformat} > this LocalBufferPool (id=403) > availableMemorySegments ArrayDeque<E> (id=398) > elements Object[16] (id=422) > head 14 > tail 14 > currentPoolSize 60 > isDestroyed false > networkBufferPool NetworkBufferPool (id=354) > allBufferPools HashSet<E> (id=424) > availableMemorySegments ArrayBlockingQueue<E> (id=427) > count 0 > items Object[10240] (id=674) > itrs null > lock ReentrantLock (id=675) > notEmpty > AbstractQueuedSynchronizer$ConditionObject (id=678) > notFull AbstractQueuedSynchronizer$ConditionObject > (id=679) > putIndex 6954 > takeIndex 6954 > factoryLock Object (id=430) > isDestroyed false > managedBufferPools HashSet<E> (id=431) > memorySegmentSize 32768 > numTotalRequiredBuffers 3226 > totalNumberOfMemorySegments 10240 > numberOfRequestedMemorySegments 60 > numberOfRequiredMemorySegments 32 > owner null > registeredListeners ArrayDeque<E> (id=421) > elements Object[16] (id=685) > head 0 > tail 0 > askToRecycle false > isBlocking true > {noformat} > Second join stack trace and variable values from > {{SingleInputGate.getNextBufferOrEvent}}: > {noformat} > Unsafe.park(boolean, long) line: not available [native method] > LockSupport.parkNanos(Object, long) line: 215 > AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078 > LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467 > SingleInputGate.getNextBufferOrEvent() line: 414 > MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79 > MutableRecordReader<T>.next(T) line: 34 > ReaderIterator<T>.next(T) line: 59 > MutableHashTable$ProbeIterator<PT>.next() line: 1581 > MutableHashTable<BT,PT>.processProbeIter() line: 457 > MutableHashTable<BT,PT>.nextRecord() line: 555 > ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, > Collector<O>) line: 110 > JoinDriver<IT1,IT2,OT>.run() line: 208 > RegularPactTask<S,OT>.run() line: 489 > RegularPactTask<S,OT>.invoke() line: 354 > Task.run() line: 581 > Thread.run() line: 745 > {noformat} > {noformat} > this SingleInputGate (id=693) > bufferPool LocalBufferPool (id=706) > availableMemorySegments ArrayDeque<E> (id=716) > elements Object[16] (id=717) > head 0 > tail 0 > currentPoolSize 60 > isDestroyed false > networkBufferPool NetworkBufferPool (id=354) > allBufferPools HashSet<E> (id=424) > availableMemorySegments ArrayBlockingQueue<E> (id=427) > count 0 > items Object[10240] (id=674) > itrs null > lock ReentrantLock (id=675) > notEmpty > AbstractQueuedSynchronizer$ConditionObject (id=678) > notFull > AbstractQueuedSynchronizer$ConditionObject (id=679) > putIndex 6954 > takeIndex 6954 > factoryLock Object (id=430) > isDestroyed false > managedBufferPools HashSet<E> (id=431) > memorySegmentSize 32768 > numTotalRequiredBuffers 3226 > totalNumberOfMemorySegments 10240 > numberOfRequestedMemorySegments 0 > numberOfRequiredMemorySegments 32 > owner null > registeredListeners ArrayDeque<E> (id=718) > channelsWithEndOfPartitionEvents BitSet (id=707) > consumedResultId IntermediateDataSetID (id=708) > consumedSubpartitionIndex 24 > executionId ExecutionAttemptID (id=709) > hasReceivedAllEndOfPartitionEvents false > inputChannels HashMap<K,V> (id=710) > inputChannelsWithData LinkedBlockingQueue<E> (id=692) > capacity 2147483647 > count AtomicInteger (id=698) > value 0 > head LinkedBlockingQueue$Node<E> (id=701) > last LinkedBlockingQueue$Node<E> (id=701) > notEmpty AbstractQueuedSynchronizer$ConditionObject > (id=691) > notFull AbstractQueuedSynchronizer$ConditionObject (id=703) > putLock ReentrantLock (id=704) > takeLock ReentrantLock (id=705) > isReleased false > jobId JobID (id=711) > numberOfInputChannels 32 > numberOfUninitializedChannels 0 > owningTaskName "Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" > (id=712) > partitionStateChecker > NetworkEnvironment$JobManagerPartitionStateChecker (id=363) > pendingEvents ArrayList<E> (id=713) > registeredListeners CopyOnWriteArrayList<E> (id=714) > requestedPartitionsFlag true > requestLock Object (id=715) > retriggerLocalRequestTimer null > currentChannel null > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)