http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
 
b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
deleted file mode 100644
index 4c1cbb3..0000000
--- 
a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ /dev/null
@@ -1,685 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.app.ControlledClock;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestLlapTaskSchedulerService {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestLlapTaskSchedulerService.class);
-
-  private static final String HOST1 = "host1";
-  private static final String HOST2 = "host2";
-  private static final String HOST3 = "host3";
-
-  @Test (timeout = 5000)
-  public void testSimpleLocalAllocation() throws IOException, 
InterruptedException {
-
-    TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper();
-
-    try {
-      Priority priority1 = Priority.newInstance(1);
-      String[] hosts1 = new String[]{HOST1};
-
-      Object task1 = new Object();
-      Object clientCookie1 = new Object();
-
-      tsWrapper.controlScheduler(true);
-      tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
-
-      tsWrapper.signalSchedulerRun();
-      tsWrapper.awaitSchedulerRun();
-
-      verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), 
eq(clientCookie1), any(Container.class));
-      // TODO Verify this is on host1.
-      assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
-    } finally {
-      tsWrapper.shutdown();
-    }
-  }
-
-  @Test (timeout = 5000)
-  public void testSimpleNoLocalityAllocation() throws IOException, 
InterruptedException {
-    TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper();
-
-    try {
-      Priority priority1 = Priority.newInstance(1);
-
-      Object task1 = new Object();
-      Object clientCookie1 = new Object();
-      tsWrapper.controlScheduler(true);
-      tsWrapper.allocateTask(task1, null, priority1, clientCookie1);
-      tsWrapper.signalSchedulerRun();
-      tsWrapper.awaitSchedulerRun();
-      verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), 
eq(clientCookie1), any(Container.class));
-      assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
-    } finally {
-      tsWrapper.shutdown();
-    }
-  }
-
-
-  @Test(timeout=5000)
-  public void testPreemption() throws InterruptedException, IOException {
-
-    Priority priority1 = Priority.newInstance(1);
-    Priority priority2 = Priority.newInstance(2);
-    String [] hosts = new String[] {HOST1};
-    TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1);
-    try {
-
-      Object task1 = "task1";
-      Object clientCookie1 = "cookie1";
-      Object task2 = "task2";
-      Object clientCookie2 = "cookie2";
-      Object task3 = "task3";
-      Object clientCookie3 = "cookie3";
-      Object task4 = "task4";
-      Object clientCookie4 = "cookie4";
-
-      tsWrapper.controlScheduler(true);
-      tsWrapper.allocateTask(task1, hosts, priority2, clientCookie1);
-      tsWrapper.allocateTask(task2, hosts, priority2, clientCookie2);
-      tsWrapper.allocateTask(task3, hosts, priority2, clientCookie3);
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numLocalAllocations == 2) {
-          break;
-        }
-      }
-      verify(tsWrapper.mockAppCallback, 
times(2)).taskAllocated(any(Object.class),
-          any(Object.class), any(Container.class));
-      assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations);
-      assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
-
-      reset(tsWrapper.mockAppCallback);
-
-      tsWrapper.allocateTask(task4, hosts, priority1, clientCookie4);
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) {
-          break;
-        }
-      }
-      
verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class));
-
-
-      tsWrapper.deallocateTask(task2, false, 
TaskAttemptEndReason.INTERNAL_PREEMPTION);
-
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numTotalAllocations == 3) {
-          break;
-        }
-      }
-      verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
-          eq(clientCookie4), any(Container.class));
-
-    } finally {
-      tsWrapper.shutdown();
-    }
-
-  }
-
-  @Test(timeout=5000)
-  public void testNodeDisabled() throws IOException, InterruptedException {
-    TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(10000l);
-    try {
-      Priority priority1 = Priority.newInstance(1);
-      String[] hosts1 = new String[]{HOST1};
-      Object task1 = new Object();
-      Object clientCookie1 = new Object();
-      tsWrapper.controlScheduler(true);
-      tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numTotalAllocations == 1) {
-          break;
-        }
-      }
-      verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), 
eq(clientCookie1),
-          any(Container.class));
-      assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
-      assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
-      assertEquals(0, tsWrapper.ts.dagStats.numNonLocalAllocations);
-      assertEquals(1, tsWrapper.ts.dagStats.numTotalAllocations);
-
-      tsWrapper.resetAppCallback();
-
-      tsWrapper.clock.setTime(10000l);
-      tsWrapper.rejectExecution(task1);
-
-      // Verify that the node is blacklisted
-      assertEquals(1, tsWrapper.ts.dagStats.numRejectedTasks);
-      assertEquals(3, tsWrapper.ts.instanceToNodeMap.size());
-      LlapTaskSchedulerService.NodeInfo disabledNodeInfo = 
tsWrapper.ts.disabledNodesQueue.peek();
-      assertNotNull(disabledNodeInfo);
-      assertEquals(HOST1, disabledNodeInfo.serviceInstance.getHost());
-      assertEquals((10000l), disabledNodeInfo.getDelay(TimeUnit.MILLISECONDS));
-      assertEquals((10000l + 10000l), disabledNodeInfo.expireTimeMillis);
-
-      Object task2 = new Object();
-      Object clientCookie2 = new Object();
-      tsWrapper.allocateTask(task2, hosts1, priority1, clientCookie2);
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numTotalAllocations == 2) {
-          break;
-        }
-      }
-      verify(tsWrapper.mockAppCallback).taskAllocated(eq(task2), 
eq(clientCookie2), any(Container.class));
-      assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
-      assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
-      assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
-      assertEquals(2, tsWrapper.ts.dagStats.numTotalAllocations);
-
-    } finally {
-      tsWrapper.shutdown();
-    }
-  }
-
-  @Test(timeout=5000)
-  public void testNodeReEnabled() throws InterruptedException, IOException {
-    // Based on actual timing.
-    TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(1000l);
-    try {
-      Priority priority1 = Priority.newInstance(1);
-      String[] hosts1 = new String[]{HOST1};
-      String[] hosts2 = new String[]{HOST2};
-      String[] hosts3 = new String[]{HOST3};
-
-      Object task1 = new Object();
-      Object clientCookie1 = new Object();
-      Object task2 = new Object();
-      Object clientCookie2 = new Object();
-      Object task3 = new Object();
-      Object clientCookie3 = new Object();
-
-      tsWrapper.controlScheduler(true);
-      tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
-      tsWrapper.allocateTask(task2, hosts2, priority1, clientCookie2);
-      tsWrapper.allocateTask(task3, hosts3, priority1, clientCookie3);
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numTotalAllocations == 3) {
-          break;
-        }
-      }
-      verify(tsWrapper.mockAppCallback, 
times(3)).taskAllocated(any(Object.class), any(Object.class), 
any(Container.class));
-      assertEquals(3, tsWrapper.ts.dagStats.numLocalAllocations);
-      assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
-      assertEquals(3, tsWrapper.ts.dagStats.numTotalAllocations);
-
-      tsWrapper.resetAppCallback();
-
-      tsWrapper.rejectExecution(task1);
-      tsWrapper.rejectExecution(task2);
-      tsWrapper.rejectExecution(task3);
-
-      // Verify that the node is blacklisted
-      assertEquals(3, tsWrapper.ts.dagStats.numRejectedTasks);
-      assertEquals(3, tsWrapper.ts.instanceToNodeMap.size());
-      assertEquals(3, tsWrapper.ts.disabledNodesQueue.size());
-
-
-      Object task4 = new Object();
-      Object clientCookie4 = new Object();
-      Object task5 = new Object();
-      Object clientCookie5 = new Object();
-      Object task6 = new Object();
-      Object clientCookie6 = new Object();
-      tsWrapper.allocateTask(task4, hosts1, priority1, clientCookie4);
-      tsWrapper.allocateTask(task5, hosts2, priority1, clientCookie5);
-      tsWrapper.allocateTask(task6, hosts3, priority1, clientCookie6);
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numTotalAllocations == 6) {
-          break;
-        }
-      }
-
-      ArgumentCaptor<Container> argumentCaptor = 
ArgumentCaptor.forClass(Container.class);
-      verify(tsWrapper.mockAppCallback, 
times(3)).taskAllocated(any(Object.class), any(Object.class), 
argumentCaptor.capture());
-
-      // which affects the locality matching
-      assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
-      assertEquals(6, tsWrapper.ts.dagStats.numTotalAllocations);
-
-    } finally {
-      tsWrapper.shutdown();
-    }
-  }
-
-  @Test (timeout = 5000)
-  public void testForceLocalityTest1() throws IOException, 
InterruptedException {
-    // 2 hosts. 2 per host. 5 requests at the same priority.
-    // First 3 on host1, Next at host2, Last with no host.
-    // Third request on host1 should not be allocated immediately.
-    forceLocalityTest1(true);
-
-  }
-
-  @Test (timeout = 5000)
-  public void testNoForceLocalityCounterTest1() throws IOException, 
InterruptedException {
-    // 2 hosts. 2 per host. 5 requests at the same priority.
-    // First 3 on host1, Next at host2, Last with no host.
-    // Third should allocate on host2, 4th on host2, 5th will wait.
-
-    forceLocalityTest1(false);
-  }
-
-  private void forceLocalityTest1(boolean forceLocality) throws IOException, 
InterruptedException {
-    Priority priority1 = Priority.newInstance(1);
-
-    String[] hosts = new String[] {HOST1, HOST2};
-
-    String[] hostsH1 = new String[] {HOST1};
-    String[] hostsH2 = new String[] {HOST2};
-
-    TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, (forceLocality ? -1l : 0l));
-
-    try {
-      Object task1 = "task1";
-      Object clientCookie1 = "cookie1";
-      Object task2 = "task2";
-      Object clientCookie2 = "cookie2";
-      Object task3 = "task3";
-      Object clientCookie3 = "cookie3";
-      Object task4 = "task4";
-      Object clientCookie4 = "cookie4";
-      Object task5 = "task5";
-      Object clientCookie5 = "cookie5";
-
-      tsWrapper.controlScheduler(true);
-      //H1 - should allocate
-      tsWrapper.allocateTask(task1, hostsH1, priority1, clientCookie1);
-      //H1 - should allocate
-      tsWrapper.allocateTask(task2, hostsH1, priority1, clientCookie2);
-      //H1 - no capacity if force, should allocate otherwise
-      tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3);
-      //H2 - should allocate
-      tsWrapper.allocateTask(task4, hostsH2, priority1, clientCookie4);
-      //No location - should allocate if force, no capacity otherwise
-      tsWrapper.allocateTask(task5, null, priority1, clientCookie5);
-
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numTotalAllocations == 4) {
-          break;
-        }
-      }
-
-      // Verify no preemption requests - since everything is at the same 
priority
-      verify(tsWrapper.mockAppCallback, 
never()).preemptContainer(any(ContainerId.class));
-      ArgumentCaptor<Object> argumentCaptor = 
ArgumentCaptor.forClass(Object.class);
-      verify(tsWrapper.mockAppCallback, 
times(4)).taskAllocated(argumentCaptor.capture(), any(Object.class), 
any(Container.class));
-      assertEquals(4, argumentCaptor.getAllValues().size());
-      assertEquals(task1, argumentCaptor.getAllValues().get(0));
-      assertEquals(task2, argumentCaptor.getAllValues().get(1));
-      if (forceLocality) {
-        // task3 not allocated
-        assertEquals(task4, argumentCaptor.getAllValues().get(2));
-        assertEquals(task5, argumentCaptor.getAllValues().get(3));
-      } else {
-        assertEquals(task3, argumentCaptor.getAllValues().get(2));
-        assertEquals(task4, argumentCaptor.getAllValues().get(3));
-      }
-
-      //Complete one task on host1.
-      tsWrapper.deallocateTask(task1, true, null);
-
-      reset(tsWrapper.mockAppCallback);
-
-      // Try scheduling again.
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numTotalAllocations == 5) {
-          break;
-        }
-      }
-      verify(tsWrapper.mockAppCallback, 
never()).preemptContainer(any(ContainerId.class));
-      argumentCaptor = ArgumentCaptor.forClass(Object.class);
-      verify(tsWrapper.mockAppCallback, 
times(1)).taskAllocated(argumentCaptor.capture(), any(Object.class), 
any(Container.class));
-      assertEquals(1, argumentCaptor.getAllValues().size());
-      if (forceLocality) {
-        assertEquals(task3, argumentCaptor.getAllValues().get(0));
-      } else {
-        assertEquals(task5, argumentCaptor.getAllValues().get(0));
-      }
-
-    } finally {
-      tsWrapper.shutdown();
-    }
-  }
-
-  @Test(timeout = 5000)
-  public void testForcedLocalityUnknownHost() throws IOException, 
InterruptedException {
-    Priority priority1 = Priority.newInstance(1);
-
-    String[] hostsKnown = new String[]{HOST1};
-    String[] hostsUnknown = new String[]{HOST2};
-
-    TestTaskSchedulerServiceWrapper tsWrapper =
-        new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 1, -1l);
-    try {
-      Object task1 = "task1";
-      Object clientCookie1 = "cookie1";
-
-      Object task2 = "task2";
-      Object clientCookie2 = "cookie2";
-
-      tsWrapper.controlScheduler(true);
-      // Should allocate since H2 is not known.
-      tsWrapper.allocateTask(task1, hostsUnknown, priority1, clientCookie1);
-      tsWrapper.allocateTask(task2, hostsKnown, priority1, clientCookie2);
-
-
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numTotalAllocations == 2) {
-          break;
-        }
-      }
-
-      ArgumentCaptor<Object> argumentCaptor = 
ArgumentCaptor.forClass(Object.class);
-      verify(tsWrapper.mockAppCallback, times(2))
-          .taskAllocated(argumentCaptor.capture(), any(Object.class), 
any(Container.class));
-      assertEquals(2, argumentCaptor.getAllValues().size());
-      assertEquals(task1, argumentCaptor.getAllValues().get(0));
-      assertEquals(task2, argumentCaptor.getAllValues().get(1));
-
-
-    } finally {
-      tsWrapper.shutdown();
-    }
-  }
-
-
-  @Test(timeout = 5000)
-  public void testForcedLocalityPreemption() throws IOException, 
InterruptedException {
-    Priority priority1 = Priority.newInstance(1);
-    Priority priority2 = Priority.newInstance(2);
-    String [] hosts = new String[] {HOST1, HOST2};
-
-    String [] hostsH1 = new String[] {HOST1};
-    String [] hostsH2 = new String[] {HOST2};
-
-    TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
-
-    // Fill up host1 with p2 tasks.
-    // Leave host2 empty
-    // Try running p1 task on host1 - should preempt
-
-    try {
-      Object task1 = "task1";
-      Object clientCookie1 = "cookie1";
-      Object task2 = "task2";
-      Object clientCookie2 = "cookie2";
-      Object task3 = "task3";
-      Object clientCookie3 = "cookie3";
-      Object task4 = "task4";
-      Object clientCookie4 = "cookie4";
-
-      tsWrapper.controlScheduler(true);
-      tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1);
-      tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2);
-      // This request at a lower priority should not affect anything.
-      tsWrapper.allocateTask(task3, hostsH1, priority2, clientCookie3);
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numLocalAllocations == 2) {
-          break;
-        }
-      }
-
-      verify(tsWrapper.mockAppCallback, 
never()).preemptContainer(any(ContainerId.class));
-      ArgumentCaptor<Object> argumentCaptor = 
ArgumentCaptor.forClass(Object.class);
-      verify(tsWrapper.mockAppCallback, times(2))
-          .taskAllocated(argumentCaptor.capture(), any(Object.class), 
any(Container.class));
-      assertEquals(2, argumentCaptor.getAllValues().size());
-      assertEquals(task1, argumentCaptor.getAllValues().get(0));
-      assertEquals(task2, argumentCaptor.getAllValues().get(1));
-
-      reset(tsWrapper.mockAppCallback);
-      // Allocate t4 at higher priority. t3 should not be allocated,
-      // and a preemption should be attempted on host1, despite host2 having 
available capacity
-      tsWrapper.allocateTask(task4, hostsH1, priority1, clientCookie4);
-
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) {
-          break;
-        }
-      }
-      
verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class));
-
-      tsWrapper.deallocateTask(task1, false, 
TaskAttemptEndReason.INTERNAL_PREEMPTION);
-
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numTotalAllocations == 3) {
-          break;
-        }
-      }
-      verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
-          eq(clientCookie4), any(Container.class));
-
-    } finally {
-      tsWrapper.shutdown();
-    }
-  }
-
-  private static class TestTaskSchedulerServiceWrapper {
-    static final Resource resource = Resource.newInstance(1024, 1);
-    Configuration conf;
-    TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class);
-    ControlledClock clock = new ControlledClock(new SystemClock());
-    ApplicationAttemptId appAttemptId = 
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1);
-    LlapTaskSchedulerServiceForTest ts;
-
-    TestTaskSchedulerServiceWrapper() throws IOException, InterruptedException 
{
-      this(2000l);
-    }
-
-    TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) throws 
IOException,
-        InterruptedException {
-      this(disableTimeoutMillis, new String[]{HOST1, HOST2, HOST3}, 4,
-          ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.defaultIntVal);
-    }
-
-    TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, 
int numExecutors, int waitQueueSize) throws
-        IOException, InterruptedException {
-      this(disableTimeoutMillis, hosts, numExecutors, waitQueueSize, 0l);
-    }
-
-    TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, 
int numExecutors, int waitQueueSize, long localityDelayMs) throws
-        IOException, InterruptedException {
-      conf = new Configuration();
-      conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts);
-      conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors);
-      conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, 
waitQueueSize);
-      
conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname,
-          disableTimeoutMillis + "ms");
-      conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, 
false);
-      conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, 
localityDelayMs);
-
-      doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId();
-      doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier();
-      UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
-      doReturn(userPayload).when(mockAppCallback).getInitialUserPayload();
-
-      ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
-
-      controlScheduler(true);
-      ts.initialize();
-      ts.start();
-      // One scheduler pass from the nodes that are added at startup
-      signalSchedulerRun();
-      controlScheduler(false);
-      awaitSchedulerRun();
-    }
-
-    void controlScheduler(boolean val) {
-      ts.forTestsetControlScheduling(val);
-    }
-
-    void signalSchedulerRun() throws InterruptedException {
-      ts.forTestSignalSchedulingRun();
-    }
-
-    void awaitSchedulerRun() throws InterruptedException {
-      ts.forTestAwaitSchedulingRun();
-    }
-    void resetAppCallback() {
-      reset(mockAppCallback);
-    }
-
-    void shutdown() {
-      ts.shutdown();
-    }
-
-    void allocateTask(Object task, String[] hosts, Priority priority, Object 
clientCookie) {
-      ts.allocateTask(task, resource, hosts, null, priority, null, 
clientCookie);
-    }
-
-    void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason 
endReason) {
-      ts.deallocateTask(task, succeeded, endReason, null);
-    }
-
-    void rejectExecution(Object task) {
-      ts.deallocateTask(task, false, TaskAttemptEndReason.EXECUTOR_BUSY, null);
-    }
-  }
-
-  private static class LlapTaskSchedulerServiceForTest extends 
LlapTaskSchedulerService {
-
-    private AtomicBoolean controlScheduling = new AtomicBoolean(false);
-    private final Lock testLock = new ReentrantLock();
-    private final Condition schedulingCompleteCondition = 
testLock.newCondition();
-    private boolean schedulingComplete = false;
-    private final Condition triggerSchedulingCondition = 
testLock.newCondition();
-    private boolean schedulingTriggered = false;
-    private final AtomicInteger numSchedulerRuns = new AtomicInteger(0);
-
-
-    public LlapTaskSchedulerServiceForTest(
-        TaskSchedulerContext appClient, Clock clock) {
-      super(appClient, clock);
-    }
-
-    @Override
-    protected void schedulePendingTasks() {
-      testLock.lock();
-      try {
-        if (controlScheduling.get()) {
-          while (!schedulingTriggered) {
-            try {
-              triggerSchedulingCondition.await();
-            } catch (InterruptedException e) {
-              throw new RuntimeException(e);
-            }
-          }
-        }
-        numSchedulerRuns.incrementAndGet();
-        super.schedulePendingTasks();
-        schedulingTriggered = false;
-        schedulingComplete = true;
-        schedulingCompleteCondition.signal();
-      } finally {
-        testLock.unlock();
-      }
-    }
-
-    // Enable or disable test scheduling control.
-    void forTestsetControlScheduling(boolean control) {
-      this.controlScheduling.set(control);
-    }
-
-    void forTestSignalSchedulingRun() throws InterruptedException {
-      testLock.lock();
-      try {
-        schedulingTriggered = true;
-        triggerSchedulingCondition.signal();
-      } finally {
-        testLock.unlock();
-      }
-    }
-
-    void forTestAwaitSchedulingRun() throws InterruptedException {
-      testLock.lock();
-      try {
-        while (!schedulingComplete) {
-          schedulingCompleteCondition.await();
-        }
-        schedulingComplete = false;
-      } finally {
-        testLock.unlock();
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-tez/pom.xml
----------------------------------------------------------------------
diff --git a/llap-tez/pom.xml b/llap-tez/pom.xml
new file mode 100644
index 0000000..ce020da
--- /dev/null
+++ b/llap-tez/pom.xml
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hive</groupId>
+    <artifactId>hive</artifactId>
+    <version>2.1.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>hive-llap-tez</artifactId>
+  <packaging>jar</packaging>
+  <name>Hive Llap Tez</name>
+
+  <properties>
+    <hive.path.to.root>..</hive.path.to.root>
+  </properties>
+
+  <dependencies>
+    <!-- dependencies are always listed in sorted order by groupId, artifactId 
-->
+    <!-- intra-project -->
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-llap-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <!-- inter-project -->
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${commons-lang3.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+      <version>${tez.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-internals</artifactId>
+      <version>${tez.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-mapreduce</artifactId>
+      <version>${tez.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <version>${tez.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <!-- test intra-project -->
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-llap-common</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- test inter-project -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${mockito-all.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <sourceDirectory>${basedir}/src/java</sourceDirectory>
+    <testSourceDirectory>${basedir}/src/test</testSourceDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>src/gen/protobuf/gen-java</source>
+                <source>src/gen/thrift/gen-javabean</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
new file mode 100644
index 0000000..a314391
--- /dev/null
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+class ContainerFactory {
+  final ApplicationAttemptId customAppAttemptId;
+  AtomicLong nextId;
+
+  public ContainerFactory(ApplicationAttemptId appAttemptId, long appIdLong) {
+    this.nextId = new AtomicLong(1);
+    ApplicationId appId =
+        ApplicationId.newInstance(appIdLong, 
appAttemptId.getApplicationId().getId());
+    this.customAppAttemptId =
+        ApplicationAttemptId.newInstance(appId, appAttemptId.getAttemptId());
+  }
+
+  public Container createContainer(Resource capability, Priority priority, 
String hostname,
+      int port) {
+    ContainerId containerId =
+        ContainerId.newContainerId(customAppAttemptId, 
nextId.getAndIncrement());
+    NodeId nodeId = NodeId.newInstance(hostname, port);
+    String nodeHttpAddress = "hostname:0"; // TODO: include UI ports
+
+    Container container =
+        Container.newInstance(containerId, nodeId, nodeHttpAddress, 
capability, priority, null);
+
+    return container;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
new file mode 100644
index 0000000..07703a2
--- /dev/null
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapContainerLauncher extends ContainerLauncher {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LlapContainerLauncher.class);
+
+  public LlapContainerLauncher(ContainerLauncherContext 
containerLauncherContext) {
+    super(containerLauncherContext);
+  }
+
+  @Override
+  public void launchContainer(ContainerLaunchRequest containerLaunchRequest) {
+    LOG.info("No-op launch for container: " + 
containerLaunchRequest.getContainerId() +
+        " succeeded on host: " + containerLaunchRequest.getNodeId());
+    getContext().containerLaunched(containerLaunchRequest.getContainerId());
+  }
+
+  @Override
+  public void stopContainer(ContainerStopRequest containerStopRequest) {
+    LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + 
containerStopRequest);
+    getContext().containerStopRequested(containerStopRequest.getContainerId());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
new file mode 100644
index 0000000..76d095a
--- /dev/null
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -0,0 +1,757 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LlapTaskCommunicator.class);
+
+  private static final boolean isInfoEnabled = LOG.isInfoEnabled();
+  private static final boolean isDebugEnabed = LOG.isDebugEnabled();
+
+  private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
+
+  private final ConcurrentMap<QueryIdentifierProto, ByteBuffer> credentialMap;
+
+  // Tracks containerIds and taskAttemptIds, so can be kept independent of the 
running DAG.
+  // When DAG specific cleanup happens, it'll be better to link this to a DAG 
though.
+  private final EntityTracker entityTracker = new EntityTracker();
+  private final SourceStateTracker sourceStateTracker;
+  private final Set<LlapNodeId> nodesForQuery = new HashSet<>();
+
+  private LlapProtocolClientProxy communicator;
+  private long deleteDelayOnDagComplete;
+  private final LlapTaskUmbilicalProtocol umbilical;
+  private final Token<LlapTokenIdentifier> token;
+
+  // These two structures track the list of known nodes, and the list of nodes 
which are sending in keep-alive heartbeats.
+  // Primarily for debugging purposes a.t.m, since there's some unexplained 
TASK_TIMEOUTS which are currently being observed.
+  private final ConcurrentMap<LlapNodeId, Long> knownNodeMap = new 
ConcurrentHashMap<>();
+  private final ConcurrentMap<LlapNodeId, PingingNodeInfo> pingedNodeMap = new 
ConcurrentHashMap<>();
+
+
+  private volatile int currentDagId;
+  private volatile QueryIdentifierProto currentQueryIdentifierProto;
+
+  public LlapTaskCommunicator(
+      TaskCommunicatorContext taskCommunicatorContext) {
+    super(taskCommunicatorContext);
+    Credentials credentials = taskCommunicatorContext.getCredentials();
+    if (credentials != null) {
+      @SuppressWarnings("unchecked")
+      Token<LlapTokenIdentifier> llapToken =
+          
(Token<LlapTokenIdentifier>)credentials.getToken(LlapTokenIdentifier.KIND_NAME);
+      this.token = llapToken;
+    } else {
+      this.token = null;
+    }
+    Preconditions.checkState((token != null) == 
UserGroupInformation.isSecurityEnabled());
+
+    umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical());
+    SubmitWorkRequestProto.Builder baseBuilder = 
SubmitWorkRequestProto.newBuilder();
+
+    // TODO Avoid reading this from the environment
+    
baseBuilder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
+    baseBuilder.setApplicationIdString(
+        
taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString());
+    baseBuilder
+        
.setAppAttemptNumber(taskCommunicatorContext.getApplicationAttemptId().getAttemptId());
+    baseBuilder.setTokenIdentifier(getTokenIdentifier());
+
+    BASE_SUBMIT_WORK_REQUEST = baseBuilder.build();
+
+    credentialMap = new ConcurrentHashMap<>();
+    sourceStateTracker = new SourceStateTracker(getContext(), this);
+  }
+
+  @Override
+  public void initialize() throws Exception {
+    super.initialize();
+    Configuration conf = getConf();
+    int numThreads = HiveConf.getIntVar(conf, 
ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS);
+    this.communicator = new LlapProtocolClientProxy(numThreads, conf, token);
+    this.deleteDelayOnDagComplete = HiveConf.getTimeVar(
+        conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
+    LOG.info("Running LlapTaskCommunicator with "
+        + "fileCleanupDelay=" + deleteDelayOnDagComplete
+        + ", numCommunicatorThreads=" + numThreads);
+    this.communicator.init(conf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    this.communicator.start();
+  }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    if (this.communicator != null) {
+      this.communicator.stop();
+    }
+  }
+
+  @Override
+  protected void startRpcServer() {
+    Configuration conf = getConf();
+    try {
+      JobTokenSecretManager jobTokenSecretManager =
+          new JobTokenSecretManager();
+      jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken);
+
+      int numHandlers = 
conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
+          TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT);
+      server = new RPC.Builder(conf)
+          .setProtocol(LlapTaskUmbilicalProtocol.class)
+          .setBindAddress("0.0.0.0")
+          .setPort(0)
+          .setInstance(umbilical)
+          .setNumHandlers(numHandlers)
+          .setSecretManager(jobTokenSecretManager).build();
+
+      if 
(conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
false)) {
+        server.refreshServiceAcl(conf, new LlapUmbilicalPolicyProvider());
+      }
+
+      server.start();
+      this.address = NetUtils.getConnectAddress(server);
+      LOG.info(
+          "Started LlapUmbilical: " + umbilical.getClass().getName() + " at 
address: " + address +
+              " with numHandlers=" + numHandlers);
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
+  @Override
+  public void registerRunningContainer(ContainerId containerId, String 
hostname, int port) {
+    super.registerRunningContainer(containerId, hostname, port);
+    entityTracker.registerContainer(containerId, hostname, port);
+
+  }
+
+  @Override
+  public void registerContainerEnd(ContainerId containerId, ContainerEndReason 
endReason, String diagnostics) {
+    super.registerContainerEnd(containerId, endReason, diagnostics);
+    if (endReason == ContainerEndReason.INTERNAL_PREEMPTION) {
+      LOG.info("Processing containerEnd for container {} caused by internal 
preemption", containerId);
+      TezTaskAttemptID taskAttemptId = 
entityTracker.getTaskAttemptIdForContainer(containerId);
+      if (taskAttemptId != null) {
+        sendTaskTerminated(taskAttemptId, true);
+      }
+    }
+    entityTracker.unregisterContainer(containerId);
+  }
+
+
+  @Override
+  public void registerRunningTaskAttempt(final ContainerId containerId, final 
TaskSpec taskSpec,
+                                         Map<String, LocalResource> 
additionalResources,
+                                         Credentials credentials,
+                                         boolean credentialsChanged,
+                                         int priority)  {
+    super.registerRunningTaskAttempt(containerId, taskSpec, 
additionalResources, credentials,
+        credentialsChanged, priority);
+    int dagId = 
taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
+    if (currentQueryIdentifierProto == null || (dagId != 
currentQueryIdentifierProto.getDagIdentifier())) {
+      resetCurrentDag(dagId);
+    }
+
+
+    ContainerInfo containerInfo = getContainerInfo(containerId);
+    String host;
+    int port;
+    if (containerInfo != null) {
+      synchronized (containerInfo) {
+        host = containerInfo.host;
+        port = containerInfo.port;
+      }
+    } else {
+      // TODO Handle this properly
+      throw new RuntimeException("ContainerInfo not found for container: " + 
containerId +
+          ", while trying to launch task: " + taskSpec.getTaskAttemptID());
+    }
+
+    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+    registerKnownNode(nodeId);
+    entityTracker.registerTaskAttempt(containerId, 
taskSpec.getTaskAttemptID(), host, port);
+    nodesForQuery.add(nodeId);
+
+    sourceStateTracker.registerTaskForStateUpdates(host, port, 
taskSpec.getInputs());
+    FragmentRuntimeInfo fragmentRuntimeInfo = 
sourceStateTracker.getFragmentRuntimeInfo(
+        taskSpec.getVertexName(), 
taskSpec.getTaskAttemptID().getTaskID().getId(), priority);
+    SubmitWorkRequestProto requestProto;
+
+    try {
+      requestProto = constructSubmitWorkRequest(containerId, taskSpec, 
fragmentRuntimeInfo);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to construct request", e);
+    }
+
+    // Have to register this up front right now. Otherwise, it's possible for 
the task to start
+    // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how 
to handle them.
+    getContext()
+        .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
+    communicator.sendSubmitWork(requestProto, host, port,
+        new 
LlapProtocolClientProxy.ExecuteRequestCallback<SubmitWorkResponseProto>() {
+          @Override
+          public void setResponse(SubmitWorkResponseProto response) {
+            if (response.hasSubmissionState()) {
+              LlapDaemonProtocolProtos.SubmissionStateProto ss = 
response.getSubmissionState();
+              if 
(ss.equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
+                LOG.info(
+                    "Unable to run task: " + taskSpec.getTaskAttemptID() + " 
on containerId: " +
+                        containerId + ", Service Busy");
+                getContext().taskKilled(taskSpec.getTaskAttemptID(),
+                    TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy");
+                return;
+              }
+            } else {
+              // TODO: Provide support for reporting errors
+              // This should never happen as server always returns a valid 
status on success
+              throw new RuntimeException("SubmissionState in response is 
expected!");
+            }
+            LOG.info("Successfully launched task: " + 
taskSpec.getTaskAttemptID());
+          }
+
+          @Override
+          public void indicateError(Throwable t) {
+            if (t instanceof ServiceException) {
+              ServiceException se = (ServiceException) t;
+              t = se.getCause();
+            }
+            if (t instanceof RemoteException) {
+              RemoteException re = (RemoteException) t;
+              // All others from the remote service cause the task to FAIL.
+              LOG.info(
+                  "Failed to run task: " + taskSpec.getTaskAttemptID() + " on 
containerId: " +
+                      containerId, t);
+              getContext()
+                  .taskFailed(taskSpec.getTaskAttemptID(), 
TaskAttemptEndReason.OTHER,
+                      t.toString());
+            } else {
+              // Exception from the RPC layer - communication failure, 
consider as KILLED / service down.
+              if (t instanceof IOException) {
+                LOG.info(
+                    "Unable to run task: " + taskSpec.getTaskAttemptID() + " 
on containerId: " +
+                        containerId + ", Communication Error");
+                getContext().taskKilled(taskSpec.getTaskAttemptID(),
+                    TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication 
Error");
+              } else {
+                // Anything else is a FAIL.
+                LOG.info(
+                    "Failed to run task: " + taskSpec.getTaskAttemptID() + " 
on containerId: " +
+                        containerId, t);
+                getContext()
+                    .taskFailed(taskSpec.getTaskAttemptID(), 
TaskAttemptEndReason.OTHER,
+                        t.getMessage());
+              }
+            }
+          }
+        });
+  }
+
+  @Override
+  public void unregisterRunningTaskAttempt(final TezTaskAttemptID 
taskAttemptId,
+                                           TaskAttemptEndReason endReason,
+                                           String diagnostics) {
+    super.unregisterRunningTaskAttempt(taskAttemptId, endReason, diagnostics);
+
+    if (endReason == TaskAttemptEndReason.INTERNAL_PREEMPTION) {
+      LOG.info("Processing taskEnd for task {} caused by internal preemption", 
taskAttemptId);
+      sendTaskTerminated(taskAttemptId, false);
+    }
+    entityTracker.unregisterTaskAttempt(taskAttemptId);
+    // This will also be invoked for tasks which have been KILLED / rejected 
by the daemon.
+    // Informing the daemon becomes necessary once the LlapScheduler supports 
preemption
+    // and/or starts attempting to kill tasks which may be running on a node.
+  }
+
+  private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId,
+                                  boolean invokedByContainerEnd) {
+    LOG.info(
+        "DBG: Attempting to send terminateRequest for fragment {} due to 
internal preemption invoked by {}",
+        taskAttemptId.toString(), invokedByContainerEnd ? "containerEnd" : 
"taskEnd");
+    LlapNodeId nodeId = entityTracker.getNodeIdForTaskAttempt(taskAttemptId);
+    // NodeId can be null if the task gets unregistered due to failure / being 
killed by the daemon itself
+    if (nodeId != null) {
+      TerminateFragmentRequestProto request =
+          
TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifierProto)
+              .setFragmentIdentifierString(taskAttemptId.toString()).build();
+      communicator.sendTerminateFragment(request, nodeId.getHostname(), 
nodeId.getPort(),
+          new 
LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>()
 {
+            @Override
+            public void setResponse(TerminateFragmentResponseProto response) {
+            }
+
+            @Override
+            public void indicateError(Throwable t) {
+              LOG.warn("Failed to send terminate fragment request for {}",
+                  taskAttemptId.toString());
+            }
+          });
+    } else {
+      LOG.info(
+          "Not sending terminate request for fragment {} since it's node is 
not known. Already unregistered",
+          taskAttemptId.toString());
+    }
+  }
+
+
+
+
+  @Override
+  public void dagComplete(final int dagIdentifier) {
+    QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder()
+        .setQueryIdentifier(constructQueryIdentifierProto(dagIdentifier))
+        .setDeleteDelay(deleteDelayOnDagComplete).build();
+    for (final LlapNodeId llapNodeId : nodesForQuery) {
+      LOG.info("Sending dagComplete message for {}, to {}", dagIdentifier, 
llapNodeId);
+      communicator.sendQueryComplete(request, llapNodeId.getHostname(), 
llapNodeId.getPort(),
+          new 
LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>()
 {
+            @Override
+            public void 
setResponse(LlapDaemonProtocolProtos.QueryCompleteResponseProto response) {
+            }
+
+            @Override
+            public void indicateError(Throwable t) {
+              LOG.warn("Failed to indicate dag complete dagId={} to node {}", 
dagIdentifier, llapNodeId);
+            }
+          });
+    }
+
+    nodesForQuery.clear();
+    // TODO Ideally move some of the other cleanup code from resetCurrentDag 
over here
+  }
+
+  @Override
+  public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
+    // Delegate updates over to the source state tracker.
+    sourceStateTracker
+        .sourceStateUpdated(vertexStateUpdate.getVertexName(), 
vertexStateUpdate.getVertexState());
+  }
+
+  public void sendStateUpdate(final String host, final int port,
+                              final SourceStateUpdatedRequestProto request) {
+    communicator.sendSourceStateUpdate(request, host, port,
+        new 
LlapProtocolClientProxy.ExecuteRequestCallback<SourceStateUpdatedResponseProto>()
 {
+          @Override
+          public void setResponse(SourceStateUpdatedResponseProto response) {
+          }
+
+          @Override
+          public void indicateError(Throwable t) {
+            // TODO HIVE-10280.
+            // Ideally, this should be retried for a while, after which the 
node should be marked as failed.
+            // Considering tasks are supposed to run fast. Failing the task 
immediately may be a good option.
+            LOG.error(
+                "Failed to send state update to node: " + host + ":" + port + 
", StateUpdate=" +
+                    request, t);
+          }
+        });
+  }
+
+
+  private static class PingingNodeInfo {
+    final AtomicLong logTimestamp;
+    final AtomicInteger pingCount;
+
+    PingingNodeInfo(long currentTs) {
+      logTimestamp = new AtomicLong(currentTs);
+      pingCount = new AtomicInteger(1);
+    }
+  }
+
+  public void registerKnownNode(LlapNodeId nodeId) {
+    Long old = knownNodeMap.putIfAbsent(nodeId,
+        TimeUnit.MILLISECONDS.convert(System.nanoTime(), 
TimeUnit.NANOSECONDS));
+    if (old == null) {
+      if (isInfoEnabled) {
+        LOG.info("Added new known node: {}", nodeId);
+      }
+    }
+  }
+
+  public void registerPingingNode(LlapNodeId nodeId) {
+    long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), 
TimeUnit.NANOSECONDS);
+    PingingNodeInfo ni = new PingingNodeInfo(currentTs);
+    PingingNodeInfo old = pingedNodeMap.put(nodeId, ni);
+    if (old == null) {
+      if (isInfoEnabled) {
+        LOG.info("Added new pinging node: [{}]", nodeId);
+      }
+    } else {
+      old.pingCount.incrementAndGet();
+    }
+    // The node should always be known by this point. Log occasionally if it 
is not known.
+    if (!knownNodeMap.containsKey(nodeId)) {
+      if (old == null) {
+        // First time this is seen. Log it.
+        LOG.warn("Received ping from unknownNode: [{}], count={}", nodeId, 
ni.pingCount.get());
+      } else {
+        // Pinged before. Log only occasionally.
+        if (currentTs > old.logTimestamp.get() + 5000l) { // 5 seconds 
elapsed. Log again.
+          LOG.warn("Received ping from unknownNode: [{}], count={}", nodeId, 
old.pingCount.get());
+          old.logTimestamp.set(currentTs);
+        }
+      }
+
+    }
+  }
+
+
+  private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0);
+
+  void nodePinged(String hostname, int port) {
+    LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
+    registerPingingNode(nodeId);
+    BiMap<ContainerId, TezTaskAttemptID> biMap =
+        entityTracker.getContainerAttemptMapForNode(nodeId);
+    if (biMap != null) {
+      synchronized (biMap) {
+        for (Map.Entry<ContainerId, TezTaskAttemptID> entry : 
biMap.entrySet()) {
+          getContext().taskAlive(entry.getValue());
+          getContext().containerAlive(entry.getKey());
+        }
+      }
+    } else {
+      long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), 
TimeUnit.NANOSECONDS);
+      if (currentTs > nodeNotFoundLogTime.get() + 5000l) {
+        LOG.warn("Received ping from node without any registered tasks or 
containers: " + hostname +
+            ":" + port +
+            ". Could be caused by pre-emption by the AM," +
+            " or a mismatched hostname. Enable debug logging for mismatched 
host names");
+        nodeNotFoundLogTime.set(currentTs);
+      }
+    }
+  }
+
+  private void resetCurrentDag(int newDagId) {
+    // Working on the assumption that a single DAG runs at a time per AM.
+    currentQueryIdentifierProto = constructQueryIdentifierProto(newDagId);
+    sourceStateTracker.resetState(newDagId);
+    nodesForQuery.clear();
+    LOG.info("CurrentDagId set to: " + newDagId + ", name=" + 
getContext().getCurrentDagName());
+    // TODO Is it possible for heartbeats to come in from lost tasks - those 
should be told to die, which
+    // is likely already happening.
+  }
+
+  private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId 
containerId,
+                                                            TaskSpec taskSpec,
+                                                            
FragmentRuntimeInfo fragmentRuntimeInfo) throws
+      IOException {
+    SubmitWorkRequestProto.Builder builder =
+        SubmitWorkRequestProto.newBuilder(BASE_SUBMIT_WORK_REQUEST);
+    builder.setContainerIdString(containerId.toString());
+    builder.setAmHost(getAddress().getHostName());
+    builder.setAmPort(getAddress().getPort());
+    Credentials taskCredentials = new Credentials();
+    // Credentials can change across DAGs. Ideally construct only once per DAG.
+    taskCredentials.addAll(getContext().getCredentials());
+
+    Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
+        
taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
+    ByteBuffer credentialsBinary = 
credentialMap.get(currentQueryIdentifierProto);
+    if (credentialsBinary == null) {
+      credentialsBinary = serializeCredentials(getContext().getCredentials());
+      credentialMap.putIfAbsent(currentQueryIdentifierProto, 
credentialsBinary.duplicate());
+    } else {
+      credentialsBinary = credentialsBinary.duplicate();
+    }
+    builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+    builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec));
+    builder.setFragmentRuntimeInfo(fragmentRuntimeInfo);
+    return builder.build();
+  }
+
+  private ByteBuffer serializeCredentials(Credentials credentials) throws 
IOException {
+    Credentials containerCredentials = new Credentials();
+    containerCredentials.addAll(credentials);
+    DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+    containerCredentials.writeTokenStorageToStream(containerTokens_dob);
+    return ByteBuffer.wrap(containerTokens_dob.getData(), 0, 
containerTokens_dob.getLength());
+  }
+
+
+
+  protected class LlapTaskUmbilicalProtocolImpl implements 
LlapTaskUmbilicalProtocol {
+
+    private final TezTaskUmbilicalProtocol tezUmbilical;
+
+    public LlapTaskUmbilicalProtocolImpl(TezTaskUmbilicalProtocol 
tezUmbilical) {
+      this.tezUmbilical = tezUmbilical;
+    }
+
+    @Override
+    public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+      return tezUmbilical.canCommit(taskid);
+    }
+
+    @Override
+    public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws 
IOException,
+        TezException {
+      return tezUmbilical.heartbeat(request);
+    }
+
+    @Override
+    public void nodeHeartbeat(Text hostname, int port) throws IOException {
+      nodePinged(hostname.toString(), port);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received heartbeat from [" + hostname + ":" + port +"]");
+      }
+    }
+
+    @Override
+    public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
+      // TODO Unregister the task for state updates, which could in turn 
unregister the node.
+      getContext().taskKilled(taskAttemptId,
+          TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted");
+      entityTracker.unregisterTaskAttempt(taskAttemptId);
+    }
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion) throws 
IOException {
+      return versionID;
+    }
+
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol, long 
clientVersion,
+                                                  int clientMethodsHash) 
throws IOException {
+      return ProtocolSignature.getProtocolSignature(this, protocol,
+          clientVersion, clientMethodsHash);
+    }
+  }
+
+  /**
+   * Track the association between known containers and taskAttempts, along 
with the nodes they are assigned to.
+   */
+  @VisibleForTesting
+  static final class EntityTracker {
+    @VisibleForTesting
+    final ConcurrentMap<TezTaskAttemptID, LlapNodeId> attemptToNodeMap = new 
ConcurrentHashMap<>();
+    @VisibleForTesting
+    final ConcurrentMap<ContainerId, LlapNodeId> containerToNodeMap = new 
ConcurrentHashMap<>();
+    @VisibleForTesting
+    final ConcurrentMap<LlapNodeId, BiMap<ContainerId, TezTaskAttemptID>> 
nodeMap = new ConcurrentHashMap<>();
+
+    void registerTaskAttempt(ContainerId containerId, TezTaskAttemptID 
taskAttemptId, String host, int port) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Registering " + containerId + ", " + taskAttemptId + " for 
node: " + host + ":" + port);
+      }
+      LlapNodeId llapNodeId = LlapNodeId.getInstance(host, port);
+      attemptToNodeMap.putIfAbsent(taskAttemptId, llapNodeId);
+
+      registerContainer(containerId, host, port);
+
+      // nodeMap registration.
+      BiMap<ContainerId, TezTaskAttemptID> tmpMap = HashBiMap.create();
+      BiMap<ContainerId, TezTaskAttemptID> old = 
nodeMap.putIfAbsent(llapNodeId, tmpMap);
+      BiMap<ContainerId, TezTaskAttemptID> usedInstance;
+      usedInstance = old == null ? tmpMap : old;
+      synchronized(usedInstance) {
+        usedInstance.put(containerId, taskAttemptId);
+      }
+      // Make sure to put the instance back again, in case it was removed as 
part of a
+      // containerEnd/taskEnd invocation.
+      nodeMap.putIfAbsent(llapNodeId, usedInstance);
+    }
+
+    void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
+      LlapNodeId llapNodeId = attemptToNodeMap.remove(attemptId);
+      if (llapNodeId == null) {
+        // Possible since either container / task can be unregistered.
+        return;
+      }
+
+      BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId);
+      ContainerId matched = null;
+      if (bMap != null) {
+        synchronized(bMap) {
+          matched = bMap.inverse().remove(attemptId);
+        }
+        if (bMap.isEmpty()) {
+          nodeMap.remove(llapNodeId);
+        }
+      }
+
+      // Remove the container mapping
+      if (matched != null) {
+        containerToNodeMap.remove(matched);
+      }
+
+    }
+
+    void registerContainer(ContainerId containerId, String hostname, int port) 
{
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Registering " + containerId + " for node: " + hostname + 
":" + port);
+      }
+      containerToNodeMap.putIfAbsent(containerId, 
LlapNodeId.getInstance(hostname, port));
+      // nodeMap registration is not required, since there's no taskId 
association.
+    }
+
+    LlapNodeId getNodeIdForContainer(ContainerId containerId) {
+      return containerToNodeMap.get(containerId);
+    }
+
+    LlapNodeId getNodeIdForTaskAttempt(TezTaskAttemptID taskAttemptId) {
+      return attemptToNodeMap.get(taskAttemptId);
+    }
+
+    ContainerId getContainerIdForAttempt(TezTaskAttemptID taskAttemptId) {
+      LlapNodeId llapNodeId = getNodeIdForTaskAttempt(taskAttemptId);
+      if (llapNodeId != null) {
+        BiMap<TezTaskAttemptID, ContainerId> bMap = 
nodeMap.get(llapNodeId).inverse();
+        if (bMap != null) {
+          synchronized (bMap) {
+            return bMap.get(taskAttemptId);
+          }
+        } else {
+          return null;
+        }
+      } else {
+        return null;
+      }
+    }
+
+    TezTaskAttemptID getTaskAttemptIdForContainer(ContainerId containerId) {
+      LlapNodeId llapNodeId = getNodeIdForContainer(containerId);
+      if (llapNodeId != null) {
+        BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId);
+        if (bMap != null) {
+          synchronized (bMap) {
+            return bMap.get(containerId);
+          }
+        } else {
+          return null;
+        }
+      } else {
+        return null;
+      }
+    }
+
+    void unregisterContainer(ContainerId containerId) {
+      LlapNodeId llapNodeId = containerToNodeMap.remove(containerId);
+      if (llapNodeId == null) {
+        // Possible since either container / task can be unregistered.
+        return;
+      }
+
+      BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId);
+      TezTaskAttemptID matched = null;
+      if (bMap != null) {
+        synchronized(bMap) {
+          matched = bMap.remove(containerId);
+        }
+        if (bMap.isEmpty()) {
+          nodeMap.remove(llapNodeId);
+        }
+      }
+
+      // Remove the container mapping
+      if (matched != null) {
+        attemptToNodeMap.remove(matched);
+      }
+    }
+
+    /**
+     * Return a {@link BiMap} containing container->taskAttemptId mapping for 
the host specified.
+     * </p>
+     * <p/>
+     * This method return the internal structure used by the EntityTracker. 
Users must synchronize
+     * on the structure to ensure correct usage.
+     *
+     * @param llapNodeId
+     * @return
+     */
+    BiMap<ContainerId, TezTaskAttemptID> 
getContainerAttemptMapForNode(LlapNodeId llapNodeId) {
+      BiMap<ContainerId, TezTaskAttemptID> biMap = nodeMap.get(llapNodeId);
+      return biMap;
+    }
+
+  }
+
+  private QueryIdentifierProto constructQueryIdentifierProto(int 
dagIdentifier) {
+    return QueryIdentifierProto.newBuilder()
+        
.setAppIdentifier(getContext().getCurrentAppIdentifier()).setDagIdentifier(dagIdentifier)
+        .build();
+  }
+}

Reply via email to