This is an automated email from the ASF dual-hosted git repository. hashutosh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 5c108dc HIVE-23746 : Send task attempts async from AM to daemons (Mustafa Iman via Rajesh Balamohan) 5c108dc is described below commit 5c108dc2c49f70228231099fce1a2032f33f9efd Author: Mustafa Iman <mustafai...@gmail.com> AuthorDate: Tue Jul 21 12:02:48 2020 -0700 HIVE-23746 : Send task attempts async from AM to daemons (Mustafa Iman via Rajesh Balamohan) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../hive/llap/tez/LlapProtocolClientProxy.java | 8 +- .../apache/hadoop/hive/llap/AsyncPbRpcProxy.java | 128 +++++++++++- .../hadoop/hive/llap/AsyncResponseHandler.java | 112 +++++++++++ .../hadoop/hive/llap/AsyncResponseHandlerTest.java | 222 +++++++++++++++++++++ 5 files changed, 466 insertions(+), 6 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9b637fb..1d64f6b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4825,6 +4825,8 @@ public class HiveConf extends Configuration { LLAP_TASK_COMMUNICATOR_LISTENER_THREAD_COUNT( "hive.llap.task.communicator.listener.thread-count", 30, "The number of task communicator listener threads."), + LLAP_MAX_CONCURRENT_REQUESTS_PER_NODE("hive.llap.max.concurrent.requests.per.daemon", 12, + "Maximum number of concurrent requests to one daemon from Tez AM"), LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS( "hive.llap.task.communicator.connection.sleep.between.retries.ms", "2000ms", new TimeValidator(TimeUnit.MILLISECONDS), diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java index bc74c55..6702531 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java @@ -54,7 +54,7 @@ public class LlapProtocolClientProxy HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS), HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, - TimeUnit.MILLISECONDS), -1, 1); + TimeUnit.MILLISECONDS), -1, HiveConf.getIntVar(conf, ConfVars.LLAP_MAX_CONCURRENT_REQUESTS_PER_NODE)); } public void registerDag(RegisterDagRequestProto request, String host, int port, @@ -108,7 +108,7 @@ public class LlapProtocolClientProxy } } - private class SubmitWorkCallable extends NodeCallableRequest<SubmitWorkRequestProto, SubmitWorkResponseProto> { + private class SubmitWorkCallable extends AsyncCallableRequest<SubmitWorkRequestProto, SubmitWorkResponseProto> { protected SubmitWorkCallable(LlapNodeId nodeId, SubmitWorkRequestProto submitWorkRequestProto, @@ -117,8 +117,8 @@ public class LlapProtocolClientProxy } @Override - public SubmitWorkResponseProto call() throws Exception { - return getProxy(nodeId, null).submitWork(null, request); + public void callInternal() throws Exception { + getProxy(nodeId, null).submitWork(null, request); } } diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java index 25e10f4..5041c66 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java @@ -39,16 +39,22 @@ import java.util.concurrent.locks.ReentrantLock; import javax.net.SocketFactory; +import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; // TODO: LlapNodeId is just a host+port pair; we could make this class more generic. +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ipc.AsyncCallLimitExceededException; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.AsyncGet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,11 +106,15 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent // Tracks completed requests pre node private final LinkedList<LlapNodeId> completedNodes = new LinkedList<>(); + private final AsyncResponseHandler asyncResponseHandler; + public RequestManager(int numThreads, int maxPerNode) { ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setNameFormat("TaskCommunicator #%2d").build()); maxConcurrentRequestsPerNode = maxPerNode; executor = MoreExecutors.listeningDecorator(localExecutor); + asyncResponseHandler = new AsyncResponseHandler(this); + asyncResponseHandler.start(); } @VisibleForTesting @@ -163,6 +173,7 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent public void shutdown() { if (!isShutdown.getAndSet(true)) { + asyncResponseHandler.shutdownNow(); executor.shutdownNow(); notifyRunLoop(); } @@ -172,8 +183,14 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent <T extends Message , U extends Message> void submitToExecutor( CallableRequest<T, U> request, LlapNodeId nodeId) { ListenableFuture<U> future = executor.submit(request); - Futures.addCallback(future, new ResponseCallback<U>( - request.getCallback(), nodeId, this)); + if (request instanceof AsyncCallableRequest) { + Futures.addCallback(future, new AsyncResponseCallback( + request.getCallback(), nodeId, this, + (AsyncCallableRequest) request, asyncResponseHandler), MoreExecutors.directExecutor()); + } else { + Futures.addCallback(future, new ResponseCallback<U>( + request.getCallback(), nodeId, this), MoreExecutors.directExecutor()); + } } @VisibleForTesting @@ -334,6 +351,41 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent } } + private static final class AsyncResponseCallback<TYPE extends Message> + implements FutureCallback<TYPE> { + + private final AsyncPbRpcProxy.ExecuteRequestCallback<TYPE> callback; + private final LlapNodeId nodeId; + private final AsyncPbRpcProxy.RequestManager requestManager; + private final AsyncPbRpcProxy.AsyncCallableRequest request; + private final AsyncResponseHandler asyncResponseHandler; + + public AsyncResponseCallback(AsyncPbRpcProxy.ExecuteRequestCallback<TYPE> callback, LlapNodeId nodeId, + AsyncPbRpcProxy.RequestManager requestManager, + AsyncPbRpcProxy.AsyncCallableRequest request, + AsyncResponseHandler asyncResponseHandler) { + this.callback = callback; + this.nodeId = nodeId; + this.requestManager = requestManager; + this.request = request; + this.asyncResponseHandler = asyncResponseHandler; + } + + @Override + public void onSuccess(TYPE result) { + asyncResponseHandler.addToAsyncResponseFutureQueue(request); + } + + @Override + public void onFailure(Throwable t) { + try { + callback.indicateError(t); + } finally { + requestManager.requestFinished(nodeId); + } + } + } + @VisibleForTesting protected static abstract class CallableRequest<REQUEST extends Message, RESPONSE extends Message> implements Callable<RESPONSE> { @@ -351,9 +403,81 @@ public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdent return callback; } + /** + * Override this method to make a synchronous request and wait for response. + * @return + * @throws Exception + */ public abstract RESPONSE call() throws Exception; } + /** + * Asynchronous request to a node. The request must override {@link #callInternal()} + * @param <REQUEST> + * @param <RESPONSE> + */ + protected static abstract class AsyncCallableRequest<REQUEST extends Message, RESPONSE extends Message> + extends NodeCallableRequest<REQUEST, RESPONSE> { + + private final long TIMEOUT = 60000; + private final long BACKOFF_START = 10; + private final int FAST_RETRIES = 5; + private AsyncGet<Message, Exception> responseFuture; + + protected AsyncCallableRequest(LlapNodeId nodeId, REQUEST request, + ExecuteRequestCallback<RESPONSE> callback) { + super(nodeId, request, callback); + } + + @Override + public RESPONSE call() throws Exception { + boolean asyncMode = Client.isAsynchronousMode(); + long deadline = System.currentTimeMillis() + TIMEOUT; + int numRetries = 0; + long nextBackoffMs = BACKOFF_START; + try { + Client.setAsynchronousMode(true); + boolean sent = false; + while (!sent) { + try { + callInternal(); + sent = true; + } catch (Exception ex) { + if (ex instanceof ServiceException && ex.getCause() != null + && ex.getCause() instanceof AsyncCallLimitExceededException) { + numRetries++; + if (numRetries >= FAST_RETRIES) { + Thread.sleep(nextBackoffMs); + if (System.currentTimeMillis() > deadline) { + throw new HiveException("Async request timed out in " + TIMEOUT + " ms.", ex.getCause()); + } + numRetries = 0; + nextBackoffMs = nextBackoffMs * 2; + } + if (LOG.isTraceEnabled()) { + LOG.trace("Async call limit exceeded.", ex.getCause()); + } + } else { + throw ex; + } + } + } + responseFuture = ProtobufRpcEngine.getAsyncReturnMessage(); + return null; + } finally { + Client.setAsynchronousMode(asyncMode); + } + } + + public void callInternal() throws Exception { + // override if async response + } + + public AsyncGet<Message, Exception> getResponseFuture() { + return responseFuture; + } + } + @VisibleForTesting protected static abstract class NodeCallableRequest< diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncResponseHandler.java b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncResponseHandler.java new file mode 100644 index 0000000..1399b65 --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncResponseHandler.java @@ -0,0 +1,112 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import com.google.protobuf.Message; +import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.AsyncCallableRequest; +import org.apache.hadoop.util.concurrent.AsyncGet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class AsyncResponseHandler { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncResponseHandler.class); + + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + + private final AsyncPbRpcProxy.RequestManager requestManager; + private final ExecutorService responseWaitingService = Executors.newSingleThreadExecutor(); + private final LinkedBlockingDeque<AsyncCallableRequest<Message, Message>> + incomingResponseFutures = new LinkedBlockingDeque<>(); + + public AsyncResponseHandler(AsyncPbRpcProxy.RequestManager requestManager) { + this.requestManager = requestManager; + } + + public void start() { + responseWaitingService.submit(new AsyncResponseHandlerRunnable()); + } + + public void addToAsyncResponseFutureQueue(AsyncCallableRequest<Message, Message> request) { + incomingResponseFutures.add(request); + } + + public void shutdownNow() { + isShutdown.set(true); + responseWaitingService.shutdownNow(); + } + + private final class AsyncResponseHandlerRunnable implements Runnable { + + private final List<AsyncCallableRequest<Message, Message>> responseFuturesQueue = new ArrayList<>(); + + @Override + public void run() { + while (!isShutdown.get()) { + try { + if (responseFuturesQueue.isEmpty()) { + // there are no more futures to hear from, just block on incoming futures + AsyncCallableRequest<Message, Message> request = incomingResponseFutures.take(); + responseFuturesQueue.add(request); + } + } catch (InterruptedException e) { + LOG.warn("Async response handler was interrupted", e); + } + Iterator<AsyncCallableRequest<Message, Message>> iterator = responseFuturesQueue.iterator(); + while (iterator.hasNext()) { + AsyncCallableRequest<Message, Message> request = iterator.next(); + AsyncGet<Message, Exception> responseFuture = request.getResponseFuture(); + if (responseFuture != null && responseFuture.isDone()) { + try { + iterator.remove(); + LlapNodeId nodeId = request.getNodeId(); + // since isDone is true, getFuture.get should return immediately + try { + Message remoteValue = responseFuture.get(-1, TimeUnit.MILLISECONDS); + if (remoteValue instanceof Throwable) { + request.getCallback().indicateError((Throwable) remoteValue); + } else { + request.getCallback().setResponse(remoteValue); + } + } catch (Exception e) { + request.getCallback().indicateError(e); + } finally { + requestManager.requestFinished(nodeId); + } + } catch (Throwable e) { + LOG.warn("ResponseDispatcher caught", e); + } + } + } + // check if there are more futures to add but do not block as we still + // have futures that we are waiting to hear from + while (!incomingResponseFutures.isEmpty()) { + AsyncCallableRequest<Message, Message> request = incomingResponseFutures.poll(); + responseFuturesQueue.add(request); + } + } + LOG.info("Async response handler exiting"); + } + } +} diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.java b/llap-common/src/test/org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.java new file mode 100644 index 0000000..3d7bd90 --- /dev/null +++ b/llap-common/src/test/org/apache/hadoop/hive/llap/AsyncResponseHandlerTest.java @@ -0,0 +1,222 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import com.google.protobuf.Message; +import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.ExecuteRequestCallback; +import org.apache.hadoop.util.concurrent.AsyncGet; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static java.lang.Thread.sleep; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class AsyncResponseHandlerTest { + + private AsyncResponseHandler responseHandler; + + @Before + public void setup() { + AsyncPbRpcProxy.RequestManager requestManager = mock(AsyncPbRpcProxy.RequestManager.class); + responseHandler = new AsyncResponseHandler(requestManager); + responseHandler.start(); + } + + @After + public void teardown() { + responseHandler.shutdownNow(); + } + + @Test + public void testAck() throws InterruptedException { + ExecuteRequestCallback<Message> callback = mock(ExecuteRequestCallback.class); + Message returnMessage = mock(Message.class); + DummyAsyncRequest asyncRequest = createAsyncRequest(returnMessage, null, callback); + + responseHandler.addToAsyncResponseFutureQueue(asyncRequest); + + verify(callback, times(0)).setResponse(any()); + verify(callback, times(0)).indicateError(any()); + + asyncRequest.finish(); + + assertTrueEventually(() -> { + verify(callback, times(1)).setResponse(returnMessage); + verify(callback, times(0)).indicateError(any()); + }); + } + + @Test + public void testRemoteFail() throws InterruptedException { + ExecuteRequestCallback<Message> callback = mock(ExecuteRequestCallback.class); + Exception returnException = new Exception(); + DummyAsyncRequest asyncRequest = createAsyncRequest(null, returnException, callback); + + responseHandler.addToAsyncResponseFutureQueue(asyncRequest); + + verify(callback, times(0)).setResponse(any()); + verify(callback, times(0)).indicateError(any()); + + asyncRequest.finish(); + + assertTrueEventually(() -> { + verify(callback, times(1)).indicateError(returnException); + verify(callback, times(0)).setResponse(any()); + }); + + } + + @Test + public void testStress() throws InterruptedException { + final int numCommunicators = 10; + final int totalCallbacks = 200000; + + ExecuteRequestCallback<Message> callbacks[] = new ExecuteRequestCallback[totalCallbacks]; + DummyAsyncRequest asyncRequests[] = new DummyAsyncRequest[totalCallbacks]; + for (int i = 0; i < totalCallbacks; i++) { + callbacks[i] = mock(ExecuteRequestCallback.class); + asyncRequests[i] = createAsyncRequest(null, null, callbacks[i]); + } + + Thread[] communicators = new Thread[numCommunicators]; + for (int i = 0; i < numCommunicators; i++) { + int communicatorStart = i * (totalCallbacks / numCommunicators); + int communicatorEnd = (i + 1) * (totalCallbacks / numCommunicators); + communicators[i] = new Thread(new Runnable() { + @Override + public void run() { + for (int j = communicatorStart; j < communicatorEnd; j++) { + responseHandler.addToAsyncResponseFutureQueue(asyncRequests[j]); + } + } + }); + } + + Thread ackerThread = new Thread(new Runnable() { + @Override + public void run() { + Random random = new Random(); + int[] ackOrder = new int[totalCallbacks]; + for (int i = 0; i < totalCallbacks; i++) { + ackOrder[i] = i; + } + for (int i = 0; i < totalCallbacks; i++) { + int swapx = random.nextInt(totalCallbacks); + int swapy = random.nextInt(totalCallbacks); + int temp = ackOrder[swapx]; + ackOrder[swapx] = ackOrder[swapy]; + ackOrder[swapy] = temp; + } + for (int i = 0; i < totalCallbacks; i++) { + asyncRequests[i].finish(); + } + } + }); + for (int i = 0; i < numCommunicators; i++) { + communicators[i].start(); + } + ackerThread.start(); + + for (int i = 0; i < numCommunicators; i++) { + communicators[i].join(); + } + ackerThread.join(); + + assertTrueEventually(() -> { + for (int i = 0; i < totalCallbacks; i++) { + verify(callbacks[i], times(1)).setResponse(null); + } + }); + } + + private DummyAsyncRequest createAsyncRequest(Message returnValue, Exception returnException, ExecuteRequestCallback<Message> callback) { + return new DummyAsyncRequest(returnValue, returnException, callback); + } + + private final class DummyAsyncRequest extends AsyncPbRpcProxy.AsyncCallableRequest<Message, Message> { + + private volatile boolean isFinished = false; + private Message returnValue; + private Exception remoteException; + + protected DummyAsyncRequest(Message returnValue, Exception remoteException, ExecuteRequestCallback<Message> callback) { + super(mock(LlapNodeId.class), mock(Message.class), callback); + this.returnValue = returnValue; + this.remoteException = remoteException; + } + + @Override + public void callInternal() throws Exception { + //do nothing + } + + @Override + public AsyncGet<Message, Exception> getResponseFuture() { + AsyncGet<Message, Exception> asyncGet = new AsyncGet<Message, Exception>() { + @Override + public Message get(long timeout, TimeUnit unit) throws Exception { + if (remoteException != null) { + throw remoteException; + } + return returnValue; + } + + @Override + public boolean isDone() { + return isFinished; + } + }; + return asyncGet; + } + + public void finish() { + isFinished = true; + } + } + + private void assertTrueEventually(AssertTask assertTask) throws InterruptedException { + assertTrueEventually(assertTask, 10000); + } + + private void assertTrueEventually(AssertTask assertTask, int timeoutMillis) throws InterruptedException { + long endTime = System.currentTimeMillis() + timeoutMillis; + AssertionError assertionError = null; + + while (System.currentTimeMillis() < endTime) { + try { + assertTask.call(); + return; + } catch (AssertionError e) { + assertionError = e; + long millisUntilTimeout = endTime - System.currentTimeMillis(); + sleep(millisUntilTimeout < 50 ? millisUntilTimeout : 50 ); + continue; + } + } + throw assertionError; + } + + private static interface AssertTask { + void call() throws AssertionError; + } + +}