Repository: hive Updated Branches: refs/heads/llap 785ec8e75 -> b321c55a3
HIVE-10961. LLAP: Fix ShuffleHandler + Submit work init race condition. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b321c55a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b321c55a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b321c55a Branch: refs/heads/llap Commit: b321c55a326093ea554df77e806e2601eab24832 Parents: 785ec8e Author: Siddharth Seth <ss...@apache.org> Authored: Tue Jun 9 13:58:49 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Tue Jun 9 13:58:49 2015 -0700 ---------------------------------------------------------------------- .../hive/llap/daemon/impl/AMReporter.java | 18 +++++++++++------ .../llap/daemon/impl/ContainerRunnerImpl.java | 18 +++++------------ .../hive/llap/daemon/impl/LlapDaemon.java | 21 ++++++++++++++++---- 3 files changed, 34 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b321c55a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 7376792..1ba18fc 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -27,6 +27,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -77,11 +78,12 @@ public class AMReporter extends AbstractService { private static final Logger LOG = LoggerFactory.getLogger(AMReporter.class); - private final LlapNodeId nodeId; + private volatile LlapNodeId nodeId; private final Configuration conf; private final ListeningExecutorService queueLookupExecutor; private final ListeningExecutorService executor; private final DelayQueue<AMNodeInfo> pendingHeartbeatQueeu = new DelayQueue(); + private final AtomicReference<InetSocketAddress> localAddress; private final long heartbeatInterval; private final AtomicBoolean isShutdown = new AtomicBoolean(false); // Tracks appMasters to which heartbeats are being sent. This should not be used for any other @@ -89,9 +91,9 @@ public class AMReporter extends AbstractService { private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>(); volatile ListenableFuture<Void> queueLookupFuture; - public AMReporter(LlapNodeId nodeId, Configuration conf) { + public AMReporter(AtomicReference<InetSocketAddress> localAddress, Configuration conf) { super(AMReporter.class.getName()); - this.nodeId = nodeId; + this.localAddress = localAddress; this.conf = conf; ExecutorService rawExecutor = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build()); @@ -102,7 +104,7 @@ public class AMReporter extends AbstractService { this.heartbeatInterval = conf.getLong(LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS, LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT); - LOG.info("AMReporter running with NodeId: {}", nodeId); + } @Override @@ -125,7 +127,8 @@ public class AMReporter extends AbstractService { } } }); - LOG.info("Started service: " + getName()); + nodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), localAddress.get().getPort()); + LOG.info("AMReporter running with NodeId: {}", nodeId); } @Override @@ -170,7 +173,7 @@ public class AMReporter extends AbstractService { synchronized (knownAppMasters) { amNodeInfo = knownAppMasters.get(amNodeId); if (amNodeInfo == null) { - LOG.info(("Ignoring duplocate unregisterRequest for am at: " + amLocation + ":" + port)); + LOG.info(("Ignoring duplicate unregisterRequest for am at: " + amLocation + ":" + port)); } amNodeInfo.decrementAndGetTaskCount(); // Not removing this here. Will be removed when taken off the queue and discovered to have 0 @@ -184,6 +187,9 @@ public class AMReporter extends AbstractService { // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection. LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); AMNodeInfo amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, conf); + + // Even if the service hasn't started up. It's OK to make this invocation since this will + // only happen after the AtomicReference address has been populated. Not adding an additional check. ListenableFuture<Void> future = executor.submit(new KillTaskCallable(taskAttemptId, amNodeInfo)); Futures.addCallback(future, new FutureCallback<Void>() { http://git-wip-us.apache.org/repos/asf/hive/blob/b321c55a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 6f9f429..10e192e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -25,7 +25,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; @@ -67,7 +66,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu private static final Logger LOG = LoggerFactory.getLogger(ContainerRunnerImpl.class); public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor "; - private volatile AMReporter amReporter; + private final AMReporter amReporter; private final QueryTracker queryTracker; private final Scheduler<TaskRunnerCallable> executorService; private final AtomicReference<InetSocketAddress> localAddress; @@ -81,12 +80,14 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize, boolean enablePreemption, String[] localDirsBase, int localShufflePort, AtomicReference<InetSocketAddress> localAddress, - long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics) { + long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics, + AMReporter amReporter) { super("ContainerRunnerImpl"); this.conf = conf; Preconditions.checkState(numExecutors > 0, "Invalid number of executors: " + numExecutors + ". Must be > 0"); this.localAddress = localAddress; + this.amReporter = amReporter; this.queryTracker = new QueryTracker(conf, localDirsBase); addIfService(queryTracker); @@ -122,22 +123,13 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu @Override public void serviceStart() throws Exception { - // The node id will only be available at this point, since the server has been started in LlapDaemon super.serviceStart(); - LlapNodeId llapNodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), - localAddress.get().getPort()); - this.amReporter = new AMReporter(llapNodeId, conf); - amReporter.init(conf); - amReporter.start(); + } @Override protected void serviceStop() throws Exception { super.serviceStop(); - if (amReporter != null) { - amReporter.stop(); - amReporter = null; - } } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/b321c55a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 75377d4..1801212 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -63,6 +63,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla private final Configuration shuffleHandlerConf; private final LlapDaemonProtocolServerImpl server; private final ContainerRunnerImpl containerRunner; + private final AMReporter amReporter; private final LlapRegistryService registry; private final LlapWebServices webServices; private final AtomicLong numSubmissions = new AtomicLong(0); @@ -155,8 +156,12 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla LOG.info("Started LlapMetricsSystem with displayName: " + displayName + " sessionId: " + sessionId); + + this.amReporter = new AMReporter(address, daemonConf); + + this.server = new LlapDaemonProtocolServerImpl(numHandlers, this, address, rpcPort); - addIfService(server); + this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, @@ -166,13 +171,19 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla shufflePort, address, executorMemoryBytes, - metrics); + metrics, + amReporter); addIfService(containerRunner); this.registry = new LlapRegistryService(true); addIfService(registry); this.webServices = new LlapWebServices(); addIfService(webServices); + // Bring up the server only after all other components have started. + addIfService(server); + // AMReporter after the server so that it gets the correct address. It knows how to deal with + // requests before it is started. + addIfService(amReporter); } private long getTotalHeapSize() { @@ -220,14 +231,16 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla @Override public void serviceStart() throws Exception { - super.serviceStart(); + // Start the Shuffle service before the listener - until it's a service as well. ShuffleHandler.initializeAndStart(shuffleHandlerConf); + super.serviceStart(); + LOG.info("LlapDaemon serviceStart complete"); } public void serviceStop() throws Exception { super.serviceStop(); - shutdown(); ShuffleHandler.shutdown(); + shutdown(); LOG.info("LlapDaemon shutdown complete"); }