This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c108dc  HIVE-23746 : Send task attempts async from AM to daemons 
(Mustafa Iman via Rajesh Balamohan)
5c108dc is described below

commit 5c108dc2c49f70228231099fce1a2032f33f9efd
Author: Mustafa Iman <mustafai...@gmail.com>
AuthorDate: Tue Jul 21 12:02:48 2020 -0700

    HIVE-23746 : Send task attempts async from AM to daemons (Mustafa Iman via 
Rajesh Balamohan)
    
    Signed-off-by: Ashutosh Chauhan <hashut...@apache.org>
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   2 +
 .../hive/llap/tez/LlapProtocolClientProxy.java     |   8 +-
 .../apache/hadoop/hive/llap/AsyncPbRpcProxy.java   | 128 +++++++++++-
 .../hadoop/hive/llap/AsyncResponseHandler.java     | 112 +++++++++++
 .../hadoop/hive/llap/AsyncResponseHandlerTest.java | 222 +++++++++++++++++++++
 5 files changed, 466 insertions(+), 6 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9b637fb..1d64f6b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4825,6 +4825,8 @@ public class HiveConf extends Configuration {
     LLAP_TASK_COMMUNICATOR_LISTENER_THREAD_COUNT(
         "hive.llap.task.communicator.listener.thread-count", 30,
         "The number of task communicator listener threads."),
+    
LLAP_MAX_CONCURRENT_REQUESTS_PER_NODE("hive.llap.max.concurrent.requests.per.daemon",
 12,
+        "Maximum number of concurrent requests to one daemon from Tez AM"),
     LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS(
       "hive.llap.task.communicator.connection.sleep.between.retries.ms", 
"2000ms",
       new TimeValidator(TimeUnit.MILLISECONDS),
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
index bc74c55..6702531 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
@@ -54,7 +54,7 @@ public class LlapProtocolClientProxy
         HiveConf.getTimeVar(conf, 
ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS,
             TimeUnit.MILLISECONDS),
         HiveConf.getTimeVar(conf, 
ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
-            TimeUnit.MILLISECONDS), -1, 1);
+            TimeUnit.MILLISECONDS), -1, HiveConf.getIntVar(conf, 
ConfVars.LLAP_MAX_CONCURRENT_REQUESTS_PER_NODE));
   }
 
   public void registerDag(RegisterDagRequestProto request, String host, int 
port,
@@ -108,7 +108,7 @@ public class LlapProtocolClientProxy
     }
   }
 
-  private class SubmitWorkCallable extends 
NodeCallableRequest<SubmitWorkRequestProto, SubmitWorkResponseProto> {
+  private class SubmitWorkCallable extends 
AsyncCallableRequest<SubmitWorkRequestProto, SubmitWorkResponseProto> {
 
     protected SubmitWorkCallable(LlapNodeId nodeId,
                           SubmitWorkRequestProto submitWorkRequestProto,
@@ -117,8 +117,8 @@ public class LlapProtocolClientProxy
     }
 
     @Override
-    public SubmitWorkResponseProto call() throws Exception {
-      return getProxy(nodeId, null).submitWork(null, request);
+    public void callInternal() throws Exception {
+      getProxy(nodeId, null).submitWork(null, request);
     }
   }
 
diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java 
b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java
index 25e10f4..5041c66 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java
@@ -39,16 +39,22 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import javax.net.SocketFactory;
 
+import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
 // TODO: LlapNodeId is just a host+port pair; we could make this class more 
generic.
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.AsyncGet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,11 +106,15 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
       // Tracks completed requests pre node
       private final LinkedList<LlapNodeId> completedNodes = new LinkedList<>();
 
+      private final AsyncResponseHandler asyncResponseHandler;
+
       public RequestManager(int numThreads, int maxPerNode) {
         ExecutorService localExecutor = 
Executors.newFixedThreadPool(numThreads,
             new ThreadFactoryBuilder().setNameFormat("TaskCommunicator 
#%2d").build());
         maxConcurrentRequestsPerNode = maxPerNode;
         executor = MoreExecutors.listeningDecorator(localExecutor);
+        asyncResponseHandler = new AsyncResponseHandler(this);
+        asyncResponseHandler.start();
       }
 
       @VisibleForTesting
@@ -163,6 +173,7 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
 
       public void shutdown() {
         if (!isShutdown.getAndSet(true)) {
+          asyncResponseHandler.shutdownNow();
           executor.shutdownNow();
           notifyRunLoop();
         }
@@ -172,8 +183,14 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
       <T extends Message , U extends Message> void submitToExecutor(
           CallableRequest<T, U> request, LlapNodeId nodeId) {
         ListenableFuture<U> future = executor.submit(request);
-        Futures.addCallback(future, new ResponseCallback<U>(
-            request.getCallback(), nodeId, this));
+        if (request instanceof AsyncCallableRequest) {
+          Futures.addCallback(future, new AsyncResponseCallback(
+                  request.getCallback(), nodeId, this,
+                  (AsyncCallableRequest) request, asyncResponseHandler), 
MoreExecutors.directExecutor());
+        } else {
+          Futures.addCallback(future, new ResponseCallback<U>(
+                  request.getCallback(), nodeId, this), 
MoreExecutors.directExecutor());
+        }
       }
 
       @VisibleForTesting
@@ -334,6 +351,41 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
       }
     }
 
+  private static final class AsyncResponseCallback<TYPE extends Message>
+          implements FutureCallback<TYPE> {
+
+    private final AsyncPbRpcProxy.ExecuteRequestCallback<TYPE> callback;
+    private final LlapNodeId nodeId;
+    private final AsyncPbRpcProxy.RequestManager requestManager;
+    private final AsyncPbRpcProxy.AsyncCallableRequest request;
+    private final AsyncResponseHandler asyncResponseHandler;
+
+    public AsyncResponseCallback(AsyncPbRpcProxy.ExecuteRequestCallback<TYPE> 
callback, LlapNodeId nodeId,
+                                 AsyncPbRpcProxy.RequestManager requestManager,
+                                 AsyncPbRpcProxy.AsyncCallableRequest request,
+                                 AsyncResponseHandler asyncResponseHandler) {
+      this.callback = callback;
+      this.nodeId = nodeId;
+      this.requestManager = requestManager;
+      this.request = request;
+      this.asyncResponseHandler = asyncResponseHandler;
+    }
+
+    @Override
+    public void onSuccess(TYPE result) {
+      asyncResponseHandler.addToAsyncResponseFutureQueue(request);
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      try {
+        callback.indicateError(t);
+      } finally {
+        requestManager.requestFinished(nodeId);
+      }
+    }
+  }
+
   @VisibleForTesting
   protected static abstract class CallableRequest<REQUEST extends Message, 
RESPONSE extends Message>
         implements Callable<RESPONSE> {
@@ -351,9 +403,81 @@ public abstract class AsyncPbRpcProxy<ProtocolType, 
TokenType extends TokenIdent
       return callback;
     }
 
+    /**
+     * Override this method to make a synchronous request and wait for 
response.
+     * @return
+     * @throws Exception
+     */
     public abstract RESPONSE call() throws Exception;
   }
 
+  /**
+   * Asynchronous request to a node. The request must override {@link 
#callInternal()}
+   * @param <REQUEST>
+   * @param <RESPONSE>
+   */
+  protected static abstract class AsyncCallableRequest<REQUEST extends 
Message, RESPONSE extends Message>
+          extends NodeCallableRequest<REQUEST, RESPONSE> {
+
+    private final long TIMEOUT = 60000;
+    private final long BACKOFF_START = 10;
+    private final int FAST_RETRIES = 5;
+    private AsyncGet<Message, Exception> responseFuture;
+
+    protected AsyncCallableRequest(LlapNodeId nodeId, REQUEST request,
+                                   ExecuteRequestCallback<RESPONSE> callback) {
+      super(nodeId, request, callback);
+    }
+
+    @Override
+    public RESPONSE call() throws Exception {
+      boolean asyncMode = Client.isAsynchronousMode();
+      long deadline = System.currentTimeMillis() + TIMEOUT;
+      int numRetries = 0;
+      long nextBackoffMs = BACKOFF_START;
+      try {
+        Client.setAsynchronousMode(true);
+        boolean sent = false;
+        while (!sent) {
+          try {
+            callInternal();
+            sent = true;
+          } catch (Exception ex) {
+            if (ex instanceof ServiceException && ex.getCause() != null
+                    && ex.getCause() instanceof 
AsyncCallLimitExceededException) {
+              numRetries++;
+              if (numRetries >= FAST_RETRIES) {
+                Thread.sleep(nextBackoffMs);
+                if (System.currentTimeMillis() > deadline) {
+                  throw new HiveException("Async request timed out in  " + 
TIMEOUT + " ms.", ex.getCause());
+                }
+                numRetries = 0;
+                nextBackoffMs = nextBackoffMs * 2;
+              }
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("Async call limit exceeded.", ex.getCause());
+              }
+            } else {
+              throw ex;
+            }
+          }
+        }
+        responseFuture = ProtobufRpcEngine.getAsyncReturnMessage();
+        return null;
+      } finally {
+        Client.setAsynchronousMode(asyncMode);
+      }
+    }
+
+    public void callInternal() throws Exception {
+      // override if async response
+    }
+
+    public AsyncGet<Message, Exception> getResponseFuture() {
+      return responseFuture;
+    }
+  }
+
 
   @VisibleForTesting
   protected static abstract class NodeCallableRequest<
diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncResponseHandler.java 
b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncResponseHandler.java
new file mode 100644
index 0000000..1399b65
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncResponseHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import com.google.protobuf.Message;
+import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.AsyncCallableRequest;
+import org.apache.hadoop.util.concurrent.AsyncGet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class AsyncResponseHandler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncResponseHandler.class);
+
+  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+  private final AsyncPbRpcProxy.RequestManager requestManager;
+  private final ExecutorService responseWaitingService = 
Executors.newSingleThreadExecutor();
+  private final LinkedBlockingDeque<AsyncCallableRequest<Message, Message>>
+          incomingResponseFutures = new LinkedBlockingDeque<>();
+
+  public AsyncResponseHandler(AsyncPbRpcProxy.RequestManager requestManager) {
+    this.requestManager = requestManager;
+  }
+
+  public void start() {
+    responseWaitingService.submit(new AsyncResponseHandlerRunnable());
+  }
+
+  public void addToAsyncResponseFutureQueue(AsyncCallableRequest<Message, 
Message> request) {
+    incomingResponseFutures.add(request);
+  }
+
+  public void shutdownNow() {
+    isShutdown.set(true);
+    responseWaitingService.shutdownNow();
+  }
+
+  private final class AsyncResponseHandlerRunnable implements Runnable {
+
+    private final List<AsyncCallableRequest<Message, Message>> 
responseFuturesQueue = new ArrayList<>();
+
+    @Override
+    public void run() {
+      while (!isShutdown.get()) {
+        try {
+          if (responseFuturesQueue.isEmpty()) {
+            // there are no more futures to hear from, just block on incoming 
futures
+            AsyncCallableRequest<Message, Message> request = 
incomingResponseFutures.take();
+            responseFuturesQueue.add(request);
+          }
+        } catch (InterruptedException e) {
+          LOG.warn("Async response handler was interrupted", e);
+        }
+        Iterator<AsyncCallableRequest<Message, Message>> iterator = 
responseFuturesQueue.iterator();
+        while (iterator.hasNext()) {
+          AsyncCallableRequest<Message, Message> request = iterator.next();
+          AsyncGet<Message, Exception> responseFuture = 
request.getResponseFuture();
+          if (responseFuture != null && responseFuture.isDone()) {
+            try {
+              iterator.remove();
+              LlapNodeId nodeId = request.getNodeId();
+              // since isDone is true, getFuture.get should return immediately
+              try {
+                Message remoteValue = responseFuture.get(-1, 
TimeUnit.MILLISECONDS);
+                if (remoteValue instanceof Throwable) {
+                  request.getCallback().indicateError((Throwable) remoteValue);
+                } else {
+                  request.getCallback().setResponse(remoteValue);
+                }
+              } catch (Exception e) {
+                request.getCallback().indicateError(e);
+              } finally {
+                requestManager.requestFinished(nodeId);
+              }
+            } catch (Throwable e) {
+              LOG.warn("ResponseDispatcher caught", e);
+            }
+          }
+        }
+        // check if there are more futures to add but do not block as we still
+        // have futures that we are waiting to hear from
+        while (!incomingResponseFutures.isEmpty()) {
+          AsyncCallableRequest<Message, Message> request = 
incomingResponseFutures.poll();
+          responseFuturesQueue.add(request);
+        }
+      }
+      LOG.info("Async response handler exiting");
+    }
+  }
+}
diff --git 
a/llap-common/src/test/org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.java
 
b/llap-common/src/test/org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.java
new file mode 100644
index 0000000..3d7bd90
--- /dev/null
+++ 
b/llap-common/src/test/org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import com.google.protobuf.Message;
+import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.ExecuteRequestCallback;
+import org.apache.hadoop.util.concurrent.AsyncGet;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Thread.sleep;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class AsyncResponseHandlerTest {
+
+  private AsyncResponseHandler responseHandler;
+
+  @Before
+  public void setup() {
+    AsyncPbRpcProxy.RequestManager requestManager = 
mock(AsyncPbRpcProxy.RequestManager.class);
+    responseHandler = new AsyncResponseHandler(requestManager);
+    responseHandler.start();
+  }
+
+  @After
+  public void teardown() {
+    responseHandler.shutdownNow();
+  }
+
+  @Test
+  public void testAck() throws InterruptedException {
+    ExecuteRequestCallback<Message> callback = 
mock(ExecuteRequestCallback.class);
+    Message returnMessage = mock(Message.class);
+    DummyAsyncRequest asyncRequest = createAsyncRequest(returnMessage, null, 
callback);
+
+    responseHandler.addToAsyncResponseFutureQueue(asyncRequest);
+
+    verify(callback, times(0)).setResponse(any());
+    verify(callback, times(0)).indicateError(any());
+
+    asyncRequest.finish();
+
+    assertTrueEventually(() -> {
+      verify(callback, times(1)).setResponse(returnMessage);
+      verify(callback, times(0)).indicateError(any());
+    });
+  }
+
+  @Test
+  public void testRemoteFail() throws InterruptedException {
+    ExecuteRequestCallback<Message> callback = 
mock(ExecuteRequestCallback.class);
+    Exception returnException = new Exception();
+    DummyAsyncRequest asyncRequest = createAsyncRequest(null, returnException, 
callback);
+
+    responseHandler.addToAsyncResponseFutureQueue(asyncRequest);
+
+    verify(callback, times(0)).setResponse(any());
+    verify(callback, times(0)).indicateError(any());
+
+    asyncRequest.finish();
+
+    assertTrueEventually(() -> {
+      verify(callback, times(1)).indicateError(returnException);
+      verify(callback, times(0)).setResponse(any());
+    });
+
+  }
+
+  @Test
+  public void testStress() throws InterruptedException {
+    final int numCommunicators = 10;
+    final int totalCallbacks = 200000;
+
+    ExecuteRequestCallback<Message> callbacks[] = new 
ExecuteRequestCallback[totalCallbacks];
+    DummyAsyncRequest asyncRequests[] = new DummyAsyncRequest[totalCallbacks];
+    for (int i = 0; i < totalCallbacks; i++) {
+      callbacks[i] = mock(ExecuteRequestCallback.class);
+      asyncRequests[i] = createAsyncRequest(null, null, callbacks[i]);
+    }
+
+    Thread[] communicators = new Thread[numCommunicators];
+    for (int i = 0; i < numCommunicators; i++) {
+      int communicatorStart = i * (totalCallbacks / numCommunicators);
+      int communicatorEnd = (i + 1) * (totalCallbacks / numCommunicators);
+      communicators[i] = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          for (int j = communicatorStart; j < communicatorEnd; j++) {
+            responseHandler.addToAsyncResponseFutureQueue(asyncRequests[j]);
+          }
+        }
+      });
+    }
+
+    Thread ackerThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        Random random = new Random();
+        int[] ackOrder = new int[totalCallbacks];
+        for (int i = 0; i < totalCallbacks; i++) {
+          ackOrder[i] = i;
+        }
+        for (int i = 0; i < totalCallbacks; i++) {
+          int swapx = random.nextInt(totalCallbacks);
+          int swapy = random.nextInt(totalCallbacks);
+          int temp = ackOrder[swapx];
+          ackOrder[swapx] = ackOrder[swapy];
+          ackOrder[swapy] = temp;
+        }
+        for (int i = 0; i < totalCallbacks; i++) {
+          asyncRequests[i].finish();
+        }
+      }
+    });
+    for (int i = 0; i < numCommunicators; i++) {
+      communicators[i].start();
+    }
+    ackerThread.start();
+
+    for (int i = 0; i < numCommunicators; i++) {
+      communicators[i].join();
+    }
+    ackerThread.join();
+
+    assertTrueEventually(() -> {
+      for (int i = 0; i < totalCallbacks; i++) {
+        verify(callbacks[i], times(1)).setResponse(null);
+      }
+    });
+  }
+
+  private DummyAsyncRequest createAsyncRequest(Message returnValue, Exception 
returnException, ExecuteRequestCallback<Message> callback) {
+    return new DummyAsyncRequest(returnValue, returnException, callback);
+  }
+
+  private final class DummyAsyncRequest extends 
AsyncPbRpcProxy.AsyncCallableRequest<Message, Message> {
+
+    private volatile boolean isFinished = false;
+    private Message returnValue;
+    private Exception remoteException;
+
+    protected DummyAsyncRequest(Message returnValue, Exception 
remoteException, ExecuteRequestCallback<Message> callback) {
+      super(mock(LlapNodeId.class), mock(Message.class), callback);
+      this.returnValue = returnValue;
+      this.remoteException = remoteException;
+    }
+
+    @Override
+    public void callInternal() throws Exception {
+      //do nothing
+    }
+
+    @Override
+    public AsyncGet<Message, Exception> getResponseFuture() {
+      AsyncGet<Message, Exception> asyncGet = new AsyncGet<Message, 
Exception>() {
+        @Override
+        public Message get(long timeout, TimeUnit unit) throws Exception {
+          if (remoteException != null) {
+            throw remoteException;
+          }
+          return returnValue;
+        }
+
+        @Override
+        public boolean isDone() {
+          return isFinished;
+        }
+      };
+      return asyncGet;
+    }
+
+    public void finish() {
+      isFinished = true;
+    }
+  }
+
+  private void assertTrueEventually(AssertTask assertTask) throws 
InterruptedException {
+    assertTrueEventually(assertTask, 10000);
+  }
+
+  private void assertTrueEventually(AssertTask assertTask, int timeoutMillis) 
throws InterruptedException {
+    long endTime = System.currentTimeMillis() + timeoutMillis;
+    AssertionError assertionError = null;
+
+    while (System.currentTimeMillis() < endTime) {
+      try {
+        assertTask.call();
+        return;
+      } catch (AssertionError e) {
+        assertionError = e;
+        long millisUntilTimeout = endTime - System.currentTimeMillis();
+        sleep(millisUntilTimeout < 50 ? millisUntilTimeout : 50 );
+        continue;
+      }
+    }
+    throw assertionError;
+  }
+
+  private static interface AssertTask {
+    void call() throws AssertionError;
+  }
+
+}

Reply via email to