Author: sseth
Date: Tue Apr  7 20:57:02 2015
New Revision: 1671950

URL: http://svn.apache.org/r1671950
Log:
HIVE-10025. LLAP: Send heartbeats for queued work. (Siddharth Seth)

Added:
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
Modified:
    
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
    
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java

Modified: 
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java?rev=1671950&r1=1671949&r2=1671950&view=diff
==============================================================================
--- 
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
 (original)
+++ 
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
 Tue Apr  7 20:57:02 2015
@@ -45,6 +45,9 @@ public class LlapConfiguration extends C
   public static final String LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED = 
LLAP_DAEMON_PREFIX + "shuffle.dir-watcher.enabled";
   public static final boolean LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT 
= false;
 
+  public static final String LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS = 
LLAP_DAEMON_PREFIX + "liveness.heartbeat.interval-ms";
+  public static final long LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT 
= 5000l;
+
 
   // Section for configs used in AM and executors
   public static final String LLAP_DAEMON_NUM_EXECUTORS = LLAP_DAEMON_PREFIX + 
"num.executors";

Added: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java?rev=1671950&view=auto
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java
 (added)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java
 Tue Apr  7 20:57:02 2015
@@ -0,0 +1,89 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+public class LlapNodeId {
+
+  private static final LoadingCache<LlapNodeId, LlapNodeId> CACHE =
+      CacheBuilder.newBuilder().softValues().build(
+          new CacheLoader<LlapNodeId, LlapNodeId>() {
+            @Override
+            public LlapNodeId load(LlapNodeId key) throws Exception {
+              return key;
+            }
+          });
+
+  public static LlapNodeId getInstance(String hostname, int port) {
+    return CACHE.getUnchecked(new LlapNodeId(hostname, port));
+  }
+
+
+  private final String hostname;
+  private final int port;
+
+
+  private LlapNodeId(String hostname, int port) {
+    this.hostname = hostname;
+    this.port = port;
+  }
+
+  public String getHostname() {
+    return hostname;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    LlapNodeId that = (LlapNodeId) o;
+
+    if (port != that.port) {
+      return false;
+    }
+    if (!hostname.equals(that.hostname)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = hostname.hashCode();
+    result = 1009 * result + port;
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "LlapNodeId{" +
+        "hostname='" + hostname + '\'' +
+        ", port=" + port +
+        '}';
+  }
+}

Added: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java?rev=1671950&view=auto
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
 (added)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
 Tue Apr  7 20:57:02 2015
@@ -0,0 +1,318 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.CallableWithNdc;
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sends status updates to various AMs.
+ */
+public class AMReporter extends AbstractService {
+
+    /*
+  registrations and un-registrations will happen as and when tasks are 
submitted or are removed.
+  reference counting is likely required.
+
+  A connection needs to be established to each app master.
+
+  Ignore exceptions when communicating with the AM.
+  At a later point, report back saying the AM is dead so that tasks can be 
removed from the running queue.
+
+  Use a cachedThreadPool so that a few AMs going down does not affect other 
AppMasters.
+
+  Race: When a task completes - it sends out it's message via the regular 
TaskReporter. The AM after this may run another DAG,
+  or may die. This may need to be consolidated with the LlapTaskReporter. Try 
ensuring there's no race between the two.
+
+  Single thread which sends heartbeats to AppMasters as events drain off a 
queue.
+   */
+
+  private static final Logger LOG = LoggerFactory.getLogger(AMReporter.class);
+
+  private final LlapNodeId nodeId;
+  private final Configuration conf;
+  private final ListeningExecutorService queueLookupExecutor;
+  private final ListeningExecutorService executor;
+  private final DelayQueue<AMNodeInfo> pendingHeartbeatQueeu = new 
DelayQueue();
+  private final long heartbeatInterval;
+  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+  private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>();
+  volatile ListenableFuture<Void> queueLookupFuture;
+
+  public AMReporter(LlapNodeId nodeId, Configuration conf) {
+    super(AMReporter.class.getName());
+    this.nodeId = nodeId;
+    this.conf = conf;
+    ExecutorService rawExecutor = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter 
%d").build());
+    this.executor = MoreExecutors.listeningDecorator(rawExecutor);
+    ExecutorService rawExecutor2 = Executors.newFixedThreadPool(1,
+        new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporterQueueDrainer").build());
+    this.queueLookupExecutor = MoreExecutors.listeningDecorator(rawExecutor2);
+    this.heartbeatInterval =
+        
conf.getLong(LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS,
+            
LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT);
+  }
+
+  @Override
+  public void serviceStart() {
+    QueueLookupCallable queueDrainerCallable = new QueueLookupCallable();
+    queueLookupFuture = queueLookupExecutor.submit(queueDrainerCallable);
+    Futures.addCallback(queueLookupFuture, new FutureCallback<Void>() {
+      @Override
+      public void onSuccess(Void result) {
+        LOG.info("AMReporter QueueDrainer exited");
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.error("AMReporter QueueDrainer exited with error", t);
+      }
+    });
+    LOG.info("Started service: " + getName());
+  }
+
+  @Override
+  public void serviceStop() {
+    if (!isShutdown.getAndSet(true)) {
+      if (queueLookupFuture != null) {
+        queueLookupFuture.cancel(true);
+      }
+      queueLookupExecutor.shutdownNow();
+      executor.shutdownNow();
+      LOG.info("Stopped service: " + getName());
+    }
+  }
+
+
+  public void registerTask(String amLocation, int port, String user, 
Token<JobTokenIdentifier> jobToken) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Registering for heartbeat: " + amLocation + ":" + port);
+    }
+    AMNodeInfo amNodeInfo;
+    synchronized (knownAppMasters) {
+      LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
+      amNodeInfo = knownAppMasters.get(amNodeId);
+      if (amNodeInfo == null) {
+        amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, conf);
+        knownAppMasters.put(amNodeId, amNodeInfo);
+        // Add to the queue only the first time this is registered, and on
+        // subsequent instances when it's taken off the queue.
+        amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + 
heartbeatInterval);
+        pendingHeartbeatQueeu.add(amNodeInfo);
+      }
+      amNodeInfo.incrementAndGetTaskCount();
+    }
+  }
+
+  public void unregisterTask(String amLocation, int port) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Un-registering for heartbeat: " + amLocation + ":" + port);
+    }
+    AMNodeInfo amNodeInfo;
+    LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
+    synchronized (knownAppMasters) {
+      amNodeInfo = knownAppMasters.get(amNodeId);
+      if (amNodeInfo == null) {
+        LOG.error(("Ignoring unexpected unregisterRequest for am at: " + 
amLocation + ":" + port));
+      }
+      amNodeInfo.decrementAndGetTaskCount();
+      // Not removing this here. Will be removed when taken off the queue and 
discovered to have 0
+      // pending tasks.
+    }
+  }
+
+  private class QueueLookupCallable extends CallableWithNdc<Void> {
+
+    @Override
+    protected Void callInternal() {
+      while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
+        try {
+          AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take();
+          if (amNodeInfo.getTaskCount() == 0) {
+            synchronized (knownAppMasters) {
+              knownAppMasters.remove(amNodeInfo.amNodeId);
+            }
+            amNodeInfo.stopUmbilical();
+          } else {
+            executor.submit(new AMHeartbeatCallable(amNodeInfo));
+          }
+        } catch (InterruptedException e) {
+          if (isShutdown.get()) {
+            LOG.info("QueueLookup thread interrupted after shutdown");
+          } else {
+            LOG.warn("Received unexpected interrupt while waiting on heartbeat 
queue");
+          }
+        }
+
+      }
+      return null;
+    }
+  }
+
+  private class AMHeartbeatCallable extends CallableWithNdc<Void> {
+
+    final AMNodeInfo amNodeInfo;
+
+    public AMHeartbeatCallable(AMNodeInfo amNodeInfo) {
+      this.amNodeInfo = amNodeInfo;
+    }
+
+    @Override
+    protected Void callInternal() {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Attempting to heartbeat to AM: " + amNodeInfo);
+      }
+      if (amNodeInfo.getTaskCount() > 0) {
+        try {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("NodeHeartbeat to: " + amNodeInfo);
+          }
+          amNodeInfo.getUmbilical().nodeHeartbeat(new 
Text(nodeId.getHostname()),
+              nodeId.getPort());
+        } catch (IOException e) {
+          // TODO Ideally, this could be used to avoid running a task - AM 
down / unreachable, so there's no point running it.
+          LOG.warn("Failed to communicate with AM. May retry later: " + 
amNodeInfo.amNodeId, e);
+        } catch (InterruptedException e) {
+          if (!isShutdown.get()) {
+            LOG.warn("Failed to communicate with AM: " + amNodeInfo.amNodeId, 
e);
+          }
+        }
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipping node heartbeat to AM: " + amNodeInfo + ", since 
ref count is 0");
+        }
+      }
+      return null;
+    }
+  }
+
+
+
+  private static class AMNodeInfo implements Delayed {
+    private final AtomicInteger taskCount = new AtomicInteger(0);
+    private final String user;
+    private final Token<JobTokenIdentifier> jobToken;
+    private final Configuration conf;
+    private final LlapNodeId amNodeId;
+    private LlapTaskUmbilicalProtocol umbilical;
+    private long nextHeartbeatTime;
+
+
+    public AMNodeInfo(LlapNodeId amNodeId, String user,
+                      Token<JobTokenIdentifier> jobToken,
+                      Configuration conf) {
+      this.user = user;
+      this.jobToken = jobToken;
+      this.conf = conf;
+      this.amNodeId = amNodeId;
+    }
+
+    synchronized LlapTaskUmbilicalProtocol getUmbilical() throws IOException, 
InterruptedException {
+      if (umbilical == null) {
+        final InetSocketAddress address =
+            NetUtils.createSocketAddrForHost(amNodeId.getHostname(), 
amNodeId.getPort());
+        SecurityUtil.setTokenService(this.jobToken, address);
+        UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+        ugi.addToken(jobToken);
+        umbilical = ugi.doAs(new 
PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() {
+          @Override
+          public LlapTaskUmbilicalProtocol run() throws Exception {
+            return RPC.getProxy(LlapTaskUmbilicalProtocol.class,
+                LlapTaskUmbilicalProtocol.versionID, address, conf);
+          }
+        });
+      }
+      return umbilical;
+    }
+
+    synchronized void stopUmbilical() {
+      if (umbilical != null) {
+        RPC.stopProxy(umbilical);
+      }
+      umbilical = null;
+    }
+
+    int incrementAndGetTaskCount() {
+      return taskCount.incrementAndGet();
+    }
+
+    int decrementAndGetTaskCount() {
+      return taskCount.decrementAndGet();
+    }
+
+    int getTaskCount() {
+      return taskCount.get();
+    }
+
+    synchronized void setNextHeartbeatTime(long nextTime) {
+      nextHeartbeatTime = nextTime;
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return 0;
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+      AMNodeInfo other = (AMNodeInfo)o;
+      if (this.nextHeartbeatTime > other.nextHeartbeatTime) {
+        return 1;
+      } else if (this.nextHeartbeatTime < other.nextHeartbeatTime) {
+        return -1;
+      } else {
+        return 0;
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "AMInfo: " + amNodeId + ", taskCount=" + getTaskCount();
+    }
+  }
+}

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java?rev=1671950&r1=1671949&r2=1671950&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 Tue Apr  7 20:57:02 2015
@@ -33,14 +33,15 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.CallableWithNdc;
+import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
 import org.apache.hadoop.hive.llap.tezplugins.Converters;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -56,7 +57,6 @@ import org.apache.hadoop.yarn.util.Auxil
 import org.apache.log4j.Logger;
 import org.apache.log4j.NDC;
 import org.apache.tez.common.TezCommonUtils;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -65,7 +65,7 @@ import org.apache.tez.dag.api.TezExcepti
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
-import org.apache.tez.runtime.task.TaskReporter;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
 import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult;
 
 import com.google.common.base.Preconditions;
@@ -84,31 +84,27 @@ public class ContainerRunnerImpl extends
   public static final String THREAD_NAME_FORMAT = THREAD_NAME_FORMAT_PREFIX + 
"%d";
   private static final Logger LOG = 
Logger.getLogger(ContainerRunnerImpl.class);
 
+  private volatile AMReporter amReporter;
   private final ListeningExecutorService executorService;
   private final AtomicReference<InetSocketAddress> localAddress;
   private final String[] localDirsBase;
-  private final Map<String, String> localEnv = new HashMap<String, String>();
-  private volatile FileSystem localFs;
+  private final Map<String, String> localEnv = new HashMap<>();
+  private final FileSystem localFs;
   private final long memoryPerExecutor;
   private final LlapDaemonExecutorMetrics metrics;
+  private final Configuration conf;
   private final ConfParams confParams;
   // TODO Support for removing queued containers, interrupting / killing 
specific containers
 
-  public ContainerRunnerImpl(int numExecutors, String[] localDirsBase, int 
localShufflePort,
+  public ContainerRunnerImpl(Configuration conf, int numExecutors, String[] 
localDirsBase, int localShufflePort,
       AtomicReference<InetSocketAddress> localAddress,
       long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics) {
     super("ContainerRunnerImpl");
+    this.conf = conf;
     Preconditions.checkState(numExecutors > 0,
         "Invalid number of executors: " + numExecutors + ". Must be > 0");
     this.localDirsBase = localDirsBase;
     this.localAddress = localAddress;
-    this.confParams = new ConfParams();
-    // Setup to defaults to start with
-    confParams.amMaxEventsPerHeartbeat = 
TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT;
-    confParams.amHeartbeatIntervalMsMax =
-        TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT;
-    confParams.amCounterHeartbeatInterval =
-        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT;
 
     ExecutorService raw = Executors.newFixedThreadPool(numExecutors,
         new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build());
@@ -121,36 +117,42 @@ public class ContainerRunnerImpl extends
     // TODO Tune this based on the available size.
     this.memoryPerExecutor = (long)(totalMemoryAvailableBytes * 0.8 / (float) 
numExecutors);
     this.metrics = metrics;
-    LOG.info("ContainerRunnerImpl config: " +
-        "memoryPerExecutorDerviced=" + memoryPerExecutor
-    );
-  }
 
-  @Override
-  public void serviceInit(Configuration conf) {
     try {
       localFs = FileSystem.getLocal(conf);
-      // TODO Fix visibility of these parameters - which
-      confParams.amCounterHeartbeatInterval = conf.getLong(
-          TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS,
-          TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT);
-      confParams.amHeartbeatIntervalMsMax =
-          conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
-              TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
-      confParams.amMaxEventsPerHeartbeat =
-          conf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
-              TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
     } catch (IOException e) {
       throw new RuntimeException("Failed to setup local filesystem instance", 
e);
     }
+    confParams = new ConfParams(
+        conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+            TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT),
+        conf.getLong(
+            TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS,
+            
TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT),
+        conf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
+            TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT)
+    );
+
+    LOG.info("ContainerRunnerImpl config: " +
+        "memoryPerExecutorDerviced=" + memoryPerExecutor
+    );
   }
 
   @Override
   public void serviceStart() {
+    // The node id will only be available at this point, since the server has 
been started in LlapDaemon
+    LlapNodeId llapNodeId = 
LlapNodeId.getInstance(localAddress.get().getHostName(), 
localAddress.get().getPort());
+    this.amReporter = new AMReporter(llapNodeId, conf);
+    amReporter.init(conf);
+    amReporter.start();
   }
 
   @Override
   protected void serviceStop() throws Exception {
+    if (amReporter != null) {
+      amReporter.stop();
+      amReporter = null;
+    }
     super.serviceStop();
   }
 
@@ -198,11 +200,8 @@ public class ContainerRunnerImpl extends
       if (LOG.isDebugEnabled()) {
         LOG.debug("Dirs are: " + Arrays.toString(localDirs));
       }
-
-
-      // Setup workingDir. This is otherwise setup as Environment.PWD
+      // May need to setup localDir for re-localization, which is usually 
setup as Environment.PWD.
       // Used for re-localization, to add the user specified configuration 
(conf_pb_binary_stream)
-      String workingDir = localDirs[0];
 
       Credentials credentials = new Credentials();
       DataInputBuffer dib = new DataInputBuffer();
@@ -218,7 +217,7 @@ public class ContainerRunnerImpl extends
 
       TaskRunnerCallable callable = new TaskRunnerCallable(request, new 
Configuration(getConfig()),
           new ExecutionContextImpl(localAddress.get().getHostName()), env, 
localDirs,
-          workingDir, credentials, memoryPerExecutor, confParams);
+          credentials, memoryPerExecutor, amReporter, confParams);
       ListenableFuture<ContainerExecutionResult> future = 
executorService.submit(callable);
       Futures.addCallback(future, new TaskRunnerCallback(request, callable));
       metrics.incrExecutorTotalRequestsHandled();
@@ -232,7 +231,6 @@ public class ContainerRunnerImpl extends
 
     private final SubmitWorkRequestProto request;
     private final Configuration conf;
-    private final String workingDir;
     private final String[] localDirs;
     private final Map<String, String> envMap;
     private final String pid = null;
@@ -240,29 +238,46 @@ public class ContainerRunnerImpl extends
     private final ExecutionContext executionContext;
     private final Credentials credentials;
     private final long memoryAvailable;
-    private final ListeningExecutorService executor;
     private final ConfParams confParams;
+    private final Token<JobTokenIdentifier> jobToken;
+    private final AMReporter amReporter;
     private volatile TezTaskRunner taskRunner;
-    private volatile TaskReporter taskReporter;
-    private TezTaskUmbilicalProtocol umbilical;
+    private volatile TaskReporterInterface taskReporter;
+    private volatile ListeningExecutorService executor;
+    private LlapTaskUmbilicalProtocol umbilical;
     private volatile long startTime;
     private volatile String threadName;
+    private volatile boolean cancelled = false;
+
 
 
     TaskRunnerCallable(SubmitWorkRequestProto request, Configuration conf,
                             ExecutionContext executionContext, Map<String, 
String> envMap,
-                            String[] localDirs, String workingDir, Credentials 
credentials,
-                            long memoryAvailable, ConfParams confParams) {
+                            String[] localDirs, Credentials credentials,
+                            long memoryAvailable, AMReporter amReporter, 
ConfParams confParams) {
       this.request = request;
       this.conf = conf;
       this.executionContext = executionContext;
       this.envMap = envMap;
-      this.workingDir = workingDir;
       this.localDirs = localDirs;
       this.objectRegistry = new ObjectRegistryImpl();
       this.credentials = credentials;
       this.memoryAvailable = memoryAvailable;
       this.confParams = confParams;
+      jobToken = TokenCache.getSessionToken(credentials);
+      this.amReporter = amReporter;
+      // Register with the AMReporter when the callable is setup. Unregister 
once it starts running.
+      this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), 
request.getUser(), jobToken);
+    }
+
+    @Override
+    protected ContainerExecutionResult callInternal() throws Exception {
+      this.startTime = System.currentTimeMillis();
+      this.threadName = Thread.currentThread().getName();
+
+      // Unregister from the AMReporter, since the task is now running.
+      this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort());
+
       // TODO This executor seems unnecessary. Here and TezChild
       ExecutorService executorReal = Executors.newFixedThreadPool(1,
           new ThreadFactoryBuilder()
@@ -271,51 +286,45 @@ public class ContainerRunnerImpl extends
                   "TezTaskRunner_" + 
request.getFragmentSpec().getTaskAttemptIdString())
               .build());
       executor = MoreExecutors.listeningDecorator(executorReal);
-    }
 
-    @Override
-    protected ContainerExecutionResult callInternal() throws Exception {
-      this.startTime = System.currentTimeMillis();
-      this.threadName = Thread.currentThread().getName();
       // TODO Consolidate this code with TezChild.
       Stopwatch sw = new Stopwatch().start();
-       UserGroupInformation taskUgi = 
UserGroupInformation.createRemoteUser(request.getUser());
-       taskUgi.addCredentials(credentials);
+      UserGroupInformation taskUgi = 
UserGroupInformation.createRemoteUser(request.getUser());
+      taskUgi.addCredentials(credentials);
 
-       Token<JobTokenIdentifier> jobToken = 
TokenCache.getSessionToken(credentials);
-       Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, 
ByteBuffer>();
-       serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
-           TezCommonUtils.convertJobTokenToBytes(jobToken));
-       Multimap<String, String> startedInputsMap = HashMultimap.create();
-
-       UserGroupInformation taskOwner =
-           UserGroupInformation.createRemoteUser(request.getTokenIdentifier());
-       final InetSocketAddress address =
-           NetUtils.createSocketAddrForHost(request.getAmHost(), 
request.getAmPort());
-       SecurityUtil.setTokenService(jobToken, address);
-       taskOwner.addToken(jobToken);
-       umbilical = taskOwner.doAs(new 
PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
-         @Override
-         public TezTaskUmbilicalProtocol run() throws Exception {
-           return RPC.getProxy(TezTaskUmbilicalProtocol.class,
-               TezTaskUmbilicalProtocol.versionID, address, conf);
-         }
-       });
-
-       taskReporter = new TaskReporter(
-           umbilical,
-           confParams.amHeartbeatIntervalMsMax,
-           confParams.amCounterHeartbeatInterval,
-           confParams.amMaxEventsPerHeartbeat,
-           new AtomicLong(0),
-           request.getContainerIdString());
-
-       taskRunner = new TezTaskRunner(conf, taskUgi, localDirs,
-           Converters.getTaskSpecfromProto(request.getFragmentSpec()), 
umbilical,
-           request.getAppAttemptNumber(),
-           serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, 
executor, objectRegistry,
-           pid,
-           executionContext, memoryAvailable);
+      Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>();
+      serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+          TezCommonUtils.convertJobTokenToBytes(jobToken));
+      Multimap<String, String> startedInputsMap = HashMultimap.create();
+
+      UserGroupInformation taskOwner =
+          UserGroupInformation.createRemoteUser(request.getTokenIdentifier());
+      final InetSocketAddress address =
+          NetUtils.createSocketAddrForHost(request.getAmHost(), 
request.getAmPort());
+      SecurityUtil.setTokenService(jobToken, address);
+      taskOwner.addToken(jobToken);
+      umbilical = taskOwner.doAs(new 
PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() {
+        @Override
+        public LlapTaskUmbilicalProtocol run() throws Exception {
+          return RPC.getProxy(LlapTaskUmbilicalProtocol.class,
+              LlapTaskUmbilicalProtocol.versionID, address, conf);
+        }
+      });
+
+      taskReporter = new LlapTaskReporter(
+          umbilical,
+          confParams.amHeartbeatIntervalMsMax,
+          confParams.amCounterHeartbeatInterval,
+          confParams.amMaxEventsPerHeartbeat,
+          new AtomicLong(0),
+          request.getContainerIdString());
+
+      taskRunner = new TezTaskRunner(conf, taskUgi, localDirs,
+          Converters.getTaskSpecfromProto(request.getFragmentSpec()),
+          request.getAppAttemptNumber(),
+          serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, 
executor, objectRegistry,
+          pid,
+          executionContext, memoryAvailable);
 
        boolean shouldDie;
        try {
@@ -398,6 +407,7 @@ public class ContainerRunnerImpl extends
     @Override
     public void onFailure(Throwable t) {
       LOG.error("TezTaskRunner execution failed for : " + 
getTaskIdentifierString(request), t);
+      // TODO HIVE-10236 Report a fatal error over the umbilical
       taskRunnerCallable.shutdown();
       HistoryLogger
           .logFragmentEnd(request.getApplicationIdString(), 
request.getContainerIdString(),
@@ -422,9 +432,16 @@ public class ContainerRunnerImpl extends
   }
 
   private static class ConfParams {
-    int amHeartbeatIntervalMsMax;
-    long amCounterHeartbeatInterval;
-    int amMaxEventsPerHeartbeat;
+    final int amHeartbeatIntervalMsMax;
+    final long amCounterHeartbeatInterval;
+    final int amMaxEventsPerHeartbeat;
+
+    public ConfParams(int amHeartbeatIntervalMsMax, long 
amCounterHeartbeatInterval,
+                      int amMaxEventsPerHeartbeat) {
+      this.amHeartbeatIntervalMsMax = amHeartbeatIntervalMsMax;
+      this.amCounterHeartbeatInterval = amCounterHeartbeatInterval;
+      this.amMaxEventsPerHeartbeat = amMaxEventsPerHeartbeat;
+    }
   }
 
   private String stringifyRequest(SubmitWorkRequestProto request) {

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java?rev=1671950&r1=1671949&r2=1671950&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
 Tue Apr  7 20:57:02 2015
@@ -131,7 +131,7 @@ public class LlapDaemon extends Abstract
     LOG.info("Started LlapMetricsSystem with displayName: " + displayName +
         " sessionId: " + sessionId);
 
-    this.containerRunner = new ContainerRunnerImpl(numExecutors, localDirs, 
shufflePort, address,
+    this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, 
localDirs, shufflePort, address,
         executorMemoryBytes, metrics);
     
     this.registry = new LlapRegistryService();

Added: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java?rev=1671950&view=auto
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
 (added)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
 Tue Apr  7 20:57:02 2015
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.io.Text;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.internals.api.TaskReporterInterface;
+import org.apache.tez.runtime.task.ErrorReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Responsible for communication between tasks running in a Container and the 
ApplicationMaster.
+ * Takes care of sending heartbeats (regular and OOB) to the AM - to send 
generated events, and to
+ * retrieve events specific to this task.
+ *
+ */
+public class LlapTaskReporter implements TaskReporterInterface {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LlapTaskReporter.class);
+
+  private final LlapTaskUmbilicalProtocol umbilical;
+  private final long pollInterval;
+  private final long sendCounterInterval;
+  private final int maxEventsToGet;
+  private final AtomicLong requestCounter;
+  private final String containerIdStr;
+
+  private final ListeningExecutorService heartbeatExecutor;
+
+  @VisibleForTesting
+  HeartbeatCallable currentCallable;
+
+  public LlapTaskReporter(LlapTaskUmbilicalProtocol umbilical, long 
amPollInterval,
+                      long sendCounterInterval, int maxEventsToGet, AtomicLong 
requestCounter, String containerIdStr) {
+    this.umbilical = umbilical;
+    this.pollInterval = amPollInterval;
+    this.sendCounterInterval = sendCounterInterval;
+    this.maxEventsToGet = maxEventsToGet;
+    this.requestCounter = requestCounter;
+    this.containerIdStr = containerIdStr;
+    ExecutorService executor = Executors.newFixedThreadPool(1, new 
ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("TaskHeartbeatThread").build());
+    heartbeatExecutor = MoreExecutors.listeningDecorator(executor);
+  }
+
+  /**
+   * Register a task to be tracked. Heartbeats will be sent out for this task 
to fetch events, etc.
+   */
+  @Override
+  public synchronized void registerTask(RuntimeTask task,
+                                        ErrorReporter errorReporter) {
+    currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, 
sendCounterInterval,
+        maxEventsToGet, requestCounter, containerIdStr);
+    ListenableFuture<Boolean> future = 
heartbeatExecutor.submit(currentCallable);
+    Futures.addCallback(future, new HeartbeatCallback(errorReporter));
+  }
+
+  /**
+   * This method should always be invoked before setting up heartbeats for 
another task running in
+   * the same container.
+   */
+  @Override
+  public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
+    currentCallable.markComplete();
+    currentCallable = null;
+  }
+
+  @Override
+  public void shutdown() {
+    heartbeatExecutor.shutdownNow();
+  }
+
+  @VisibleForTesting
+  static class HeartbeatCallable implements Callable<Boolean> {
+
+    private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds
+    private static final float LOG_COUNTER_BACKOFF = 1.3f;
+
+    private final RuntimeTask task;
+    private EventMetaData updateEventMetadata;
+
+    private final LlapTaskUmbilicalProtocol umbilical;
+
+    private final long pollInterval;
+    private final long sendCounterInterval;
+    private final int maxEventsToGet;
+    private final String containerIdStr;
+
+    private final AtomicLong requestCounter;
+
+    private LinkedBlockingQueue<TezEvent> eventsToSend = new 
LinkedBlockingQueue<TezEvent>();
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Condition condition = lock.newCondition();
+
+    /*
+     * Keeps track of regular timed heartbeats. Is primarily used as a timing 
mechanism to send /
+     * log counters.
+     */
+    private AtomicInteger nonOobHeartbeatCounter = new AtomicInteger(0);
+    private int nextHeartbeatNumToLog = 0;
+    /*
+     * Tracks the last non-OOB heartbeat number at which counters were sent to 
the AM. 
+     */
+    private int prevCounterSendHeartbeatNum = 0;
+
+    public HeartbeatCallable(RuntimeTask task,
+                             LlapTaskUmbilicalProtocol umbilical, long 
amPollInterval, long sendCounterInterval,
+                             int maxEventsToGet, AtomicLong requestCounter, 
String containerIdStr) {
+
+      this.pollInterval = amPollInterval;
+      this.sendCounterInterval = sendCounterInterval;
+      this.maxEventsToGet = maxEventsToGet;
+      this.requestCounter = requestCounter;
+      this.containerIdStr = containerIdStr;
+
+      this.task = task;
+      this.umbilical = umbilical;
+      this.updateEventMetadata = new 
EventMetaData(EventProducerConsumerType.SYSTEM,
+          task.getVertexName(), "", task.getTaskAttemptID());
+
+      nextHeartbeatNumToLog = (Math.max(1,
+          (int) (LOG_COUNTER_START_INTERVAL / (amPollInterval == 0 ? 0.000001f
+              : (float) amPollInterval))));
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      // Heartbeat only for active tasks. Errors, etc will be reported 
directly.
+      while (!task.isTaskDone() && !task.hadFatalError()) {
+        ResponseWrapper response = heartbeat(null);
+
+        if (response.shouldDie) {
+          // AM sent a shouldDie=true
+          LOG.info("Asked to die via task heartbeat");
+          return false;
+        } else {
+          if (response.numEvents < maxEventsToGet) {
+            // Wait before sending another heartbeat. Otherwise consider as an 
OOB heartbeat
+            lock.lock();
+            try {
+              boolean interrupted = condition.await(pollInterval, 
TimeUnit.MILLISECONDS);
+              if (!interrupted) {
+                nonOobHeartbeatCounter.incrementAndGet();
+              }
+            } finally {
+              lock.unlock();
+            }
+          }
+        }
+      }
+      int pendingEventCount = eventsToSend.size();
+      if (pendingEventCount > 0) {
+        LOG.warn("Exiting TaskReporter thread with pending queue size=" + 
pendingEventCount);
+      }
+      return true;
+    }
+
+    /**
+     * @param eventsArg
+     * @return
+     * @throws IOException
+     *           indicates an RPC communication failure.
+     * @throws TezException
+     *           indicates an exception somewhere in the AM.
+     */
+    private synchronized ResponseWrapper heartbeat(Collection<TezEvent> 
eventsArg) throws IOException,
+        TezException {
+
+      if (eventsArg != null) {
+        eventsToSend.addAll(eventsArg);
+      }
+
+      TezEvent updateEvent = null;
+      List<TezEvent> events = new ArrayList<TezEvent>();
+      eventsToSend.drainTo(events);
+
+      if (!task.isTaskDone() && !task.hadFatalError()) {
+        TezCounters counters = null;
+        /**
+         * Increasing the heartbeat interval can delay the delivery of events. 
Sending just updated
+         * records would save CPU in DAG AM, but certain counters are updated 
very frequently. Until
+         * real time decisions are made based on these counters, it can be 
sent once per second.
+         */
+        // Not completely accurate, since OOB heartbeats could go out.
+        if ((nonOobHeartbeatCounter.get() - prevCounterSendHeartbeatNum) * 
pollInterval >= sendCounterInterval) {
+          counters = task.getCounters();
+          prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get();
+        }
+        updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, 
task.getProgress()),
+            updateEventMetadata);
+        events.add(updateEvent);
+      }
+
+      long requestId = requestCounter.incrementAndGet();
+      TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, 
containerIdStr,
+          task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sending heartbeat to AM, request=" + request);
+      }
+
+      maybeLogCounters();
+
+      TezHeartbeatResponse response = umbilical.heartbeat(request);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received heartbeat response from AM, response=" + response);
+      }
+
+      if (response.shouldDie()) {
+        LOG.info("Received should die response from AM");
+        return new ResponseWrapper(true, 1);
+      }
+      if (response.getLastRequestId() != requestId) {
+        throw new TezException("AM and Task out of sync" + ", responseReqId="
+            + response.getLastRequestId() + ", expectedReqId=" + requestId);
+      }
+
+      // The same umbilical is used by multiple tasks. Problematic in the case 
where multiple tasks
+      // are running using the same umbilical.
+      int numEventsReceived = 0;
+      if (task.isTaskDone() || task.hadFatalError()) {
+        if (response.getEvents() != null && !response.getEvents().isEmpty()) {
+          LOG.warn("Current task already complete, Ignoring all event in"
+              + " heartbeat response, eventCount=" + 
response.getEvents().size());
+        }
+      } else {
+        if (response.getEvents() != null && !response.getEvents().isEmpty()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Routing events from heartbeat response to task" + ", 
currentTaskAttemptId="
+                + task.getTaskAttemptID() + ", eventCount=" + 
response.getEvents().size());
+          }
+          // This should ideally happen in a separate thread
+          numEventsReceived = response.getEvents().size();
+          task.handleEvents(response.getEvents());
+        }
+      }
+      return new ResponseWrapper(false, numEventsReceived);
+    }
+
+    public void markComplete() {
+      // Notify to clear pending events, if any.
+      lock.lock();
+      try {
+        condition.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    private void maybeLogCounters() {
+      if (LOG.isDebugEnabled()) {
+        if (nonOobHeartbeatCounter.get() == nextHeartbeatNumToLog) {
+          LOG.debug("Counters: " + task.getCounters().toShortString());
+          nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * 
(LOG_COUNTER_BACKOFF));
+        }
+      }
+    }
+
+    /**
+     * Sends out final events for task success.
+     * @param taskAttemptID
+     * @return
+     * @throws IOException
+     *           indicates an RPC communication failure.
+     * @throws TezException
+     *           indicates an exception somewhere in the AM.
+     */
+    private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws 
IOException, TezException {
+      TezEvent statusUpdateEvent = new TezEvent(new 
TaskStatusUpdateEvent(task.getCounters(),
+          task.getProgress()), updateEventMetadata);
+      TezEvent taskCompletedEvent = new TezEvent(new 
TaskAttemptCompletedEvent(),
+          updateEventMetadata);
+      return !heartbeat(Lists.newArrayList(statusUpdateEvent, 
taskCompletedEvent)).shouldDie;
+    }
+
+    /**
+     * Sends out final events for task failure.
+     * @param taskAttemptID
+     * @param t
+     * @param diagnostics
+     * @param srcMeta
+     * @return
+     * @throws IOException
+     *           indicates an RPC communication failure.
+     * @throws TezException
+     *           indicates an exception somewhere in the AM.
+     */
+    private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, 
String diagnostics,
+                               EventMetaData srcMeta) throws IOException, 
TezException {
+      TezEvent statusUpdateEvent = new TezEvent(new 
TaskStatusUpdateEvent(task.getCounters(),
+          task.getProgress()), updateEventMetadata);
+      if (diagnostics == null) {
+        diagnostics = ExceptionUtils.getStackTrace(t);
+      } else {
+        diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
+      }
+      TezEvent taskAttemptFailedEvent = new TezEvent(new 
TaskAttemptFailedEvent(diagnostics),
+          srcMeta == null ? updateEventMetadata : srcMeta);
+      return !heartbeat(Lists.newArrayList(statusUpdateEvent, 
taskAttemptFailedEvent)).shouldDie;
+    }
+
+    private void addEvents(TezTaskAttemptID taskAttemptID, 
Collection<TezEvent> events) {
+      if (events != null && !events.isEmpty()) {
+        eventsToSend.addAll(events);
+      }
+    }
+  }
+
+  private static class HeartbeatCallback implements FutureCallback<Boolean> {
+
+    private final ErrorReporter errorReporter;
+
+    HeartbeatCallback(ErrorReporter errorReporter) {
+      this.errorReporter = errorReporter;
+    }
+
+    @Override
+    public void onSuccess(Boolean result) {
+      if (result == false) {
+        errorReporter.shutdownRequested();
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      errorReporter.reportError(t);
+    }
+  }
+
+  public boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws 
IOException, TezException {
+    return currentCallable.taskSucceeded(taskAttemptID);
+  }
+
+  @Override
+  public boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, 
String diagnostics,
+                            EventMetaData srcMeta) throws IOException, 
TezException {
+    return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
+  }
+
+  @Override
+  public void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> 
events) {
+    currentCallable.addEvents(taskAttemptID, events);
+  }
+
+  @Override
+  public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
+    return umbilical.canCommit(taskAttemptID);
+  }
+
+  private static final class ResponseWrapper {
+    boolean shouldDie;
+    int numEvents;
+
+    private ResponseWrapper(boolean shouldDie, int numEvents) {
+      this.shouldDie = shouldDie;
+      this.numEvents = numEvents;
+    }
+  }
+}

Added: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java?rev=1671950&view=auto
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
 (added)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
 Tue Apr  7 20:57:02 2015
@@ -0,0 +1,37 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+
+public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
+
+  public static final long versionID = 1L;
+
+  // From Tez. Eventually changes over to the LLAP protocol and ProtocolBuffers
+  boolean canCommit(TezTaskAttemptID taskid) throws IOException;
+  public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
+      throws IOException, TezException;
+
+  public void nodeHeartbeat(Text hostname, int port);
+
+}

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java?rev=1671950&r1=1671949&r2=1671950&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 Tue Apr  7 20:57:02 2015
@@ -20,26 +20,42 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 
 public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
 
@@ -48,12 +64,16 @@ public class LlapTaskCommunicator extend
   private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
   private final ConcurrentMap<String, ByteBuffer> credentialMap;
 
+  private final EntityTracker entityTracker = new EntityTracker();
+
   private TaskCommunicator communicator;
+  private final LlapTaskUmbilicalProtocol umbilical;
 
   public LlapTaskCommunicator(
       TaskCommunicatorContext taskCommunicatorContext) {
     super(taskCommunicatorContext);
 
+    umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical());
     SubmitWorkRequestProto.Builder baseBuilder = 
SubmitWorkRequestProto.newBuilder();
 
     // TODO Avoid reading this from the environment
@@ -92,15 +112,45 @@ public class LlapTaskCommunicator extend
     }
   }
 
+  @Override
+  protected void startRpcServer() {
+    Configuration conf = getConfig();
+    try {
+      JobTokenSecretManager jobTokenSecretManager =
+          new JobTokenSecretManager();
+      jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken);
+
+      server = new RPC.Builder(conf)
+          .setProtocol(LlapTaskUmbilicalProtocol.class)
+          .setBindAddress("0.0.0.0")
+          .setPort(0)
+          .setInstance(umbilical)
+          .setNumHandlers(
+              conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
+                  TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
+          .setSecretManager(jobTokenSecretManager).build();
+
+      // Do serviceACLs need to be refreshed, like in Tez ?
+
+      server.start();
+      this.address = NetUtils.getConnectAddress(server);
+      LOG.info("Started LlapUmbilical: " + umbilical.getClass().getName() + " 
at address: " + address);
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
 
   @Override
   public void registerRunningContainer(ContainerId containerId, String 
hostname, int port) {
     super.registerRunningContainer(containerId, hostname, port);
+    entityTracker.registerContainer(containerId, hostname, port);
+
   }
 
   @Override
   public void registerContainerEnd(ContainerId containerId) {
     super.registerContainerEnd(containerId);
+    entityTracker.unregisterContainer(containerId);
   }
 
   @Override
@@ -111,6 +161,7 @@ public class LlapTaskCommunicator extend
                                          int priority)  {
     super.registerRunningTaskAttempt(containerId, taskSpec, 
additionalResources, credentials,
         credentialsChanged, priority);
+
     SubmitWorkRequestProto requestProto;
     try {
       requestProto = constructSubmitWorkRequest(containerId, taskSpec);
@@ -130,6 +181,9 @@ public class LlapTaskCommunicator extend
       throw new RuntimeException("ContainerInfo not found for container: " + 
containerId +
           ", while trying to launch task: " + taskSpec.getTaskAttemptID());
     }
+
+    entityTracker.registerTaskAttempt(containerId, 
taskSpec.getTaskAttemptID(), host, port);
+
     // Have to register this up front right now. Otherwise, it's possible for 
the task to start
     // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how 
to handle them.
     getTaskCommunicatorContext()
@@ -180,7 +234,10 @@ public class LlapTaskCommunicator extend
   @Override
   public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
     super.unregisterRunningTaskAttempt(taskAttemptID);
-    // Nothing else to do for now. The push API in the test does not support 
termination of a running task
+    entityTracker.unregisterTaskAttempt(taskAttemptID);
+    // TODO Inform the daemon that this task is no longer running.
+    // Currently, a task will end up moving into the RUNNING queue and will
+    // be told that it needs to die since it isn't recognized.
   }
 
   private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId 
containerId,
@@ -214,4 +271,144 @@ public class LlapTaskCommunicator extend
     containerCredentials.writeTokenStorageToStream(containerTokens_dob);
     return ByteBuffer.wrap(containerTokens_dob.getData(), 0, 
containerTokens_dob.getLength());
   }
+
+  protected class LlapTaskUmbilicalProtocolImpl implements 
LlapTaskUmbilicalProtocol {
+
+    private final TezTaskUmbilicalProtocol tezUmbilical;
+
+    public LlapTaskUmbilicalProtocolImpl(TezTaskUmbilicalProtocol 
tezUmbilical) {
+      this.tezUmbilical = tezUmbilical;
+    }
+
+    @Override
+    public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+      return tezUmbilical.canCommit(taskid);
+    }
+
+    @Override
+    public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws 
IOException,
+        TezException {
+      return tezUmbilical.heartbeat(request);
+    }
+
+    @Override
+    public void nodeHeartbeat(Text hostname, int port) {
+      entityTracker.nodePinged(hostname.toString(), port);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received heartbeat from [" + hostname + ":" + port +"]");
+      }
+    }
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion) throws 
IOException {
+      return versionID;
+    }
+
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol, long 
clientVersion,
+                                                  int clientMethodsHash) 
throws IOException {
+      return ProtocolSignature.getProtocolSignature(this, protocol,
+          clientVersion, clientMethodsHash);
+    }
+  }
+
+  private final class EntityTracker {
+    private final ConcurrentMap<TezTaskAttemptID, LlapNodeId> attemptToNodeMap 
= new ConcurrentHashMap<>();
+    private final ConcurrentMap<ContainerId, LlapNodeId> containerToNodeMap = 
new ConcurrentHashMap<>();
+    private final ConcurrentMap<LlapNodeId, BiMap<ContainerId, 
TezTaskAttemptID>> nodeMap = new ConcurrentHashMap<>();
+
+    void registerTaskAttempt(ContainerId containerId, TezTaskAttemptID 
taskAttemptId, String host, int port) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Registering " + containerId + ", " + taskAttemptId + " for 
node: " + host + ":" + port);
+      }
+      LlapNodeId llapNodeId = LlapNodeId.getInstance(host, port);
+      attemptToNodeMap.putIfAbsent(taskAttemptId, llapNodeId);
+      BiMap<ContainerId, TezTaskAttemptID> tmpMap = HashBiMap.create();
+      BiMap<ContainerId, TezTaskAttemptID> old = 
nodeMap.putIfAbsent(llapNodeId, tmpMap);
+      BiMap<ContainerId, TezTaskAttemptID> usedInstance;
+      usedInstance = old == null ? tmpMap : old;
+      synchronized(usedInstance) {
+        usedInstance.put(containerId, taskAttemptId);
+      }
+      // Make sure to put the instance back again, in case it was removed as 
part of a
+      // containerEnd/taskEnd invocation.
+      nodeMap.putIfAbsent(llapNodeId, usedInstance);
+    }
+
+    void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
+      LlapNodeId llapNodeId = attemptToNodeMap.remove(attemptId);
+      if (llapNodeId == null) {
+        // Possible since either container / task can be unregistered.
+        return;
+      }
+
+      BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId);
+      ContainerId matched = null;
+      if (bMap != null) {
+        synchronized(bMap) {
+          matched = bMap.inverse().remove(attemptId);
+        }
+      }
+      // Removing here. Registration into the map has to make sure to put
+      if (bMap.isEmpty()) {
+        nodeMap.remove(llapNodeId);
+      }
+
+      // Remove the container mapping
+      if (matched != null) {
+        containerToNodeMap.remove(matched);
+      }
+
+    }
+
+    void registerContainer(ContainerId containerId, String hostname, int port) 
{
+      containerToNodeMap.putIfAbsent(containerId, 
LlapNodeId.getInstance(hostname, port));
+    }
+
+    void unregisterContainer(ContainerId containerId) {
+      LlapNodeId llapNodeId = containerToNodeMap.remove(containerId);
+      if (llapNodeId == null) {
+        // Possible since either container / task can be unregistered.
+        return;
+      }
+
+      BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId);
+      TezTaskAttemptID matched = null;
+      if (bMap != null) {
+        synchronized(bMap) {
+          matched = bMap.remove(containerId);
+        }
+      }
+      // Removing here. Registration into the map has to make sure to put
+      if (bMap.isEmpty()) {
+        nodeMap.remove(llapNodeId);
+      }
+
+      // Remove the container mapping
+      if (matched != null) {
+        attemptToNodeMap.remove(matched);
+      }
+    }
+
+    private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0);
+    void nodePinged(String hostname, int port) {
+      LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
+      BiMap<ContainerId, TezTaskAttemptID> biMap = nodeMap.get(nodeId);
+      if (biMap != null) {
+        synchronized(biMap) {
+          for (Map.Entry<ContainerId, TezTaskAttemptID> entry : 
biMap.entrySet()) {
+            getTaskCommunicatorContext().taskAlive(entry.getValue());
+            getTaskCommunicatorContext().containerAlive(entry.getKey());
+          }
+        }
+      } else {
+        if (System.currentTimeMillis() > nodeNotFoundLogTime.get() + 5000l) {
+          LOG.warn("Recevied ping from unknown node: " + hostname + ":" + port 
+
+              ". Could be caused by pre-emption by the AM," +
+              " or a mismatched hostname. Enable debug logging for mismatched 
host names");
+          nodeNotFoundLogTime.set(System.currentTimeMillis());
+        }
+      }
+    }
+  }
 }

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java?rev=1671950&r1=1671949&r2=1671950&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
 Tue Apr  7 20:57:02 2015
@@ -15,6 +15,9 @@
 package org.apache.tez.dag.app.rm;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.EnumSet;
@@ -52,6 +55,7 @@ import com.google.common.util.concurrent
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
 import org.apache.hadoop.registry.client.types.AddressTypes;
 import org.apache.hadoop.registry.client.types.Endpoint;
@@ -194,6 +198,20 @@ public class LlapTaskSchedulerService ex
       Preconditions.checkState(hosts != null && hosts.length != 0,
           LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS + "must be defined");
       for (String host : hosts) {
+        // If reading addresses from conf, try resolving local addresses so 
that
+        // this matches with the address reported by daemons.
+        InetAddress inetAddress = null;
+        try {
+          inetAddress = InetAddress.getByName(host);
+          if (NetUtils.isLocalAddress(inetAddress)) {
+            InetSocketAddress socketAddress = new InetSocketAddress(0);
+            socketAddress = NetUtils.getConnectAddress(socketAddress);
+            LOG.info("Adding host identified as local: " + host + " as " + 
socketAddress.getHostName());
+            host = socketAddress.getHostName();
+          }
+        } catch (UnknownHostException e) {
+          LOG.warn("Ignoring resolution issues for host: " + host, e);
+        }
         NodeInfo nodeInfo = new NodeInfo(host, BACKOFF_FACTOR, clock);
         activeHosts.put(host, nodeInfo);
         allHosts.put(host, nodeInfo);
@@ -780,7 +798,6 @@ public class LlapTaskSchedulerService ex
           numLocalAllocations++;
           _registerAllocationInHostMap(allocatedHost, 
localityBasedNumAllocationsPerHost);
         } else {
-          // KKK TODO Log all non-local allocations
           numNonLocalAllocations++;
         }
       } else {


Reply via email to