[ 
https://issues.apache.org/jira/browse/FLINK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443716#comment-16443716
 ] 

Nico Kruber commented on FLINK-2685:
------------------------------------

Hi [~amit.jain],
if an operator is blocking in {{LocalBufferPool.requestBuffer}}, this usually 
only means that it is backpressured, i.e. the following/receiving operator does 
not process data (fast enough) and at some point all buffers have been filled 
and we stop filling more until the existing ones have been processed.
You should have a look at the following operators to identify where the 
backpressure is happening and whether you only make slow progress or nothing at 
all. The Web UI should help you with that.

> TaskManager deadlock on NetworkBufferPool
> -----------------------------------------
>
>                 Key: FLINK-2685
>                 URL: https://issues.apache.org/jira/browse/FLINK-2685
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination, Network
>    Affects Versions: 0.10.0
>            Reporter: Greg Hogan
>            Assignee: Ufuk Celebi
>            Priority: Major
>         Attachments: job_manager_19_feb_15_30_running, 
> task_manager_19_feb_15_30_running
>
>
> This deadlock occurs intermittently. I have a {{join}} followed by a 
> {{chain<join,filter>}} followed by a {{reduceGroup}}. Stack traces and local 
> variables from one each of the {{join}} threads below.
> The {{join}}'s are waiting on a buffer to become available 
> ({{networkBufferPool.availableMemorySegments.count=0}}). Both 
> {{LocalBufferPool}}'s have been given extra capacity ({{currentPoolSize=60 > 
> numberOfRequiredMemorySegments=32}}). The first {{join}} is at full capacity 
> ({{currentPoolSize=numberOfRequestedMemorySegments=60}}) yet the second 
> {{join}} has not acquired any ({{numberOfRequestedMemorySegments=0}}).
> {{LocalBufferPool.returnExcessMemorySegments}} only recycles 
> {{MemorySegment}}'s from its {{availableMemorySegments}}, so any requested 
> {{Buffer}}'s will only be released when explicitly recycled.
> First join stack trace and variable values from 
> {{LocalBufferPool.requestBuffer}}:
> {noformat}
> owns: SpanningRecordSerializer<T>  (id=723)   
> waiting for: ArrayDeque<E>  (id=724)  
> Object.wait(long) line: not available [native method] 
> LocalBufferPool.requestBuffer(boolean) line: 163      
> LocalBufferPool.requestBufferBlocking() line: 133     
> RecordWriter<T>.emit(T) line: 92      
> OutputCollector<T>.collect(T) line: 65        
> JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) 
> line: 1088   
> ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>,
>  Collector<O>) line: 137       
> JoinDriver<IT1,IT2,OT>.run() line: 208        
> RegularPactTask<S,OT>.run() line: 489 
> RegularPactTask<S,OT>.invoke() line: 354      
> Task.run() line: 581  
> Thread.run() line: 745        
> {noformat}
> {noformat}
> this  LocalBufferPool  (id=403)       
>       availableMemorySegments ArrayDeque<E>  (id=398) 
>               elements        Object[16]  (id=422)    
>               head    14      
>               tail    14      
>       currentPoolSize 60      
>       isDestroyed     false   
>       networkBufferPool       NetworkBufferPool  (id=354)     
>               allBufferPools  HashSet<E>  (id=424)    
>               availableMemorySegments ArrayBlockingQueue<E>  (id=427) 
>                       count   0       
>                       items   Object[10240]  (id=674) 
>                       itrs    null    
>                       lock    ReentrantLock  (id=675) 
>                       notEmpty        
> AbstractQueuedSynchronizer$ConditionObject  (id=678)    
>                       notFull AbstractQueuedSynchronizer$ConditionObject  
> (id=679)    
>                       putIndex        6954    
>                       takeIndex       6954    
>               factoryLock     Object  (id=430)        
>               isDestroyed     false   
>               managedBufferPools      HashSet<E>  (id=431)    
>               memorySegmentSize       32768   
>               numTotalRequiredBuffers 3226    
>               totalNumberOfMemorySegments     10240   
>       numberOfRequestedMemorySegments 60      
>       numberOfRequiredMemorySegments  32      
>       owner   null    
>       registeredListeners     ArrayDeque<E>  (id=421) 
>               elements        Object[16]  (id=685)    
>               head    0       
>               tail    0       
> askToRecycle  false   
> isBlocking    true    
> {noformat}
> Second join stack trace and variable values from 
> {{SingleInputGate.getNextBufferOrEvent}}:
> {noformat}
> Unsafe.park(boolean, long) line: not available [native method]        
> LockSupport.parkNanos(Object, long) line: 215 
> AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078        
> LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467 
> SingleInputGate.getNextBufferOrEvent() line: 414      
> MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79     
> MutableRecordReader<T>.next(T) line: 34       
> ReaderIterator<T>.next(T) line: 59    
> MutableHashTable$ProbeIterator<PT>.next() line: 1581  
> MutableHashTable<BT,PT>.processProbeIter() line: 457  
> MutableHashTable<BT,PT>.nextRecord() line: 555        
> ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>,
>  Collector<O>) line: 110       
> JoinDriver<IT1,IT2,OT>.run() line: 208        
> RegularPactTask<S,OT>.run() line: 489 
> RegularPactTask<S,OT>.invoke() line: 354      
> Task.run() line: 581  
> Thread.run() line: 745        
> {noformat}
> {noformat}
> this  SingleInputGate  (id=693)       
>       bufferPool      LocalBufferPool  (id=706)       
>               availableMemorySegments ArrayDeque<E>  (id=716) 
>                       elements        Object[16]  (id=717)    
>                       head    0       
>                       tail    0       
>               currentPoolSize 60      
>               isDestroyed     false   
>               networkBufferPool       NetworkBufferPool  (id=354)     
>                       allBufferPools  HashSet<E>  (id=424)    
>                       availableMemorySegments ArrayBlockingQueue<E>  (id=427) 
>                               count   0       
>                               items   Object[10240]  (id=674) 
>                               itrs    null    
>                               lock    ReentrantLock  (id=675) 
>                               notEmpty        
> AbstractQueuedSynchronizer$ConditionObject  (id=678)    
>                               notFull 
> AbstractQueuedSynchronizer$ConditionObject  (id=679)    
>                               putIndex        6954    
>                               takeIndex       6954    
>                       factoryLock     Object  (id=430)        
>                       isDestroyed     false   
>                       managedBufferPools      HashSet<E>  (id=431)    
>                       memorySegmentSize       32768   
>                       numTotalRequiredBuffers 3226    
>                       totalNumberOfMemorySegments     10240   
>               numberOfRequestedMemorySegments 0       
>               numberOfRequiredMemorySegments  32      
>               owner   null    
>               registeredListeners     ArrayDeque<E>  (id=718) 
>       channelsWithEndOfPartitionEvents        BitSet  (id=707)        
>       consumedResultId        IntermediateDataSetID  (id=708) 
>       consumedSubpartitionIndex       24      
>       executionId     ExecutionAttemptID  (id=709)    
>       hasReceivedAllEndOfPartitionEvents      false   
>       inputChannels   HashMap<K,V>  (id=710)  
>       inputChannelsWithData   LinkedBlockingQueue<E>  (id=692)        
>               capacity        2147483647      
>               count   AtomicInteger  (id=698) 
>                       value   0       
>               head    LinkedBlockingQueue$Node<E>  (id=701)   
>               last    LinkedBlockingQueue$Node<E>  (id=701)   
>               notEmpty        AbstractQueuedSynchronizer$ConditionObject  
> (id=691)    
>               notFull AbstractQueuedSynchronizer$ConditionObject  (id=703)    
>               putLock ReentrantLock  (id=704) 
>               takeLock        ReentrantLock  (id=705) 
>       isReleased      false   
>       jobId   JobID  (id=711) 
>       numberOfInputChannels   32      
>       numberOfUninitializedChannels   0       
>       owningTaskName  "Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" 
> (id=712)      
>       partitionStateChecker   
> NetworkEnvironment$JobManagerPartitionStateChecker  (id=363)    
>       pendingEvents   ArrayList<E>  (id=713)  
>       registeredListeners     CopyOnWriteArrayList<E>  (id=714)       
>       requestedPartitionsFlag true    
>       requestLock     Object  (id=715)        
>       retriggerLocalRequestTimer      null    
> currentChannel        null    
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to