[ https://issues.apache.org/jira/browse/HIVE-24626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
László Bodor updated HIVE-24626: -------------------------------- Description: The root cause is that the readers cannot queue.offer items to full queues, which belong to consumers that are blocked on other consumers. Scenario is like below: {code} ---------------------------------------------------------------------------------------------- VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED ---------------------------------------------------------------------------------------------- Map 2 ...... llap RUNNING 3 2 1 0 0 0 Map 1 llap RUNNING 676 0 119 557 0 0 Map 3 llap RUNNING 108 0 21 87 0 21 Reducer 4 llap INITED 1 0 0 1 0 0 Map 5 llap INITED 108 0 0 108 0 0 Reducer 6 llap INITED 4 0 0 4 0 0 Reducer 7 llap INITED 1 0 0 1 0 0 ---------------------------------------------------------------------------------------------- VERTICES: 00/07 [>>--------------------------] 0% ELAPSED TIME: 3489.83 s ---------------------------------------------------------------------------------------------- {code} Map2 is MAPJOINed to Map1. In an LLAP daemon, the forever running Map2 task is blocked on nextCvb: {code} "TezTR-886270_0_1_0_1_0" #154 daemon prio=5 os_prio=0 tid=0x00007f1b88348000 nid=0x147 waiting on condition [0x00007f0ce005d000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00007f0de8025e00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.nextCvb(LlapRecordReader.java:517) at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:372) at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:82) at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:362) at org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:79) at org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:33) at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:117) at org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next(TezGroupedSplitsInputFormat.java:151) at org.apache.tez.mapreduce.lib.MRReaderMapred.next(MRReaderMapred.java:115) at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68) at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:437) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250) at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} while all the elevator threads are blocked here (all of them tries to read for Map1): {code} "IO-Elevator-Thread-11" #408 daemon prio=5 os_prio=0 tid=0x00007f0cddc48800 nid=0x267 waiting on condition [0x00007f0cd7af8000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00007f0e3095c480> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:379) at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.enqueueInternal(LlapRecordReader.java:607) at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:591) at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:82) at org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:268) at org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:79) at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:122) at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:42) at org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.readEncodedColumns(EncodedReaderImpl.java:535) at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.performDataRead(OrcEncodedDataReader.java:430) at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:279) at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:276) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:276) at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:117) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:88) at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:73) at org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} the problem here is that, as far as I can see, all the elevator threads try to offer to LlapRecordReaders that belong to Map 1, however, Map 1 tasks cannot progress beyond a certain point, because wait for Map 2's input this is because the queues of LlapRecordReader instances of Map1 are full, otherwise LlapRecordReader.enqueueInternal should be able to offer the item from the elevator thread In my example: Map2's queue limit is: 50000 (Map2: mapjoin source) Map1's queue limit is: 6931 (Map1: mapjoin target) the limits are calculated according to data characteristics in LlapRecordReader.determineQueueLimit in my case, io threads for Map1 reached their queue limit, and cannot offer new items, so holding IO elevator threads, which cannot be used for reading for more important Map2 tasks, so Map1 tasks will eventually hang here: {code} "TezTR-886270_0_1_1_1_0" #281 daemon prio=5 os_prio=0 tid=0x00007f1b80366800 nid=0x1ca waiting on condition [0x00007f0cdb6d9000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00007f0de90049a8> (a java.util.concurrent.FutureTask) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426) at java.util.concurrent.FutureTask.get(FutureTask.java:204) at org.apache.hadoop.hive.ql.exec.Operator.completeInitialization(Operator.java:436) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:399) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:572) at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:524) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:266) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250) at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} Operator.asyncInitOperations typically contains a dependency on map join input. in my example Map2 reads small table, like microstrategy.lu_item, and Map1 reads a larger table microstrategy.order_detail was: The root cause is that the readers cannot queue.offer items to full queues, which belong to consumers that are blocked on other consumers. Scenario is like below: {code} ---------------------------------------------------------------------------------------------- VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED ---------------------------------------------------------------------------------------------- Map 2 ...... llap RUNNING 3 2 1 0 0 0 Map 1 llap RUNNING 676 0 119 557 0 0 Map 3 llap RUNNING 108 0 21 87 0 21 Reducer 4 llap INITED 1 0 0 1 0 0 Map 5 llap INITED 108 0 0 108 0 0 Reducer 6 llap INITED 4 0 0 4 0 0 Reducer 7 llap INITED 1 0 0 1 0 0 ---------------------------------------------------------------------------------------------- VERTICES: 00/07 [>>--------------------------] 0% ELAPSED TIME: 3489.83 s ---------------------------------------------------------------------------------------------- {code} Map2 is MAPJOINed to Map1. In an LLAP daemon, the forever running Map2 task is blocked on nextCvb: {code} "TezTR-886270_0_1_0_1_0" #154 daemon prio=5 os_prio=0 tid=0x00007f1b88348000 nid=0x147 waiting on condition [0x00007f0ce005d000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00007f0de8025e00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.nextCvb(LlapRecordReader.java:517) at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:372) at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:82) at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:362) at org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:79) at org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:33) at org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:117) at org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next(TezGroupedSplitsInputFormat.java:151) at org.apache.tez.mapreduce.lib.MRReaderMapred.next(MRReaderMapred.java:115) at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68) at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:437) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250) at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} while all the elevator threads are blocked here: {code} "IO-Elevator-Thread-11" #408 daemon prio=5 os_prio=0 tid=0x00007f0cddc48800 nid=0x267 waiting on condition [0x00007f0cd7af8000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00007f0e3095c480> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:379) at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.enqueueInternal(LlapRecordReader.java:607) at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:591) at org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:82) at org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:268) at org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:79) at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:122) at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:42) at org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.readEncodedColumns(EncodedReaderImpl.java:535) at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.performDataRead(OrcEncodedDataReader.java:430) at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:279) at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:276) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:276) at org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:117) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:88) at org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:73) at org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} the problem here is that, as far as I can see, all the elevator threads try to offer to LlapRecordReaders that belong to Map 1, however, Map 1 tasks cannot progress beyond a certain point, because wait for Map 2's input this is because the queues of LlapRecordReader instances of Map1 are full, otherwise LlapRecordReader.enqueueInternal should be able to offer the item from the elevator thread In my example: Map2's queue limit is: 50000 Map1's queue limit is: 6931 which is calculated according to data characteristics in LlapRecordReader.determineQueueLimit in my case, I guess Map1 reader reached its queue limit, and cannot offer new items, that's why Map2 task's thread is blocked here: {code} "TezTR-886270_0_1_1_1_0" #281 daemon prio=5 os_prio=0 tid=0x00007f1b80366800 nid=0x1ca waiting on condition [0x00007f0cdb6d9000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00007f0de90049a8> (a java.util.concurrent.FutureTask) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426) at java.util.concurrent.FutureTask.get(FutureTask.java:204) at org.apache.hadoop.hive.ql.exec.Operator.completeInitialization(Operator.java:436) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:399) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:572) at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:524) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:266) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250) at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} Operator.asyncInitOperations typically contains a dependency on map join input. in my example Map2 reads small table, like microstrategy.lu_item, and Map1 reads a larger table microstrategy.order_detail > LLAP: reader threads could be starvated if all IO elevator threads are busy > to enqueue to another readers with full queue > ------------------------------------------------------------------------------------------------------------------------- > > Key: HIVE-24626 > URL: https://issues.apache.org/jira/browse/HIVE-24626 > Project: Hive > Issue Type: Bug > Reporter: László Bodor > Assignee: László Bodor > Priority: Major > Attachments: executor_stack_cache_none_12_io_threads.log > > > The root cause is that the readers cannot queue.offer items to full queues, > which belong to consumers that are blocked on other consumers. > Scenario is like below: > {code} > ---------------------------------------------------------------------------------------------- > VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING > FAILED KILLED > ---------------------------------------------------------------------------------------------- > Map 2 ...... llap RUNNING 3 2 1 0 > 0 0 > Map 1 llap RUNNING 676 0 119 557 > 0 0 > Map 3 llap RUNNING 108 0 21 87 > 0 21 > Reducer 4 llap INITED 1 0 0 1 > 0 0 > Map 5 llap INITED 108 0 0 108 > 0 0 > Reducer 6 llap INITED 4 0 0 4 > 0 0 > Reducer 7 llap INITED 1 0 0 1 > 0 0 > ---------------------------------------------------------------------------------------------- > VERTICES: 00/07 [>>--------------------------] 0% ELAPSED TIME: 3489.83 s > ---------------------------------------------------------------------------------------------- > {code} > Map2 is MAPJOINed to Map1. In an LLAP daemon, the forever running Map2 task > is blocked on nextCvb: > {code} > "TezTR-886270_0_1_0_1_0" #154 daemon prio=5 os_prio=0 tid=0x00007f1b88348000 > nid=0x147 waiting on condition [0x00007f0ce005d000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00007f0de8025e00> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at > java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.nextCvb(LlapRecordReader.java:517) > at > org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:372) > at > org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:82) > at > org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:362) > at > org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:79) > at > org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:33) > at > org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:117) > at > org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next(TezGroupedSplitsInputFormat.java:151) > at > org.apache.tez.mapreduce.lib.MRReaderMapred.next(MRReaderMapred.java:115) > at > org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68) > at > org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:437) > at > org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267) > at > org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250) > at > org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374) > at > org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75) > at > org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) > at > org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62) > at > org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38) > at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) > at > org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > while all the elevator threads are blocked here (all of them tries to read > for Map1): > {code} > "IO-Elevator-Thread-11" #408 daemon prio=5 os_prio=0 tid=0x00007f0cddc48800 > nid=0x267 waiting on condition [0x00007f0cd7af8000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00007f0e3095c480> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at > java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:379) > at > org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.enqueueInternal(LlapRecordReader.java:607) > at > org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:591) > at > org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:82) > at > org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:268) > at > org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:79) > at > org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:122) > at > org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:42) > at > org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.readEncodedColumns(EncodedReaderImpl.java:535) > at > org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.performDataRead(OrcEncodedDataReader.java:430) > at > org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:279) > at > org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:276) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) > at > org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:276) > at > org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:117) > at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) > at > org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:88) > at > org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:73) > at > org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > the problem here is that, as far as I can see, all the elevator threads try > to offer to LlapRecordReaders that belong to Map 1, however, Map 1 tasks > cannot progress beyond a certain point, because wait for Map 2's input > this is because the queues of LlapRecordReader instances of Map1 are full, > otherwise LlapRecordReader.enqueueInternal should be able to offer the item > from the elevator thread > In my example: > Map2's queue limit is: 50000 (Map2: mapjoin source) > Map1's queue limit is: 6931 (Map1: mapjoin target) > the limits are calculated according to data characteristics in > LlapRecordReader.determineQueueLimit > in my case, io threads for Map1 reached their queue limit, and cannot offer > new items, so holding IO elevator threads, which cannot be used for reading > for more important Map2 tasks, so Map1 tasks will eventually hang here: > {code} > "TezTR-886270_0_1_1_1_0" #281 daemon prio=5 os_prio=0 tid=0x00007f1b80366800 > nid=0x1ca waiting on condition [0x00007f0cdb6d9000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00007f0de90049a8> (a > java.util.concurrent.FutureTask) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426) > at java.util.concurrent.FutureTask.get(FutureTask.java:204) > at > org.apache.hadoop.hive.ql.exec.Operator.completeInitialization(Operator.java:436) > at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:399) > at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:572) > at > org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:524) > at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) > at > org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353) > at > org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:266) > at > org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250) > at > org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374) > at > org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75) > at > org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) > at > org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62) > at > org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38) > at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) > at > org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > Operator.asyncInitOperations typically contains a dependency on map join > input. > in my example Map2 reads small table, like microstrategy.lu_item, and Map1 > reads a larger table microstrategy.order_detail -- This message was sent by Atlassian Jira (v8.3.4#803005)