[ 
https://issues.apache.org/jira/browse/HIVE-24626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HIVE-24626:
----------------------------------
    Labels: pull-request-available  (was: )

> LLAP: reader threads could be starvated if all IO elevator threads are busy 
> to enqueue to another readers with full queue
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-24626
>                 URL: https://issues.apache.org/jira/browse/HIVE-24626
>             Project: Hive
>          Issue Type: Bug
>            Reporter: László Bodor
>            Assignee: László Bodor
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: executor_stack_cache_none_12_io_threads.log
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> The root cause is that the readers cannot queue.offer items to full queues, 
> which belong to consumers that are blocked on other consumers. 
> Scenario is like below:
> {code}
> ----------------------------------------------------------------------------------------------
>         VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  
> FAILED  KILLED
> ----------------------------------------------------------------------------------------------
> Map 2 ......          llap       RUNNING      3          2        1        0  
>      0       0
> Map 1                 llap       RUNNING    676          0      119      557  
>      0       0
> Map 3                 llap       RUNNING    108          0       21       87  
>      0      21
> Reducer 4             llap        INITED      1          0        0        1  
>      0       0
> Map 5                 llap        INITED    108          0        0      108  
>      0       0
> Reducer 6             llap        INITED      4          0        0        4  
>      0       0
> Reducer 7             llap        INITED      1          0        0        1  
>      0       0
> ----------------------------------------------------------------------------------------------
> VERTICES: 00/07  [>>--------------------------] 0%    ELAPSED TIME: 3489.83 s
> ----------------------------------------------------------------------------------------------
> {code}
> Map2 is MAPJOINed to Map1. In an LLAP daemon, the forever running Map2 task 
> is blocked on nextCvb:
> {code}
> "TezTR-886270_0_1_0_1_0" #154 daemon prio=5 os_prio=0 tid=0x00007f1b88348000 
> nid=0x147 waiting on condition [0x00007f0ce005d000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00007f0de8025e00> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at 
> java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.nextCvb(LlapRecordReader.java:517)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:372)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:82)
>       at 
> org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:362)
>       at 
> org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:79)
>       at 
> org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:33)
>       at 
> org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:117)
>       at 
> org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next(TezGroupedSplitsInputFormat.java:151)
>       at 
> org.apache.tez.mapreduce.lib.MRReaderMapred.next(MRReaderMapred.java:115)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:437)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
>       at 
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
>       at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>       at 
> org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> while all the elevator threads are blocked here (all of them tries to read 
> for Map1):
> {code}
> "IO-Elevator-Thread-11" #408 daemon prio=5 os_prio=0 tid=0x00007f0cddc48800 
> nid=0x267 waiting on condition [0x00007f0cd7af8000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00007f0e3095c480> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at 
> java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:379)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.enqueueInternal(LlapRecordReader.java:607)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:591)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:82)
>       at 
> org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:268)
>       at 
> org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:79)
>       at 
> org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:122)
>       at 
> org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:42)
>       at 
> org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.readEncodedColumns(EncodedReaderImpl.java:535)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.performDataRead(OrcEncodedDataReader.java:430)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:279)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:276)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:276)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:117)
>       at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>       at 
> org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:88)
>       at 
> org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:73)
>       at 
> org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> the problem here is that, as far as I can see, all the elevator threads try 
> to offer to LlapRecordReaders that belong to Map 1, however, Map 1 tasks 
> cannot progress beyond a certain point, because wait for Map 2's input
> this is because the queues of LlapRecordReader instances of Map1 are full, 
> otherwise LlapRecordReader.enqueueInternal should be able to offer the item 
> from the elevator thread
> In my example:
> Map2's queue limit is: 50000 (Map2: mapjoin source)
> Map1's queue limit is: 6931 (Map1: mapjoin target)
> the limits are calculated according to data characteristics in 
> LlapRecordReader.determineQueueLimit
> in my case, io threads for Map1 reached their queue limit, and cannot offer 
> new items, so holding IO elevator threads, which cannot be used for reading 
> for more important Map2 tasks, so Map1 tasks will eventually hang here:
> {code}
> "TezTR-886270_0_1_1_1_0" #281 daemon prio=5 os_prio=0 tid=0x00007f1b80366800 
> nid=0x1ca waiting on condition [0x00007f0cdb6d9000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00007f0de90049a8> (a 
> java.util.concurrent.FutureTask)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:204)
>       at 
> org.apache.hadoop.hive.ql.exec.Operator.completeInitialization(Operator.java:436)
>       at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:399)
>       at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:572)
>       at 
> org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:524)
>       at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:266)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
>       at 
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
>       at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>       at 
> org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> Operator.asyncInitOperations typically contains a dependency on map join 
> input.
> in my example Map2 reads small table, like microstrategy.lu_item, and Map1 
> reads a larger table microstrategy.order_detail
> this deadlock theoretically can be solved by:
> 1. IO elevator threads (reading for Map1) opt out as they cannot offer new 
> items
> 2. the freed elevator threads can be used for Map2
> 3. Map2 can finish its work, and let Map1 proceed by providing the mapjoin 
> data to it



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to