http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java deleted file mode 100644 index 4c1cbb3..0000000 --- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java +++ /dev/null @@ -1,685 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.rm; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.dag.app.ControlledClock; -import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; -import org.apache.tez.serviceplugins.api.TaskSchedulerContext; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestLlapTaskSchedulerService { - - private static final Logger LOG = LoggerFactory.getLogger(TestLlapTaskSchedulerService.class); - - private static final String HOST1 = "host1"; - private static final String HOST2 = "host2"; - private static final String HOST3 = "host3"; - - @Test (timeout = 5000) - public void testSimpleLocalAllocation() throws IOException, InterruptedException { - - TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(); - - try { - Priority priority1 = Priority.newInstance(1); - String[] hosts1 = new String[]{HOST1}; - - Object task1 = new Object(); - Object clientCookie1 = new Object(); - - tsWrapper.controlScheduler(true); - tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1); - - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - - verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class)); - // TODO Verify this is on host1. - assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations); - } finally { - tsWrapper.shutdown(); - } - } - - @Test (timeout = 5000) - public void testSimpleNoLocalityAllocation() throws IOException, InterruptedException { - TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(); - - try { - Priority priority1 = Priority.newInstance(1); - - Object task1 = new Object(); - Object clientCookie1 = new Object(); - tsWrapper.controlScheduler(true); - tsWrapper.allocateTask(task1, null, priority1, clientCookie1); - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class)); - assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); - } finally { - tsWrapper.shutdown(); - } - } - - - @Test(timeout=5000) - public void testPreemption() throws InterruptedException, IOException { - - Priority priority1 = Priority.newInstance(1); - Priority priority2 = Priority.newInstance(2); - String [] hosts = new String[] {HOST1}; - TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1); - try { - - Object task1 = "task1"; - Object clientCookie1 = "cookie1"; - Object task2 = "task2"; - Object clientCookie2 = "cookie2"; - Object task3 = "task3"; - Object clientCookie3 = "cookie3"; - Object task4 = "task4"; - Object clientCookie4 = "cookie4"; - - tsWrapper.controlScheduler(true); - tsWrapper.allocateTask(task1, hosts, priority2, clientCookie1); - tsWrapper.allocateTask(task2, hosts, priority2, clientCookie2); - tsWrapper.allocateTask(task3, hosts, priority2, clientCookie3); - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numLocalAllocations == 2) { - break; - } - } - verify(tsWrapper.mockAppCallback, times(2)).taskAllocated(any(Object.class), - any(Object.class), any(Container.class)); - assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations); - assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); - - reset(tsWrapper.mockAppCallback); - - tsWrapper.allocateTask(task4, hosts, priority1, clientCookie4); - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) { - break; - } - } - verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class)); - - - tsWrapper.deallocateTask(task2, false, TaskAttemptEndReason.INTERNAL_PREEMPTION); - - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numTotalAllocations == 3) { - break; - } - } - verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4), - eq(clientCookie4), any(Container.class)); - - } finally { - tsWrapper.shutdown(); - } - - } - - @Test(timeout=5000) - public void testNodeDisabled() throws IOException, InterruptedException { - TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(10000l); - try { - Priority priority1 = Priority.newInstance(1); - String[] hosts1 = new String[]{HOST1}; - Object task1 = new Object(); - Object clientCookie1 = new Object(); - tsWrapper.controlScheduler(true); - tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1); - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numTotalAllocations == 1) { - break; - } - } - verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), - any(Container.class)); - assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations); - assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); - assertEquals(0, tsWrapper.ts.dagStats.numNonLocalAllocations); - assertEquals(1, tsWrapper.ts.dagStats.numTotalAllocations); - - tsWrapper.resetAppCallback(); - - tsWrapper.clock.setTime(10000l); - tsWrapper.rejectExecution(task1); - - // Verify that the node is blacklisted - assertEquals(1, tsWrapper.ts.dagStats.numRejectedTasks); - assertEquals(3, tsWrapper.ts.instanceToNodeMap.size()); - LlapTaskSchedulerService.NodeInfo disabledNodeInfo = tsWrapper.ts.disabledNodesQueue.peek(); - assertNotNull(disabledNodeInfo); - assertEquals(HOST1, disabledNodeInfo.serviceInstance.getHost()); - assertEquals((10000l), disabledNodeInfo.getDelay(TimeUnit.MILLISECONDS)); - assertEquals((10000l + 10000l), disabledNodeInfo.expireTimeMillis); - - Object task2 = new Object(); - Object clientCookie2 = new Object(); - tsWrapper.allocateTask(task2, hosts1, priority1, clientCookie2); - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numTotalAllocations == 2) { - break; - } - } - verify(tsWrapper.mockAppCallback).taskAllocated(eq(task2), eq(clientCookie2), any(Container.class)); - assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations); - assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); - assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations); - assertEquals(2, tsWrapper.ts.dagStats.numTotalAllocations); - - } finally { - tsWrapper.shutdown(); - } - } - - @Test(timeout=5000) - public void testNodeReEnabled() throws InterruptedException, IOException { - // Based on actual timing. - TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(1000l); - try { - Priority priority1 = Priority.newInstance(1); - String[] hosts1 = new String[]{HOST1}; - String[] hosts2 = new String[]{HOST2}; - String[] hosts3 = new String[]{HOST3}; - - Object task1 = new Object(); - Object clientCookie1 = new Object(); - Object task2 = new Object(); - Object clientCookie2 = new Object(); - Object task3 = new Object(); - Object clientCookie3 = new Object(); - - tsWrapper.controlScheduler(true); - tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1); - tsWrapper.allocateTask(task2, hosts2, priority1, clientCookie2); - tsWrapper.allocateTask(task3, hosts3, priority1, clientCookie3); - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numTotalAllocations == 3) { - break; - } - } - verify(tsWrapper.mockAppCallback, times(3)).taskAllocated(any(Object.class), any(Object.class), any(Container.class)); - assertEquals(3, tsWrapper.ts.dagStats.numLocalAllocations); - assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); - assertEquals(3, tsWrapper.ts.dagStats.numTotalAllocations); - - tsWrapper.resetAppCallback(); - - tsWrapper.rejectExecution(task1); - tsWrapper.rejectExecution(task2); - tsWrapper.rejectExecution(task3); - - // Verify that the node is blacklisted - assertEquals(3, tsWrapper.ts.dagStats.numRejectedTasks); - assertEquals(3, tsWrapper.ts.instanceToNodeMap.size()); - assertEquals(3, tsWrapper.ts.disabledNodesQueue.size()); - - - Object task4 = new Object(); - Object clientCookie4 = new Object(); - Object task5 = new Object(); - Object clientCookie5 = new Object(); - Object task6 = new Object(); - Object clientCookie6 = new Object(); - tsWrapper.allocateTask(task4, hosts1, priority1, clientCookie4); - tsWrapper.allocateTask(task5, hosts2, priority1, clientCookie5); - tsWrapper.allocateTask(task6, hosts3, priority1, clientCookie6); - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numTotalAllocations == 6) { - break; - } - } - - ArgumentCaptor<Container> argumentCaptor = ArgumentCaptor.forClass(Container.class); - verify(tsWrapper.mockAppCallback, times(3)).taskAllocated(any(Object.class), any(Object.class), argumentCaptor.capture()); - - // which affects the locality matching - assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); - assertEquals(6, tsWrapper.ts.dagStats.numTotalAllocations); - - } finally { - tsWrapper.shutdown(); - } - } - - @Test (timeout = 5000) - public void testForceLocalityTest1() throws IOException, InterruptedException { - // 2 hosts. 2 per host. 5 requests at the same priority. - // First 3 on host1, Next at host2, Last with no host. - // Third request on host1 should not be allocated immediately. - forceLocalityTest1(true); - - } - - @Test (timeout = 5000) - public void testNoForceLocalityCounterTest1() throws IOException, InterruptedException { - // 2 hosts. 2 per host. 5 requests at the same priority. - // First 3 on host1, Next at host2, Last with no host. - // Third should allocate on host2, 4th on host2, 5th will wait. - - forceLocalityTest1(false); - } - - private void forceLocalityTest1(boolean forceLocality) throws IOException, InterruptedException { - Priority priority1 = Priority.newInstance(1); - - String[] hosts = new String[] {HOST1, HOST2}; - - String[] hostsH1 = new String[] {HOST1}; - String[] hostsH2 = new String[] {HOST2}; - - TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, (forceLocality ? -1l : 0l)); - - try { - Object task1 = "task1"; - Object clientCookie1 = "cookie1"; - Object task2 = "task2"; - Object clientCookie2 = "cookie2"; - Object task3 = "task3"; - Object clientCookie3 = "cookie3"; - Object task4 = "task4"; - Object clientCookie4 = "cookie4"; - Object task5 = "task5"; - Object clientCookie5 = "cookie5"; - - tsWrapper.controlScheduler(true); - //H1 - should allocate - tsWrapper.allocateTask(task1, hostsH1, priority1, clientCookie1); - //H1 - should allocate - tsWrapper.allocateTask(task2, hostsH1, priority1, clientCookie2); - //H1 - no capacity if force, should allocate otherwise - tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3); - //H2 - should allocate - tsWrapper.allocateTask(task4, hostsH2, priority1, clientCookie4); - //No location - should allocate if force, no capacity otherwise - tsWrapper.allocateTask(task5, null, priority1, clientCookie5); - - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numTotalAllocations == 4) { - break; - } - } - - // Verify no preemption requests - since everything is at the same priority - verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); - ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class); - verify(tsWrapper.mockAppCallback, times(4)).taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); - assertEquals(4, argumentCaptor.getAllValues().size()); - assertEquals(task1, argumentCaptor.getAllValues().get(0)); - assertEquals(task2, argumentCaptor.getAllValues().get(1)); - if (forceLocality) { - // task3 not allocated - assertEquals(task4, argumentCaptor.getAllValues().get(2)); - assertEquals(task5, argumentCaptor.getAllValues().get(3)); - } else { - assertEquals(task3, argumentCaptor.getAllValues().get(2)); - assertEquals(task4, argumentCaptor.getAllValues().get(3)); - } - - //Complete one task on host1. - tsWrapper.deallocateTask(task1, true, null); - - reset(tsWrapper.mockAppCallback); - - // Try scheduling again. - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numTotalAllocations == 5) { - break; - } - } - verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); - argumentCaptor = ArgumentCaptor.forClass(Object.class); - verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); - assertEquals(1, argumentCaptor.getAllValues().size()); - if (forceLocality) { - assertEquals(task3, argumentCaptor.getAllValues().get(0)); - } else { - assertEquals(task5, argumentCaptor.getAllValues().get(0)); - } - - } finally { - tsWrapper.shutdown(); - } - } - - @Test(timeout = 5000) - public void testForcedLocalityUnknownHost() throws IOException, InterruptedException { - Priority priority1 = Priority.newInstance(1); - - String[] hostsKnown = new String[]{HOST1}; - String[] hostsUnknown = new String[]{HOST2}; - - TestTaskSchedulerServiceWrapper tsWrapper = - new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 1, -1l); - try { - Object task1 = "task1"; - Object clientCookie1 = "cookie1"; - - Object task2 = "task2"; - Object clientCookie2 = "cookie2"; - - tsWrapper.controlScheduler(true); - // Should allocate since H2 is not known. - tsWrapper.allocateTask(task1, hostsUnknown, priority1, clientCookie1); - tsWrapper.allocateTask(task2, hostsKnown, priority1, clientCookie2); - - - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numTotalAllocations == 2) { - break; - } - } - - ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class); - verify(tsWrapper.mockAppCallback, times(2)) - .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); - assertEquals(2, argumentCaptor.getAllValues().size()); - assertEquals(task1, argumentCaptor.getAllValues().get(0)); - assertEquals(task2, argumentCaptor.getAllValues().get(1)); - - - } finally { - tsWrapper.shutdown(); - } - } - - - @Test(timeout = 5000) - public void testForcedLocalityPreemption() throws IOException, InterruptedException { - Priority priority1 = Priority.newInstance(1); - Priority priority2 = Priority.newInstance(2); - String [] hosts = new String[] {HOST1, HOST2}; - - String [] hostsH1 = new String[] {HOST1}; - String [] hostsH2 = new String[] {HOST2}; - - TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l); - - // Fill up host1 with p2 tasks. - // Leave host2 empty - // Try running p1 task on host1 - should preempt - - try { - Object task1 = "task1"; - Object clientCookie1 = "cookie1"; - Object task2 = "task2"; - Object clientCookie2 = "cookie2"; - Object task3 = "task3"; - Object clientCookie3 = "cookie3"; - Object task4 = "task4"; - Object clientCookie4 = "cookie4"; - - tsWrapper.controlScheduler(true); - tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1); - tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2); - // This request at a lower priority should not affect anything. - tsWrapper.allocateTask(task3, hostsH1, priority2, clientCookie3); - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numLocalAllocations == 2) { - break; - } - } - - verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); - ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class); - verify(tsWrapper.mockAppCallback, times(2)) - .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); - assertEquals(2, argumentCaptor.getAllValues().size()); - assertEquals(task1, argumentCaptor.getAllValues().get(0)); - assertEquals(task2, argumentCaptor.getAllValues().get(1)); - - reset(tsWrapper.mockAppCallback); - // Allocate t4 at higher priority. t3 should not be allocated, - // and a preemption should be attempted on host1, despite host2 having available capacity - tsWrapper.allocateTask(task4, hostsH1, priority1, clientCookie4); - - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) { - break; - } - } - verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class)); - - tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.INTERNAL_PREEMPTION); - - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numTotalAllocations == 3) { - break; - } - } - verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4), - eq(clientCookie4), any(Container.class)); - - } finally { - tsWrapper.shutdown(); - } - } - - private static class TestTaskSchedulerServiceWrapper { - static final Resource resource = Resource.newInstance(1024, 1); - Configuration conf; - TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class); - ControlledClock clock = new ControlledClock(new SystemClock()); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1); - LlapTaskSchedulerServiceForTest ts; - - TestTaskSchedulerServiceWrapper() throws IOException, InterruptedException { - this(2000l); - } - - TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) throws IOException, - InterruptedException { - this(disableTimeoutMillis, new String[]{HOST1, HOST2, HOST3}, 4, - ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.defaultIntVal); - } - - TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) throws - IOException, InterruptedException { - this(disableTimeoutMillis, hosts, numExecutors, waitQueueSize, 0l); - } - - TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize, long localityDelayMs) throws - IOException, InterruptedException { - conf = new Configuration(); - conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts); - conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors); - conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, waitQueueSize); - conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname, - disableTimeoutMillis + "ms"); - conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false); - conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, localityDelayMs); - - doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId(); - doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier(); - UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); - doReturn(userPayload).when(mockAppCallback).getInitialUserPayload(); - - ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock); - - controlScheduler(true); - ts.initialize(); - ts.start(); - // One scheduler pass from the nodes that are added at startup - signalSchedulerRun(); - controlScheduler(false); - awaitSchedulerRun(); - } - - void controlScheduler(boolean val) { - ts.forTestsetControlScheduling(val); - } - - void signalSchedulerRun() throws InterruptedException { - ts.forTestSignalSchedulingRun(); - } - - void awaitSchedulerRun() throws InterruptedException { - ts.forTestAwaitSchedulingRun(); - } - void resetAppCallback() { - reset(mockAppCallback); - } - - void shutdown() { - ts.shutdown(); - } - - void allocateTask(Object task, String[] hosts, Priority priority, Object clientCookie) { - ts.allocateTask(task, resource, hosts, null, priority, null, clientCookie); - } - - void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason endReason) { - ts.deallocateTask(task, succeeded, endReason, null); - } - - void rejectExecution(Object task) { - ts.deallocateTask(task, false, TaskAttemptEndReason.EXECUTOR_BUSY, null); - } - } - - private static class LlapTaskSchedulerServiceForTest extends LlapTaskSchedulerService { - - private AtomicBoolean controlScheduling = new AtomicBoolean(false); - private final Lock testLock = new ReentrantLock(); - private final Condition schedulingCompleteCondition = testLock.newCondition(); - private boolean schedulingComplete = false; - private final Condition triggerSchedulingCondition = testLock.newCondition(); - private boolean schedulingTriggered = false; - private final AtomicInteger numSchedulerRuns = new AtomicInteger(0); - - - public LlapTaskSchedulerServiceForTest( - TaskSchedulerContext appClient, Clock clock) { - super(appClient, clock); - } - - @Override - protected void schedulePendingTasks() { - testLock.lock(); - try { - if (controlScheduling.get()) { - while (!schedulingTriggered) { - try { - triggerSchedulingCondition.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - numSchedulerRuns.incrementAndGet(); - super.schedulePendingTasks(); - schedulingTriggered = false; - schedulingComplete = true; - schedulingCompleteCondition.signal(); - } finally { - testLock.unlock(); - } - } - - // Enable or disable test scheduling control. - void forTestsetControlScheduling(boolean control) { - this.controlScheduling.set(control); - } - - void forTestSignalSchedulingRun() throws InterruptedException { - testLock.lock(); - try { - schedulingTriggered = true; - triggerSchedulingCondition.signal(); - } finally { - testLock.unlock(); - } - } - - void forTestAwaitSchedulingRun() throws InterruptedException { - testLock.lock(); - try { - while (!schedulingComplete) { - schedulingCompleteCondition.await(); - } - schedulingComplete = false; - } finally { - testLock.unlock(); - } - } - - } -}
http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-tez/pom.xml ---------------------------------------------------------------------- diff --git a/llap-tez/pom.xml b/llap-tez/pom.xml new file mode 100644 index 0000000..ce020da --- /dev/null +++ b/llap-tez/pom.xml @@ -0,0 +1,200 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hive</groupId> + <artifactId>hive</artifactId> + <version>2.1.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>hive-llap-tez</artifactId> + <packaging>jar</packaging> + <name>Hive Llap Tez</name> + + <properties> + <hive.path.to.root>..</hive.path.to.root> + </properties> + + <dependencies> + <!-- dependencies are always listed in sorted order by groupId, artifactId --> + <!-- intra-project --> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-llap-client</artifactId> + <version>${project.version}</version> + </dependency> + <!-- inter-project --> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-api</artifactId> + <version>${tez.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-runtime-internals</artifactId> + <version>${tez.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-mapreduce</artifactId> + <version>${tez.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-dag</artifactId> + <version>${tez.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <!-- test intra-project --> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-llap-common</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <!-- test inter-project --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${mockito-all.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <sourceDirectory>${basedir}/src/java</sourceDirectory> + <testSourceDirectory>${basedir}/src/test</testSourceDirectory> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/gen/protobuf/gen-java</source> + <source>src/gen/thrift/gen-javabean</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java new file mode 100644 index 0000000..a314391 --- /dev/null +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; + +class ContainerFactory { + final ApplicationAttemptId customAppAttemptId; + AtomicLong nextId; + + public ContainerFactory(ApplicationAttemptId appAttemptId, long appIdLong) { + this.nextId = new AtomicLong(1); + ApplicationId appId = + ApplicationId.newInstance(appIdLong, appAttemptId.getApplicationId().getId()); + this.customAppAttemptId = + ApplicationAttemptId.newInstance(appId, appAttemptId.getAttemptId()); + } + + public Container createContainer(Resource capability, Priority priority, String hostname, + int port) { + ContainerId containerId = + ContainerId.newContainerId(customAppAttemptId, nextId.getAndIncrement()); + NodeId nodeId = NodeId.newInstance(hostname, port); + String nodeHttpAddress = "hostname:0"; // TODO: include UI ports + + Container container = + Container.newInstance(containerId, nodeId, nodeHttpAddress, capability, priority, null); + + return container; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java new file mode 100644 index 0000000..07703a2 --- /dev/null +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapContainerLauncher extends ContainerLauncher { + private static final Logger LOG = LoggerFactory.getLogger(LlapContainerLauncher.class); + + public LlapContainerLauncher(ContainerLauncherContext containerLauncherContext) { + super(containerLauncherContext); + } + + @Override + public void launchContainer(ContainerLaunchRequest containerLaunchRequest) { + LOG.info("No-op launch for container: " + containerLaunchRequest.getContainerId() + + " succeeded on host: " + containerLaunchRequest.getNodeId()); + getContext().containerLaunched(containerLaunchRequest.getContainerId()); + } + + @Override + public void stopContainer(ContainerStopRequest containerStopRequest) { + LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + containerStopRequest); + getContext().containerStopRequested(containerStopRequest.getContainerId()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/9886414b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java new file mode 100644 index 0000000..76d095a --- /dev/null +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -0,0 +1,757 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapNodeId; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; +import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; +import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.app.TezTaskCommunicatorImpl; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; +import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { + + private static final Logger LOG = LoggerFactory.getLogger(LlapTaskCommunicator.class); + + private static final boolean isInfoEnabled = LOG.isInfoEnabled(); + private static final boolean isDebugEnabed = LOG.isDebugEnabled(); + + private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST; + + private final ConcurrentMap<QueryIdentifierProto, ByteBuffer> credentialMap; + + // Tracks containerIds and taskAttemptIds, so can be kept independent of the running DAG. + // When DAG specific cleanup happens, it'll be better to link this to a DAG though. + private final EntityTracker entityTracker = new EntityTracker(); + private final SourceStateTracker sourceStateTracker; + private final Set<LlapNodeId> nodesForQuery = new HashSet<>(); + + private LlapProtocolClientProxy communicator; + private long deleteDelayOnDagComplete; + private final LlapTaskUmbilicalProtocol umbilical; + private final Token<LlapTokenIdentifier> token; + + // These two structures track the list of known nodes, and the list of nodes which are sending in keep-alive heartbeats. + // Primarily for debugging purposes a.t.m, since there's some unexplained TASK_TIMEOUTS which are currently being observed. + private final ConcurrentMap<LlapNodeId, Long> knownNodeMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<LlapNodeId, PingingNodeInfo> pingedNodeMap = new ConcurrentHashMap<>(); + + + private volatile int currentDagId; + private volatile QueryIdentifierProto currentQueryIdentifierProto; + + public LlapTaskCommunicator( + TaskCommunicatorContext taskCommunicatorContext) { + super(taskCommunicatorContext); + Credentials credentials = taskCommunicatorContext.getCredentials(); + if (credentials != null) { + @SuppressWarnings("unchecked") + Token<LlapTokenIdentifier> llapToken = + (Token<LlapTokenIdentifier>)credentials.getToken(LlapTokenIdentifier.KIND_NAME); + this.token = llapToken; + } else { + this.token = null; + } + Preconditions.checkState((token != null) == UserGroupInformation.isSecurityEnabled()); + + umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical()); + SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder(); + + // TODO Avoid reading this from the environment + baseBuilder.setUser(System.getenv(ApplicationConstants.Environment.USER.name())); + baseBuilder.setApplicationIdString( + taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString()); + baseBuilder + .setAppAttemptNumber(taskCommunicatorContext.getApplicationAttemptId().getAttemptId()); + baseBuilder.setTokenIdentifier(getTokenIdentifier()); + + BASE_SUBMIT_WORK_REQUEST = baseBuilder.build(); + + credentialMap = new ConcurrentHashMap<>(); + sourceStateTracker = new SourceStateTracker(getContext(), this); + } + + @Override + public void initialize() throws Exception { + super.initialize(); + Configuration conf = getConf(); + int numThreads = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS); + this.communicator = new LlapProtocolClientProxy(numThreads, conf, token); + this.deleteDelayOnDagComplete = HiveConf.getTimeVar( + conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS); + LOG.info("Running LlapTaskCommunicator with " + + "fileCleanupDelay=" + deleteDelayOnDagComplete + + ", numCommunicatorThreads=" + numThreads); + this.communicator.init(conf); + } + + @Override + public void start() { + super.start(); + this.communicator.start(); + } + + @Override + public void shutdown() { + super.shutdown(); + if (this.communicator != null) { + this.communicator.stop(); + } + } + + @Override + protected void startRpcServer() { + Configuration conf = getConf(); + try { + JobTokenSecretManager jobTokenSecretManager = + new JobTokenSecretManager(); + jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken); + + int numHandlers = conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT, + TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT); + server = new RPC.Builder(conf) + .setProtocol(LlapTaskUmbilicalProtocol.class) + .setBindAddress("0.0.0.0") + .setPort(0) + .setInstance(umbilical) + .setNumHandlers(numHandlers) + .setSecretManager(jobTokenSecretManager).build(); + + if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { + server.refreshServiceAcl(conf, new LlapUmbilicalPolicyProvider()); + } + + server.start(); + this.address = NetUtils.getConnectAddress(server); + LOG.info( + "Started LlapUmbilical: " + umbilical.getClass().getName() + " at address: " + address + + " with numHandlers=" + numHandlers); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + } + + @Override + public void registerRunningContainer(ContainerId containerId, String hostname, int port) { + super.registerRunningContainer(containerId, hostname, port); + entityTracker.registerContainer(containerId, hostname, port); + + } + + @Override + public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, String diagnostics) { + super.registerContainerEnd(containerId, endReason, diagnostics); + if (endReason == ContainerEndReason.INTERNAL_PREEMPTION) { + LOG.info("Processing containerEnd for container {} caused by internal preemption", containerId); + TezTaskAttemptID taskAttemptId = entityTracker.getTaskAttemptIdForContainer(containerId); + if (taskAttemptId != null) { + sendTaskTerminated(taskAttemptId, true); + } + } + entityTracker.unregisterContainer(containerId); + } + + + @Override + public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec, + Map<String, LocalResource> additionalResources, + Credentials credentials, + boolean credentialsChanged, + int priority) { + super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, + credentialsChanged, priority); + int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId(); + if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIdentifier())) { + resetCurrentDag(dagId); + } + + + ContainerInfo containerInfo = getContainerInfo(containerId); + String host; + int port; + if (containerInfo != null) { + synchronized (containerInfo) { + host = containerInfo.host; + port = containerInfo.port; + } + } else { + // TODO Handle this properly + throw new RuntimeException("ContainerInfo not found for container: " + containerId + + ", while trying to launch task: " + taskSpec.getTaskAttemptID()); + } + + LlapNodeId nodeId = LlapNodeId.getInstance(host, port); + registerKnownNode(nodeId); + entityTracker.registerTaskAttempt(containerId, taskSpec.getTaskAttemptID(), host, port); + nodesForQuery.add(nodeId); + + sourceStateTracker.registerTaskForStateUpdates(host, port, taskSpec.getInputs()); + FragmentRuntimeInfo fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo( + taskSpec.getVertexName(), taskSpec.getTaskAttemptID().getTaskID().getId(), priority); + SubmitWorkRequestProto requestProto; + + try { + requestProto = constructSubmitWorkRequest(containerId, taskSpec, fragmentRuntimeInfo); + } catch (IOException e) { + throw new RuntimeException("Failed to construct request", e); + } + + // Have to register this up front right now. Otherwise, it's possible for the task to start + // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them. + getContext() + .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId); + communicator.sendSubmitWork(requestProto, host, port, + new LlapProtocolClientProxy.ExecuteRequestCallback<SubmitWorkResponseProto>() { + @Override + public void setResponse(SubmitWorkResponseProto response) { + if (response.hasSubmissionState()) { + LlapDaemonProtocolProtos.SubmissionStateProto ss = response.getSubmissionState(); + if (ss.equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) { + LOG.info( + "Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + + containerId + ", Service Busy"); + getContext().taskKilled(taskSpec.getTaskAttemptID(), + TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy"); + return; + } + } else { + // TODO: Provide support for reporting errors + // This should never happen as server always returns a valid status on success + throw new RuntimeException("SubmissionState in response is expected!"); + } + LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID()); + } + + @Override + public void indicateError(Throwable t) { + if (t instanceof ServiceException) { + ServiceException se = (ServiceException) t; + t = se.getCause(); + } + if (t instanceof RemoteException) { + RemoteException re = (RemoteException) t; + // All others from the remote service cause the task to FAIL. + LOG.info( + "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + + containerId, t); + getContext() + .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, + t.toString()); + } else { + // Exception from the RPC layer - communication failure, consider as KILLED / service down. + if (t instanceof IOException) { + LOG.info( + "Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + + containerId + ", Communication Error"); + getContext().taskKilled(taskSpec.getTaskAttemptID(), + TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error"); + } else { + // Anything else is a FAIL. + LOG.info( + "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + + containerId, t); + getContext() + .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, + t.getMessage()); + } + } + } + }); + } + + @Override + public void unregisterRunningTaskAttempt(final TezTaskAttemptID taskAttemptId, + TaskAttemptEndReason endReason, + String diagnostics) { + super.unregisterRunningTaskAttempt(taskAttemptId, endReason, diagnostics); + + if (endReason == TaskAttemptEndReason.INTERNAL_PREEMPTION) { + LOG.info("Processing taskEnd for task {} caused by internal preemption", taskAttemptId); + sendTaskTerminated(taskAttemptId, false); + } + entityTracker.unregisterTaskAttempt(taskAttemptId); + // This will also be invoked for tasks which have been KILLED / rejected by the daemon. + // Informing the daemon becomes necessary once the LlapScheduler supports preemption + // and/or starts attempting to kill tasks which may be running on a node. + } + + private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId, + boolean invokedByContainerEnd) { + LOG.info( + "DBG: Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}", + taskAttemptId.toString(), invokedByContainerEnd ? "containerEnd" : "taskEnd"); + LlapNodeId nodeId = entityTracker.getNodeIdForTaskAttempt(taskAttemptId); + // NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself + if (nodeId != null) { + TerminateFragmentRequestProto request = + TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifierProto) + .setFragmentIdentifierString(taskAttemptId.toString()).build(); + communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(), + new LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() { + @Override + public void setResponse(TerminateFragmentResponseProto response) { + } + + @Override + public void indicateError(Throwable t) { + LOG.warn("Failed to send terminate fragment request for {}", + taskAttemptId.toString()); + } + }); + } else { + LOG.info( + "Not sending terminate request for fragment {} since it's node is not known. Already unregistered", + taskAttemptId.toString()); + } + } + + + + + @Override + public void dagComplete(final int dagIdentifier) { + QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder() + .setQueryIdentifier(constructQueryIdentifierProto(dagIdentifier)) + .setDeleteDelay(deleteDelayOnDagComplete).build(); + for (final LlapNodeId llapNodeId : nodesForQuery) { + LOG.info("Sending dagComplete message for {}, to {}", dagIdentifier, llapNodeId); + communicator.sendQueryComplete(request, llapNodeId.getHostname(), llapNodeId.getPort(), + new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>() { + @Override + public void setResponse(LlapDaemonProtocolProtos.QueryCompleteResponseProto response) { + } + + @Override + public void indicateError(Throwable t) { + LOG.warn("Failed to indicate dag complete dagId={} to node {}", dagIdentifier, llapNodeId); + } + }); + } + + nodesForQuery.clear(); + // TODO Ideally move some of the other cleanup code from resetCurrentDag over here + } + + @Override + public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) { + // Delegate updates over to the source state tracker. + sourceStateTracker + .sourceStateUpdated(vertexStateUpdate.getVertexName(), vertexStateUpdate.getVertexState()); + } + + public void sendStateUpdate(final String host, final int port, + final SourceStateUpdatedRequestProto request) { + communicator.sendSourceStateUpdate(request, host, port, + new LlapProtocolClientProxy.ExecuteRequestCallback<SourceStateUpdatedResponseProto>() { + @Override + public void setResponse(SourceStateUpdatedResponseProto response) { + } + + @Override + public void indicateError(Throwable t) { + // TODO HIVE-10280. + // Ideally, this should be retried for a while, after which the node should be marked as failed. + // Considering tasks are supposed to run fast. Failing the task immediately may be a good option. + LOG.error( + "Failed to send state update to node: " + host + ":" + port + ", StateUpdate=" + + request, t); + } + }); + } + + + private static class PingingNodeInfo { + final AtomicLong logTimestamp; + final AtomicInteger pingCount; + + PingingNodeInfo(long currentTs) { + logTimestamp = new AtomicLong(currentTs); + pingCount = new AtomicInteger(1); + } + } + + public void registerKnownNode(LlapNodeId nodeId) { + Long old = knownNodeMap.putIfAbsent(nodeId, + TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS)); + if (old == null) { + if (isInfoEnabled) { + LOG.info("Added new known node: {}", nodeId); + } + } + } + + public void registerPingingNode(LlapNodeId nodeId) { + long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); + PingingNodeInfo ni = new PingingNodeInfo(currentTs); + PingingNodeInfo old = pingedNodeMap.put(nodeId, ni); + if (old == null) { + if (isInfoEnabled) { + LOG.info("Added new pinging node: [{}]", nodeId); + } + } else { + old.pingCount.incrementAndGet(); + } + // The node should always be known by this point. Log occasionally if it is not known. + if (!knownNodeMap.containsKey(nodeId)) { + if (old == null) { + // First time this is seen. Log it. + LOG.warn("Received ping from unknownNode: [{}], count={}", nodeId, ni.pingCount.get()); + } else { + // Pinged before. Log only occasionally. + if (currentTs > old.logTimestamp.get() + 5000l) { // 5 seconds elapsed. Log again. + LOG.warn("Received ping from unknownNode: [{}], count={}", nodeId, old.pingCount.get()); + old.logTimestamp.set(currentTs); + } + } + + } + } + + + private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0); + + void nodePinged(String hostname, int port) { + LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port); + registerPingingNode(nodeId); + BiMap<ContainerId, TezTaskAttemptID> biMap = + entityTracker.getContainerAttemptMapForNode(nodeId); + if (biMap != null) { + synchronized (biMap) { + for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) { + getContext().taskAlive(entry.getValue()); + getContext().containerAlive(entry.getKey()); + } + } + } else { + long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); + if (currentTs > nodeNotFoundLogTime.get() + 5000l) { + LOG.warn("Received ping from node without any registered tasks or containers: " + hostname + + ":" + port + + ". Could be caused by pre-emption by the AM," + + " or a mismatched hostname. Enable debug logging for mismatched host names"); + nodeNotFoundLogTime.set(currentTs); + } + } + } + + private void resetCurrentDag(int newDagId) { + // Working on the assumption that a single DAG runs at a time per AM. + currentQueryIdentifierProto = constructQueryIdentifierProto(newDagId); + sourceStateTracker.resetState(newDagId); + nodesForQuery.clear(); + LOG.info("CurrentDagId set to: " + newDagId + ", name=" + getContext().getCurrentDagName()); + // TODO Is it possible for heartbeats to come in from lost tasks - those should be told to die, which + // is likely already happening. + } + + private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId, + TaskSpec taskSpec, + FragmentRuntimeInfo fragmentRuntimeInfo) throws + IOException { + SubmitWorkRequestProto.Builder builder = + SubmitWorkRequestProto.newBuilder(BASE_SUBMIT_WORK_REQUEST); + builder.setContainerIdString(containerId.toString()); + builder.setAmHost(getAddress().getHostName()); + builder.setAmPort(getAddress().getPort()); + Credentials taskCredentials = new Credentials(); + // Credentials can change across DAGs. Ideally construct only once per DAG. + taskCredentials.addAll(getContext().getCredentials()); + + Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() == + taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); + ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); + if (credentialsBinary == null) { + credentialsBinary = serializeCredentials(getContext().getCredentials()); + credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate()); + } else { + credentialsBinary = credentialsBinary.duplicate(); + } + builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); + builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec)); + builder.setFragmentRuntimeInfo(fragmentRuntimeInfo); + return builder.build(); + } + + private ByteBuffer serializeCredentials(Credentials credentials) throws IOException { + Credentials containerCredentials = new Credentials(); + containerCredentials.addAll(credentials); + DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); + containerCredentials.writeTokenStorageToStream(containerTokens_dob); + return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength()); + } + + + + protected class LlapTaskUmbilicalProtocolImpl implements LlapTaskUmbilicalProtocol { + + private final TezTaskUmbilicalProtocol tezUmbilical; + + public LlapTaskUmbilicalProtocolImpl(TezTaskUmbilicalProtocol tezUmbilical) { + this.tezUmbilical = tezUmbilical; + } + + @Override + public boolean canCommit(TezTaskAttemptID taskid) throws IOException { + return tezUmbilical.canCommit(taskid); + } + + @Override + public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, + TezException { + return tezUmbilical.heartbeat(request); + } + + @Override + public void nodeHeartbeat(Text hostname, int port) throws IOException { + nodePinged(hostname.toString(), port); + if (LOG.isDebugEnabled()) { + LOG.debug("Received heartbeat from [" + hostname + ":" + port +"]"); + } + } + + @Override + public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException { + // TODO Unregister the task for state updates, which could in turn unregister the node. + getContext().taskKilled(taskAttemptId, + TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted"); + entityTracker.unregisterTaskAttempt(taskAttemptId); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return versionID; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int clientMethodsHash) throws IOException { + return ProtocolSignature.getProtocolSignature(this, protocol, + clientVersion, clientMethodsHash); + } + } + + /** + * Track the association between known containers and taskAttempts, along with the nodes they are assigned to. + */ + @VisibleForTesting + static final class EntityTracker { + @VisibleForTesting + final ConcurrentMap<TezTaskAttemptID, LlapNodeId> attemptToNodeMap = new ConcurrentHashMap<>(); + @VisibleForTesting + final ConcurrentMap<ContainerId, LlapNodeId> containerToNodeMap = new ConcurrentHashMap<>(); + @VisibleForTesting + final ConcurrentMap<LlapNodeId, BiMap<ContainerId, TezTaskAttemptID>> nodeMap = new ConcurrentHashMap<>(); + + void registerTaskAttempt(ContainerId containerId, TezTaskAttemptID taskAttemptId, String host, int port) { + if (LOG.isDebugEnabled()) { + LOG.debug("Registering " + containerId + ", " + taskAttemptId + " for node: " + host + ":" + port); + } + LlapNodeId llapNodeId = LlapNodeId.getInstance(host, port); + attemptToNodeMap.putIfAbsent(taskAttemptId, llapNodeId); + + registerContainer(containerId, host, port); + + // nodeMap registration. + BiMap<ContainerId, TezTaskAttemptID> tmpMap = HashBiMap.create(); + BiMap<ContainerId, TezTaskAttemptID> old = nodeMap.putIfAbsent(llapNodeId, tmpMap); + BiMap<ContainerId, TezTaskAttemptID> usedInstance; + usedInstance = old == null ? tmpMap : old; + synchronized(usedInstance) { + usedInstance.put(containerId, taskAttemptId); + } + // Make sure to put the instance back again, in case it was removed as part of a + // containerEnd/taskEnd invocation. + nodeMap.putIfAbsent(llapNodeId, usedInstance); + } + + void unregisterTaskAttempt(TezTaskAttemptID attemptId) { + LlapNodeId llapNodeId = attemptToNodeMap.remove(attemptId); + if (llapNodeId == null) { + // Possible since either container / task can be unregistered. + return; + } + + BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId); + ContainerId matched = null; + if (bMap != null) { + synchronized(bMap) { + matched = bMap.inverse().remove(attemptId); + } + if (bMap.isEmpty()) { + nodeMap.remove(llapNodeId); + } + } + + // Remove the container mapping + if (matched != null) { + containerToNodeMap.remove(matched); + } + + } + + void registerContainer(ContainerId containerId, String hostname, int port) { + if (LOG.isDebugEnabled()) { + LOG.debug("Registering " + containerId + " for node: " + hostname + ":" + port); + } + containerToNodeMap.putIfAbsent(containerId, LlapNodeId.getInstance(hostname, port)); + // nodeMap registration is not required, since there's no taskId association. + } + + LlapNodeId getNodeIdForContainer(ContainerId containerId) { + return containerToNodeMap.get(containerId); + } + + LlapNodeId getNodeIdForTaskAttempt(TezTaskAttemptID taskAttemptId) { + return attemptToNodeMap.get(taskAttemptId); + } + + ContainerId getContainerIdForAttempt(TezTaskAttemptID taskAttemptId) { + LlapNodeId llapNodeId = getNodeIdForTaskAttempt(taskAttemptId); + if (llapNodeId != null) { + BiMap<TezTaskAttemptID, ContainerId> bMap = nodeMap.get(llapNodeId).inverse(); + if (bMap != null) { + synchronized (bMap) { + return bMap.get(taskAttemptId); + } + } else { + return null; + } + } else { + return null; + } + } + + TezTaskAttemptID getTaskAttemptIdForContainer(ContainerId containerId) { + LlapNodeId llapNodeId = getNodeIdForContainer(containerId); + if (llapNodeId != null) { + BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId); + if (bMap != null) { + synchronized (bMap) { + return bMap.get(containerId); + } + } else { + return null; + } + } else { + return null; + } + } + + void unregisterContainer(ContainerId containerId) { + LlapNodeId llapNodeId = containerToNodeMap.remove(containerId); + if (llapNodeId == null) { + // Possible since either container / task can be unregistered. + return; + } + + BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId); + TezTaskAttemptID matched = null; + if (bMap != null) { + synchronized(bMap) { + matched = bMap.remove(containerId); + } + if (bMap.isEmpty()) { + nodeMap.remove(llapNodeId); + } + } + + // Remove the container mapping + if (matched != null) { + attemptToNodeMap.remove(matched); + } + } + + /** + * Return a {@link BiMap} containing container->taskAttemptId mapping for the host specified. + * </p> + * <p/> + * This method return the internal structure used by the EntityTracker. Users must synchronize + * on the structure to ensure correct usage. + * + * @param llapNodeId + * @return + */ + BiMap<ContainerId, TezTaskAttemptID> getContainerAttemptMapForNode(LlapNodeId llapNodeId) { + BiMap<ContainerId, TezTaskAttemptID> biMap = nodeMap.get(llapNodeId); + return biMap; + } + + } + + private QueryIdentifierProto constructQueryIdentifierProto(int dagIdentifier) { + return QueryIdentifierProto.newBuilder() + .setAppIdentifier(getContext().getCurrentAppIdentifier()).setDagIdentifier(dagIdentifier) + .build(); + } +}