hive git commit: HIVE-12904. Fix a deadlock in LLAP task scheduling. (Sergey Shelukhin and Siddharth Seth) (cherry picked from commit 19b508ecc862fae8997ec938edae4e094658544f)
Repository: hive Updated Branches: refs/heads/branch-2.0 9bf8d00cf -> 80ce0449e HIVE-12904. Fix a deadlock in LLAP task scheduling. (Sergey Shelukhin and Siddharth Seth) (cherry picked from commit 19b508ecc862fae8997ec938edae4e094658544f) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/80ce0449 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/80ce0449 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/80ce0449 Branch: refs/heads/branch-2.0 Commit: 80ce0449ef2bb4f849ce7c37df0c5f1f264e1df2 Parents: 9bf8d00 Author: Siddharth SethAuthored: Mon Jan 25 19:17:15 2016 -0800 Committer: Siddharth Seth Committed: Mon Jan 25 19:18:31 2016 -0800 -- .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 69 +--- .../llap/daemon/impl/TaskExecutorService.java | 64 +- 2 files changed, 80 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/80ce0449/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java -- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 8bec95f..64c2b58 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -20,11 +20,13 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; @@ -166,37 +168,60 @@ public class QueryInfo { private final Map trackedEntities = new HashMap<>(); private final Multimap sourceToEntity = HashMultimap.create(); -synchronized boolean registerForUpdates(FinishableStateUpdateHandler handler, +private final ReentrantLock lock = new ReentrantLock(); + +boolean registerForUpdates(FinishableStateUpdateHandler handler, List sources, QueryFragmentInfo fragmentInfo, boolean lastFinishableState) { - EntityInfo entityInfo = - new EntityInfo(handler, sources, fragmentInfo, lastFinishableState); - if (trackedEntities.put(handler, entityInfo) != null) { -throw new IllegalStateException( -"Only a single registration allowed per entity. Duplicate for " + handler.toString()); - } - for (String source : sources) { -sourceToEntity.put(source, entityInfo); - } + lock.lock(); + try { +EntityInfo entityInfo = +new EntityInfo(handler, sources, fragmentInfo, lastFinishableState); +if (trackedEntities.put(handler, entityInfo) != null) { + throw new IllegalStateException( + "Only a single registration allowed per entity. Duplicate for " + handler.toString()); +} +for (String source : sources) { + sourceToEntity.put(source, entityInfo); +} - if (lastFinishableState != fragmentInfo.canFinish()) { -entityInfo.setLastFinishableState(fragmentInfo.canFinish()); -return false; - } else { -return true; +if (lastFinishableState != fragmentInfo.canFinish()) { + entityInfo.setLastFinishableState(fragmentInfo.canFinish()); + return false; +} else { + return true; +} + } finally { +lock.unlock(); } } -synchronized void unregisterForUpdates(FinishableStateUpdateHandler handler) { - EntityInfo info = trackedEntities.remove(handler); - Preconditions.checkState(info != null, "Cannot invoke unregister on an entity which has not been registered"); - for (String source : info.getSources()) { -sourceToEntity.remove(source, info); +void unregisterForUpdates(FinishableStateUpdateHandler handler) { + lock.lock(); + try { +EntityInfo info = trackedEntities.remove(handler); +Preconditions.checkState(info != null, +"Cannot invoke unregister on an entity which has not been registered"); +for (String source : info.getSources()) { + sourceToEntity.remove(source, info); +} + } finally { +lock.unlock(); } } -synchronized void
hive git commit: HIVE-12904. Fix a deadlock in LLAP task scheduling. (Sergey Shelukhin and Siddharth Seth)
Repository: hive Updated Branches: refs/heads/master 228624b1f -> 19b508ecc HIVE-12904. Fix a deadlock in LLAP task scheduling. (Sergey Shelukhin and Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/19b508ec Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/19b508ec Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/19b508ec Branch: refs/heads/master Commit: 19b508ecc862fae8997ec938edae4e094658544f Parents: 228624b Author: Siddharth SethAuthored: Mon Jan 25 19:17:15 2016 -0800 Committer: Siddharth Seth Committed: Mon Jan 25 19:17:15 2016 -0800 -- .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 69 +--- .../llap/daemon/impl/TaskExecutorService.java | 64 +- 2 files changed, 80 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/19b508ec/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java -- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 8bec95f..64c2b58 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -20,11 +20,13 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; @@ -166,37 +168,60 @@ public class QueryInfo { private final Map trackedEntities = new HashMap<>(); private final Multimap sourceToEntity = HashMultimap.create(); -synchronized boolean registerForUpdates(FinishableStateUpdateHandler handler, +private final ReentrantLock lock = new ReentrantLock(); + +boolean registerForUpdates(FinishableStateUpdateHandler handler, List sources, QueryFragmentInfo fragmentInfo, boolean lastFinishableState) { - EntityInfo entityInfo = - new EntityInfo(handler, sources, fragmentInfo, lastFinishableState); - if (trackedEntities.put(handler, entityInfo) != null) { -throw new IllegalStateException( -"Only a single registration allowed per entity. Duplicate for " + handler.toString()); - } - for (String source : sources) { -sourceToEntity.put(source, entityInfo); - } + lock.lock(); + try { +EntityInfo entityInfo = +new EntityInfo(handler, sources, fragmentInfo, lastFinishableState); +if (trackedEntities.put(handler, entityInfo) != null) { + throw new IllegalStateException( + "Only a single registration allowed per entity. Duplicate for " + handler.toString()); +} +for (String source : sources) { + sourceToEntity.put(source, entityInfo); +} - if (lastFinishableState != fragmentInfo.canFinish()) { -entityInfo.setLastFinishableState(fragmentInfo.canFinish()); -return false; - } else { -return true; +if (lastFinishableState != fragmentInfo.canFinish()) { + entityInfo.setLastFinishableState(fragmentInfo.canFinish()); + return false; +} else { + return true; +} + } finally { +lock.unlock(); } } -synchronized void unregisterForUpdates(FinishableStateUpdateHandler handler) { - EntityInfo info = trackedEntities.remove(handler); - Preconditions.checkState(info != null, "Cannot invoke unregister on an entity which has not been registered"); - for (String source : info.getSources()) { -sourceToEntity.remove(source, info); +void unregisterForUpdates(FinishableStateUpdateHandler handler) { + lock.lock(); + try { +EntityInfo info = trackedEntities.remove(handler); +Preconditions.checkState(info != null, +"Cannot invoke unregister on an entity which has not been registered"); +for (String source : info.getSources()) { + sourceToEntity.remove(source, info); +} + } finally { +lock.unlock(); } } -synchronized void sourceStateUpdated(String sourceName) { - Collection interestedEntityInfos =