Repository: hive Updated Branches: refs/heads/llap d28b6a53e -> b97a07688
HIVE-11148. LLAP: fix TestLlapTaskSchedulerService flakiness. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b97a0768 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b97a0768 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b97a0768 Branch: refs/heads/llap Commit: b97a0768885c052b25058885ad37ebc3d0b40503 Parents: d28b6a5 Author: Siddharth Seth <ss...@apache.org> Authored: Thu Aug 20 18:41:12 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Thu Aug 20 18:41:12 2015 -0700 ---------------------------------------------------------------------- .../registry/impl/LlapFixedRegistryImpl.java | 29 +- .../dag/app/rm/LlapTaskSchedulerService.java | 3 + .../app/rm/TestLlapTaskSchedulerService.java | 282 +++++++++---------- 3 files changed, 160 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b97a0768/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java index cdc3930..57aa1e7 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; @@ -36,11 +37,16 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { private static final Logger LOG = Logger.getLogger(LlapFixedRegistryImpl.class); + @InterfaceAudience.Private + // This is primarily for testing to avoid the host lookup + public static final String FIXED_REGISTRY_RESOLVE_HOST_NAMES = "fixed.registry.resolve.host.names"; + private final int port; private final int shuffle; private final String[] hosts; private final int memory; private final int vcores; + private final boolean resolveHosts; private final Map<String, String> srv = new HashMap<String, String>(); @@ -52,6 +58,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { this.shuffle = conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); + this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, true); for (Map.Entry<String, String> kv : conf) { if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX) @@ -100,17 +107,19 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { private final String host; public FixedServiceInstance(String host) { - try { - InetAddress inetAddress = InetAddress.getByName(host); - if (NetUtils.isLocalAddress(inetAddress)) { - InetSocketAddress socketAddress = new InetSocketAddress(0); - socketAddress = NetUtils.getConnectAddress(socketAddress); - LOG.info("Adding host identified as local: " + host + " as " - + socketAddress.getHostName()); - host = socketAddress.getHostName(); + if (resolveHosts) { + try { + InetAddress inetAddress = InetAddress.getByName(host); + if (NetUtils.isLocalAddress(inetAddress)) { + InetSocketAddress socketAddress = new InetSocketAddress(0); + socketAddress = NetUtils.getConnectAddress(socketAddress); + LOG.info("Adding host identified as local: " + host + " as " + + socketAddress.getHostName()); + host = socketAddress.getHostName(); + } + } catch (UnknownHostException e) { + LOG.warn("Ignoring resolution issues for host: " + host, e); } - } catch (UnknownHostException e) { - LOG.warn("Ignoring resolution issues for host: " + host, e); } this.host = host; } http://git-wip-us.apache.org/repos/asf/hive/blob/b97a0768/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index f69a99b..38d42b9 100644 --- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -1111,6 +1111,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { int numRequestedAllocations = 0; int numRequestsWithLocation = 0; int numRequestsWithoutLocation = 0; + int numTotalAllocations = 0; int numLocalAllocations = 0; int numNonLocalAllocations = 0; int numAllocationsNoLocalityRequest = 0; @@ -1129,6 +1130,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { sb.append("NumRequestsWithlocation=").append(numRequestsWithLocation).append(", "); sb.append("NumLocalAllocations=").append(numLocalAllocations).append(","); sb.append("NumNonLocalAllocations=").append(numNonLocalAllocations).append(","); + sb.append("NumTotalAllocations=").append(numTotalAllocations).append(","); sb.append("NumRequestsWithoutLocation=").append(numRequestsWithoutLocation).append(", "); sb.append("NumRejectedTasks=").append(numRejectedTasks).append(", "); sb.append("NumCommFailures=").append(numCommFailures).append(", "); @@ -1163,6 +1165,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { } else { numAllocationsNoLocalityRequest++; } + numTotalAllocations++; _registerAllocationInHostMap(allocatedHost, numAllocationsPerHost); } http://git-wip-us.apache.org/repos/asf/hive/blob/b97a0768/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java index 0d4d619..ce60e6e 100644 --- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java +++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java @@ -32,9 +32,9 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapFixedRegistryImpl; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -50,17 +50,19 @@ import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestLlapTaskSchedulerService { - // TODO Fix the races and the broken scheduler control in the tests + private static final Logger LOG = LoggerFactory.getLogger(TestLlapTaskSchedulerService.class); private static final String HOST1 = "host1"; private static final String HOST2 = "host2"; private static final String HOST3 = "host3"; @Test (timeout = 5000) - public void testSimpleLocalAllocation() throws IOException { + public void testSimpleLocalAllocation() throws IOException, InterruptedException { TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(); @@ -70,9 +72,13 @@ public class TestLlapTaskSchedulerService { Object task1 = new Object(); Object clientCookie1 = new Object(); - int schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); + + tsWrapper.controlScheduler(true); tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1); - tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1); + + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class)); // TODO Verify this is on host1. assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations); @@ -82,7 +88,7 @@ public class TestLlapTaskSchedulerService { } @Test (timeout = 5000) - public void testSimpleNoLocalityAllocation() throws IOException { + public void testSimpleNoLocalityAllocation() throws IOException, InterruptedException { TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(); try { @@ -90,9 +96,10 @@ public class TestLlapTaskSchedulerService { Object task1 = new Object(); Object clientCookie1 = new Object(); - int schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); + tsWrapper.controlScheduler(true); tsWrapper.allocateTask(task1, null, priority1, clientCookie1); - tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1); + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class)); assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); } finally { @@ -112,23 +119,26 @@ public class TestLlapTaskSchedulerService { TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1); try { - Object task1 = new String("task1"); - Object clientCookie1 = new String("cookie1"); - Object task2 = new String("task2"); - Object clientCookie2 = new String("cookie1"); - Object task3 = new String("task3"); - Object clientCookie3 = new String("cookie1"); - Object task4 = new String("task4"); - Object clientCookie4 = new String("cookie1"); + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + Object task2 = "task2"; + Object clientCookie2 = "cookie1"; + Object task3 = "task3"; + Object clientCookie3 = "cookie1"; + Object task4 = "task4"; + Object clientCookie4 = "cookie1"; tsWrapper.controlScheduler(true); - int schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); tsWrapper.allocateTask(task1, hosts, priority2, clientCookie1); tsWrapper.allocateTask(task2, hosts, priority2, clientCookie2); tsWrapper.allocateTask(task3, hosts, priority2, clientCookie3); - tsWrapper.signalScheduler(); - tsWrapper.controlScheduler(false); - tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numLocalAllocations == 2) { + break; + } + } verify(tsWrapper.mockAppCallback, times(2)).taskAllocated(any(Object.class), any(Object.class), any(Container.class)); assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations); @@ -136,18 +146,26 @@ public class TestLlapTaskSchedulerService { reset(tsWrapper.mockAppCallback); - tsWrapper.controlScheduler(true); - schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); tsWrapper.allocateTask(task4, hosts, priority1, clientCookie4); - tsWrapper.controlScheduler(false); - tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) { + break; + } + } verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class)); - schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); + tsWrapper.deallocateTask(task2, false, TaskAttemptEndReason.INTERNAL_PREEMPTION); - tsWrapper.signalScheduler(); - Thread.sleep(2000l); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 3) { + break; + } + } verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4), eq(clientCookie4), any(Container.class)); @@ -158,19 +176,28 @@ public class TestLlapTaskSchedulerService { } @Test(timeout=5000) - public void testNodeDisabled() throws IOException { + public void testNodeDisabled() throws IOException, InterruptedException { TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(10000l); try { Priority priority1 = Priority.newInstance(1); String[] hosts1 = new String[]{HOST1}; Object task1 = new Object(); Object clientCookie1 = new Object(); - int schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); + tsWrapper.controlScheduler(true); tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1); - tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1); - verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class)); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 1) { + break; + } + } + verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), + any(Container.class)); assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations); assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + assertEquals(0, tsWrapper.ts.dagStats.numNonLocalAllocations); + assertEquals(1, tsWrapper.ts.dagStats.numTotalAllocations); tsWrapper.resetAppCallback(); @@ -188,21 +215,26 @@ public class TestLlapTaskSchedulerService { Object task2 = new Object(); Object clientCookie2 = new Object(); - schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); tsWrapper.allocateTask(task2, hosts1, priority1, clientCookie2); - tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 2) { + break; + } + } verify(tsWrapper.mockAppCallback).taskAllocated(eq(task2), eq(clientCookie2), any(Container.class)); assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations); assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations); + assertEquals(2, tsWrapper.ts.dagStats.numTotalAllocations); - // TODO Enhance this to verify unblacklisting of the node. } finally { tsWrapper.shutdown(); } } - // Flaky test disabled @Test(timeout=5000) + @Test(timeout=5000) public void testNodeReEnabled() throws InterruptedException, IOException { // Based on actual timing. TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(1000l); @@ -220,16 +252,20 @@ public class TestLlapTaskSchedulerService { Object clientCookie3 = new Object(); tsWrapper.controlScheduler(true); - int schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1); tsWrapper.allocateTask(task2, hosts2, priority1, clientCookie2); tsWrapper.allocateTask(task3, hosts3, priority1, clientCookie3); - tsWrapper.signalScheduler(); - tsWrapper.controlScheduler(false); - tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 3) { + break; + } + } verify(tsWrapper.mockAppCallback, times(3)).taskAllocated(any(Object.class), any(Object.class), any(Container.class)); assertEquals(3, tsWrapper.ts.dagStats.numLocalAllocations); assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + assertEquals(3, tsWrapper.ts.dagStats.numTotalAllocations); tsWrapper.resetAppCallback(); @@ -249,26 +285,24 @@ public class TestLlapTaskSchedulerService { Object clientCookie5 = new Object(); Object task6 = new Object(); Object clientCookie6 = new Object(); - tsWrapper.controlScheduler(true); - schedulerRunNumber = tsWrapper.getSchedulerRunNumber(); tsWrapper.allocateTask(task4, hosts1, priority1, clientCookie4); tsWrapper.allocateTask(task5, hosts2, priority1, clientCookie5); tsWrapper.allocateTask(task6, hosts3, priority1, clientCookie6); - tsWrapper.signalScheduler(); - tsWrapper.controlScheduler(false); - tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 2); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 6) { + break; + } + } ArgumentCaptor<Container> argumentCaptor = ArgumentCaptor.forClass(Container.class); verify(tsWrapper.mockAppCallback, times(3)).taskAllocated(any(Object.class), any(Object.class), argumentCaptor.capture()); - // Limited allocations per node. So better locality when nodes come out of the blacklist - // TODO This is flaky, since multiple nodes can get enabled at roughly the same time, // which affects the locality matching - assertEquals(6, tsWrapper.ts.dagStats.numLocalAllocations); assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); - assertEquals(0, tsWrapper.ts.dagStats.numNonLocalAllocations); + assertEquals(6, tsWrapper.ts.dagStats.numTotalAllocations); - // TODO Enhance this to verify unblacklisting of the node. } finally { tsWrapper.shutdown(); } @@ -282,24 +316,25 @@ public class TestLlapTaskSchedulerService { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1); LlapTaskSchedulerServiceForTest ts; - TestTaskSchedulerServiceWrapper() throws IOException { + TestTaskSchedulerServiceWrapper() throws IOException, InterruptedException { this(2000l); } - TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) throws IOException { + TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) throws IOException, + InterruptedException { this(disableTimeoutMillis, new String[]{HOST1, HOST2, HOST3}, 4, LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT); } TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) throws - IOException { + IOException, InterruptedException { conf = new Configuration(); conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, hosts); conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, numExecutors); conf.setInt(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE, waitQueueSize); conf.setLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS, disableTimeoutMillis); - conf.setBoolean(LlapTaskSchedulerServiceForTest.LLAP_TASK_SCHEDULER_IN_TEST, true); + conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false); doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId(); doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier(); @@ -310,24 +345,20 @@ public class TestLlapTaskSchedulerService { ts.initialize(); ts.start(); - // One shceduler pass from the nodes that are added at startup - awaitSchedulerRunNumber(1); - } - - int getSchedulerRunNumber() { - return ts.forTestGetSchedulerRunNumber(); + // One scheduler pass from the nodes that are added at startup + awaitSchedulerRun(); } - void awaitSchedulerRunNumber(int runNumber) { - ts.forTestAwaitSchedulingRun(runNumber); + void controlScheduler(boolean val) { + ts.forTestsetControlScheduling(val); } - void controlScheduler(boolean val) { - ts.forTestSetupSchedulerStartWait(val); + void signalSchedulerRun() throws InterruptedException { + ts.forTestSignalSchedulingRun(); } - void signalScheduler() { - ts.forTestSignalSchedulerStart(); + void awaitSchedulerRun() throws InterruptedException { + ts.forTestAwaitSchedulingRun(); } void resetAppCallback() { reset(mockAppCallback); @@ -352,106 +383,69 @@ public class TestLlapTaskSchedulerService { private static class LlapTaskSchedulerServiceForTest extends LlapTaskSchedulerService { - // For Unit Testing - static final String LLAP_TASK_SCHEDULER_IN_TEST = "llap.task.scheduler.in-test"; - private final boolean inTest; - private final Lock forTestSchedulerLock = new ReentrantLock(); - private final Condition forTestSchedulerRunCondition = forTestSchedulerLock.newCondition(); - private final Condition forTestSchedulerRunStartCondition = forTestSchedulerLock.newCondition(); - private final AtomicInteger forTestNumSchedulerRuns = new AtomicInteger(0); - private final AtomicBoolean forTestControlledScheduleStart = new AtomicBoolean(false); - private boolean forTestSchedulerGoSignal = false; + private AtomicBoolean controlScheduling = new AtomicBoolean(false); + private final Lock testLock = new ReentrantLock(); + private final Condition schedulingCompleteCondition = testLock.newCondition(); + private final AtomicBoolean schedulingComplete = new AtomicBoolean(false); + private final Condition triggerSchedulingCondition = testLock.newCondition(); + private final AtomicBoolean schedulingTriggered = new AtomicBoolean(false); + private final AtomicInteger numSchedulerRuns = new AtomicInteger(0); + public LlapTaskSchedulerServiceForTest( TaskSchedulerContext appClient, Clock clock) { super(appClient, clock); - Configuration conf; - try { - conf = TezUtils.createConfFromUserPayload(appClient.getInitialUserPayload()); - } catch (IOException e) { - throw new RuntimeException(e); - } - this.inTest = conf.getBoolean(LLAP_TASK_SCHEDULER_IN_TEST, false); } + @Override protected void schedulePendingTasks() { + testLock.lock(); try { - forTestAwaitSchedulerStartSignal(); + if (controlScheduling.get()) { + while (!schedulingTriggered.get()) { + try { + triggerSchedulingCondition.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + numSchedulerRuns.incrementAndGet(); super.schedulePendingTasks(); + schedulingTriggered.set(false); + schedulingComplete.set(true); + schedulingCompleteCondition.signal(); } finally { - forTestMaybeSignalSchedulerRun(); - } - } - - - private void forTestMaybeSignalSchedulerRun() { - if (inTest) { - forTestSchedulerLock.lock(); - try { - forTestNumSchedulerRuns.incrementAndGet(); - forTestSchedulerRunCondition.signal(); - } finally { - forTestSchedulerLock.unlock(); - } + testLock.unlock(); } } - int forTestGetSchedulerRunNumber() { - return forTestNumSchedulerRuns.get(); + // Enable or disable test scheduling control. + void forTestsetControlScheduling(boolean control) { + this.controlScheduling.set(control); } - @VisibleForTesting - void forTestAwaitSchedulingRun(int runNumber) { - if (inTest) { - forTestSchedulerLock.lock(); - try { - while (forTestNumSchedulerRuns.get() != runNumber) { - forTestSchedulerRunCondition.await(); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - forTestSchedulerLock.unlock(); - } - } - } - - void forTestSetupSchedulerStartWait(boolean val) { - if (inTest) { - forTestControlledScheduleStart.set(val); - forTestSchedulerGoSignal = false; + void forTestSignalSchedulingRun() throws InterruptedException { + testLock.lock(); + try { + schedulingTriggered.set(true); + triggerSchedulingCondition.signal(); + } finally { + testLock.unlock(); } } - void forTestSignalSchedulerStart() { - if (inTest) { - forTestSchedulerLock.lock(); - try { - forTestSchedulerGoSignal = true; - forTestSchedulerRunStartCondition.signal(); - } finally { - forTestSchedulerLock.unlock(); + void forTestAwaitSchedulingRun() throws InterruptedException { + testLock.lock(); + try { + while (!schedulingComplete.get()) { + schedulingCompleteCondition.await(); } + schedulingComplete.set(false); + } finally { + testLock.unlock(); } } - private void forTestAwaitSchedulerStartSignal() { - if (inTest) { - forTestSchedulerLock.lock(); - try { - if (forTestControlledScheduleStart.get()) { - if (forTestSchedulerGoSignal) { - forTestSchedulerGoSignal = false; - return; - } - forTestSchedulerRunStartCondition.await(); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - forTestSchedulerLock.unlock(); - } - } - } } }