Repository: hive Updated Branches: refs/heads/master 11f1e47eb -> 4185d9b8e
http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java new file mode 100644 index 0000000..36d8ffd --- /dev/null +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -0,0 +1,684 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl; +import org.apache.hadoop.hive.llap.testhelpers.ControlledClock; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestLlapTaskSchedulerService { + + private static final Logger LOG = LoggerFactory.getLogger(TestLlapTaskSchedulerService.class); + + private static final String HOST1 = "host1"; + private static final String HOST2 = "host2"; + private static final String HOST3 = "host3"; + + @Test (timeout = 5000) + public void testSimpleLocalAllocation() throws IOException, InterruptedException { + + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(); + + try { + Priority priority1 = Priority.newInstance(1); + String[] hosts1 = new String[]{HOST1}; + + Object task1 = new Object(); + Object clientCookie1 = new Object(); + + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1); + + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + + verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class)); + // TODO Verify this is on host1. + assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations); + } finally { + tsWrapper.shutdown(); + } + } + + @Test (timeout = 5000) + public void testSimpleNoLocalityAllocation() throws IOException, InterruptedException { + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(); + + try { + Priority priority1 = Priority.newInstance(1); + + Object task1 = new Object(); + Object clientCookie1 = new Object(); + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, null, priority1, clientCookie1); + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class)); + assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + } finally { + tsWrapper.shutdown(); + } + } + + + @Test(timeout=5000) + public void testPreemption() throws InterruptedException, IOException { + + Priority priority1 = Priority.newInstance(1); + Priority priority2 = Priority.newInstance(2); + String [] hosts = new String[] {HOST1}; + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1); + try { + + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + Object task2 = "task2"; + Object clientCookie2 = "cookie2"; + Object task3 = "task3"; + Object clientCookie3 = "cookie3"; + Object task4 = "task4"; + Object clientCookie4 = "cookie4"; + + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, hosts, priority2, clientCookie1); + tsWrapper.allocateTask(task2, hosts, priority2, clientCookie2); + tsWrapper.allocateTask(task3, hosts, priority2, clientCookie3); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numLocalAllocations == 2) { + break; + } + } + verify(tsWrapper.mockAppCallback, times(2)).taskAllocated(any(Object.class), + any(Object.class), any(Container.class)); + assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + + reset(tsWrapper.mockAppCallback); + + tsWrapper.allocateTask(task4, hosts, priority1, clientCookie4); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) { + break; + } + } + verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class)); + + + tsWrapper.deallocateTask(task2, false, TaskAttemptEndReason.INTERNAL_PREEMPTION); + + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 3) { + break; + } + } + verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4), + eq(clientCookie4), any(Container.class)); + + } finally { + tsWrapper.shutdown(); + } + + } + + @Test(timeout=5000) + public void testNodeDisabled() throws IOException, InterruptedException { + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(10000l); + try { + Priority priority1 = Priority.newInstance(1); + String[] hosts1 = new String[]{HOST1}; + Object task1 = new Object(); + Object clientCookie1 = new Object(); + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 1) { + break; + } + } + verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), + any(Container.class)); + assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + assertEquals(0, tsWrapper.ts.dagStats.numNonLocalAllocations); + assertEquals(1, tsWrapper.ts.dagStats.numTotalAllocations); + + tsWrapper.resetAppCallback(); + + tsWrapper.clock.setTime(10000l); + tsWrapper.rejectExecution(task1); + + // Verify that the node is blacklisted + assertEquals(1, tsWrapper.ts.dagStats.numRejectedTasks); + assertEquals(3, tsWrapper.ts.instanceToNodeMap.size()); + LlapTaskSchedulerService.NodeInfo disabledNodeInfo = tsWrapper.ts.disabledNodesQueue.peek(); + assertNotNull(disabledNodeInfo); + assertEquals(HOST1, disabledNodeInfo.serviceInstance.getHost()); + assertEquals((10000l), disabledNodeInfo.getDelay(TimeUnit.MILLISECONDS)); + assertEquals((10000l + 10000l), disabledNodeInfo.expireTimeMillis); + + Object task2 = new Object(); + Object clientCookie2 = new Object(); + tsWrapper.allocateTask(task2, hosts1, priority1, clientCookie2); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 2) { + break; + } + } + verify(tsWrapper.mockAppCallback).taskAllocated(eq(task2), eq(clientCookie2), any(Container.class)); + assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations); + assertEquals(2, tsWrapper.ts.dagStats.numTotalAllocations); + + } finally { + tsWrapper.shutdown(); + } + } + + @Test(timeout=5000) + public void testNodeReEnabled() throws InterruptedException, IOException { + // Based on actual timing. + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(1000l); + try { + Priority priority1 = Priority.newInstance(1); + String[] hosts1 = new String[]{HOST1}; + String[] hosts2 = new String[]{HOST2}; + String[] hosts3 = new String[]{HOST3}; + + Object task1 = new Object(); + Object clientCookie1 = new Object(); + Object task2 = new Object(); + Object clientCookie2 = new Object(); + Object task3 = new Object(); + Object clientCookie3 = new Object(); + + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1); + tsWrapper.allocateTask(task2, hosts2, priority1, clientCookie2); + tsWrapper.allocateTask(task3, hosts3, priority1, clientCookie3); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 3) { + break; + } + } + verify(tsWrapper.mockAppCallback, times(3)).taskAllocated(any(Object.class), any(Object.class), any(Container.class)); + assertEquals(3, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + assertEquals(3, tsWrapper.ts.dagStats.numTotalAllocations); + + tsWrapper.resetAppCallback(); + + tsWrapper.rejectExecution(task1); + tsWrapper.rejectExecution(task2); + tsWrapper.rejectExecution(task3); + + // Verify that the node is blacklisted + assertEquals(3, tsWrapper.ts.dagStats.numRejectedTasks); + assertEquals(3, tsWrapper.ts.instanceToNodeMap.size()); + assertEquals(3, tsWrapper.ts.disabledNodesQueue.size()); + + + Object task4 = new Object(); + Object clientCookie4 = new Object(); + Object task5 = new Object(); + Object clientCookie5 = new Object(); + Object task6 = new Object(); + Object clientCookie6 = new Object(); + tsWrapper.allocateTask(task4, hosts1, priority1, clientCookie4); + tsWrapper.allocateTask(task5, hosts2, priority1, clientCookie5); + tsWrapper.allocateTask(task6, hosts3, priority1, clientCookie6); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 6) { + break; + } + } + + ArgumentCaptor<Container> argumentCaptor = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(3)).taskAllocated(any(Object.class), any(Object.class), argumentCaptor.capture()); + + // which affects the locality matching + assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + assertEquals(6, tsWrapper.ts.dagStats.numTotalAllocations); + + } finally { + tsWrapper.shutdown(); + } + } + + @Test (timeout = 5000) + public void testForceLocalityTest1() throws IOException, InterruptedException { + // 2 hosts. 2 per host. 5 requests at the same priority. + // First 3 on host1, Next at host2, Last with no host. + // Third request on host1 should not be allocated immediately. + forceLocalityTest1(true); + + } + + @Test (timeout = 5000) + public void testNoForceLocalityCounterTest1() throws IOException, InterruptedException { + // 2 hosts. 2 per host. 5 requests at the same priority. + // First 3 on host1, Next at host2, Last with no host. + // Third should allocate on host2, 4th on host2, 5th will wait. + + forceLocalityTest1(false); + } + + private void forceLocalityTest1(boolean forceLocality) throws IOException, InterruptedException { + Priority priority1 = Priority.newInstance(1); + + String[] hosts = new String[] {HOST1, HOST2}; + + String[] hostsH1 = new String[] {HOST1}; + String[] hostsH2 = new String[] {HOST2}; + + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, (forceLocality ? -1l : 0l)); + + try { + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + Object task2 = "task2"; + Object clientCookie2 = "cookie2"; + Object task3 = "task3"; + Object clientCookie3 = "cookie3"; + Object task4 = "task4"; + Object clientCookie4 = "cookie4"; + Object task5 = "task5"; + Object clientCookie5 = "cookie5"; + + tsWrapper.controlScheduler(true); + //H1 - should allocate + tsWrapper.allocateTask(task1, hostsH1, priority1, clientCookie1); + //H1 - should allocate + tsWrapper.allocateTask(task2, hostsH1, priority1, clientCookie2); + //H1 - no capacity if force, should allocate otherwise + tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3); + //H2 - should allocate + tsWrapper.allocateTask(task4, hostsH2, priority1, clientCookie4); + //No location - should allocate if force, no capacity otherwise + tsWrapper.allocateTask(task5, null, priority1, clientCookie5); + + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 4) { + break; + } + } + + // Verify no preemption requests - since everything is at the same priority + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class); + verify(tsWrapper.mockAppCallback, times(4)).taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); + assertEquals(4, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + if (forceLocality) { + // task3 not allocated + assertEquals(task4, argumentCaptor.getAllValues().get(2)); + assertEquals(task5, argumentCaptor.getAllValues().get(3)); + } else { + assertEquals(task3, argumentCaptor.getAllValues().get(2)); + assertEquals(task4, argumentCaptor.getAllValues().get(3)); + } + + //Complete one task on host1. + tsWrapper.deallocateTask(task1, true, null); + + reset(tsWrapper.mockAppCallback); + + // Try scheduling again. + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 5) { + break; + } + } + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + argumentCaptor = ArgumentCaptor.forClass(Object.class); + verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); + assertEquals(1, argumentCaptor.getAllValues().size()); + if (forceLocality) { + assertEquals(task3, argumentCaptor.getAllValues().get(0)); + } else { + assertEquals(task5, argumentCaptor.getAllValues().get(0)); + } + + } finally { + tsWrapper.shutdown(); + } + } + + @Test(timeout = 5000) + public void testForcedLocalityUnknownHost() throws IOException, InterruptedException { + Priority priority1 = Priority.newInstance(1); + + String[] hostsKnown = new String[]{HOST1}; + String[] hostsUnknown = new String[]{HOST2}; + + TestTaskSchedulerServiceWrapper tsWrapper = + new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 1, -1l); + try { + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + + Object task2 = "task2"; + Object clientCookie2 = "cookie2"; + + tsWrapper.controlScheduler(true); + // Should allocate since H2 is not known. + tsWrapper.allocateTask(task1, hostsUnknown, priority1, clientCookie1); + tsWrapper.allocateTask(task2, hostsKnown, priority1, clientCookie2); + + + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 2) { + break; + } + } + + ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); + assertEquals(2, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + + + } finally { + tsWrapper.shutdown(); + } + } + + + @Test(timeout = 5000) + public void testForcedLocalityPreemption() throws IOException, InterruptedException { + Priority priority1 = Priority.newInstance(1); + Priority priority2 = Priority.newInstance(2); + String [] hosts = new String[] {HOST1, HOST2}; + + String [] hostsH1 = new String[] {HOST1}; + String [] hostsH2 = new String[] {HOST2}; + + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l); + + // Fill up host1 with p2 tasks. + // Leave host2 empty + // Try running p1 task on host1 - should preempt + + try { + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + Object task2 = "task2"; + Object clientCookie2 = "cookie2"; + Object task3 = "task3"; + Object clientCookie3 = "cookie3"; + Object task4 = "task4"; + Object clientCookie4 = "cookie4"; + + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1); + tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2); + // This request at a lower priority should not affect anything. + tsWrapper.allocateTask(task3, hostsH1, priority2, clientCookie3); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numLocalAllocations == 2) { + break; + } + } + + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); + assertEquals(2, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + + reset(tsWrapper.mockAppCallback); + // Allocate t4 at higher priority. t3 should not be allocated, + // and a preemption should be attempted on host1, despite host2 having available capacity + tsWrapper.allocateTask(task4, hostsH1, priority1, clientCookie4); + + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) { + break; + } + } + verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class)); + + tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.INTERNAL_PREEMPTION); + + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 3) { + break; + } + } + verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4), + eq(clientCookie4), any(Container.class)); + + } finally { + tsWrapper.shutdown(); + } + } + + private static class TestTaskSchedulerServiceWrapper { + static final Resource resource = Resource.newInstance(1024, 1); + Configuration conf; + TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class); + ControlledClock clock = new ControlledClock(new SystemClock()); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1); + LlapTaskSchedulerServiceForTest ts; + + TestTaskSchedulerServiceWrapper() throws IOException, InterruptedException { + this(2000l); + } + + TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) throws IOException, + InterruptedException { + this(disableTimeoutMillis, new String[]{HOST1, HOST2, HOST3}, 4, + ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.defaultIntVal); + } + + TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) throws + IOException, InterruptedException { + this(disableTimeoutMillis, hosts, numExecutors, waitQueueSize, 0l); + } + + TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize, long localityDelayMs) throws + IOException, InterruptedException { + conf = new Configuration(); + conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts); + conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors); + conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, waitQueueSize); + conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname, + disableTimeoutMillis + "ms"); + conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false); + conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, localityDelayMs); + + doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId(); + doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier(); + UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); + doReturn(userPayload).when(mockAppCallback).getInitialUserPayload(); + + ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock); + + controlScheduler(true); + ts.initialize(); + ts.start(); + // One scheduler pass from the nodes that are added at startup + signalSchedulerRun(); + controlScheduler(false); + awaitSchedulerRun(); + } + + void controlScheduler(boolean val) { + ts.forTestsetControlScheduling(val); + } + + void signalSchedulerRun() throws InterruptedException { + ts.forTestSignalSchedulingRun(); + } + + void awaitSchedulerRun() throws InterruptedException { + ts.forTestAwaitSchedulingRun(); + } + void resetAppCallback() { + reset(mockAppCallback); + } + + void shutdown() { + ts.shutdown(); + } + + void allocateTask(Object task, String[] hosts, Priority priority, Object clientCookie) { + ts.allocateTask(task, resource, hosts, null, priority, null, clientCookie); + } + + void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason endReason) { + ts.deallocateTask(task, succeeded, endReason, null); + } + + void rejectExecution(Object task) { + ts.deallocateTask(task, false, TaskAttemptEndReason.EXECUTOR_BUSY, null); + } + } + + private static class LlapTaskSchedulerServiceForTest extends LlapTaskSchedulerService { + + private AtomicBoolean controlScheduling = new AtomicBoolean(false); + private final Lock testLock = new ReentrantLock(); + private final Condition schedulingCompleteCondition = testLock.newCondition(); + private boolean schedulingComplete = false; + private final Condition triggerSchedulingCondition = testLock.newCondition(); + private boolean schedulingTriggered = false; + private final AtomicInteger numSchedulerRuns = new AtomicInteger(0); + + + public LlapTaskSchedulerServiceForTest( + TaskSchedulerContext appClient, Clock clock) { + super(appClient, clock); + } + + @Override + protected void schedulePendingTasks() { + testLock.lock(); + try { + if (controlScheduling.get()) { + while (!schedulingTriggered) { + try { + triggerSchedulingCondition.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + numSchedulerRuns.incrementAndGet(); + super.schedulePendingTasks(); + schedulingTriggered = false; + schedulingComplete = true; + schedulingCompleteCondition.signal(); + } finally { + testLock.unlock(); + } + } + + // Enable or disable test scheduling control. + void forTestsetControlScheduling(boolean control) { + this.controlScheduling.set(control); + } + + void forTestSignalSchedulingRun() throws InterruptedException { + testLock.lock(); + try { + schedulingTriggered = true; + triggerSchedulingCondition.signal(); + } finally { + testLock.unlock(); + } + } + + void forTestAwaitSchedulingRun() throws InterruptedException { + testLock.lock(); + try { + while (!schedulingComplete) { + schedulingCompleteCondition.await(); + } + schedulingComplete = false; + } finally { + testLock.unlock(); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 802d3d4..8c2257f 100644 --- a/pom.xml +++ b/pom.xml @@ -49,8 +49,10 @@ <module>serde</module> <module>service-rpc</module> <module>service</module> - <module>llap-server</module> + <module>llap-common</module> <module>llap-client</module> + <module>llap-tez</module> + <module>llap-server</module> <module>shims</module> <module>spark-client</module> <module>storage-api</module> http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/ql/pom.xml ---------------------------------------------------------------------- diff --git a/ql/pom.xml b/ql/pom.xml index f19a225..330e449 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -75,6 +75,11 @@ </dependency> <dependency> <groupId>org.apache.hive</groupId> + <artifactId>hive-llap-tez</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> <artifactId>hive-shims</artifactId> <version>${project.version}</version> </dependency> @@ -603,6 +608,55 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-dag</artifactId> + <version>${tez.version}</version> + <optional>true</optional> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 0bc6e2a..6a7d035 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.util.Collection; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -44,7 +42,6 @@ import java.util.concurrent.TimeoutException; import javax.security.auth.login.LoginException; -import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.fs.FileStatus; @@ -53,9 +50,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.security.LlapTokenProvider; +import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; +import org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher; +import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator; +import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -88,9 +90,9 @@ public class TezSessionState { private static final Logger LOG = LoggerFactory.getLogger(TezSessionState.class.getName()); private static final String TEZ_DIR = "_tez_session_dir"; public static final String LLAP_SERVICE = "LLAP"; - private static final String LLAP_SCHEDULER = "org.apache.tez.dag.app.rm.LlapTaskSchedulerService"; - private static final String LLAP_LAUNCHER = "org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher"; - private static final String LLAP_TASK_COMMUNICATOR = "org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator"; + private static final String LLAP_SCHEDULER = LlapTaskSchedulerService.class.getName(); + private static final String LLAP_LAUNCHER = LlapContainerLauncher.class.getName(); + private static final String LLAP_TASK_COMMUNICATOR = LlapTaskCommunicator.class.getName(); private HiveConf conf; private Path tezScratchDir; @@ -251,27 +253,11 @@ public class TezSessionState { // add configs for llap-daemon-site.xml + localize llap jars // they cannot be referred to directly as it would be a circular depedency conf.addResource("llap-daemon-site.xml"); - try { - final File daemonJar = - new File(Utilities.jarFinderGetJar(Class - .forName("org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat"))); - final LocalResource daemonLr = - createJarLocalResource(daemonJar.toURI().toURL().toExternalForm()); - commonLocalResources.put(utils.getBaseName(daemonLr), daemonLr); - } catch (ClassNotFoundException ce) { - throw new IOException("Cannot find LlapInputFormat in the classpath", ce); - } - try { - final File registryJar = - new File(Utilities.jarFinderGetJar(Class - .forName("org.apache.hadoop.registry.client.api.RegistryOperations"))); - final LocalResource registryLr = - createJarLocalResource(registryJar.toURI().toURL().toExternalForm()); - commonLocalResources.put(utils.getBaseName(registryLr), registryLr); - } catch (ClassNotFoundException ce) { - throw new IOException("Cannot find Hadoop Registry in the classpath", ce); - } + addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources); + addJarLRByClass(LlapProtocolClientImpl.class, commonLocalResources); + addJarLRByClass(LlapProtocolClientProxy.class, commonLocalResources); + addJarLRByClassName("org.apache.hadoop.registry.client.api.RegistryOperations", commonLocalResources); } // Create environment for AM. @@ -587,6 +573,26 @@ public class TezSessionState { return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf); } + private void addJarLRByClassName(String className, final Map<String, LocalResource> lrMap) throws + IOException, LoginException { + Class<?> clazz; + try { + clazz = Class.forName(className); + } catch (ClassNotFoundException e) { + throw new IOException("Cannot find " + className + " in classpath", e); + } + addJarLRByClass(clazz, lrMap); + } + + private void addJarLRByClass(Class<?> clazz, final Map<String, LocalResource> lrMap) throws IOException, + LoginException { + final File jar = + new File(Utilities.jarFinderGetJar(clazz)); + final LocalResource jarLr = + createJarLocalResource(jar.toURI().toURL().toExternalForm()); + lrMap.put(utils.getBaseName(jarLr), jarLr); + } + private String getSha(Path localFile) throws IOException, IllegalArgumentException { InputStream is = null; try {