[ https://issues.apache.org/jira/browse/HIVE-27078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
László Bodor updated HIVE-27078: -------------------------------- Description: Considering this DAG: {code} | Map 1 <- Reducer 3 (CUSTOM_EDGE) | | Map 2 <- Map 4 (CUSTOM_EDGE) | | Map 5 <- Map 1 (CUSTOM_EDGE) | | Reducer 3 <- Map 2 (SIMPLE_EDGE) {code} this can be simplified further, just picked from a customer query, the problematic vertices and edge is: {code} | Map 1 <- Reducer 3 (CUSTOM_EDGE) | {code} Reducer 3 started scheduled with 20 tasks, and later it's decided by auto reducer parallelism that only 4 tasks are needed: {code} 2023-02-07 13:00:36,078 [INFO] [App Shared Pool - #4] |vertexmanager.ShuffleVertexManager|: Reducing auto parallelism for vertex: Reducer 3 from 20 to 4 {code} in this case, Map 1 can hang as it still expects 20 inputs: {code} ---------------------------------------------------------------------------------------------- VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED ---------------------------------------------------------------------------------------------- Map 4 .......... container SUCCEEDED 16 16 0 0 0 0 Map 2 .......... container SUCCEEDED 48 48 0 0 0 0 Reducer 3 ...... container SUCCEEDED 4 4 0 0 0 0 Map 1 container RUNNING 192 0 13 179 0 0 Map 5 container INITED 241 0 0 241 0 0 ---------------------------------------------------------------------------------------------- VERTICES: 03/05 [===>>-----------------------] 13% ELAPSED TIME: 901.18 s ---------------------------------------------------------------------------------------------- {code} in logs it's like: {code} 2022-12-08 09:42:26,845 [INFO] [I/O Setup 2 Start: {Reducer 3}] |impl.ShuffleManager|: Reducer_3: numInputs=20, compressionCodec=org.apache.hadoop.io.compress.SnappyCodec, numFetchers=10, ifileBufferSize=4096, ifileReadAheadEnabled=true, ifileReadAheadLength=4194304, localDiskFetchEnabled=true, sharedFetchEnabled=false, keepAlive=true, keepAliveMaxConnections=20, connectionTimeout=180000, readTimeout=180000, bufferSize=8192, bufferSize=8192, maxTaskOutputAtOnce=20, asyncHttp=false ... receives the input event: 2022-12-08 09:42:27,134 [INFO] [TaskHeartbeatThread] |task.TaskReporter|: Routing events from heartbeat response to task, currentTaskAttemptId=attempt_1670331499491_1408_1_03_000039_0, eventCount=1 fromEventId=0 nextFromEventId=0 ...but then it hangs while waiting for further inputs: "TezChild" #29 daemon prio=5 os_prio=0 tid=0x00007f3fae141000 nid=0x9581 waiting on condition [0x00007f3f737ba000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000071ad90a00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492) at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680) at org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:1033) at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202) at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125) at org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader.load(VectorMapJoinFastHashTableLoader.java:129) at org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTableInternal(MapJoinOperator.java:385) at org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:454) at org.apache.hadoop.hive.ql.exec.MapJoinOperator.initializeOp(MapJoinOperator.java:241) at org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinCommonOperator.initializeOp(VectorMapJoinCommonOperator.java:555) at org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinGenerateResultOperator.initializeOp(VectorMapJoinGenerateResultOperator.java:111) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:374) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:571) at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:523) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:384) at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:268) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:252) at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {code} we can temporarily (as a quick workaround) disable auto reducer parallelism if on a vertex if it's a source of a bucket map join was: Considering this DAG: {code} | Map 1 <- Reducer 3 (CUSTOM_EDGE) | | Map 2 <- Map 4 (CUSTOM_EDGE) | | Map 5 <- Map 1 (CUSTOM_EDGE) | | Reducer 3 <- Map 2 (SIMPLE_EDGE) {code} this can be simplified further, just picked from a customer query, the problematic vertices and edge is: {code} | Map 1 <- Reducer 3 (CUSTOM_EDGE) | {code} Reducer 3 started scheduled with 20 tasks, and later it's decided by auto reducer parallelism that only 4 tasks are needed: {code} 2023-02-07 13:00:36,078 [INFO] [App Shared Pool - #4] |vertexmanager.ShuffleVertexManager|: Reducing auto parallelism for vertex: Reducer 3 from 20 to 4 {code} in this case, Map 1 can hang as it still expects 20 inputs: {code} ---------------------------------------------------------------------------------------------- VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED ---------------------------------------------------------------------------------------------- Map 4 .......... container SUCCEEDED 16 16 0 0 0 0 Map 2 .......... container SUCCEEDED 48 48 0 0 0 0 Reducer 3 ...... container SUCCEEDED 4 4 0 0 0 0 Map 1 container RUNNING 192 0 13 179 0 0 Map 5 container INITED 241 0 0 241 0 0 ---------------------------------------------------------------------------------------------- VERTICES: 03/05 [===>>-----------------------] 13% ELAPSED TIME: 901.18 s ---------------------------------------------------------------------------------------------- {code} in logs it's like: {code} 2022-12-08 09:42:26,845 [INFO] [I/O Setup 2 Start: {Reducer 3}] |impl.ShuffleManager|: Reducer_3: numInputs=20, compressionCodec=org.apache.hadoop.io.compress.SnappyCodec, numFetchers=10, ifileBufferSize=4096, ifileReadAheadEnabled=true, ifileReadAheadLength=4194304, localDiskFetchEnabled=true, sharedFetchEnabled=false, keepAlive=true, keepAliveMaxConnections=20, connectionTimeout=180000, readTimeout=180000, bufferSize=8192, bufferSize=8192, maxTaskOutputAtOnce=20, asyncHttp=false ... receives the input event: 2022-12-08 09:42:27,134 [INFO] [TaskHeartbeatThread] |task.TaskReporter|: Routing events from heartbeat response to task, currentTaskAttemptId=attempt_1670331499491_1408_1_03_000039_0, eventCount=1 fromEventId=0 nextFromEventId=0 ...but then it hangs while waiting for further inputs: "TezChild" #29 daemon prio=5 os_prio=0 tid=0x00007f3fae141000 nid=0x9581 waiting on condition [0x00007f3f737ba000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000071ad90a00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492) at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680) at org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:1033) at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202) at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125) at org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader.load(VectorMapJoinFastHashTableLoader.java:129) at org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTableInternal(MapJoinOperator.java:385) at org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:454) at org.apache.hadoop.hive.ql.exec.MapJoinOperator.initializeOp(MapJoinOperator.java:241) at org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinCommonOperator.initializeOp(VectorMapJoinCommonOperator.java:555) at org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinGenerateResultOperator.initializeOp(VectorMapJoinGenerateResultOperator.java:111) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:374) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:571) at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:523) at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:384) at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:268) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:252) at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {code} > Bucket Map Join can hang if the source tasks parallelism is changed by > reducer autoparallelism > ---------------------------------------------------------------------------------------------- > > Key: HIVE-27078 > URL: https://issues.apache.org/jira/browse/HIVE-27078 > Project: Hive > Issue Type: Bug > Reporter: László Bodor > Priority: Major > > Considering this DAG: > {code} > | Map 1 <- Reducer 3 (CUSTOM_EDGE) | > | Map 2 <- Map 4 (CUSTOM_EDGE) | > | Map 5 <- Map 1 (CUSTOM_EDGE) | > | Reducer 3 <- Map 2 (SIMPLE_EDGE) > {code} > this can be simplified further, just picked from a customer query, the > problematic vertices and edge is: > {code} > | Map 1 <- Reducer 3 (CUSTOM_EDGE) | > {code} > Reducer 3 started scheduled with 20 tasks, and later it's decided by auto > reducer parallelism that only 4 tasks are needed: > {code} > 2023-02-07 13:00:36,078 [INFO] [App Shared Pool - #4] > |vertexmanager.ShuffleVertexManager|: Reducing auto parallelism for vertex: > Reducer 3 from 20 to 4 > {code} > in this case, Map 1 can hang as it still expects 20 inputs: > {code} > ---------------------------------------------------------------------------------------------- > VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING > FAILED KILLED > ---------------------------------------------------------------------------------------------- > Map 4 .......... container SUCCEEDED 16 16 0 0 > 0 0 > Map 2 .......... container SUCCEEDED 48 48 0 0 > 0 0 > Reducer 3 ...... container SUCCEEDED 4 4 0 0 > 0 0 > Map 1 container RUNNING 192 0 13 179 > 0 0 > Map 5 container INITED 241 0 0 241 > 0 0 > ---------------------------------------------------------------------------------------------- > VERTICES: 03/05 [===>>-----------------------] 13% ELAPSED TIME: 901.18 s > ---------------------------------------------------------------------------------------------- > {code} > in logs it's like: > {code} > 2022-12-08 09:42:26,845 [INFO] [I/O Setup 2 Start: {Reducer 3}] > |impl.ShuffleManager|: Reducer_3: numInputs=20, > compressionCodec=org.apache.hadoop.io.compress.SnappyCodec, numFetchers=10, > ifileBufferSize=4096, ifileReadAheadEnabled=true, > ifileReadAheadLength=4194304, localDiskFetchEnabled=true, > sharedFetchEnabled=false, keepAlive=true, keepAliveMaxConnections=20, > connectionTimeout=180000, readTimeout=180000, bufferSize=8192, > bufferSize=8192, maxTaskOutputAtOnce=20, asyncHttp=false > ... > receives the input event: > 2022-12-08 09:42:27,134 [INFO] [TaskHeartbeatThread] |task.TaskReporter|: > Routing events from heartbeat response to task, > currentTaskAttemptId=attempt_1670331499491_1408_1_03_000039_0, eventCount=1 > fromEventId=0 nextFromEventId=0 > ...but then it hangs while waiting for further inputs: > "TezChild" #29 daemon prio=5 os_prio=0 tid=0x00007f3fae141000 nid=0x9581 > waiting on condition [0x00007f3f737ba000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x000000071ad90a00> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492) > at > java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680) > at > org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:1033) > at > org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202) > at > org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125) > at > org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader.load(VectorMapJoinFastHashTableLoader.java:129) > at > org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTableInternal(MapJoinOperator.java:385) > at > org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:454) > at > org.apache.hadoop.hive.ql.exec.MapJoinOperator.initializeOp(MapJoinOperator.java:241) > at > org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinCommonOperator.initializeOp(VectorMapJoinCommonOperator.java:555) > at > org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinGenerateResultOperator.initializeOp(VectorMapJoinGenerateResultOperator.java:111) > at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:374) > at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:571) > at > org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:523) > at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:384) > at > org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353) > at > org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:268) > at > org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:252) > at > org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374) > at > org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75) > at > org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) > at > org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62) > at > org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38) > at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) > at > com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > at > com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69) > at > com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > {code} > we can temporarily (as a quick workaround) disable auto reducer parallelism > if on a vertex if it's a source of a bucket map join -- This message was sent by Atlassian Jira (v8.20.10#820010)