[ 
https://issues.apache.org/jira/browse/HIVE-24626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

László Bodor updated HIVE-24626:
--------------------------------
    Description: 
The root cause is that the readers cannot queue.offer items to full queues, 
which belong to consumers that are blocked on other consumers. 
Scenario is like below:
{code}
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  
FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 2 ......          llap       RUNNING      3          2        1        0    
   0       0
Map 1                 llap       RUNNING    676          0      119      557    
   0       0
Map 3                 llap       RUNNING    108          0       21       87    
   0      21
Reducer 4             llap        INITED      1          0        0        1    
   0       0
Map 5                 llap        INITED    108          0        0      108    
   0       0
Reducer 6             llap        INITED      4          0        0        4    
   0       0
Reducer 7             llap        INITED      1          0        0        1    
   0       0
----------------------------------------------------------------------------------------------
VERTICES: 00/07  [>>--------------------------] 0%    ELAPSED TIME: 3489.83 s
----------------------------------------------------------------------------------------------
{code}

Map2 is MAPJOINed to Map1. In an LLAP daemon, the forever running Map2 task is 
blocked on nextCvb:
{code}
"TezTR-886270_0_1_0_1_0" #154 daemon prio=5 os_prio=0 tid=0x00007f1b88348000 
nid=0x147 waiting on condition [0x00007f0ce005d000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f0de8025e00> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at 
java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.nextCvb(LlapRecordReader.java:517)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:372)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:82)
        at 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:362)
        at 
org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:79)
        at 
org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:33)
        at 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:117)
        at 
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next(TezGroupedSplitsInputFormat.java:151)
        at 
org.apache.tez.mapreduce.lib.MRReaderMapred.next(MRReaderMapred.java:115)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:437)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
        at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

while all the elevator threads are blocked here (all of them tries to read for 
Map1):
{code}
"IO-Elevator-Thread-11" #408 daemon prio=5 os_prio=0 tid=0x00007f0cddc48800 
nid=0x267 waiting on condition [0x00007f0cd7af8000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f0e3095c480> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at 
java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:379)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.enqueueInternal(LlapRecordReader.java:607)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:591)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:82)
        at 
org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:268)
        at 
org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:79)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:122)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:42)
        at 
org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.readEncodedColumns(EncodedReaderImpl.java:535)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.performDataRead(OrcEncodedDataReader.java:430)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:279)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:276)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:276)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:117)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:88)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:73)
        at 
org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

the problem here is that, as far as I can see, all the elevator threads try to 
offer to LlapRecordReaders that belong to Map 1, however, Map 1 tasks cannot 
progress beyond a certain point, because wait for Map 2's input
this is because the queues of LlapRecordReader instances of Map1 are full, 
otherwise LlapRecordReader.enqueueInternal should be able to offer the item 
from the elevator thread

In my example:
Map2's queue limit is: 50000 (Map2: mapjoin source)
Map1's queue limit is: 6931 (Map1: mapjoin target)
the limits are calculated according to data characteristics in 
LlapRecordReader.determineQueueLimit

in my case, io threads for Map1 reached their queue limit, and cannot offer new 
items, so holding IO elevator threads, which cannot be used for reading for 
more important Map2 tasks, so Map1 tasks will eventually hang here:
{code}
"TezTR-886270_0_1_1_1_0" #281 daemon prio=5 os_prio=0 tid=0x00007f1b80366800 
nid=0x1ca waiting on condition [0x00007f0cdb6d9000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f0de90049a8> (a 
java.util.concurrent.FutureTask)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)
        at java.util.concurrent.FutureTask.get(FutureTask.java:204)
        at 
org.apache.hadoop.hive.ql.exec.Operator.completeInitialization(Operator.java:436)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:399)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:572)
        at 
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:524)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:266)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
        at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}
Operator.asyncInitOperations typically contains a dependency on map join input.

in my example Map2 reads small table, like microstrategy.lu_item, and Map1 
reads a larger table microstrategy.order_detail

  was:
The root cause is that the readers cannot queue.offer items to full queues, 
which belong to consumers that are blocked on other consumers. 
Scenario is like below:
{code}
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  
FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 2 ......          llap       RUNNING      3          2        1        0    
   0       0
Map 1                 llap       RUNNING    676          0      119      557    
   0       0
Map 3                 llap       RUNNING    108          0       21       87    
   0      21
Reducer 4             llap        INITED      1          0        0        1    
   0       0
Map 5                 llap        INITED    108          0        0      108    
   0       0
Reducer 6             llap        INITED      4          0        0        4    
   0       0
Reducer 7             llap        INITED      1          0        0        1    
   0       0
----------------------------------------------------------------------------------------------
VERTICES: 00/07  [>>--------------------------] 0%    ELAPSED TIME: 3489.83 s
----------------------------------------------------------------------------------------------
{code}

Map2 is MAPJOINed to Map1. In an LLAP daemon, the forever running Map2 task is 
blocked on nextCvb:
{code}
"TezTR-886270_0_1_0_1_0" #154 daemon prio=5 os_prio=0 tid=0x00007f1b88348000 
nid=0x147 waiting on condition [0x00007f0ce005d000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f0de8025e00> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at 
java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.nextCvb(LlapRecordReader.java:517)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:372)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:82)
        at 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:362)
        at 
org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:79)
        at 
org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:33)
        at 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:117)
        at 
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next(TezGroupedSplitsInputFormat.java:151)
        at 
org.apache.tez.mapreduce.lib.MRReaderMapred.next(MRReaderMapred.java:115)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:437)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
        at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

while all the elevator threads are blocked here:
{code}
"IO-Elevator-Thread-11" #408 daemon prio=5 os_prio=0 tid=0x00007f0cddc48800 
nid=0x267 waiting on condition [0x00007f0cd7af8000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f0e3095c480> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at 
java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:379)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.enqueueInternal(LlapRecordReader.java:607)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:591)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:82)
        at 
org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:268)
        at 
org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:79)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:122)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:42)
        at 
org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.readEncodedColumns(EncodedReaderImpl.java:535)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.performDataRead(OrcEncodedDataReader.java:430)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:279)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:276)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:276)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:117)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:88)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:73)
        at 
org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

the problem here is that, as far as I can see, all the elevator threads try to 
offer to LlapRecordReaders that belong to Map 1, however, Map 1 tasks cannot 
progress beyond a certain point, because wait for Map 2's input
this is because the queues of LlapRecordReader instances of Map1 are full, 
otherwise LlapRecordReader.enqueueInternal should be able to offer the item 
from the elevator thread

In my example:
Map2's queue limit is: 50000
Map1's queue limit is: 6931
which is calculated according to data characteristics in 
LlapRecordReader.determineQueueLimit
in my case, I guess Map1 reader reached its queue limit, and cannot offer new 
items, that's why Map2 task's thread is blocked here:
{code}
"TezTR-886270_0_1_1_1_0" #281 daemon prio=5 os_prio=0 tid=0x00007f1b80366800 
nid=0x1ca waiting on condition [0x00007f0cdb6d9000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f0de90049a8> (a 
java.util.concurrent.FutureTask)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)
        at java.util.concurrent.FutureTask.get(FutureTask.java:204)
        at 
org.apache.hadoop.hive.ql.exec.Operator.completeInitialization(Operator.java:436)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:399)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:572)
        at 
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:524)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:266)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
        at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}
Operator.asyncInitOperations typically contains a dependency on map join input.

in my example Map2 reads small table, like microstrategy.lu_item, and Map1 
reads a larger table microstrategy.order_detail


> LLAP: reader threads could be starvated if all IO elevator threads are busy 
> to enqueue to another readers with full queue
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-24626
>                 URL: https://issues.apache.org/jira/browse/HIVE-24626
>             Project: Hive
>          Issue Type: Bug
>            Reporter: László Bodor
>            Assignee: László Bodor
>            Priority: Major
>         Attachments: executor_stack_cache_none_12_io_threads.log
>
>
> The root cause is that the readers cannot queue.offer items to full queues, 
> which belong to consumers that are blocked on other consumers. 
> Scenario is like below:
> {code}
> ----------------------------------------------------------------------------------------------
>         VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  
> FAILED  KILLED
> ----------------------------------------------------------------------------------------------
> Map 2 ......          llap       RUNNING      3          2        1        0  
>      0       0
> Map 1                 llap       RUNNING    676          0      119      557  
>      0       0
> Map 3                 llap       RUNNING    108          0       21       87  
>      0      21
> Reducer 4             llap        INITED      1          0        0        1  
>      0       0
> Map 5                 llap        INITED    108          0        0      108  
>      0       0
> Reducer 6             llap        INITED      4          0        0        4  
>      0       0
> Reducer 7             llap        INITED      1          0        0        1  
>      0       0
> ----------------------------------------------------------------------------------------------
> VERTICES: 00/07  [>>--------------------------] 0%    ELAPSED TIME: 3489.83 s
> ----------------------------------------------------------------------------------------------
> {code}
> Map2 is MAPJOINed to Map1. In an LLAP daemon, the forever running Map2 task 
> is blocked on nextCvb:
> {code}
> "TezTR-886270_0_1_0_1_0" #154 daemon prio=5 os_prio=0 tid=0x00007f1b88348000 
> nid=0x147 waiting on condition [0x00007f0ce005d000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00007f0de8025e00> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at 
> java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.nextCvb(LlapRecordReader.java:517)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:372)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:82)
>       at 
> org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:362)
>       at 
> org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:79)
>       at 
> org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:33)
>       at 
> org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:117)
>       at 
> org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next(TezGroupedSplitsInputFormat.java:151)
>       at 
> org.apache.tez.mapreduce.lib.MRReaderMapred.next(MRReaderMapred.java:115)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:437)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
>       at 
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
>       at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>       at 
> org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> while all the elevator threads are blocked here (all of them tries to read 
> for Map1):
> {code}
> "IO-Elevator-Thread-11" #408 daemon prio=5 os_prio=0 tid=0x00007f0cddc48800 
> nid=0x267 waiting on condition [0x00007f0cd7af8000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00007f0e3095c480> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at 
> java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:379)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.enqueueInternal(LlapRecordReader.java:607)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:591)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:82)
>       at 
> org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:268)
>       at 
> org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:79)
>       at 
> org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:122)
>       at 
> org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:42)
>       at 
> org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.readEncodedColumns(EncodedReaderImpl.java:535)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.performDataRead(OrcEncodedDataReader.java:430)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:279)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:276)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:276)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:117)
>       at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>       at 
> org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:88)
>       at 
> org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:73)
>       at 
> org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> the problem here is that, as far as I can see, all the elevator threads try 
> to offer to LlapRecordReaders that belong to Map 1, however, Map 1 tasks 
> cannot progress beyond a certain point, because wait for Map 2's input
> this is because the queues of LlapRecordReader instances of Map1 are full, 
> otherwise LlapRecordReader.enqueueInternal should be able to offer the item 
> from the elevator thread
> In my example:
> Map2's queue limit is: 50000 (Map2: mapjoin source)
> Map1's queue limit is: 6931 (Map1: mapjoin target)
> the limits are calculated according to data characteristics in 
> LlapRecordReader.determineQueueLimit
> in my case, io threads for Map1 reached their queue limit, and cannot offer 
> new items, so holding IO elevator threads, which cannot be used for reading 
> for more important Map2 tasks, so Map1 tasks will eventually hang here:
> {code}
> "TezTR-886270_0_1_1_1_0" #281 daemon prio=5 os_prio=0 tid=0x00007f1b80366800 
> nid=0x1ca waiting on condition [0x00007f0cdb6d9000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00007f0de90049a8> (a 
> java.util.concurrent.FutureTask)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:204)
>       at 
> org.apache.hadoop.hive.ql.exec.Operator.completeInitialization(Operator.java:436)
>       at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:399)
>       at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:572)
>       at 
> org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:524)
>       at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:266)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
>       at 
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
>       at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>       at 
> org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> Operator.asyncInitOperations typically contains a dependency on map join 
> input.
> in my example Map2 reads small table, like microstrategy.lu_item, and Map1 
> reads a larger table microstrategy.order_detail



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to