Repository: hive
Updated Branches:
  refs/heads/master b340ecb5e -> 0cbf45cfc


HIVE-12577: NPE in LlapTaskCommunicator when unregistering containers (Siddarth 
Seth, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0cbf45cf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0cbf45cf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0cbf45cf

Branch: refs/heads/master
Commit: 0cbf45cfc046f39bed4533ab83542002e79b4f5b
Parents: b340ecb
Author: Gunther Hagleitner <gunt...@apache.org>
Authored: Wed Dec 23 11:18:10 2015 -0800
Committer: Gunther Hagleitner <gunt...@apache.org>
Committed: Wed Dec 23 11:18:10 2015 -0800

----------------------------------------------------------------------
 .../LlapDaemonProtocolClientProxy.java          | 509 ++++++++++++++++++
 .../llap/tezplugins/LlapTaskCommunicator.java   | 187 +++++--
 .../hive/llap/tezplugins/TaskCommunicator.java  | 512 -------------------
 .../TestLlapDaemonProtocolClientProxy.java      | 143 ++++++
 .../tezplugins/TestLlapTaskCommunicator.java    | 100 ++++
 .../llap/tezplugins/TestTaskCommunicator.java   | 143 ------
 6 files changed, 895 insertions(+), 699 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0cbf45cf/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java
new file mode 100644
index 0000000..2884e40
--- /dev/null
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import javax.net.SocketFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Message;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.AbstractService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapDaemonProtocolClientProxy extends AbstractService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LlapDaemonProtocolClientProxy.class);
+
+  private final ConcurrentMap<String, LlapDaemonProtocolBlockingPB> 
hostProxies;
+
+  private final RequestManager requestManager;
+  private final RetryPolicy retryPolicy;
+  private final SocketFactory socketFactory;
+
+  private final ListeningExecutorService requestManagerExecutor;
+  private volatile ListenableFuture<Void> requestManagerFuture;
+  private final Token<LlapTokenIdentifier> llapToken;
+
+  public LlapDaemonProtocolClientProxy(
+      int numThreads, Configuration conf, Token<LlapTokenIdentifier> 
llapToken) {
+    super(LlapDaemonProtocolClientProxy.class.getSimpleName());
+    this.hostProxies = new ConcurrentHashMap<>();
+    this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    this.llapToken = llapToken;
+
+    long connectionTimeout = HiveConf.getTimeVar(conf,
+        ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+    long retrySleep = HiveConf.getTimeVar(conf,
+        ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
+        TimeUnit.MILLISECONDS);
+    this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+        connectionTimeout, retrySleep, TimeUnit.MILLISECONDS);
+
+    this.requestManager = new RequestManager(numThreads);
+    ExecutorService localExecutor = Executors.newFixedThreadPool(1,
+        new 
ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build());
+    this.requestManagerExecutor = 
MoreExecutors.listeningDecorator(localExecutor);
+
+    LOG.info("Setting up taskCommunicator with" +
+        "numThreads=" + numThreads +
+        "retryTime(millis)=" + connectionTimeout +
+        "retrySleep(millis)=" + retrySleep);
+  }
+
+  @Override
+  public void serviceStart() {
+    requestManagerFuture = requestManagerExecutor.submit(requestManager);
+    Futures.addCallback(requestManagerFuture, new FutureCallback<Void>() {
+      @Override
+      public void onSuccess(Void result) {
+        LOG.info("RequestManager shutdown");
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.warn("RequestManager shutdown with error", t);
+      }
+    });
+  }
+
+  @Override
+  public void serviceStop() {
+    if (requestManagerFuture != null) {
+      requestManager.shutdown();
+      requestManagerFuture.cancel(true);
+    }
+    requestManagerExecutor.shutdown();
+  }
+
+  public void sendSubmitWork(SubmitWorkRequestProto request, String host, int 
port,
+                         final ExecuteRequestCallback<SubmitWorkResponseProto> 
callback) {
+    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+    requestManager.queueRequest(new SubmitWorkCallable(nodeId, request, 
callback));
+  }
+
+  public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto 
request, final String host,
+                                    final int port,
+                                    final 
ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) {
+    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+    requestManager.queueRequest(
+        new SendSourceStateUpdateCallable(nodeId, request, callback));
+  }
+
+  public void sendQueryComplete(final QueryCompleteRequestProto request, final 
String host,
+                                final int port,
+                                final 
ExecuteRequestCallback<QueryCompleteResponseProto> callback) {
+    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+    requestManager.queueRequest(new SendQueryCompleteCallable(nodeId, request, 
callback));
+  }
+
+  public void sendTerminateFragment(final TerminateFragmentRequestProto 
request, final String host,
+                                    final int port,
+                                    final 
ExecuteRequestCallback<TerminateFragmentResponseProto> callback) {
+    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+    requestManager.queueRequest(new SendTerminateFragmentCallable(nodeId, 
request, callback));
+  }
+
+  @VisibleForTesting
+  static class RequestManager implements Callable<Void> {
+
+    private final Lock lock = new ReentrantLock();
+    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+    private final Condition queueCondition = lock.newCondition();
+    private final AtomicBoolean shouldRun = new AtomicBoolean(false);
+
+    private final int maxConcurrentRequestsPerNode = 1;
+    private final ListeningExecutorService executor;
+
+
+    // Tracks new additions via add, while the loop is processing existing 
ones.
+    private final LinkedList<CallableRequest> newRequestList = new 
LinkedList<>();
+
+    // Tracks existing requests which are cycled through.
+    private final LinkedList<CallableRequest> pendingRequests = new 
LinkedList<>();
+
+    // Tracks requests executing per node
+    private final ConcurrentMap<LlapNodeId, AtomicInteger> runningRequests = 
new ConcurrentHashMap<>();
+
+    // Tracks completed requests pre node
+    private final LinkedList<LlapNodeId> completedNodes = new LinkedList<>();
+
+    public RequestManager(int numThreads) {
+      ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads,
+          new ThreadFactoryBuilder().setNameFormat("TaskCommunicator 
#%2d").build());
+      executor = MoreExecutors.listeningDecorator(localExecutor);
+    }
+
+
+    @VisibleForTesting
+    Set<LlapNodeId> currentLoopDisabledNodes = new HashSet<>();
+    @VisibleForTesting
+    List<CallableRequest> currentLoopSkippedRequests = new LinkedList<>();
+    @Override
+    public Void call() {
+      // Caches disabled nodes for quicker lookups and ensures a request on a 
node which was skipped
+      // does not go out of order.
+      while (!isShutdown.get()) {
+        lock.lock();
+        try {
+          while (!shouldRun.get()) {
+            queueCondition.await();
+            break; // Break out and try executing.
+          }
+          boolean shouldBreak = process();
+          if (shouldBreak) {
+            break;
+          }
+        } catch (InterruptedException e) {
+          if (isShutdown.get()) {
+            break;
+          } else {
+            LOG.warn("RunLoop interrupted without being shutdown first");
+            throw new RuntimeException(e);
+          }
+        } finally {
+          lock.unlock();
+        }
+      }
+      LOG.info("CallScheduler loop exiting");
+      return null;
+    }
+
+    /* Add a new request to be executed */
+    public void queueRequest(CallableRequest request) {
+      synchronized (newRequestList) {
+        newRequestList.add(request);
+        shouldRun.set(true);
+      }
+      notifyRunLoop();
+    }
+
+    /* Indicates a request has completed on a node */
+    public void requestFinished(LlapNodeId nodeId) {
+      synchronized (completedNodes) {
+        completedNodes.add(nodeId);
+        shouldRun.set(true);
+      }
+      notifyRunLoop();
+    }
+
+    public void shutdown() {
+      if (!isShutdown.getAndSet(true)) {
+        executor.shutdownNow();
+        notifyRunLoop();
+      }
+    }
+
+    @VisibleForTesting
+    void submitToExecutor(CallableRequest request, LlapNodeId nodeId) {
+      ListenableFuture<SourceStateUpdatedResponseProto> future =
+          executor.submit(request);
+      Futures.addCallback(future, new ResponseCallback(request.getCallback(), 
nodeId, this));
+    }
+
+    @VisibleForTesting
+    boolean process() {
+      if (isShutdown.get()) {
+        return true;
+      }
+      currentLoopDisabledNodes.clear();
+      currentLoopSkippedRequests.clear();
+
+      // Set to false to block the next loop. This must be called before 
draining the lists,
+      // otherwise an add/completion after draining the lists but before 
setting it to false,
+      // will not trigger a run. May cause one unnecessary run if an add comes 
in before drain.
+      // drain list. add request (setTrue). setFalse needs to be avoided.
+      shouldRun.compareAndSet(true, false);
+      // Drain any calls which may have come in during the last execution of 
the loop.
+      drainNewRequestList();  // Locks newRequestList
+      drainCompletedNodes();  // Locks completedNodes
+
+
+      Iterator<CallableRequest> iterator = pendingRequests.iterator();
+      while (iterator.hasNext()) {
+        CallableRequest request = iterator.next();
+        iterator.remove();
+        LlapNodeId nodeId = request.getNodeId();
+        if (canRunForNode(nodeId, currentLoopDisabledNodes)) {
+          submitToExecutor(request, nodeId);
+        } else {
+          currentLoopDisabledNodes.add(nodeId);
+          currentLoopSkippedRequests.add(request);
+        }
+      }
+      // Tried scheduling everything that could be scheduled in this loop.
+      pendingRequests.addAll(0, currentLoopSkippedRequests);
+      return false;
+    }
+
+    private void drainNewRequestList() {
+      synchronized (newRequestList) {
+        if (!newRequestList.isEmpty()) {
+          pendingRequests.addAll(newRequestList);
+          newRequestList.clear();
+        }
+      }
+    }
+
+    private void drainCompletedNodes() {
+      synchronized (completedNodes) {
+        if (!completedNodes.isEmpty()) {
+          for (LlapNodeId nodeId : completedNodes) {
+            runningRequests.get(nodeId).decrementAndGet();
+          }
+        }
+        completedNodes.clear();
+      }
+    }
+
+    private boolean canRunForNode(LlapNodeId nodeId, Set<LlapNodeId> 
currentRunDisabledNodes) {
+      if (currentRunDisabledNodes.contains(nodeId)) {
+        return false;
+      } else {
+        AtomicInteger count = runningRequests.get(nodeId);
+        if (count == null) {
+          count = new AtomicInteger(0);
+          AtomicInteger old = runningRequests.putIfAbsent(nodeId, count);
+          count = old != null ? old : count;
+        }
+        if (count.incrementAndGet() <= maxConcurrentRequestsPerNode) {
+          return true;
+        } else {
+          count.decrementAndGet();
+          return false;
+        }
+      }
+    }
+
+    private void notifyRunLoop() {
+      lock.lock();
+      try {
+        queueCondition.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+
+  private static final class ResponseCallback<TYPE extends Message>
+      implements FutureCallback<TYPE> {
+
+    private final ExecuteRequestCallback<TYPE> callback;
+    private final LlapNodeId nodeId;
+    private final RequestManager requestManager;
+
+    public ResponseCallback(ExecuteRequestCallback<TYPE> callback, LlapNodeId 
nodeId,
+                            RequestManager requestManager) {
+      this.callback = callback;
+      this.nodeId = nodeId;
+      this.requestManager = requestManager;
+    }
+
+    @Override
+    public void onSuccess(TYPE result) {
+      try {
+        callback.setResponse(result);
+      } finally {
+        requestManager.requestFinished(nodeId);
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      try {
+        callback.indicateError(t);
+      } finally {
+        requestManager.requestFinished(nodeId);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  static abstract class CallableRequest<REQUEST extends Message, RESPONSE 
extends Message>
+      implements Callable {
+
+    final LlapNodeId nodeId;
+    final ExecuteRequestCallback<RESPONSE> callback;
+    final REQUEST request;
+
+
+    protected CallableRequest(LlapNodeId nodeId, REQUEST request, 
ExecuteRequestCallback<RESPONSE> callback) {
+      this.nodeId = nodeId;
+      this.request = request;
+      this.callback = callback;
+    }
+
+    public LlapNodeId getNodeId() {
+      return nodeId;
+    }
+
+    public ExecuteRequestCallback<RESPONSE> getCallback() {
+      return callback;
+    }
+
+    public abstract RESPONSE call() throws Exception;
+  }
+
+  private class SubmitWorkCallable extends 
CallableRequest<SubmitWorkRequestProto, SubmitWorkResponseProto> {
+
+    protected SubmitWorkCallable(LlapNodeId nodeId,
+                          SubmitWorkRequestProto submitWorkRequestProto,
+                                 
ExecuteRequestCallback<SubmitWorkResponseProto> callback) {
+      super(nodeId, submitWorkRequestProto, callback);
+    }
+
+    @Override
+    public SubmitWorkResponseProto call() throws Exception {
+      return getProxy(nodeId).submitWork(null, request);
+    }
+  }
+
+  private class SendSourceStateUpdateCallable
+      extends CallableRequest<SourceStateUpdatedRequestProto, 
SourceStateUpdatedResponseProto> {
+
+    public SendSourceStateUpdateCallable(LlapNodeId nodeId,
+                                         SourceStateUpdatedRequestProto 
request,
+                                         
ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) {
+      super(nodeId, request, callback);
+    }
+
+    @Override
+    public SourceStateUpdatedResponseProto call() throws Exception {
+      return getProxy(nodeId).sourceStateUpdated(null, request);
+    }
+  }
+
+  private class SendQueryCompleteCallable
+      extends CallableRequest<QueryCompleteRequestProto, 
QueryCompleteResponseProto> {
+
+    protected SendQueryCompleteCallable(LlapNodeId nodeId,
+                                        QueryCompleteRequestProto 
queryCompleteRequestProto,
+                                        
ExecuteRequestCallback<QueryCompleteResponseProto> callback) {
+      super(nodeId, queryCompleteRequestProto, callback);
+    }
+
+    @Override
+    public QueryCompleteResponseProto call() throws Exception {
+      return getProxy(nodeId).queryComplete(null, request);
+    }
+  }
+
+  private class SendTerminateFragmentCallable
+      extends CallableRequest<TerminateFragmentRequestProto, 
TerminateFragmentResponseProto> {
+
+    protected SendTerminateFragmentCallable(LlapNodeId nodeId,
+                                            TerminateFragmentRequestProto 
terminateFragmentRequestProto,
+                                            
ExecuteRequestCallback<TerminateFragmentResponseProto> callback) {
+      super(nodeId, terminateFragmentRequestProto, callback);
+    }
+
+    @Override
+    public TerminateFragmentResponseProto call() throws Exception {
+      return getProxy(nodeId).terminateFragment(null, request);
+    }
+  }
+
+  public interface ExecuteRequestCallback<T extends Message> {
+    void setResponse(T response);
+    void indicateError(Throwable t);
+  }
+
+  private LlapDaemonProtocolBlockingPB getProxy(final LlapNodeId nodeId) {
+    String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort());
+
+    LlapDaemonProtocolBlockingPB proxy = hostProxies.get(hostId);
+    if (proxy == null) {
+      if (llapToken == null) {
+        proxy = new LlapDaemonProtocolClientImpl(getConfig(), 
nodeId.getHostname(),
+            nodeId.getPort(), retryPolicy, socketFactory);
+      } else {
+        UserGroupInformation ugi;
+        try {
+          ugi = UserGroupInformation.getCurrentUser();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        Token<LlapTokenIdentifier> nodeToken = new 
Token<LlapTokenIdentifier>(llapToken);
+        SecurityUtil.setTokenService(nodeToken, 
NetUtils.createSocketAddrForHost(
+            nodeId.getHostname(), nodeId.getPort()));
+        ugi.addToken(nodeToken);
+        proxy = ugi.doAs(new PrivilegedAction<LlapDaemonProtocolBlockingPB>() {
+          @Override
+          public LlapDaemonProtocolBlockingPB run() {
+           return new LlapDaemonProtocolClientImpl(getConfig(), 
nodeId.getHostname(),
+               nodeId.getPort(), retryPolicy, socketFactory);
+          }
+        });
+      }
+
+      LlapDaemonProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, 
proxy);
+      if (proxyOld != null) {
+        // TODO Shutdown the new proxy.
+        proxy = proxyOld;
+      }
+    }
+    return proxy;
+  }
+
+  private String getHostIdentifier(String hostname, int port) {
+    return hostname + ":" + port;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0cbf45cf/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 9d47940..5c370ee 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -22,8 +22,10 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
@@ -79,6 +81,9 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LlapTaskCommunicator.class);
 
+  private static final boolean isInfoEnabled = LOG.isInfoEnabled();
+  private static final boolean isDebugEnabed = LOG.isDebugEnabled();
+
   private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
   private final ConcurrentMap<String, ByteBuffer> credentialMap;
 
@@ -88,11 +93,17 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
   private final SourceStateTracker sourceStateTracker;
   private final Set<LlapNodeId> nodesForQuery = new HashSet<>();
 
-  private TaskCommunicator communicator;
+  private LlapDaemonProtocolClientProxy communicator;
   private long deleteDelayOnDagComplete;
   private final LlapTaskUmbilicalProtocol umbilical;
   private final Token<LlapTokenIdentifier> token;
 
+  // These two structures track the list of known nodes, and the list of nodes 
which are sending in keep-alive heartbeats.
+  // Primarily for debugging purposes a.t.m, since there's some unexplained 
TASK_TIMEOUTS which are currently being observed.
+  private final ConcurrentMap<LlapNodeId, Long> knownNodeMap = new 
ConcurrentHashMap<>();
+  private final ConcurrentMap<LlapNodeId, PingingNodeInfo> pingedNodeMap = new 
ConcurrentHashMap<>();
+
+
   private volatile String currentDagName;
 
   public LlapTaskCommunicator(
@@ -131,7 +142,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     super.initialize();
     Configuration conf = getConf();
     int numThreads = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS);
-    this.communicator = new TaskCommunicator(numThreads, conf, token);
+    this.communicator = new LlapDaemonProtocolClientProxy(numThreads, conf, 
token);
     this.deleteDelayOnDagComplete = HiveConf.getTimeVar(
         conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
     LOG.info("Running LlapTaskCommunicator with "
@@ -235,6 +246,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     }
 
     LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+    registerKnownNode(nodeId);
     entityTracker.registerTaskAttempt(containerId, 
taskSpec.getTaskAttemptID(), host, port);
     nodesForQuery.add(nodeId);
 
@@ -254,7 +266,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     getContext()
         .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
     communicator.sendSubmitWork(requestProto, host, port,
-        new TaskCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() 
{
+        new 
LlapDaemonProtocolClientProxy.ExecuteRequestCallback<SubmitWorkResponseProto>() 
{
           @Override
           public void setResponse(SubmitWorkResponseProto response) {
             if (response.hasSubmissionState()) {
@@ -333,14 +345,14 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     LOG.info(
         "DBG: Attempting to send terminateRequest for fragment {} due to 
internal preemption invoked by {}",
         taskAttemptId.toString(), invokedByContainerEnd ? "containerEnd" : 
"taskEnd");
-    LlapNodeId nodeId = entityTracker.getNodeIfForTaskAttempt(taskAttemptId);
+    LlapNodeId nodeId = entityTracker.getNodeIdForTaskAttempt(taskAttemptId);
     // NodeId can be null if the task gets unregistered due to failure / being 
killed by the daemon itself
     if (nodeId != null) {
       TerminateFragmentRequestProto request =
           TerminateFragmentRequestProto.newBuilder().setDagName(currentDagName)
               .setFragmentIdentifierString(taskAttemptId.toString()).build();
       communicator.sendTerminateFragment(request, nodeId.getHostname(), 
nodeId.getPort(),
-          new 
TaskCommunicator.ExecuteRequestCallback<TerminateFragmentResponseProto>() {
+          new 
LlapDaemonProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>()
 {
             @Override
             public void setResponse(TerminateFragmentResponseProto response) {
             }
@@ -365,7 +377,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     for (final LlapNodeId llapNodeId : nodesForQuery) {
       LOG.info("Sending dagComplete message for {}, to {}", dagName, 
llapNodeId);
       communicator.sendQueryComplete(request, llapNodeId.getHostname(), 
llapNodeId.getPort(),
-          new 
TaskCommunicator.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>()
 {
+          new 
LlapDaemonProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>()
 {
             @Override
             public void 
setResponse(LlapDaemonProtocolProtos.QueryCompleteResponseProto response) {
             }
@@ -391,7 +403,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
   public void sendStateUpdate(final String host, final int port,
                               final SourceStateUpdatedRequestProto request) {
     communicator.sendSourceStateUpdate(request, host, port,
-        new 
TaskCommunicator.ExecuteRequestCallback<SourceStateUpdatedResponseProto>() {
+        new 
LlapDaemonProtocolClientProxy.ExecuteRequestCallback<SourceStateUpdatedResponseProto>()
 {
           @Override
           public void setResponse(SourceStateUpdatedResponseProto response) {
           }
@@ -409,6 +421,79 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
   }
 
 
+  private static class PingingNodeInfo {
+    final AtomicLong logTimestamp;
+    final AtomicInteger pingCount;
+
+    PingingNodeInfo(long currentTs) {
+      logTimestamp = new AtomicLong(currentTs);
+      pingCount = new AtomicInteger(1);
+    }
+  }
+
+  public void registerKnownNode(LlapNodeId nodeId) {
+    Long old = knownNodeMap.putIfAbsent(nodeId,
+        TimeUnit.MILLISECONDS.convert(System.nanoTime(), 
TimeUnit.NANOSECONDS));
+    if (old == null) {
+      if (isInfoEnabled) {
+        LOG.info("Added new known node: {}", nodeId);
+      }
+    }
+  }
+
+  public void registerPingingNode(LlapNodeId nodeId) {
+    long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), 
TimeUnit.NANOSECONDS);
+    PingingNodeInfo ni = new PingingNodeInfo(currentTs);
+    PingingNodeInfo old = pingedNodeMap.put(nodeId, ni);
+    if (old == null) {
+      if (isInfoEnabled) {
+        LOG.info("Added new pinging node: [{}]", nodeId);
+      }
+    } else {
+      old.pingCount.incrementAndGet();
+    }
+    // The node should always be known by this point. Log occasionally if it 
is not known.
+    if (!knownNodeMap.containsKey(nodeId)) {
+      if (old == null) {
+        // First time this is seen. Log it.
+        LOG.warn("Received ping from unknownNode: [{}], count={}", nodeId, 
ni.pingCount.get());
+      } else {
+        // Pinged before. Log only occasionally.
+        if (currentTs > old.logTimestamp.get() + 5000l) { // 5 seconds 
elapsed. Log again.
+          LOG.warn("Received ping from unknownNode: [{}], count={}", nodeId, 
old.pingCount.get());
+          old.logTimestamp.set(currentTs);
+        }
+      }
+
+    }
+  }
+
+
+  private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0);
+
+  void nodePinged(String hostname, int port) {
+    LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
+    registerPingingNode(nodeId);
+    BiMap<ContainerId, TezTaskAttemptID> biMap =
+        entityTracker.getContainerAttemptMapForNode(nodeId);
+    if (biMap != null) {
+      synchronized (biMap) {
+        for (Map.Entry<ContainerId, TezTaskAttemptID> entry : 
biMap.entrySet()) {
+          getContext().taskAlive(entry.getValue());
+          getContext().containerAlive(entry.getKey());
+        }
+      }
+    } else {
+      long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), 
TimeUnit.NANOSECONDS);
+      if (currentTs > nodeNotFoundLogTime.get() + 5000l) {
+        LOG.warn("Received ping from node without any registered tasks or 
containers: " + hostname +
+            ":" + port +
+            ". Could be caused by pre-emption by the AM," +
+            " or a mismatched hostname. Enable debug logging for mismatched 
host names");
+        nodeNotFoundLogTime.set(currentTs);
+      }
+    }
+  }
 
   private void resetCurrentDag(String newDagName) {
     // Working on the assumption that a single DAG runs at a time per AM.
@@ -454,6 +539,8 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     return ByteBuffer.wrap(containerTokens_dob.getData(), 0, 
containerTokens_dob.getLength());
   }
 
+
+
   protected class LlapTaskUmbilicalProtocolImpl implements 
LlapTaskUmbilicalProtocol {
 
     private final TezTaskUmbilicalProtocol tezUmbilical;
@@ -475,7 +562,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
 
     @Override
     public void nodeHeartbeat(Text hostname, int port) throws IOException {
-      entityTracker.nodePinged(hostname.toString(), port);
+      nodePinged(hostname.toString(), port);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Received heartbeat from [" + hostname + ":" + port +"]");
       }
@@ -502,10 +589,17 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     }
   }
 
-  private final class EntityTracker {
-    private final ConcurrentMap<TezTaskAttemptID, LlapNodeId> attemptToNodeMap 
= new ConcurrentHashMap<>();
-    private final ConcurrentMap<ContainerId, LlapNodeId> containerToNodeMap = 
new ConcurrentHashMap<>();
-    private final ConcurrentMap<LlapNodeId, BiMap<ContainerId, 
TezTaskAttemptID>> nodeMap = new ConcurrentHashMap<>();
+  /**
+   * Track the association between known containers and taskAttempts, along 
with the nodes they are assigned to.
+   */
+  @VisibleForTesting
+  static final class EntityTracker {
+    @VisibleForTesting
+    final ConcurrentMap<TezTaskAttemptID, LlapNodeId> attemptToNodeMap = new 
ConcurrentHashMap<>();
+    @VisibleForTesting
+    final ConcurrentMap<ContainerId, LlapNodeId> containerToNodeMap = new 
ConcurrentHashMap<>();
+    @VisibleForTesting
+    final ConcurrentMap<LlapNodeId, BiMap<ContainerId, TezTaskAttemptID>> 
nodeMap = new ConcurrentHashMap<>();
 
     void registerTaskAttempt(ContainerId containerId, TezTaskAttemptID 
taskAttemptId, String host, int port) {
       if (LOG.isDebugEnabled()) {
@@ -513,6 +607,10 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
       }
       LlapNodeId llapNodeId = LlapNodeId.getInstance(host, port);
       attemptToNodeMap.putIfAbsent(taskAttemptId, llapNodeId);
+
+      registerContainer(containerId, host, port);
+
+      // nodeMap registration.
       BiMap<ContainerId, TezTaskAttemptID> tmpMap = HashBiMap.create();
       BiMap<ContainerId, TezTaskAttemptID> old = 
nodeMap.putIfAbsent(llapNodeId, tmpMap);
       BiMap<ContainerId, TezTaskAttemptID> usedInstance;
@@ -538,10 +636,9 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
         synchronized(bMap) {
           matched = bMap.inverse().remove(attemptId);
         }
-      }
-      // Removing here. Registration into the map has to make sure to put
-      if (bMap.isEmpty()) {
-        nodeMap.remove(llapNodeId);
+        if (bMap.isEmpty()) {
+          nodeMap.remove(llapNodeId);
+        }
       }
 
       // Remove the container mapping
@@ -552,23 +649,29 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     }
 
     void registerContainer(ContainerId containerId, String hostname, int port) 
{
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Registering " + containerId + " for node: " + hostname + 
":" + port);
+      }
       containerToNodeMap.putIfAbsent(containerId, 
LlapNodeId.getInstance(hostname, port));
+      // nodeMap registration is not required, since there's no taskId 
association.
     }
 
     LlapNodeId getNodeIdForContainer(ContainerId containerId) {
       return containerToNodeMap.get(containerId);
     }
 
-    LlapNodeId getNodeIfForTaskAttempt(TezTaskAttemptID taskAttemptId) {
+    LlapNodeId getNodeIdForTaskAttempt(TezTaskAttemptID taskAttemptId) {
       return attemptToNodeMap.get(taskAttemptId);
     }
 
     ContainerId getContainerIdForAttempt(TezTaskAttemptID taskAttemptId) {
-      LlapNodeId llapNodeId = getNodeIfForTaskAttempt(taskAttemptId);
+      LlapNodeId llapNodeId = getNodeIdForTaskAttempt(taskAttemptId);
       if (llapNodeId != null) {
         BiMap<TezTaskAttemptID, ContainerId> bMap = 
nodeMap.get(llapNodeId).inverse();
         if (bMap != null) {
-          return bMap.get(taskAttemptId);
+          synchronized (bMap) {
+            return bMap.get(taskAttemptId);
+          }
         } else {
           return null;
         }
@@ -582,7 +685,9 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
       if (llapNodeId != null) {
         BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId);
         if (bMap != null) {
-          return bMap.get(containerId);
+          synchronized (bMap) {
+            return bMap.get(containerId);
+          }
         } else {
           return null;
         }
@@ -604,10 +709,9 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
         synchronized(bMap) {
           matched = bMap.remove(containerId);
         }
-      }
-      // Removing here. Registration into the map has to make sure to put
-      if (bMap.isEmpty()) {
-        nodeMap.remove(llapNodeId);
+        if (bMap.isEmpty()) {
+          nodeMap.remove(llapNodeId);
+        }
       }
 
       // Remove the container mapping
@@ -616,25 +720,20 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
       }
     }
 
-    private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0);
-    void nodePinged(String hostname, int port) {
-      LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
-      BiMap<ContainerId, TezTaskAttemptID> biMap = nodeMap.get(nodeId);
-      if (biMap != null) {
-        synchronized(biMap) {
-          for (Map.Entry<ContainerId, TezTaskAttemptID> entry : 
biMap.entrySet()) {
-            getContext().taskAlive(entry.getValue());
-            getContext().containerAlive(entry.getKey());
-          }
-        }
-      } else {
-        if (System.currentTimeMillis() > nodeNotFoundLogTime.get() + 5000l) {
-          LOG.warn("Received ping from unknown node: " + hostname + ":" + port 
+
-              ". Could be caused by pre-emption by the AM," +
-              " or a mismatched hostname. Enable debug logging for mismatched 
host names");
-          nodeNotFoundLogTime.set(System.currentTimeMillis());
-        }
-      }
+    /**
+     * Return a {@link BiMap} containing container->taskAttemptId mapping for 
the host specified.
+     * </p>
+     * <p/>
+     * This method return the internal structure used by the EntityTracker. 
Users must synchronize
+     * on the structure to ensure correct usage.
+     *
+     * @param llapNodeId
+     * @return
+     */
+    BiMap<ContainerId, TezTaskAttemptID> 
getContainerAttemptMapForNode(LlapNodeId llapNodeId) {
+      BiMap<ContainerId, TezTaskAttemptID> biMap = nodeMap.get(llapNodeId);
+      return biMap;
     }
+
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0cbf45cf/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
deleted file mode 100644
index f9ca677..0000000
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
+++ /dev/null
@@ -1,512 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.tezplugins;
-
-import javax.net.SocketFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.security.PrivilegedAction;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.Message;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.LlapNodeId;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
-import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
-import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.service.AbstractService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TaskCommunicator extends AbstractService {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(TaskCommunicator.class);
-
-  private final ConcurrentMap<String, LlapDaemonProtocolBlockingPB> 
hostProxies;
-
-  private final RequestManager requestManager;
-  private final RetryPolicy retryPolicy;
-  private final SocketFactory socketFactory;
-
-  private final ListeningExecutorService requestManagerExecutor;
-  private volatile ListenableFuture<Void> requestManagerFuture;
-  private final Token<LlapTokenIdentifier> llapToken;
-
-  public TaskCommunicator(
-      int numThreads, Configuration conf, Token<LlapTokenIdentifier> 
llapToken) {
-    super(TaskCommunicator.class.getSimpleName());
-    this.hostProxies = new ConcurrentHashMap<>();
-    this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
-    this.llapToken = llapToken;
-
-    long connectionTimeout = HiveConf.getTimeVar(conf,
-        ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
-    long retrySleep = HiveConf.getTimeVar(conf,
-        ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
-        TimeUnit.MILLISECONDS);
-    this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
-        connectionTimeout, retrySleep, TimeUnit.MILLISECONDS);
-
-    this.requestManager = new RequestManager(numThreads);
-    ExecutorService localExecutor = Executors.newFixedThreadPool(1,
-        new 
ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build());
-    this.requestManagerExecutor = 
MoreExecutors.listeningDecorator(localExecutor);
-
-    LOG.info("Setting up taskCommunicator with" +
-        "numThreads=" + numThreads +
-        "retryTime(millis)=" + connectionTimeout +
-        "retrySleep(millis)=" + retrySleep);
-  }
-
-  @Override
-  public void serviceStart() {
-    requestManagerFuture = requestManagerExecutor.submit(requestManager);
-    Futures.addCallback(requestManagerFuture, new FutureCallback<Void>() {
-      @Override
-      public void onSuccess(Void result) {
-        LOG.info("RequestManager shutdown");
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        LOG.warn("RequestManager shutdown with error", t);
-      }
-    });
-  }
-
-  @Override
-  public void serviceStop() {
-    if (requestManagerFuture != null) {
-      requestManager.shutdown();
-      requestManagerFuture.cancel(true);
-    }
-    requestManagerExecutor.shutdown();
-  }
-
-  public void sendSubmitWork(SubmitWorkRequestProto request, String host, int 
port,
-                         final ExecuteRequestCallback<SubmitWorkResponseProto> 
callback) {
-    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
-    requestManager.queueRequest(new SubmitWorkCallable(nodeId, request, 
callback));
-  }
-
-  public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto 
request, final String host,
-                                    final int port,
-                                    final 
ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) {
-    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
-    requestManager.queueRequest(
-        new SendSourceStateUpdateCallable(nodeId, request, callback));
-  }
-
-  public void sendQueryComplete(final QueryCompleteRequestProto request, final 
String host,
-                                final int port,
-                                final 
ExecuteRequestCallback<QueryCompleteResponseProto> callback) {
-    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
-    requestManager.queueRequest(new SendQueryCompleteCallable(nodeId, request, 
callback));
-  }
-
-  public void sendTerminateFragment(final TerminateFragmentRequestProto 
request, final String host,
-                                    final int port,
-                                    final 
ExecuteRequestCallback<TerminateFragmentResponseProto> callback) {
-    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
-    requestManager.queueRequest(new SendTerminateFragmentCallable(nodeId, 
request, callback));
-  }
-
-  @VisibleForTesting
-  static class RequestManager implements Callable<Void> {
-
-    private final Lock lock = new ReentrantLock();
-    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
-    private final Condition queueCondition = lock.newCondition();
-    private final AtomicBoolean shouldRun = new AtomicBoolean(false);
-
-    private final int maxConcurrentRequestsPerNode = 1;
-    private final ListeningExecutorService executor;
-
-
-    // Tracks new additions via add, while the loop is processing existing 
ones.
-    private final LinkedList<CallableRequest> newRequestList = new 
LinkedList<>();
-
-    // Tracks existing requests which are cycled through.
-    private final LinkedList<CallableRequest> pendingRequests = new 
LinkedList<>();
-
-    // Tracks requests executing per node
-    private final ConcurrentMap<LlapNodeId, AtomicInteger> runningRequests = 
new ConcurrentHashMap<>();
-
-    // Tracks completed requests pre node
-    private final LinkedList<LlapNodeId> completedNodes = new LinkedList<>();
-
-    public RequestManager(int numThreads) {
-      ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads,
-          new ThreadFactoryBuilder().setNameFormat("TaskCommunicator 
#%2d").build());
-      executor = MoreExecutors.listeningDecorator(localExecutor);
-    }
-
-
-    @VisibleForTesting
-    Set<LlapNodeId> currentLoopDisabledNodes = new HashSet<>();
-    @VisibleForTesting
-    List<CallableRequest> currentLoopSkippedRequests = new LinkedList<>();
-    @Override
-    public Void call() {
-      // Caches disabled nodes for quicker lookups and ensures a request on a 
node which was skipped
-      // does not go out of order.
-      while (!isShutdown.get()) {
-        lock.lock();
-        try {
-          while (!shouldRun.get()) {
-            queueCondition.await();
-            break; // Break out and try executing.
-          }
-          boolean shouldBreak = process();
-          if (shouldBreak) {
-            break;
-          }
-        } catch (InterruptedException e) {
-          if (isShutdown.get()) {
-            break;
-          } else {
-            LOG.warn("RunLoop interrupted without being shutdown first");
-            throw new RuntimeException(e);
-          }
-        } finally {
-          lock.unlock();
-        }
-      }
-      LOG.info("CallScheduler loop exiting");
-      return null;
-    }
-
-    /* Add a new request to be executed */
-    public void queueRequest(CallableRequest request) {
-      synchronized (newRequestList) {
-        newRequestList.add(request);
-        shouldRun.set(true);
-      }
-      notifyRunLoop();
-    }
-
-    /* Indicates a request has completed on a node */
-    public void requestFinished(LlapNodeId nodeId) {
-      synchronized (completedNodes) {
-        completedNodes.add(nodeId);
-        shouldRun.set(true);
-      }
-      notifyRunLoop();
-    }
-
-    public void shutdown() {
-      if (!isShutdown.getAndSet(true)) {
-        executor.shutdownNow();
-        notifyRunLoop();
-      }
-    }
-
-    @VisibleForTesting
-    void submitToExecutor(CallableRequest request, LlapNodeId nodeId) {
-      ListenableFuture<SourceStateUpdatedResponseProto> future =
-          executor.submit(request);
-      Futures.addCallback(future, new ResponseCallback(request.getCallback(), 
nodeId, this));
-    }
-
-    @VisibleForTesting
-    boolean process() {
-      if (isShutdown.get()) {
-        return true;
-      }
-      currentLoopDisabledNodes.clear();
-      currentLoopSkippedRequests.clear();
-
-      // Set to false to block the next loop. This must be called before 
draining the lists,
-      // otherwise an add/completion after draining the lists but before 
setting it to false,
-      // will not trigger a run. May cause one unnecessary run if an add comes 
in before drain.
-      // drain list. add request (setTrue). setFalse needs to be avoided.
-      shouldRun.compareAndSet(true, false);
-      // Drain any calls which may have come in during the last execution of 
the loop.
-      drainNewRequestList();  // Locks newRequestList
-      drainCompletedNodes();  // Locks completedNodes
-
-
-      Iterator<CallableRequest> iterator = pendingRequests.iterator();
-      while (iterator.hasNext()) {
-        CallableRequest request = iterator.next();
-        iterator.remove();
-        LlapNodeId nodeId = request.getNodeId();
-        if (canRunForNode(nodeId, currentLoopDisabledNodes)) {
-          submitToExecutor(request, nodeId);
-        } else {
-          currentLoopDisabledNodes.add(nodeId);
-          currentLoopSkippedRequests.add(request);
-        }
-      }
-      // Tried scheduling everything that could be scheduled in this loop.
-      pendingRequests.addAll(0, currentLoopSkippedRequests);
-      return false;
-    }
-
-    private void drainNewRequestList() {
-      synchronized (newRequestList) {
-        if (!newRequestList.isEmpty()) {
-          pendingRequests.addAll(newRequestList);
-          newRequestList.clear();
-        }
-      }
-    }
-
-    private void drainCompletedNodes() {
-      synchronized (completedNodes) {
-        if (!completedNodes.isEmpty()) {
-          for (LlapNodeId nodeId : completedNodes) {
-            runningRequests.get(nodeId).decrementAndGet();
-          }
-        }
-        completedNodes.clear();
-      }
-    }
-
-    private boolean canRunForNode(LlapNodeId nodeId, Set<LlapNodeId> 
currentRunDisabledNodes) {
-      if (currentRunDisabledNodes.contains(nodeId)) {
-        return false;
-      } else {
-        AtomicInteger count = runningRequests.get(nodeId);
-        if (count == null) {
-          count = new AtomicInteger(0);
-          AtomicInteger old = runningRequests.putIfAbsent(nodeId, count);
-          count = old != null ? old : count;
-        }
-        if (count.incrementAndGet() <= maxConcurrentRequestsPerNode) {
-          return true;
-        } else {
-          count.decrementAndGet();
-          return false;
-        }
-      }
-    }
-
-    private void notifyRunLoop() {
-      lock.lock();
-      try {
-        queueCondition.signal();
-      } finally {
-        lock.unlock();
-      }
-    }
-  }
-
-
-  private static final class ResponseCallback<TYPE extends Message>
-      implements FutureCallback<TYPE> {
-
-    private final ExecuteRequestCallback<TYPE> callback;
-    private final LlapNodeId nodeId;
-    private final RequestManager requestManager;
-
-    public ResponseCallback(ExecuteRequestCallback<TYPE> callback, LlapNodeId 
nodeId,
-                            RequestManager requestManager) {
-      this.callback = callback;
-      this.nodeId = nodeId;
-      this.requestManager = requestManager;
-    }
-
-    @Override
-    public void onSuccess(TYPE result) {
-      try {
-        callback.setResponse(result);
-      } finally {
-        requestManager.requestFinished(nodeId);
-      }
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-      try {
-        callback.indicateError(t);
-      } finally {
-        requestManager.requestFinished(nodeId);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  static abstract class CallableRequest<REQUEST extends Message, RESPONSE 
extends Message>
-      implements Callable {
-
-    final LlapNodeId nodeId;
-    final ExecuteRequestCallback<RESPONSE> callback;
-    final REQUEST request;
-
-
-    protected CallableRequest(LlapNodeId nodeId, REQUEST request, 
ExecuteRequestCallback<RESPONSE> callback) {
-      this.nodeId = nodeId;
-      this.request = request;
-      this.callback = callback;
-    }
-
-    public LlapNodeId getNodeId() {
-      return nodeId;
-    }
-
-    public ExecuteRequestCallback<RESPONSE> getCallback() {
-      return callback;
-    }
-
-    public abstract RESPONSE call() throws Exception;
-  }
-
-  private class SubmitWorkCallable extends 
CallableRequest<SubmitWorkRequestProto, SubmitWorkResponseProto> {
-
-    protected SubmitWorkCallable(LlapNodeId nodeId,
-                          SubmitWorkRequestProto submitWorkRequestProto,
-                                 
ExecuteRequestCallback<SubmitWorkResponseProto> callback) {
-      super(nodeId, submitWorkRequestProto, callback);
-    }
-
-    @Override
-    public SubmitWorkResponseProto call() throws Exception {
-      return getProxy(nodeId).submitWork(null, request);
-    }
-  }
-
-  private class SendSourceStateUpdateCallable
-      extends CallableRequest<SourceStateUpdatedRequestProto, 
SourceStateUpdatedResponseProto> {
-
-    public SendSourceStateUpdateCallable(LlapNodeId nodeId,
-                                         SourceStateUpdatedRequestProto 
request,
-                                         
ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) {
-      super(nodeId, request, callback);
-    }
-
-    @Override
-    public SourceStateUpdatedResponseProto call() throws Exception {
-      return getProxy(nodeId).sourceStateUpdated(null, request);
-    }
-  }
-
-  private class SendQueryCompleteCallable
-      extends CallableRequest<QueryCompleteRequestProto, 
QueryCompleteResponseProto> {
-
-    protected SendQueryCompleteCallable(LlapNodeId nodeId,
-                                        QueryCompleteRequestProto 
queryCompleteRequestProto,
-                                        
ExecuteRequestCallback<QueryCompleteResponseProto> callback) {
-      super(nodeId, queryCompleteRequestProto, callback);
-    }
-
-    @Override
-    public QueryCompleteResponseProto call() throws Exception {
-      return getProxy(nodeId).queryComplete(null, request);
-    }
-  }
-
-  private class SendTerminateFragmentCallable
-      extends CallableRequest<TerminateFragmentRequestProto, 
TerminateFragmentResponseProto> {
-
-    protected SendTerminateFragmentCallable(LlapNodeId nodeId,
-                                            TerminateFragmentRequestProto 
terminateFragmentRequestProto,
-                                            
ExecuteRequestCallback<TerminateFragmentResponseProto> callback) {
-      super(nodeId, terminateFragmentRequestProto, callback);
-    }
-
-    @Override
-    public TerminateFragmentResponseProto call() throws Exception {
-      return getProxy(nodeId).terminateFragment(null, request);
-    }
-  }
-
-  public interface ExecuteRequestCallback<T extends Message> {
-    void setResponse(T response);
-    void indicateError(Throwable t);
-  }
-
-  private LlapDaemonProtocolBlockingPB getProxy(final LlapNodeId nodeId) {
-    String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort());
-
-    LlapDaemonProtocolBlockingPB proxy = hostProxies.get(hostId);
-    if (proxy == null) {
-      if (llapToken == null) {
-        proxy = new LlapDaemonProtocolClientImpl(getConfig(), 
nodeId.getHostname(),
-            nodeId.getPort(), retryPolicy, socketFactory);
-      } else {
-        UserGroupInformation ugi;
-        try {
-          ugi = UserGroupInformation.getCurrentUser();
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-        Token<LlapTokenIdentifier> nodeToken = new 
Token<LlapTokenIdentifier>(llapToken);
-        SecurityUtil.setTokenService(nodeToken, 
NetUtils.createSocketAddrForHost(
-            nodeId.getHostname(), nodeId.getPort()));
-        ugi.addToken(nodeToken);
-        proxy = ugi.doAs(new PrivilegedAction<LlapDaemonProtocolBlockingPB>() {
-          @Override
-          public LlapDaemonProtocolBlockingPB run() {
-           return new LlapDaemonProtocolClientImpl(getConfig(), 
nodeId.getHostname(),
-               nodeId.getPort(), retryPolicy, socketFactory);
-          }
-        });
-      }
-
-      LlapDaemonProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, 
proxy);
-      if (proxyOld != null) {
-        // TODO Shutdown the new proxy.
-        proxy = proxyOld;
-      }
-    }
-    return proxy;
-  }
-
-  private String getHostIdentifier(String hostname, int port) {
-    return hostname + ":" + port;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0cbf45cf/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java
new file mode 100644
index 0000000..a6af8c2
--- /dev/null
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.protobuf.Message;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.junit.Test;
+
+public class TestLlapDaemonProtocolClientProxy {
+
+  @Test (timeout = 5000)
+  public void testMultipleNodes() {
+    RequestManagerForTest requestManager = new RequestManagerForTest(1);
+
+    LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
+    LlapNodeId nodeId2 = LlapNodeId.getInstance("host2", 1025);
+
+    Message mockMessage = mock(Message.class);
+    LlapDaemonProtocolClientProxy.ExecuteRequestCallback 
mockExecuteRequestCallback = mock(
+        LlapDaemonProtocolClientProxy.ExecuteRequestCallback.class);
+
+    // Request two messages
+    requestManager.queueRequest(
+        new CallableRequestForTest(nodeId1, mockMessage, 
mockExecuteRequestCallback));
+    requestManager.queueRequest(
+        new CallableRequestForTest(nodeId2, mockMessage, 
mockExecuteRequestCallback));
+
+    // Should go through in a single process call
+    requestManager.process();
+    assertEquals(2, requestManager.numSubmissionsCounters);
+    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
+    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId2));
+    assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+    assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId2).getValue().intValue());
+    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
+    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
+    assertEquals(0, requestManager.currentLoopDisabledNodes.size());
+  }
+
+  @Test(timeout = 5000)
+  public void testSingleInvocationPerNode() {
+    RequestManagerForTest requestManager = new RequestManagerForTest(1);
+
+    LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
+
+    Message mockMessage = mock(Message.class);
+    LlapDaemonProtocolClientProxy.ExecuteRequestCallback 
mockExecuteRequestCallback = mock(
+        LlapDaemonProtocolClientProxy.ExecuteRequestCallback.class);
+
+    // First request for host.
+    requestManager.queueRequest(
+        new CallableRequestForTest(nodeId1, mockMessage, 
mockExecuteRequestCallback));
+    requestManager.process();
+    assertEquals(1, requestManager.numSubmissionsCounters);
+    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
+    assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
+
+    // Second request for host. Single invocation since the last has not 
completed.
+    requestManager.queueRequest(
+        new CallableRequestForTest(nodeId1, mockMessage, 
mockExecuteRequestCallback));
+    requestManager.process();
+    assertEquals(1, requestManager.numSubmissionsCounters);
+    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
+    assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+    assertEquals(1, requestManager.currentLoopSkippedRequests.size());
+    assertEquals(1, requestManager.currentLoopDisabledNodes.size());
+    assertTrue(requestManager.currentLoopDisabledNodes.contains(nodeId1));
+
+    // Complete first request. Second pending request should go through.
+    requestManager.requestFinished(nodeId1);
+    requestManager.process();
+    assertEquals(2, requestManager.numSubmissionsCounters);
+    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
+    assertEquals(2, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
+    assertEquals(0, requestManager.currentLoopDisabledNodes.size());
+    assertFalse(requestManager.currentLoopDisabledNodes.contains(nodeId1));
+  }
+
+
+  static class RequestManagerForTest extends 
LlapDaemonProtocolClientProxy.RequestManager {
+
+    int numSubmissionsCounters = 0;
+    private Map<LlapNodeId, MutableInt> numInvocationsPerNode = new 
HashMap<>();
+
+    public RequestManagerForTest(int numThreads) {
+      super(numThreads);
+    }
+
+    protected void 
submitToExecutor(LlapDaemonProtocolClientProxy.CallableRequest request, 
LlapNodeId nodeId) {
+      numSubmissionsCounters++;
+      MutableInt nodeCount = numInvocationsPerNode.get(nodeId);
+      if (nodeCount == null) {
+        nodeCount = new MutableInt(0);
+        numInvocationsPerNode.put(nodeId, nodeCount);
+      }
+      nodeCount.increment();
+    }
+
+    void reset() {
+      numSubmissionsCounters = 0;
+      numInvocationsPerNode.clear();
+    }
+
+  }
+
+  static class CallableRequestForTest extends 
LlapDaemonProtocolClientProxy.CallableRequest<Message, Message> {
+
+    protected CallableRequestForTest(LlapNodeId nodeId, Message message,
+                                     
LlapDaemonProtocolClientProxy.ExecuteRequestCallback<Message> callback) {
+      super(nodeId, message, callback);
+    }
+
+    @Override
+    public Message call() throws Exception {
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0cbf45cf/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
new file mode 100644
index 0000000..8f3d104
--- /dev/null
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.junit.Test;
+
+public class TestLlapTaskCommunicator {
+
+  @Test (timeout = 5000)
+  public void testEntityTracker1() {
+    LlapTaskCommunicator.EntityTracker entityTracker = new 
LlapTaskCommunicator.EntityTracker();
+
+    String host1 = "host1";
+    String host2 = "host2";
+    String host3 = "host3";
+    int port = 1451;
+
+
+    // Simple container registration and un-registration without any task 
attempt being involved.
+    ContainerId containerId101 = constructContainerId(101);
+    entityTracker.registerContainer(containerId101, host1, port);
+    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForContainer(containerId101));
+
+    entityTracker.unregisterContainer(containerId101);
+    
assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1,
 port)));
+    assertNull(entityTracker.getNodeIdForContainer(containerId101));
+    assertEquals(0, entityTracker.nodeMap.size());
+    assertEquals(0, entityTracker.attemptToNodeMap.size());
+    assertEquals(0, entityTracker.containerToNodeMap.size());
+
+
+    // Simple task registration and un-registration.
+    ContainerId containerId1 = constructContainerId(1);
+    TezTaskAttemptID taskAttemptId1 = constructTaskAttemptId(1);
+    entityTracker.registerTaskAttempt(containerId1, taskAttemptId1, host1, 
port);
+    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForContainer(containerId1));
+    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForTaskAttempt(taskAttemptId1));
+
+    entityTracker.unregisterTaskAttempt(taskAttemptId1);
+    
assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1,
 port)));
+    assertNull(entityTracker.getNodeIdForContainer(containerId1));
+    assertNull(entityTracker.getNodeIdForTaskAttempt(taskAttemptId1));
+    assertEquals(0, entityTracker.nodeMap.size());
+    assertEquals(0, entityTracker.attemptToNodeMap.size());
+    assertEquals(0, entityTracker.containerToNodeMap.size());
+
+    // Register taskAttempt, unregister container. TaskAttempt should also be 
unregistered
+    ContainerId containerId201 = constructContainerId(201);
+    TezTaskAttemptID taskAttemptId201 = constructTaskAttemptId(201);
+    entityTracker.registerTaskAttempt(containerId201, taskAttemptId201, host1, 
port);
+    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForContainer(containerId201));
+    assertEquals(LlapNodeId.getInstance(host1, port), 
entityTracker.getNodeIdForTaskAttempt(taskAttemptId201));
+
+    entityTracker.unregisterContainer(containerId201);
+    
assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1,
 port)));
+    assertNull(entityTracker.getNodeIdForContainer(containerId201));
+    assertNull(entityTracker.getNodeIdForTaskAttempt(taskAttemptId201));
+    assertEquals(0, entityTracker.nodeMap.size());
+    assertEquals(0, entityTracker.attemptToNodeMap.size());
+    assertEquals(0, entityTracker.containerToNodeMap.size());
+
+    entityTracker.unregisterTaskAttempt(taskAttemptId201); // No errors
+  }
+
+
+  private ContainerId constructContainerId(int id) {
+    ContainerId containerId = mock(ContainerId.class);
+    doReturn(id).when(containerId).getId();
+    doReturn((long)id).when(containerId).getContainerId();
+    return containerId;
+  }
+
+  private TezTaskAttemptID constructTaskAttemptId(int id) {
+    TezTaskAttemptID taskAttemptId = mock(TezTaskAttemptID.class);
+    doReturn(id).when(taskAttemptId).getId();
+    return taskAttemptId;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0cbf45cf/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestTaskCommunicator.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestTaskCommunicator.java
deleted file mode 100644
index 2aef4ed..0000000
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestTaskCommunicator.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.tezplugins;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.protobuf.Message;
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.hive.llap.LlapNodeId;
-import org.junit.Test;
-
-public class TestTaskCommunicator {
-
-  @Test (timeout = 5000)
-  public void testMultipleNodes() {
-    RequestManagerForTest requestManager = new RequestManagerForTest(1);
-
-    LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
-    LlapNodeId nodeId2 = LlapNodeId.getInstance("host2", 1025);
-
-    Message mockMessage = mock(Message.class);
-    TaskCommunicator.ExecuteRequestCallback mockExecuteRequestCallback = mock(
-        TaskCommunicator.ExecuteRequestCallback.class);
-
-    // Request two messages
-    requestManager.queueRequest(
-        new CallableRequestForTest(nodeId1, mockMessage, 
mockExecuteRequestCallback));
-    requestManager.queueRequest(
-        new CallableRequestForTest(nodeId2, mockMessage, 
mockExecuteRequestCallback));
-
-    // Should go through in a single process call
-    requestManager.process();
-    assertEquals(2, requestManager.numSubmissionsCounters);
-    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
-    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId2));
-    assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
-    assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId2).getValue().intValue());
-    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
-    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
-    assertEquals(0, requestManager.currentLoopDisabledNodes.size());
-  }
-
-  @Test(timeout = 5000)
-  public void testSingleInvocationPerNode() {
-    RequestManagerForTest requestManager = new RequestManagerForTest(1);
-
-    LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
-
-    Message mockMessage = mock(Message.class);
-    TaskCommunicator.ExecuteRequestCallback mockExecuteRequestCallback = mock(
-        TaskCommunicator.ExecuteRequestCallback.class);
-
-    // First request for host.
-    requestManager.queueRequest(
-        new CallableRequestForTest(nodeId1, mockMessage, 
mockExecuteRequestCallback));
-    requestManager.process();
-    assertEquals(1, requestManager.numSubmissionsCounters);
-    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
-    assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
-    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
-
-    // Second request for host. Single invocation since the last has not 
completed.
-    requestManager.queueRequest(
-        new CallableRequestForTest(nodeId1, mockMessage, 
mockExecuteRequestCallback));
-    requestManager.process();
-    assertEquals(1, requestManager.numSubmissionsCounters);
-    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
-    assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
-    assertEquals(1, requestManager.currentLoopSkippedRequests.size());
-    assertEquals(1, requestManager.currentLoopDisabledNodes.size());
-    assertTrue(requestManager.currentLoopDisabledNodes.contains(nodeId1));
-
-    // Complete first request. Second pending request should go through.
-    requestManager.requestFinished(nodeId1);
-    requestManager.process();
-    assertEquals(2, requestManager.numSubmissionsCounters);
-    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
-    assertEquals(2, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
-    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
-    assertEquals(0, requestManager.currentLoopDisabledNodes.size());
-    assertFalse(requestManager.currentLoopDisabledNodes.contains(nodeId1));
-  }
-
-
-  static class RequestManagerForTest extends TaskCommunicator.RequestManager {
-
-    int numSubmissionsCounters = 0;
-    private Map<LlapNodeId, MutableInt> numInvocationsPerNode = new 
HashMap<>();
-
-    public RequestManagerForTest(int numThreads) {
-      super(numThreads);
-    }
-
-    protected void submitToExecutor(TaskCommunicator.CallableRequest request, 
LlapNodeId nodeId) {
-      numSubmissionsCounters++;
-      MutableInt nodeCount = numInvocationsPerNode.get(nodeId);
-      if (nodeCount == null) {
-        nodeCount = new MutableInt(0);
-        numInvocationsPerNode.put(nodeId, nodeCount);
-      }
-      nodeCount.increment();
-    }
-
-    void reset() {
-      numSubmissionsCounters = 0;
-      numInvocationsPerNode.clear();
-    }
-
-  }
-
-  static class CallableRequestForTest extends 
TaskCommunicator.CallableRequest<Message, Message> {
-
-    protected CallableRequestForTest(LlapNodeId nodeId, Message message,
-                                     
TaskCommunicator.ExecuteRequestCallback<Message> callback) {
-      super(nodeId, message, callback);
-    }
-
-    @Override
-    public Message call() throws Exception {
-      return null;
-    }
-  }
-
-}

Reply via email to