Repository: hive
Updated Branches:
  refs/heads/llap d28b6a53e -> b97a07688


HIVE-11148. LLAP: fix TestLlapTaskSchedulerService flakiness. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b97a0768
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b97a0768
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b97a0768

Branch: refs/heads/llap
Commit: b97a0768885c052b25058885ad37ebc3d0b40503
Parents: d28b6a5
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Aug 20 18:41:12 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 20 18:41:12 2015 -0700

----------------------------------------------------------------------
 .../registry/impl/LlapFixedRegistryImpl.java    |  29 +-
 .../dag/app/rm/LlapTaskSchedulerService.java    |   3 +
 .../app/rm/TestLlapTaskSchedulerService.java    | 282 +++++++++----------
 3 files changed, 160 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b97a0768/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
index cdc3930..57aa1e7 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance;
@@ -36,11 +37,16 @@ public class LlapFixedRegistryImpl implements 
ServiceRegistry {
 
   private static final Logger LOG = 
Logger.getLogger(LlapFixedRegistryImpl.class);
 
+  @InterfaceAudience.Private
+  // This is primarily for testing to avoid the host lookup
+  public static final String FIXED_REGISTRY_RESOLVE_HOST_NAMES = 
"fixed.registry.resolve.host.names";
+
   private final int port;
   private final int shuffle;
   private final String[] hosts;
   private final int memory;
   private final int vcores;
+  private final boolean resolveHosts;
 
   private final Map<String, String> srv = new HashMap<String, String>();
 
@@ -52,6 +58,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry 
{
     this.shuffle =
         conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
             LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
+    this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, 
true);
 
     for (Map.Entry<String, String> kv : conf) {
       if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)
@@ -100,17 +107,19 @@ public class LlapFixedRegistryImpl implements 
ServiceRegistry {
     private final String host;
 
     public FixedServiceInstance(String host) {
-      try {
-        InetAddress inetAddress = InetAddress.getByName(host);
-        if (NetUtils.isLocalAddress(inetAddress)) {
-          InetSocketAddress socketAddress = new InetSocketAddress(0);
-          socketAddress = NetUtils.getConnectAddress(socketAddress);
-          LOG.info("Adding host identified as local: " + host + " as "
-              + socketAddress.getHostName());
-          host = socketAddress.getHostName();
+      if (resolveHosts) {
+        try {
+          InetAddress inetAddress = InetAddress.getByName(host);
+          if (NetUtils.isLocalAddress(inetAddress)) {
+            InetSocketAddress socketAddress = new InetSocketAddress(0);
+            socketAddress = NetUtils.getConnectAddress(socketAddress);
+            LOG.info("Adding host identified as local: " + host + " as "
+                + socketAddress.getHostName());
+            host = socketAddress.getHostName();
+          }
+        } catch (UnknownHostException e) {
+          LOG.warn("Ignoring resolution issues for host: " + host, e);
         }
-      } catch (UnknownHostException e) {
-        LOG.warn("Ignoring resolution issues for host: " + host, e);
       }
       this.host = host;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/b97a0768/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java 
b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index f69a99b..38d42b9 100644
--- 
a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ 
b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -1111,6 +1111,7 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     int numRequestedAllocations = 0;
     int numRequestsWithLocation = 0;
     int numRequestsWithoutLocation = 0;
+    int numTotalAllocations = 0;
     int numLocalAllocations = 0;
     int numNonLocalAllocations = 0;
     int numAllocationsNoLocalityRequest = 0;
@@ -1129,6 +1130,7 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
       
sb.append("NumRequestsWithlocation=").append(numRequestsWithLocation).append(", 
");
       
sb.append("NumLocalAllocations=").append(numLocalAllocations).append(",");
       
sb.append("NumNonLocalAllocations=").append(numNonLocalAllocations).append(",");
+      
sb.append("NumTotalAllocations=").append(numTotalAllocations).append(",");
       
sb.append("NumRequestsWithoutLocation=").append(numRequestsWithoutLocation).append(",
 ");
       sb.append("NumRejectedTasks=").append(numRejectedTasks).append(", ");
       sb.append("NumCommFailures=").append(numCommFailures).append(", ");
@@ -1163,6 +1165,7 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
       } else {
         numAllocationsNoLocalityRequest++;
       }
+      numTotalAllocations++;
       _registerAllocationInHostMap(allocatedHost, numAllocationsPerHost);
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b97a0768/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
 
b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
index 0d4d619..ce60e6e 100644
--- 
a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ 
b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
@@ -32,9 +32,9 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapFixedRegistryImpl;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -50,17 +50,19 @@ import 
org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestLlapTaskSchedulerService {
 
-  // TODO Fix the races and the broken scheduler control in the tests
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestLlapTaskSchedulerService.class);
 
   private static final String HOST1 = "host1";
   private static final String HOST2 = "host2";
   private static final String HOST3 = "host3";
 
   @Test (timeout = 5000)
-  public void testSimpleLocalAllocation() throws IOException {
+  public void testSimpleLocalAllocation() throws IOException, 
InterruptedException {
 
     TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper();
 
@@ -70,9 +72,13 @@ public class TestLlapTaskSchedulerService {
 
       Object task1 = new Object();
       Object clientCookie1 = new Object();
-      int schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
+
+      tsWrapper.controlScheduler(true);
       tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
-      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1);
+
+      tsWrapper.signalSchedulerRun();
+      tsWrapper.awaitSchedulerRun();
+
       verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), 
eq(clientCookie1), any(Container.class));
       // TODO Verify this is on host1.
       assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
@@ -82,7 +88,7 @@ public class TestLlapTaskSchedulerService {
   }
 
   @Test (timeout = 5000)
-  public void testSimpleNoLocalityAllocation() throws IOException {
+  public void testSimpleNoLocalityAllocation() throws IOException, 
InterruptedException {
     TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper();
 
     try {
@@ -90,9 +96,10 @@ public class TestLlapTaskSchedulerService {
 
       Object task1 = new Object();
       Object clientCookie1 = new Object();
-      int schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
+      tsWrapper.controlScheduler(true);
       tsWrapper.allocateTask(task1, null, priority1, clientCookie1);
-      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1);
+      tsWrapper.signalSchedulerRun();
+      tsWrapper.awaitSchedulerRun();
       verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), 
eq(clientCookie1), any(Container.class));
       assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
     } finally {
@@ -112,23 +119,26 @@ public class TestLlapTaskSchedulerService {
     TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1);
     try {
 
-      Object task1 = new String("task1");
-      Object clientCookie1 = new String("cookie1");
-      Object task2 = new String("task2");
-      Object clientCookie2 = new String("cookie1");
-      Object task3 = new String("task3");
-      Object clientCookie3 = new String("cookie1");
-      Object task4 = new String("task4");
-      Object clientCookie4 = new String("cookie1");
+      Object task1 = "task1";
+      Object clientCookie1 = "cookie1";
+      Object task2 = "task2";
+      Object clientCookie2 = "cookie1";
+      Object task3 = "task3";
+      Object clientCookie3 = "cookie1";
+      Object task4 = "task4";
+      Object clientCookie4 = "cookie1";
 
       tsWrapper.controlScheduler(true);
-      int schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
       tsWrapper.allocateTask(task1, hosts, priority2, clientCookie1);
       tsWrapper.allocateTask(task2, hosts, priority2, clientCookie2);
       tsWrapper.allocateTask(task3, hosts, priority2, clientCookie3);
-      tsWrapper.signalScheduler();
-      tsWrapper.controlScheduler(false);
-      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1);
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numLocalAllocations == 2) {
+          break;
+        }
+      }
       verify(tsWrapper.mockAppCallback, 
times(2)).taskAllocated(any(Object.class),
           any(Object.class), any(Container.class));
       assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations);
@@ -136,18 +146,26 @@ public class TestLlapTaskSchedulerService {
 
       reset(tsWrapper.mockAppCallback);
 
-      tsWrapper.controlScheduler(true);
-      schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
       tsWrapper.allocateTask(task4, hosts, priority1, clientCookie4);
-      tsWrapper.controlScheduler(false);
-      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1);
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) {
+          break;
+        }
+      }
       
verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class));
 
-      schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
+
       tsWrapper.deallocateTask(task2, false, 
TaskAttemptEndReason.INTERNAL_PREEMPTION);
-      tsWrapper.signalScheduler();
-      Thread.sleep(2000l);
 
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numTotalAllocations == 3) {
+          break;
+        }
+      }
       verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
           eq(clientCookie4), any(Container.class));
 
@@ -158,19 +176,28 @@ public class TestLlapTaskSchedulerService {
   }
 
   @Test(timeout=5000)
-  public void testNodeDisabled() throws IOException {
+  public void testNodeDisabled() throws IOException, InterruptedException {
     TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(10000l);
     try {
       Priority priority1 = Priority.newInstance(1);
       String[] hosts1 = new String[]{HOST1};
       Object task1 = new Object();
       Object clientCookie1 = new Object();
-      int schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
+      tsWrapper.controlScheduler(true);
       tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
-      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1);
-      verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), 
eq(clientCookie1), any(Container.class));
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numTotalAllocations == 1) {
+          break;
+        }
+      }
+      verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), 
eq(clientCookie1),
+          any(Container.class));
       assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
       assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
+      assertEquals(0, tsWrapper.ts.dagStats.numNonLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numTotalAllocations);
 
       tsWrapper.resetAppCallback();
 
@@ -188,21 +215,26 @@ public class TestLlapTaskSchedulerService {
 
       Object task2 = new Object();
       Object clientCookie2 = new Object();
-      schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
       tsWrapper.allocateTask(task2, hosts1, priority1, clientCookie2);
-      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1);
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numTotalAllocations == 2) {
+          break;
+        }
+      }
       verify(tsWrapper.mockAppCallback).taskAllocated(eq(task2), 
eq(clientCookie2), any(Container.class));
       assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
       assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
       assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+      assertEquals(2, tsWrapper.ts.dagStats.numTotalAllocations);
 
-      // TODO Enhance this to verify unblacklisting of the node.
     } finally {
       tsWrapper.shutdown();
     }
   }
 
-  // Flaky test disabled @Test(timeout=5000)
+  @Test(timeout=5000)
   public void testNodeReEnabled() throws InterruptedException, IOException {
     // Based on actual timing.
     TestTaskSchedulerServiceWrapper tsWrapper = new 
TestTaskSchedulerServiceWrapper(1000l);
@@ -220,16 +252,20 @@ public class TestLlapTaskSchedulerService {
       Object clientCookie3 = new Object();
 
       tsWrapper.controlScheduler(true);
-      int schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
       tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
       tsWrapper.allocateTask(task2, hosts2, priority1, clientCookie2);
       tsWrapper.allocateTask(task3, hosts3, priority1, clientCookie3);
-      tsWrapper.signalScheduler();
-      tsWrapper.controlScheduler(false);
-      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 1);
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numTotalAllocations == 3) {
+          break;
+        }
+      }
       verify(tsWrapper.mockAppCallback, 
times(3)).taskAllocated(any(Object.class), any(Object.class), 
any(Container.class));
       assertEquals(3, tsWrapper.ts.dagStats.numLocalAllocations);
       assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
+      assertEquals(3, tsWrapper.ts.dagStats.numTotalAllocations);
 
       tsWrapper.resetAppCallback();
 
@@ -249,26 +285,24 @@ public class TestLlapTaskSchedulerService {
       Object clientCookie5 = new Object();
       Object task6 = new Object();
       Object clientCookie6 = new Object();
-      tsWrapper.controlScheduler(true);
-      schedulerRunNumber = tsWrapper.getSchedulerRunNumber();
       tsWrapper.allocateTask(task4, hosts1, priority1, clientCookie4);
       tsWrapper.allocateTask(task5, hosts2, priority1, clientCookie5);
       tsWrapper.allocateTask(task6, hosts3, priority1, clientCookie6);
-      tsWrapper.signalScheduler();
-      tsWrapper.controlScheduler(false);
-      tsWrapper.awaitSchedulerRunNumber(schedulerRunNumber + 2);
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numTotalAllocations == 6) {
+          break;
+        }
+      }
 
       ArgumentCaptor<Container> argumentCaptor = 
ArgumentCaptor.forClass(Container.class);
       verify(tsWrapper.mockAppCallback, 
times(3)).taskAllocated(any(Object.class), any(Object.class), 
argumentCaptor.capture());
 
-      // Limited allocations per node. So better locality when nodes come out 
of the blacklist
-      // TODO This is flaky, since multiple nodes can get enabled at roughly 
the same time,
       // which affects the locality matching
-      assertEquals(6, tsWrapper.ts.dagStats.numLocalAllocations);
       assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
-      assertEquals(0, tsWrapper.ts.dagStats.numNonLocalAllocations);
+      assertEquals(6, tsWrapper.ts.dagStats.numTotalAllocations);
 
-      // TODO Enhance this to verify unblacklisting of the node.
     } finally {
       tsWrapper.shutdown();
     }
@@ -282,24 +316,25 @@ public class TestLlapTaskSchedulerService {
     ApplicationAttemptId appAttemptId = 
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1);
     LlapTaskSchedulerServiceForTest ts;
 
-    TestTaskSchedulerServiceWrapper() throws IOException {
+    TestTaskSchedulerServiceWrapper() throws IOException, InterruptedException 
{
       this(2000l);
     }
 
-    TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) throws 
IOException {
+    TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) throws 
IOException,
+        InterruptedException {
       this(disableTimeoutMillis, new String[]{HOST1, HOST2, HOST3}, 4,
           
LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT);
     }
 
     TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, 
int numExecutors, int waitQueueSize) throws
-        IOException {
+        IOException, InterruptedException {
       conf = new Configuration();
       conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, hosts);
       conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, numExecutors);
       
conf.setInt(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE, 
waitQueueSize);
       
conf.setLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS,
           disableTimeoutMillis);
-      
conf.setBoolean(LlapTaskSchedulerServiceForTest.LLAP_TASK_SCHEDULER_IN_TEST, 
true);
+      conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, 
false);
 
       doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId();
       doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier();
@@ -310,24 +345,20 @@ public class TestLlapTaskSchedulerService {
 
       ts.initialize();
       ts.start();
-      // One shceduler pass from the nodes that are added at startup
-      awaitSchedulerRunNumber(1);
-    }
-
-    int getSchedulerRunNumber() {
-      return ts.forTestGetSchedulerRunNumber();
+      // One scheduler pass from the nodes that are added at startup
+      awaitSchedulerRun();
     }
 
-    void awaitSchedulerRunNumber(int runNumber) {
-      ts.forTestAwaitSchedulingRun(runNumber);
+    void controlScheduler(boolean val) {
+      ts.forTestsetControlScheduling(val);
     }
 
-    void controlScheduler(boolean val) {
-      ts.forTestSetupSchedulerStartWait(val);
+    void signalSchedulerRun() throws InterruptedException {
+      ts.forTestSignalSchedulingRun();
     }
 
-    void signalScheduler() {
-      ts.forTestSignalSchedulerStart();
+    void awaitSchedulerRun() throws InterruptedException {
+      ts.forTestAwaitSchedulingRun();
     }
     void resetAppCallback() {
       reset(mockAppCallback);
@@ -352,106 +383,69 @@ public class TestLlapTaskSchedulerService {
 
   private static class LlapTaskSchedulerServiceForTest extends 
LlapTaskSchedulerService {
 
-    // For Unit Testing
-    static final String LLAP_TASK_SCHEDULER_IN_TEST = 
"llap.task.scheduler.in-test";
-    private final boolean inTest;
-    private final Lock forTestSchedulerLock = new ReentrantLock();
-    private final Condition forTestSchedulerRunCondition = 
forTestSchedulerLock.newCondition();
-    private final Condition forTestSchedulerRunStartCondition = 
forTestSchedulerLock.newCondition();
-    private final AtomicInteger forTestNumSchedulerRuns = new AtomicInteger(0);
-    private final AtomicBoolean forTestControlledScheduleStart = new 
AtomicBoolean(false);
-    private boolean forTestSchedulerGoSignal = false;
+    private AtomicBoolean controlScheduling = new AtomicBoolean(false);
+    private final Lock testLock = new ReentrantLock();
+    private final Condition schedulingCompleteCondition = 
testLock.newCondition();
+    private final AtomicBoolean schedulingComplete = new AtomicBoolean(false);
+    private final Condition triggerSchedulingCondition = 
testLock.newCondition();
+    private final AtomicBoolean schedulingTriggered = new AtomicBoolean(false);
+    private final AtomicInteger numSchedulerRuns = new AtomicInteger(0);
+
 
     public LlapTaskSchedulerServiceForTest(
         TaskSchedulerContext appClient, Clock clock) {
       super(appClient, clock);
-      Configuration conf;
-      try {
-        conf = 
TezUtils.createConfFromUserPayload(appClient.getInitialUserPayload());
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      this.inTest = conf.getBoolean(LLAP_TASK_SCHEDULER_IN_TEST, false);
     }
 
+    @Override
     protected void schedulePendingTasks() {
+      testLock.lock();
       try {
-        forTestAwaitSchedulerStartSignal();
+        if (controlScheduling.get()) {
+          while (!schedulingTriggered.get()) {
+            try {
+              triggerSchedulingCondition.await();
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }
+        numSchedulerRuns.incrementAndGet();
         super.schedulePendingTasks();
+        schedulingTriggered.set(false);
+        schedulingComplete.set(true);
+        schedulingCompleteCondition.signal();
       } finally {
-        forTestMaybeSignalSchedulerRun();
-      }
-    }
-
-
-    private void forTestMaybeSignalSchedulerRun() {
-      if (inTest) {
-        forTestSchedulerLock.lock();
-        try {
-          forTestNumSchedulerRuns.incrementAndGet();
-          forTestSchedulerRunCondition.signal();
-        } finally {
-          forTestSchedulerLock.unlock();
-        }
+        testLock.unlock();
       }
     }
 
-    int forTestGetSchedulerRunNumber() {
-      return forTestNumSchedulerRuns.get();
+    // Enable or disable test scheduling control.
+    void forTestsetControlScheduling(boolean control) {
+      this.controlScheduling.set(control);
     }
 
-    @VisibleForTesting
-    void forTestAwaitSchedulingRun(int runNumber) {
-      if (inTest) {
-        forTestSchedulerLock.lock();
-        try {
-          while (forTestNumSchedulerRuns.get() != runNumber) {
-            forTestSchedulerRunCondition.await();
-          }
-        } catch (InterruptedException e) {
-          throw new RuntimeException(e);
-        } finally {
-          forTestSchedulerLock.unlock();
-        }
-      }
-    }
-
-    void forTestSetupSchedulerStartWait(boolean val) {
-      if (inTest) {
-        forTestControlledScheduleStart.set(val);
-        forTestSchedulerGoSignal = false;
+    void forTestSignalSchedulingRun() throws InterruptedException {
+      testLock.lock();
+      try {
+        schedulingTriggered.set(true);
+        triggerSchedulingCondition.signal();
+      } finally {
+        testLock.unlock();
       }
     }
 
-    void forTestSignalSchedulerStart() {
-      if (inTest) {
-        forTestSchedulerLock.lock();
-        try {
-          forTestSchedulerGoSignal = true;
-          forTestSchedulerRunStartCondition.signal();
-        } finally {
-          forTestSchedulerLock.unlock();
+    void forTestAwaitSchedulingRun() throws InterruptedException {
+      testLock.lock();
+      try {
+        while (!schedulingComplete.get()) {
+          schedulingCompleteCondition.await();
         }
+        schedulingComplete.set(false);
+      } finally {
+        testLock.unlock();
       }
     }
 
-    private void forTestAwaitSchedulerStartSignal() {
-      if (inTest) {
-        forTestSchedulerLock.lock();
-        try {
-          if (forTestControlledScheduleStart.get()) {
-            if (forTestSchedulerGoSignal) {
-              forTestSchedulerGoSignal = false;
-              return;
-            }
-            forTestSchedulerRunStartCondition.await();
-          }
-        } catch (InterruptedException e) {
-          throw new RuntimeException(e);
-        } finally {
-          forTestSchedulerLock.unlock();
-        }
-      }
-    }
   }
 }

Reply via email to