[ 
https://issues.apache.org/jira/browse/HIVE-27078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

László Bodor updated HIVE-27078:
--------------------------------
    Description: 
Considering this DAG:
{code}
|         Map 1 <- Reducer 3 (CUSTOM_EDGE)           |
|         Map 2 <- Map 4 (CUSTOM_EDGE)               |
|         Map 5 <- Map 1 (CUSTOM_EDGE)               |
|         Reducer 3 <- Map 2 (SIMPLE_EDGE)   
{code}
this can be simplified further, just picked from a customer query, the 
problematic vertices and edge is:
{code}
|         Map 1 <- Reducer 3 (CUSTOM_EDGE)           |
{code}

Reducer 3 started scheduled with 20 tasks, and later it's decided by auto 
reducer parallelism that only 4 tasks are needed:
{code}
2023-02-07 13:00:36,078 [INFO] [App Shared Pool - #4] 
|vertexmanager.ShuffleVertexManager|: Reducing auto parallelism for vertex: 
Reducer 3 from 20 to 4
{code}

in this case, Map 1 can hang as it still expects 20 inputs:
{code}
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  
FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 4 .......... container     SUCCEEDED     16         16        0        0    
   0       0
Map 2 .......... container     SUCCEEDED     48         48        0        0    
   0       0
Reducer 3 ...... container     SUCCEEDED      4          4        0        0    
   0       0
Map 1            container       RUNNING    192          0       13      179    
   0       0
Map 5            container        INITED    241          0        0      241    
   0       0
----------------------------------------------------------------------------------------------
VERTICES: 03/05  [===>>-----------------------] 13%   ELAPSED TIME: 901.18 s
----------------------------------------------------------------------------------------------
{code}

in logs it's like:
{code}
2022-12-08 09:42:26,845 [INFO] [I/O Setup 2 Start: {Reducer 3}] 
|impl.ShuffleManager|: Reducer_3: numInputs=20, 
compressionCodec=org.apache.hadoop.io.compress.SnappyCodec, numFetchers=10, 
ifileBufferSize=4096, ifileReadAheadEnabled=true, ifileReadAheadLength=4194304, 
localDiskFetchEnabled=true, sharedFetchEnabled=false, keepAlive=true, 
keepAliveMaxConnections=20, connectionTimeout=180000, readTimeout=180000, 
bufferSize=8192, bufferSize=8192, maxTaskOutputAtOnce=20, asyncHttp=false
...
receives the input event:
2022-12-08 09:42:27,134 [INFO] [TaskHeartbeatThread] |task.TaskReporter|: 
Routing events from heartbeat response to task, 
currentTaskAttemptId=attempt_1670331499491_1408_1_03_000039_0, eventCount=1 
fromEventId=0 nextFromEventId=0

...but then it hangs while waiting for further inputs:

"TezChild" #29 daemon prio=5 os_prio=0 tid=0x00007f3fae141000 nid=0x9581 
waiting on condition [0x00007f3f737ba000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000071ad90a00> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
        at 
java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
        at 
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:1033)
        at 
org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202)
        at 
org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125)
        at 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader.load(VectorMapJoinFastHashTableLoader.java:129)
        at 
org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTableInternal(MapJoinOperator.java:385)
        at 
org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:454)
        at 
org.apache.hadoop.hive.ql.exec.MapJoinOperator.initializeOp(MapJoinOperator.java:241)
        at 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinCommonOperator.initializeOp(VectorMapJoinCommonOperator.java:555)
        at 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinGenerateResultOperator.initializeOp(VectorMapJoinGenerateResultOperator.java:111)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:374)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:571)
        at 
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:523)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:384)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:268)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:252)
        at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
        at 
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
        at 
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
{code}

we can temporarily (as a quick workaround) disable auto reducer parallelism if 
on a vertex if it's a source of a bucket map join 

  was:
Considering this DAG:
{code}
|         Map 1 <- Reducer 3 (CUSTOM_EDGE)           |
|         Map 2 <- Map 4 (CUSTOM_EDGE)               |
|         Map 5 <- Map 1 (CUSTOM_EDGE)               |
|         Reducer 3 <- Map 2 (SIMPLE_EDGE)   
{code}
this can be simplified further, just picked from a customer query, the 
problematic vertices and edge is:
{code}
|         Map 1 <- Reducer 3 (CUSTOM_EDGE)           |
{code}

Reducer 3 started scheduled with 20 tasks, and later it's decided by auto 
reducer parallelism that only 4 tasks are needed:
{code}
2023-02-07 13:00:36,078 [INFO] [App Shared Pool - #4] 
|vertexmanager.ShuffleVertexManager|: Reducing auto parallelism for vertex: 
Reducer 3 from 20 to 4
{code}

in this case, Map 1 can hang as it still expects 20 inputs:
{code}
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  
FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 4 .......... container     SUCCEEDED     16         16        0        0    
   0       0
Map 2 .......... container     SUCCEEDED     48         48        0        0    
   0       0
Reducer 3 ...... container     SUCCEEDED      4          4        0        0    
   0       0
Map 1            container       RUNNING    192          0       13      179    
   0       0
Map 5            container        INITED    241          0        0      241    
   0       0
----------------------------------------------------------------------------------------------
VERTICES: 03/05  [===>>-----------------------] 13%   ELAPSED TIME: 901.18 s
----------------------------------------------------------------------------------------------
{code}

in logs it's like:
{code}
2022-12-08 09:42:26,845 [INFO] [I/O Setup 2 Start: {Reducer 3}] 
|impl.ShuffleManager|: Reducer_3: numInputs=20, 
compressionCodec=org.apache.hadoop.io.compress.SnappyCodec, numFetchers=10, 
ifileBufferSize=4096, ifileReadAheadEnabled=true, ifileReadAheadLength=4194304, 
localDiskFetchEnabled=true, sharedFetchEnabled=false, keepAlive=true, 
keepAliveMaxConnections=20, connectionTimeout=180000, readTimeout=180000, 
bufferSize=8192, bufferSize=8192, maxTaskOutputAtOnce=20, asyncHttp=false
...
receives the input event:
2022-12-08 09:42:27,134 [INFO] [TaskHeartbeatThread] |task.TaskReporter|: 
Routing events from heartbeat response to task, 
currentTaskAttemptId=attempt_1670331499491_1408_1_03_000039_0, eventCount=1 
fromEventId=0 nextFromEventId=0

...but then it hangs while waiting for further inputs:

"TezChild" #29 daemon prio=5 os_prio=0 tid=0x00007f3fae141000 nid=0x9581 
waiting on condition [0x00007f3f737ba000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000071ad90a00> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
        at 
java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
        at 
org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:1033)
        at 
org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202)
        at 
org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125)
        at 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader.load(VectorMapJoinFastHashTableLoader.java:129)
        at 
org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTableInternal(MapJoinOperator.java:385)
        at 
org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:454)
        at 
org.apache.hadoop.hive.ql.exec.MapJoinOperator.initializeOp(MapJoinOperator.java:241)
        at 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinCommonOperator.initializeOp(VectorMapJoinCommonOperator.java:555)
        at 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinGenerateResultOperator.initializeOp(VectorMapJoinGenerateResultOperator.java:111)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:374)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:571)
        at 
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:523)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:384)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:268)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:252)
        at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
        at 
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
        at 
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
{code}




> Bucket Map Join can hang if the source tasks parallelism is changed by 
> reducer autoparallelism
> ----------------------------------------------------------------------------------------------
>
>                 Key: HIVE-27078
>                 URL: https://issues.apache.org/jira/browse/HIVE-27078
>             Project: Hive
>          Issue Type: Bug
>            Reporter: László Bodor
>            Priority: Major
>
> Considering this DAG:
> {code}
> |         Map 1 <- Reducer 3 (CUSTOM_EDGE)           |
> |         Map 2 <- Map 4 (CUSTOM_EDGE)               |
> |         Map 5 <- Map 1 (CUSTOM_EDGE)               |
> |         Reducer 3 <- Map 2 (SIMPLE_EDGE)   
> {code}
> this can be simplified further, just picked from a customer query, the 
> problematic vertices and edge is:
> {code}
> |         Map 1 <- Reducer 3 (CUSTOM_EDGE)           |
> {code}
> Reducer 3 started scheduled with 20 tasks, and later it's decided by auto 
> reducer parallelism that only 4 tasks are needed:
> {code}
> 2023-02-07 13:00:36,078 [INFO] [App Shared Pool - #4] 
> |vertexmanager.ShuffleVertexManager|: Reducing auto parallelism for vertex: 
> Reducer 3 from 20 to 4
> {code}
> in this case, Map 1 can hang as it still expects 20 inputs:
> {code}
> ----------------------------------------------------------------------------------------------
>         VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  
> FAILED  KILLED
> ----------------------------------------------------------------------------------------------
> Map 4 .......... container     SUCCEEDED     16         16        0        0  
>      0       0
> Map 2 .......... container     SUCCEEDED     48         48        0        0  
>      0       0
> Reducer 3 ...... container     SUCCEEDED      4          4        0        0  
>      0       0
> Map 1            container       RUNNING    192          0       13      179  
>      0       0
> Map 5            container        INITED    241          0        0      241  
>      0       0
> ----------------------------------------------------------------------------------------------
> VERTICES: 03/05  [===>>-----------------------] 13%   ELAPSED TIME: 901.18 s
> ----------------------------------------------------------------------------------------------
> {code}
> in logs it's like:
> {code}
> 2022-12-08 09:42:26,845 [INFO] [I/O Setup 2 Start: {Reducer 3}] 
> |impl.ShuffleManager|: Reducer_3: numInputs=20, 
> compressionCodec=org.apache.hadoop.io.compress.SnappyCodec, numFetchers=10, 
> ifileBufferSize=4096, ifileReadAheadEnabled=true, 
> ifileReadAheadLength=4194304, localDiskFetchEnabled=true, 
> sharedFetchEnabled=false, keepAlive=true, keepAliveMaxConnections=20, 
> connectionTimeout=180000, readTimeout=180000, bufferSize=8192, 
> bufferSize=8192, maxTaskOutputAtOnce=20, asyncHttp=false
> ...
> receives the input event:
> 2022-12-08 09:42:27,134 [INFO] [TaskHeartbeatThread] |task.TaskReporter|: 
> Routing events from heartbeat response to task, 
> currentTaskAttemptId=attempt_1670331499491_1408_1_03_000039_0, eventCount=1 
> fromEventId=0 nextFromEventId=0
> ...but then it hangs while waiting for further inputs:
> "TezChild" #29 daemon prio=5 os_prio=0 tid=0x00007f3fae141000 nid=0x9581 
> waiting on condition [0x00007f3f737ba000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x000000071ad90a00> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
>       at 
> java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
>       at 
> org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:1033)
>       at 
> org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202)
>       at 
> org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125)
>       at 
> org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader.load(VectorMapJoinFastHashTableLoader.java:129)
>       at 
> org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTableInternal(MapJoinOperator.java:385)
>       at 
> org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:454)
>       at 
> org.apache.hadoop.hive.ql.exec.MapJoinOperator.initializeOp(MapJoinOperator.java:241)
>       at 
> org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinCommonOperator.initializeOp(VectorMapJoinCommonOperator.java:555)
>       at 
> org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinGenerateResultOperator.initializeOp(VectorMapJoinGenerateResultOperator.java:111)
>       at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:374)
>       at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:571)
>       at 
> org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:523)
>       at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:384)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:268)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:252)
>       at 
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
>       at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>       at 
> com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>       at 
> com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
>       at 
> com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> {code}
> we can temporarily (as a quick workaround) disable auto reducer parallelism 
> if on a vertex if it's a source of a bucket map join 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to