Author: sseth Date: Tue Apr 7 20:57:02 2015 New Revision: 1671950 URL: http://svn.apache.org/r1671950 Log: HIVE-10025. LLAP: Send heartbeats for queued work. (Siddharth Seth)
Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java?rev=1671950&r1=1671949&r2=1671950&view=diff ============================================================================== --- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java (original) +++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java Tue Apr 7 20:57:02 2015 @@ -45,6 +45,9 @@ public class LlapConfiguration extends C public static final String LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED = LLAP_DAEMON_PREFIX + "shuffle.dir-watcher.enabled"; public static final boolean LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT = false; + public static final String LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS = LLAP_DAEMON_PREFIX + "liveness.heartbeat.interval-ms"; + public static final long LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000l; + // Section for configs used in AM and executors public static final String LLAP_DAEMON_NUM_EXECUTORS = LLAP_DAEMON_PREFIX + "num.executors"; Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java?rev=1671950&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java Tue Apr 7 20:57:02 2015 @@ -0,0 +1,89 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + +public class LlapNodeId { + + private static final LoadingCache<LlapNodeId, LlapNodeId> CACHE = + CacheBuilder.newBuilder().softValues().build( + new CacheLoader<LlapNodeId, LlapNodeId>() { + @Override + public LlapNodeId load(LlapNodeId key) throws Exception { + return key; + } + }); + + public static LlapNodeId getInstance(String hostname, int port) { + return CACHE.getUnchecked(new LlapNodeId(hostname, port)); + } + + + private final String hostname; + private final int port; + + + private LlapNodeId(String hostname, int port) { + this.hostname = hostname; + this.port = port; + } + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + LlapNodeId that = (LlapNodeId) o; + + if (port != that.port) { + return false; + } + if (!hostname.equals(that.hostname)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = hostname.hashCode(); + result = 1009 * result + port; + return result; + } + + @Override + public String toString() { + return "LlapNodeId{" + + "hostname='" + hostname + '\'' + + ", port=" + port + + '}'; + } +} Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java?rev=1671950&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java Tue Apr 7 20:57:02 2015 @@ -0,0 +1,318 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.CallableWithNdc; +import org.apache.hadoop.hive.llap.LlapNodeId; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.service.AbstractService; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sends status updates to various AMs. + */ +public class AMReporter extends AbstractService { + + /* + registrations and un-registrations will happen as and when tasks are submitted or are removed. + reference counting is likely required. + + A connection needs to be established to each app master. + + Ignore exceptions when communicating with the AM. + At a later point, report back saying the AM is dead so that tasks can be removed from the running queue. + + Use a cachedThreadPool so that a few AMs going down does not affect other AppMasters. + + Race: When a task completes - it sends out it's message via the regular TaskReporter. The AM after this may run another DAG, + or may die. This may need to be consolidated with the LlapTaskReporter. Try ensuring there's no race between the two. + + Single thread which sends heartbeats to AppMasters as events drain off a queue. + */ + + private static final Logger LOG = LoggerFactory.getLogger(AMReporter.class); + + private final LlapNodeId nodeId; + private final Configuration conf; + private final ListeningExecutorService queueLookupExecutor; + private final ListeningExecutorService executor; + private final DelayQueue<AMNodeInfo> pendingHeartbeatQueeu = new DelayQueue(); + private final long heartbeatInterval; + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>(); + volatile ListenableFuture<Void> queueLookupFuture; + + public AMReporter(LlapNodeId nodeId, Configuration conf) { + super(AMReporter.class.getName()); + this.nodeId = nodeId; + this.conf = conf; + ExecutorService rawExecutor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build()); + this.executor = MoreExecutors.listeningDecorator(rawExecutor); + ExecutorService rawExecutor2 = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporterQueueDrainer").build()); + this.queueLookupExecutor = MoreExecutors.listeningDecorator(rawExecutor2); + this.heartbeatInterval = + conf.getLong(LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS, + LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT); + } + + @Override + public void serviceStart() { + QueueLookupCallable queueDrainerCallable = new QueueLookupCallable(); + queueLookupFuture = queueLookupExecutor.submit(queueDrainerCallable); + Futures.addCallback(queueLookupFuture, new FutureCallback<Void>() { + @Override + public void onSuccess(Void result) { + LOG.info("AMReporter QueueDrainer exited"); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("AMReporter QueueDrainer exited with error", t); + } + }); + LOG.info("Started service: " + getName()); + } + + @Override + public void serviceStop() { + if (!isShutdown.getAndSet(true)) { + if (queueLookupFuture != null) { + queueLookupFuture.cancel(true); + } + queueLookupExecutor.shutdownNow(); + executor.shutdownNow(); + LOG.info("Stopped service: " + getName()); + } + } + + + public void registerTask(String amLocation, int port, String user, Token<JobTokenIdentifier> jobToken) { + if (LOG.isTraceEnabled()) { + LOG.trace("Registering for heartbeat: " + amLocation + ":" + port); + } + AMNodeInfo amNodeInfo; + synchronized (knownAppMasters) { + LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); + amNodeInfo = knownAppMasters.get(amNodeId); + if (amNodeInfo == null) { + amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, conf); + knownAppMasters.put(amNodeId, amNodeInfo); + // Add to the queue only the first time this is registered, and on + // subsequent instances when it's taken off the queue. + amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + heartbeatInterval); + pendingHeartbeatQueeu.add(amNodeInfo); + } + amNodeInfo.incrementAndGetTaskCount(); + } + } + + public void unregisterTask(String amLocation, int port) { + if (LOG.isTraceEnabled()) { + LOG.trace("Un-registering for heartbeat: " + amLocation + ":" + port); + } + AMNodeInfo amNodeInfo; + LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); + synchronized (knownAppMasters) { + amNodeInfo = knownAppMasters.get(amNodeId); + if (amNodeInfo == null) { + LOG.error(("Ignoring unexpected unregisterRequest for am at: " + amLocation + ":" + port)); + } + amNodeInfo.decrementAndGetTaskCount(); + // Not removing this here. Will be removed when taken off the queue and discovered to have 0 + // pending tasks. + } + } + + private class QueueLookupCallable extends CallableWithNdc<Void> { + + @Override + protected Void callInternal() { + while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { + try { + AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take(); + if (amNodeInfo.getTaskCount() == 0) { + synchronized (knownAppMasters) { + knownAppMasters.remove(amNodeInfo.amNodeId); + } + amNodeInfo.stopUmbilical(); + } else { + executor.submit(new AMHeartbeatCallable(amNodeInfo)); + } + } catch (InterruptedException e) { + if (isShutdown.get()) { + LOG.info("QueueLookup thread interrupted after shutdown"); + } else { + LOG.warn("Received unexpected interrupt while waiting on heartbeat queue"); + } + } + + } + return null; + } + } + + private class AMHeartbeatCallable extends CallableWithNdc<Void> { + + final AMNodeInfo amNodeInfo; + + public AMHeartbeatCallable(AMNodeInfo amNodeInfo) { + this.amNodeInfo = amNodeInfo; + } + + @Override + protected Void callInternal() { + if (LOG.isTraceEnabled()) { + LOG.trace("Attempting to heartbeat to AM: " + amNodeInfo); + } + if (amNodeInfo.getTaskCount() > 0) { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("NodeHeartbeat to: " + amNodeInfo); + } + amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()), + nodeId.getPort()); + } catch (IOException e) { + // TODO Ideally, this could be used to avoid running a task - AM down / unreachable, so there's no point running it. + LOG.warn("Failed to communicate with AM. May retry later: " + amNodeInfo.amNodeId, e); + } catch (InterruptedException e) { + if (!isShutdown.get()) { + LOG.warn("Failed to communicate with AM: " + amNodeInfo.amNodeId, e); + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping node heartbeat to AM: " + amNodeInfo + ", since ref count is 0"); + } + } + return null; + } + } + + + + private static class AMNodeInfo implements Delayed { + private final AtomicInteger taskCount = new AtomicInteger(0); + private final String user; + private final Token<JobTokenIdentifier> jobToken; + private final Configuration conf; + private final LlapNodeId amNodeId; + private LlapTaskUmbilicalProtocol umbilical; + private long nextHeartbeatTime; + + + public AMNodeInfo(LlapNodeId amNodeId, String user, + Token<JobTokenIdentifier> jobToken, + Configuration conf) { + this.user = user; + this.jobToken = jobToken; + this.conf = conf; + this.amNodeId = amNodeId; + } + + synchronized LlapTaskUmbilicalProtocol getUmbilical() throws IOException, InterruptedException { + if (umbilical == null) { + final InetSocketAddress address = + NetUtils.createSocketAddrForHost(amNodeId.getHostname(), amNodeId.getPort()); + SecurityUtil.setTokenService(this.jobToken, address); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); + ugi.addToken(jobToken); + umbilical = ugi.doAs(new PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() { + @Override + public LlapTaskUmbilicalProtocol run() throws Exception { + return RPC.getProxy(LlapTaskUmbilicalProtocol.class, + LlapTaskUmbilicalProtocol.versionID, address, conf); + } + }); + } + return umbilical; + } + + synchronized void stopUmbilical() { + if (umbilical != null) { + RPC.stopProxy(umbilical); + } + umbilical = null; + } + + int incrementAndGetTaskCount() { + return taskCount.incrementAndGet(); + } + + int decrementAndGetTaskCount() { + return taskCount.decrementAndGet(); + } + + int getTaskCount() { + return taskCount.get(); + } + + synchronized void setNextHeartbeatTime(long nextTime) { + nextHeartbeatTime = nextTime; + } + + @Override + public long getDelay(TimeUnit unit) { + return 0; + } + + @Override + public int compareTo(Delayed o) { + AMNodeInfo other = (AMNodeInfo)o; + if (this.nextHeartbeatTime > other.nextHeartbeatTime) { + return 1; + } else if (this.nextHeartbeatTime < other.nextHeartbeatTime) { + return -1; + } else { + return 0; + } + } + + @Override + public String toString() { + return "AMInfo: " + amNodeId + ", taskCount=" + getTaskCount(); + } + } +} Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java?rev=1671950&r1=1671949&r2=1671950&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java Tue Apr 7 20:57:02 2015 @@ -33,14 +33,15 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.CallableWithNdc; +import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.hive.llap.tezplugins.Converters; import org.apache.hadoop.io.DataInputBuffer; @@ -56,7 +57,6 @@ import org.apache.hadoop.yarn.util.Auxil import org.apache.log4j.Logger; import org.apache.log4j.NDC; import org.apache.tez.common.TezCommonUtils; -import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; @@ -65,7 +65,7 @@ import org.apache.tez.dag.api.TezExcepti import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; -import org.apache.tez.runtime.task.TaskReporter; +import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult; import com.google.common.base.Preconditions; @@ -84,31 +84,27 @@ public class ContainerRunnerImpl extends public static final String THREAD_NAME_FORMAT = THREAD_NAME_FORMAT_PREFIX + "%d"; private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class); + private volatile AMReporter amReporter; private final ListeningExecutorService executorService; private final AtomicReference<InetSocketAddress> localAddress; private final String[] localDirsBase; - private final Map<String, String> localEnv = new HashMap<String, String>(); - private volatile FileSystem localFs; + private final Map<String, String> localEnv = new HashMap<>(); + private final FileSystem localFs; private final long memoryPerExecutor; private final LlapDaemonExecutorMetrics metrics; + private final Configuration conf; private final ConfParams confParams; // TODO Support for removing queued containers, interrupting / killing specific containers - public ContainerRunnerImpl(int numExecutors, String[] localDirsBase, int localShufflePort, + public ContainerRunnerImpl(Configuration conf, int numExecutors, String[] localDirsBase, int localShufflePort, AtomicReference<InetSocketAddress> localAddress, long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics) { super("ContainerRunnerImpl"); + this.conf = conf; Preconditions.checkState(numExecutors > 0, "Invalid number of executors: " + numExecutors + ". Must be > 0"); this.localDirsBase = localDirsBase; this.localAddress = localAddress; - this.confParams = new ConfParams(); - // Setup to defaults to start with - confParams.amMaxEventsPerHeartbeat = TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT; - confParams.amHeartbeatIntervalMsMax = - TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT; - confParams.amCounterHeartbeatInterval = - TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT; ExecutorService raw = Executors.newFixedThreadPool(numExecutors, new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build()); @@ -121,36 +117,42 @@ public class ContainerRunnerImpl extends // TODO Tune this based on the available size. this.memoryPerExecutor = (long)(totalMemoryAvailableBytes * 0.8 / (float) numExecutors); this.metrics = metrics; - LOG.info("ContainerRunnerImpl config: " + - "memoryPerExecutorDerviced=" + memoryPerExecutor - ); - } - @Override - public void serviceInit(Configuration conf) { try { localFs = FileSystem.getLocal(conf); - // TODO Fix visibility of these parameters - which - confParams.amCounterHeartbeatInterval = conf.getLong( - TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS, - TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT); - confParams.amHeartbeatIntervalMsMax = - conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS, - TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT); - confParams.amMaxEventsPerHeartbeat = - conf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT, - TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT); } catch (IOException e) { throw new RuntimeException("Failed to setup local filesystem instance", e); } + confParams = new ConfParams( + conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS, + TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT), + conf.getLong( + TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS, + TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT), + conf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT, + TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT) + ); + + LOG.info("ContainerRunnerImpl config: " + + "memoryPerExecutorDerviced=" + memoryPerExecutor + ); } @Override public void serviceStart() { + // The node id will only be available at this point, since the server has been started in LlapDaemon + LlapNodeId llapNodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), localAddress.get().getPort()); + this.amReporter = new AMReporter(llapNodeId, conf); + amReporter.init(conf); + amReporter.start(); } @Override protected void serviceStop() throws Exception { + if (amReporter != null) { + amReporter.stop(); + amReporter = null; + } super.serviceStop(); } @@ -198,11 +200,8 @@ public class ContainerRunnerImpl extends if (LOG.isDebugEnabled()) { LOG.debug("Dirs are: " + Arrays.toString(localDirs)); } - - - // Setup workingDir. This is otherwise setup as Environment.PWD + // May need to setup localDir for re-localization, which is usually setup as Environment.PWD. // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream) - String workingDir = localDirs[0]; Credentials credentials = new Credentials(); DataInputBuffer dib = new DataInputBuffer(); @@ -218,7 +217,7 @@ public class ContainerRunnerImpl extends TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()), new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs, - workingDir, credentials, memoryPerExecutor, confParams); + credentials, memoryPerExecutor, amReporter, confParams); ListenableFuture<ContainerExecutionResult> future = executorService.submit(callable); Futures.addCallback(future, new TaskRunnerCallback(request, callable)); metrics.incrExecutorTotalRequestsHandled(); @@ -232,7 +231,6 @@ public class ContainerRunnerImpl extends private final SubmitWorkRequestProto request; private final Configuration conf; - private final String workingDir; private final String[] localDirs; private final Map<String, String> envMap; private final String pid = null; @@ -240,29 +238,46 @@ public class ContainerRunnerImpl extends private final ExecutionContext executionContext; private final Credentials credentials; private final long memoryAvailable; - private final ListeningExecutorService executor; private final ConfParams confParams; + private final Token<JobTokenIdentifier> jobToken; + private final AMReporter amReporter; private volatile TezTaskRunner taskRunner; - private volatile TaskReporter taskReporter; - private TezTaskUmbilicalProtocol umbilical; + private volatile TaskReporterInterface taskReporter; + private volatile ListeningExecutorService executor; + private LlapTaskUmbilicalProtocol umbilical; private volatile long startTime; private volatile String threadName; + private volatile boolean cancelled = false; + TaskRunnerCallable(SubmitWorkRequestProto request, Configuration conf, ExecutionContext executionContext, Map<String, String> envMap, - String[] localDirs, String workingDir, Credentials credentials, - long memoryAvailable, ConfParams confParams) { + String[] localDirs, Credentials credentials, + long memoryAvailable, AMReporter amReporter, ConfParams confParams) { this.request = request; this.conf = conf; this.executionContext = executionContext; this.envMap = envMap; - this.workingDir = workingDir; this.localDirs = localDirs; this.objectRegistry = new ObjectRegistryImpl(); this.credentials = credentials; this.memoryAvailable = memoryAvailable; this.confParams = confParams; + jobToken = TokenCache.getSessionToken(credentials); + this.amReporter = amReporter; + // Register with the AMReporter when the callable is setup. Unregister once it starts running. + this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken); + } + + @Override + protected ContainerExecutionResult callInternal() throws Exception { + this.startTime = System.currentTimeMillis(); + this.threadName = Thread.currentThread().getName(); + + // Unregister from the AMReporter, since the task is now running. + this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort()); + // TODO This executor seems unnecessary. Here and TezChild ExecutorService executorReal = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() @@ -271,51 +286,45 @@ public class ContainerRunnerImpl extends "TezTaskRunner_" + request.getFragmentSpec().getTaskAttemptIdString()) .build()); executor = MoreExecutors.listeningDecorator(executorReal); - } - @Override - protected ContainerExecutionResult callInternal() throws Exception { - this.startTime = System.currentTimeMillis(); - this.threadName = Thread.currentThread().getName(); // TODO Consolidate this code with TezChild. Stopwatch sw = new Stopwatch().start(); - UserGroupInformation taskUgi = UserGroupInformation.createRemoteUser(request.getUser()); - taskUgi.addCredentials(credentials); + UserGroupInformation taskUgi = UserGroupInformation.createRemoteUser(request.getUser()); + taskUgi.addCredentials(credentials); - Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); - Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>(); - serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, - TezCommonUtils.convertJobTokenToBytes(jobToken)); - Multimap<String, String> startedInputsMap = HashMultimap.create(); - - UserGroupInformation taskOwner = - UserGroupInformation.createRemoteUser(request.getTokenIdentifier()); - final InetSocketAddress address = - NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort()); - SecurityUtil.setTokenService(jobToken, address); - taskOwner.addToken(jobToken); - umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() { - @Override - public TezTaskUmbilicalProtocol run() throws Exception { - return RPC.getProxy(TezTaskUmbilicalProtocol.class, - TezTaskUmbilicalProtocol.versionID, address, conf); - } - }); - - taskReporter = new TaskReporter( - umbilical, - confParams.amHeartbeatIntervalMsMax, - confParams.amCounterHeartbeatInterval, - confParams.amMaxEventsPerHeartbeat, - new AtomicLong(0), - request.getContainerIdString()); - - taskRunner = new TezTaskRunner(conf, taskUgi, localDirs, - Converters.getTaskSpecfromProto(request.getFragmentSpec()), umbilical, - request.getAppAttemptNumber(), - serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, - pid, - executionContext, memoryAvailable); + Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>(); + serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, + TezCommonUtils.convertJobTokenToBytes(jobToken)); + Multimap<String, String> startedInputsMap = HashMultimap.create(); + + UserGroupInformation taskOwner = + UserGroupInformation.createRemoteUser(request.getTokenIdentifier()); + final InetSocketAddress address = + NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort()); + SecurityUtil.setTokenService(jobToken, address); + taskOwner.addToken(jobToken); + umbilical = taskOwner.doAs(new PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() { + @Override + public LlapTaskUmbilicalProtocol run() throws Exception { + return RPC.getProxy(LlapTaskUmbilicalProtocol.class, + LlapTaskUmbilicalProtocol.versionID, address, conf); + } + }); + + taskReporter = new LlapTaskReporter( + umbilical, + confParams.amHeartbeatIntervalMsMax, + confParams.amCounterHeartbeatInterval, + confParams.amMaxEventsPerHeartbeat, + new AtomicLong(0), + request.getContainerIdString()); + + taskRunner = new TezTaskRunner(conf, taskUgi, localDirs, + Converters.getTaskSpecfromProto(request.getFragmentSpec()), + request.getAppAttemptNumber(), + serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, + pid, + executionContext, memoryAvailable); boolean shouldDie; try { @@ -398,6 +407,7 @@ public class ContainerRunnerImpl extends @Override public void onFailure(Throwable t) { LOG.error("TezTaskRunner execution failed for : " + getTaskIdentifierString(request), t); + // TODO HIVE-10236 Report a fatal error over the umbilical taskRunnerCallable.shutdown(); HistoryLogger .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), @@ -422,9 +432,16 @@ public class ContainerRunnerImpl extends } private static class ConfParams { - int amHeartbeatIntervalMsMax; - long amCounterHeartbeatInterval; - int amMaxEventsPerHeartbeat; + final int amHeartbeatIntervalMsMax; + final long amCounterHeartbeatInterval; + final int amMaxEventsPerHeartbeat; + + public ConfParams(int amHeartbeatIntervalMsMax, long amCounterHeartbeatInterval, + int amMaxEventsPerHeartbeat) { + this.amHeartbeatIntervalMsMax = amHeartbeatIntervalMsMax; + this.amCounterHeartbeatInterval = amCounterHeartbeatInterval; + this.amMaxEventsPerHeartbeat = amMaxEventsPerHeartbeat; + } } private String stringifyRequest(SubmitWorkRequestProto request) { Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java?rev=1671950&r1=1671949&r2=1671950&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java Tue Apr 7 20:57:02 2015 @@ -131,7 +131,7 @@ public class LlapDaemon extends Abstract LOG.info("Started LlapMetricsSystem with displayName: " + displayName + " sessionId: " + sessionId); - this.containerRunner = new ContainerRunnerImpl(numExecutors, localDirs, shufflePort, address, + this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, localDirs, shufflePort, address, executorMemoryBytes, metrics); this.registry = new LlapRegistryService(); Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java?rev=1671950&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java Tue Apr 7 20:57:02 2015 @@ -0,0 +1,414 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; +import org.apache.hadoop.io.Text; +import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.RuntimeTask; +import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; +import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; +import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; +import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; +import org.apache.tez.runtime.internals.api.TaskReporterInterface; +import org.apache.tez.runtime.task.ErrorReporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Responsible for communication between tasks running in a Container and the ApplicationMaster. + * Takes care of sending heartbeats (regular and OOB) to the AM - to send generated events, and to + * retrieve events specific to this task. + * + */ +public class LlapTaskReporter implements TaskReporterInterface { + + private static final Logger LOG = LoggerFactory.getLogger(LlapTaskReporter.class); + + private final LlapTaskUmbilicalProtocol umbilical; + private final long pollInterval; + private final long sendCounterInterval; + private final int maxEventsToGet; + private final AtomicLong requestCounter; + private final String containerIdStr; + + private final ListeningExecutorService heartbeatExecutor; + + @VisibleForTesting + HeartbeatCallable currentCallable; + + public LlapTaskReporter(LlapTaskUmbilicalProtocol umbilical, long amPollInterval, + long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) { + this.umbilical = umbilical; + this.pollInterval = amPollInterval; + this.sendCounterInterval = sendCounterInterval; + this.maxEventsToGet = maxEventsToGet; + this.requestCounter = requestCounter; + this.containerIdStr = containerIdStr; + ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("TaskHeartbeatThread").build()); + heartbeatExecutor = MoreExecutors.listeningDecorator(executor); + } + + /** + * Register a task to be tracked. Heartbeats will be sent out for this task to fetch events, etc. + */ + @Override + public synchronized void registerTask(RuntimeTask task, + ErrorReporter errorReporter) { + currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval, + maxEventsToGet, requestCounter, containerIdStr); + ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable); + Futures.addCallback(future, new HeartbeatCallback(errorReporter)); + } + + /** + * This method should always be invoked before setting up heartbeats for another task running in + * the same container. + */ + @Override + public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) { + currentCallable.markComplete(); + currentCallable = null; + } + + @Override + public void shutdown() { + heartbeatExecutor.shutdownNow(); + } + + @VisibleForTesting + static class HeartbeatCallable implements Callable<Boolean> { + + private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds + private static final float LOG_COUNTER_BACKOFF = 1.3f; + + private final RuntimeTask task; + private EventMetaData updateEventMetadata; + + private final LlapTaskUmbilicalProtocol umbilical; + + private final long pollInterval; + private final long sendCounterInterval; + private final int maxEventsToGet; + private final String containerIdStr; + + private final AtomicLong requestCounter; + + private LinkedBlockingQueue<TezEvent> eventsToSend = new LinkedBlockingQueue<TezEvent>(); + + private final ReentrantLock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + + /* + * Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send / + * log counters. + */ + private AtomicInteger nonOobHeartbeatCounter = new AtomicInteger(0); + private int nextHeartbeatNumToLog = 0; + /* + * Tracks the last non-OOB heartbeat number at which counters were sent to the AM. + */ + private int prevCounterSendHeartbeatNum = 0; + + public HeartbeatCallable(RuntimeTask task, + LlapTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval, + int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) { + + this.pollInterval = amPollInterval; + this.sendCounterInterval = sendCounterInterval; + this.maxEventsToGet = maxEventsToGet; + this.requestCounter = requestCounter; + this.containerIdStr = containerIdStr; + + this.task = task; + this.umbilical = umbilical; + this.updateEventMetadata = new EventMetaData(EventProducerConsumerType.SYSTEM, + task.getVertexName(), "", task.getTaskAttemptID()); + + nextHeartbeatNumToLog = (Math.max(1, + (int) (LOG_COUNTER_START_INTERVAL / (amPollInterval == 0 ? 0.000001f + : (float) amPollInterval)))); + } + + @Override + public Boolean call() throws Exception { + // Heartbeat only for active tasks. Errors, etc will be reported directly. + while (!task.isTaskDone() && !task.hadFatalError()) { + ResponseWrapper response = heartbeat(null); + + if (response.shouldDie) { + // AM sent a shouldDie=true + LOG.info("Asked to die via task heartbeat"); + return false; + } else { + if (response.numEvents < maxEventsToGet) { + // Wait before sending another heartbeat. Otherwise consider as an OOB heartbeat + lock.lock(); + try { + boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS); + if (!interrupted) { + nonOobHeartbeatCounter.incrementAndGet(); + } + } finally { + lock.unlock(); + } + } + } + } + int pendingEventCount = eventsToSend.size(); + if (pendingEventCount > 0) { + LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount); + } + return true; + } + + /** + * @param eventsArg + * @return + * @throws IOException + * indicates an RPC communication failure. + * @throws TezException + * indicates an exception somewhere in the AM. + */ + private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) throws IOException, + TezException { + + if (eventsArg != null) { + eventsToSend.addAll(eventsArg); + } + + TezEvent updateEvent = null; + List<TezEvent> events = new ArrayList<TezEvent>(); + eventsToSend.drainTo(events); + + if (!task.isTaskDone() && !task.hadFatalError()) { + TezCounters counters = null; + /** + * Increasing the heartbeat interval can delay the delivery of events. Sending just updated + * records would save CPU in DAG AM, but certain counters are updated very frequently. Until + * real time decisions are made based on these counters, it can be sent once per second. + */ + // Not completely accurate, since OOB heartbeats could go out. + if ((nonOobHeartbeatCounter.get() - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) { + counters = task.getCounters(); + prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get(); + } + updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()), + updateEventMetadata); + events.add(updateEvent); + } + + long requestId = requestCounter.incrementAndGet(); + TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr, + task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet); + if (LOG.isDebugEnabled()) { + LOG.debug("Sending heartbeat to AM, request=" + request); + } + + maybeLogCounters(); + + TezHeartbeatResponse response = umbilical.heartbeat(request); + if (LOG.isDebugEnabled()) { + LOG.debug("Received heartbeat response from AM, response=" + response); + } + + if (response.shouldDie()) { + LOG.info("Received should die response from AM"); + return new ResponseWrapper(true, 1); + } + if (response.getLastRequestId() != requestId) { + throw new TezException("AM and Task out of sync" + ", responseReqId=" + + response.getLastRequestId() + ", expectedReqId=" + requestId); + } + + // The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks + // are running using the same umbilical. + int numEventsReceived = 0; + if (task.isTaskDone() || task.hadFatalError()) { + if (response.getEvents() != null && !response.getEvents().isEmpty()) { + LOG.warn("Current task already complete, Ignoring all event in" + + " heartbeat response, eventCount=" + response.getEvents().size()); + } + } else { + if (response.getEvents() != null && !response.getEvents().isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId=" + + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size()); + } + // This should ideally happen in a separate thread + numEventsReceived = response.getEvents().size(); + task.handleEvents(response.getEvents()); + } + } + return new ResponseWrapper(false, numEventsReceived); + } + + public void markComplete() { + // Notify to clear pending events, if any. + lock.lock(); + try { + condition.signal(); + } finally { + lock.unlock(); + } + } + + private void maybeLogCounters() { + if (LOG.isDebugEnabled()) { + if (nonOobHeartbeatCounter.get() == nextHeartbeatNumToLog) { + LOG.debug("Counters: " + task.getCounters().toShortString()); + nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF)); + } + } + } + + /** + * Sends out final events for task success. + * @param taskAttemptID + * @return + * @throws IOException + * indicates an RPC communication failure. + * @throws TezException + * indicates an exception somewhere in the AM. + */ + private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { + TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(), + task.getProgress()), updateEventMetadata); + TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(), + updateEventMetadata); + return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie; + } + + /** + * Sends out final events for task failure. + * @param taskAttemptID + * @param t + * @param diagnostics + * @param srcMeta + * @return + * @throws IOException + * indicates an RPC communication failure. + * @throws TezException + * indicates an exception somewhere in the AM. + */ + private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, + EventMetaData srcMeta) throws IOException, TezException { + TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(), + task.getProgress()), updateEventMetadata); + if (diagnostics == null) { + diagnostics = ExceptionUtils.getStackTrace(t); + } else { + diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t); + } + TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics), + srcMeta == null ? updateEventMetadata : srcMeta); + return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie; + } + + private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) { + if (events != null && !events.isEmpty()) { + eventsToSend.addAll(events); + } + } + } + + private static class HeartbeatCallback implements FutureCallback<Boolean> { + + private final ErrorReporter errorReporter; + + HeartbeatCallback(ErrorReporter errorReporter) { + this.errorReporter = errorReporter; + } + + @Override + public void onSuccess(Boolean result) { + if (result == false) { + errorReporter.shutdownRequested(); + } + } + + @Override + public void onFailure(Throwable t) { + errorReporter.reportError(t); + } + } + + public boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { + return currentCallable.taskSucceeded(taskAttemptID); + } + + @Override + public boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, + EventMetaData srcMeta) throws IOException, TezException { + return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta); + } + + @Override + public void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) { + currentCallable.addEvents(taskAttemptID, events); + } + + @Override + public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException { + return umbilical.canCommit(taskAttemptID); + } + + private static final class ResponseWrapper { + boolean shouldDie; + int numEvents; + + private ResponseWrapper(boolean shouldDie, int numEvents) { + this.shouldDie = shouldDie; + this.numEvents = numEvents; + } + } +} Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java?rev=1671950&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java Tue Apr 7 20:57:02 2015 @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.protocol; + +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; + +public interface LlapTaskUmbilicalProtocol extends VersionedProtocol { + + public static final long versionID = 1L; + + // From Tez. Eventually changes over to the LLAP protocol and ProtocolBuffers + boolean canCommit(TezTaskAttemptID taskid) throws IOException; + public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) + throws IOException, TezException; + + public void nodeHeartbeat(Text hostname, int port); + +} Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java?rev=1671950&r1=1671949&r2=1671950&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java Tue Apr 7 20:57:02 2015 @@ -20,26 +20,42 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; import com.google.protobuf.ByteString; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { @@ -48,12 +64,16 @@ public class LlapTaskCommunicator extend private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST; private final ConcurrentMap<String, ByteBuffer> credentialMap; + private final EntityTracker entityTracker = new EntityTracker(); + private TaskCommunicator communicator; + private final LlapTaskUmbilicalProtocol umbilical; public LlapTaskCommunicator( TaskCommunicatorContext taskCommunicatorContext) { super(taskCommunicatorContext); + umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical()); SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder(); // TODO Avoid reading this from the environment @@ -92,15 +112,45 @@ public class LlapTaskCommunicator extend } } + @Override + protected void startRpcServer() { + Configuration conf = getConfig(); + try { + JobTokenSecretManager jobTokenSecretManager = + new JobTokenSecretManager(); + jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken); + + server = new RPC.Builder(conf) + .setProtocol(LlapTaskUmbilicalProtocol.class) + .setBindAddress("0.0.0.0") + .setPort(0) + .setInstance(umbilical) + .setNumHandlers( + conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT, + TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT)) + .setSecretManager(jobTokenSecretManager).build(); + + // Do serviceACLs need to be refreshed, like in Tez ? + + server.start(); + this.address = NetUtils.getConnectAddress(server); + LOG.info("Started LlapUmbilical: " + umbilical.getClass().getName() + " at address: " + address); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + } @Override public void registerRunningContainer(ContainerId containerId, String hostname, int port) { super.registerRunningContainer(containerId, hostname, port); + entityTracker.registerContainer(containerId, hostname, port); + } @Override public void registerContainerEnd(ContainerId containerId) { super.registerContainerEnd(containerId); + entityTracker.unregisterContainer(containerId); } @Override @@ -111,6 +161,7 @@ public class LlapTaskCommunicator extend int priority) { super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority); + SubmitWorkRequestProto requestProto; try { requestProto = constructSubmitWorkRequest(containerId, taskSpec); @@ -130,6 +181,9 @@ public class LlapTaskCommunicator extend throw new RuntimeException("ContainerInfo not found for container: " + containerId + ", while trying to launch task: " + taskSpec.getTaskAttemptID()); } + + entityTracker.registerTaskAttempt(containerId, taskSpec.getTaskAttemptID(), host, port); + // Have to register this up front right now. Otherwise, it's possible for the task to start // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them. getTaskCommunicatorContext() @@ -180,7 +234,10 @@ public class LlapTaskCommunicator extend @Override public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) { super.unregisterRunningTaskAttempt(taskAttemptID); - // Nothing else to do for now. The push API in the test does not support termination of a running task + entityTracker.unregisterTaskAttempt(taskAttemptID); + // TODO Inform the daemon that this task is no longer running. + // Currently, a task will end up moving into the RUNNING queue and will + // be told that it needs to die since it isn't recognized. } private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId, @@ -214,4 +271,144 @@ public class LlapTaskCommunicator extend containerCredentials.writeTokenStorageToStream(containerTokens_dob); return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength()); } + + protected class LlapTaskUmbilicalProtocolImpl implements LlapTaskUmbilicalProtocol { + + private final TezTaskUmbilicalProtocol tezUmbilical; + + public LlapTaskUmbilicalProtocolImpl(TezTaskUmbilicalProtocol tezUmbilical) { + this.tezUmbilical = tezUmbilical; + } + + @Override + public boolean canCommit(TezTaskAttemptID taskid) throws IOException { + return tezUmbilical.canCommit(taskid); + } + + @Override + public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, + TezException { + return tezUmbilical.heartbeat(request); + } + + @Override + public void nodeHeartbeat(Text hostname, int port) { + entityTracker.nodePinged(hostname.toString(), port); + if (LOG.isDebugEnabled()) { + LOG.debug("Received heartbeat from [" + hostname + ":" + port +"]"); + } + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return versionID; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int clientMethodsHash) throws IOException { + return ProtocolSignature.getProtocolSignature(this, protocol, + clientVersion, clientMethodsHash); + } + } + + private final class EntityTracker { + private final ConcurrentMap<TezTaskAttemptID, LlapNodeId> attemptToNodeMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<ContainerId, LlapNodeId> containerToNodeMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<LlapNodeId, BiMap<ContainerId, TezTaskAttemptID>> nodeMap = new ConcurrentHashMap<>(); + + void registerTaskAttempt(ContainerId containerId, TezTaskAttemptID taskAttemptId, String host, int port) { + if (LOG.isDebugEnabled()) { + LOG.debug("Registering " + containerId + ", " + taskAttemptId + " for node: " + host + ":" + port); + } + LlapNodeId llapNodeId = LlapNodeId.getInstance(host, port); + attemptToNodeMap.putIfAbsent(taskAttemptId, llapNodeId); + BiMap<ContainerId, TezTaskAttemptID> tmpMap = HashBiMap.create(); + BiMap<ContainerId, TezTaskAttemptID> old = nodeMap.putIfAbsent(llapNodeId, tmpMap); + BiMap<ContainerId, TezTaskAttemptID> usedInstance; + usedInstance = old == null ? tmpMap : old; + synchronized(usedInstance) { + usedInstance.put(containerId, taskAttemptId); + } + // Make sure to put the instance back again, in case it was removed as part of a + // containerEnd/taskEnd invocation. + nodeMap.putIfAbsent(llapNodeId, usedInstance); + } + + void unregisterTaskAttempt(TezTaskAttemptID attemptId) { + LlapNodeId llapNodeId = attemptToNodeMap.remove(attemptId); + if (llapNodeId == null) { + // Possible since either container / task can be unregistered. + return; + } + + BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId); + ContainerId matched = null; + if (bMap != null) { + synchronized(bMap) { + matched = bMap.inverse().remove(attemptId); + } + } + // Removing here. Registration into the map has to make sure to put + if (bMap.isEmpty()) { + nodeMap.remove(llapNodeId); + } + + // Remove the container mapping + if (matched != null) { + containerToNodeMap.remove(matched); + } + + } + + void registerContainer(ContainerId containerId, String hostname, int port) { + containerToNodeMap.putIfAbsent(containerId, LlapNodeId.getInstance(hostname, port)); + } + + void unregisterContainer(ContainerId containerId) { + LlapNodeId llapNodeId = containerToNodeMap.remove(containerId); + if (llapNodeId == null) { + // Possible since either container / task can be unregistered. + return; + } + + BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId); + TezTaskAttemptID matched = null; + if (bMap != null) { + synchronized(bMap) { + matched = bMap.remove(containerId); + } + } + // Removing here. Registration into the map has to make sure to put + if (bMap.isEmpty()) { + nodeMap.remove(llapNodeId); + } + + // Remove the container mapping + if (matched != null) { + attemptToNodeMap.remove(matched); + } + } + + private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0); + void nodePinged(String hostname, int port) { + LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port); + BiMap<ContainerId, TezTaskAttemptID> biMap = nodeMap.get(nodeId); + if (biMap != null) { + synchronized(biMap) { + for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) { + getTaskCommunicatorContext().taskAlive(entry.getValue()); + getTaskCommunicatorContext().containerAlive(entry.getKey()); + } + } + } else { + if (System.currentTimeMillis() > nodeNotFoundLogTime.get() + 5000l) { + LOG.warn("Recevied ping from unknown node: " + hostname + ":" + port + + ". Could be caused by pre-emption by the AM," + + " or a mismatched hostname. Enable debug logging for mismatched host names"); + nodeNotFoundLogTime.set(System.currentTimeMillis()); + } + } + } + } } Modified: hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java?rev=1671950&r1=1671949&r2=1671950&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java Tue Apr 7 20:57:02 2015 @@ -15,6 +15,9 @@ package org.apache.tez.dag.app.rm; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.Comparator; import java.util.EnumSet; @@ -52,6 +55,7 @@ import com.google.common.util.concurrent import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.types.AddressTypes; import org.apache.hadoop.registry.client.types.Endpoint; @@ -194,6 +198,20 @@ public class LlapTaskSchedulerService ex Preconditions.checkState(hosts != null && hosts.length != 0, LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS + "must be defined"); for (String host : hosts) { + // If reading addresses from conf, try resolving local addresses so that + // this matches with the address reported by daemons. + InetAddress inetAddress = null; + try { + inetAddress = InetAddress.getByName(host); + if (NetUtils.isLocalAddress(inetAddress)) { + InetSocketAddress socketAddress = new InetSocketAddress(0); + socketAddress = NetUtils.getConnectAddress(socketAddress); + LOG.info("Adding host identified as local: " + host + " as " + socketAddress.getHostName()); + host = socketAddress.getHostName(); + } + } catch (UnknownHostException e) { + LOG.warn("Ignoring resolution issues for host: " + host, e); + } NodeInfo nodeInfo = new NodeInfo(host, BACKOFF_FACTOR, clock); activeHosts.put(host, nodeInfo); allHosts.put(host, nodeInfo); @@ -780,7 +798,6 @@ public class LlapTaskSchedulerService ex numLocalAllocations++; _registerAllocationInHostMap(allocatedHost, localityBasedNumAllocationsPerHost); } else { - // KKK TODO Log all non-local allocations numNonLocalAllocations++; } } else {