Repository: hive Updated Branches: refs/heads/master b340ecb5e -> 0cbf45cfc
HIVE-12577: NPE in LlapTaskCommunicator when unregistering containers (Siddarth Seth, reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0cbf45cf Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0cbf45cf Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0cbf45cf Branch: refs/heads/master Commit: 0cbf45cfc046f39bed4533ab83542002e79b4f5b Parents: b340ecb Author: Gunther Hagleitner <gunt...@apache.org> Authored: Wed Dec 23 11:18:10 2015 -0800 Committer: Gunther Hagleitner <gunt...@apache.org> Committed: Wed Dec 23 11:18:10 2015 -0800 ---------------------------------------------------------------------- .../LlapDaemonProtocolClientProxy.java | 509 ++++++++++++++++++ .../llap/tezplugins/LlapTaskCommunicator.java | 187 +++++-- .../hive/llap/tezplugins/TaskCommunicator.java | 512 ------------------- .../TestLlapDaemonProtocolClientProxy.java | 143 ++++++ .../tezplugins/TestLlapTaskCommunicator.java | 100 ++++ .../llap/tezplugins/TestTaskCommunicator.java | 143 ------ 6 files changed, 895 insertions(+), 699 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0cbf45cf/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java new file mode 100644 index 0000000..2884e40 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java @@ -0,0 +1,509 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + +import javax.net.SocketFactory; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.Message; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapNodeId; +import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; +import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; +import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.service.AbstractService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapDaemonProtocolClientProxy extends AbstractService { + + private static final Logger LOG = LoggerFactory.getLogger(LlapDaemonProtocolClientProxy.class); + + private final ConcurrentMap<String, LlapDaemonProtocolBlockingPB> hostProxies; + + private final RequestManager requestManager; + private final RetryPolicy retryPolicy; + private final SocketFactory socketFactory; + + private final ListeningExecutorService requestManagerExecutor; + private volatile ListenableFuture<Void> requestManagerFuture; + private final Token<LlapTokenIdentifier> llapToken; + + public LlapDaemonProtocolClientProxy( + int numThreads, Configuration conf, Token<LlapTokenIdentifier> llapToken) { + super(LlapDaemonProtocolClientProxy.class.getSimpleName()); + this.hostProxies = new ConcurrentHashMap<>(); + this.socketFactory = NetUtils.getDefaultSocketFactory(conf); + this.llapToken = llapToken; + + long connectionTimeout = HiveConf.getTimeVar(conf, + ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + long retrySleep = HiveConf.getTimeVar(conf, + ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, + TimeUnit.MILLISECONDS); + this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep( + connectionTimeout, retrySleep, TimeUnit.MILLISECONDS); + + this.requestManager = new RequestManager(numThreads); + ExecutorService localExecutor = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build()); + this.requestManagerExecutor = MoreExecutors.listeningDecorator(localExecutor); + + LOG.info("Setting up taskCommunicator with" + + "numThreads=" + numThreads + + "retryTime(millis)=" + connectionTimeout + + "retrySleep(millis)=" + retrySleep); + } + + @Override + public void serviceStart() { + requestManagerFuture = requestManagerExecutor.submit(requestManager); + Futures.addCallback(requestManagerFuture, new FutureCallback<Void>() { + @Override + public void onSuccess(Void result) { + LOG.info("RequestManager shutdown"); + } + + @Override + public void onFailure(Throwable t) { + LOG.warn("RequestManager shutdown with error", t); + } + }); + } + + @Override + public void serviceStop() { + if (requestManagerFuture != null) { + requestManager.shutdown(); + requestManagerFuture.cancel(true); + } + requestManagerExecutor.shutdown(); + } + + public void sendSubmitWork(SubmitWorkRequestProto request, String host, int port, + final ExecuteRequestCallback<SubmitWorkResponseProto> callback) { + LlapNodeId nodeId = LlapNodeId.getInstance(host, port); + requestManager.queueRequest(new SubmitWorkCallable(nodeId, request, callback)); + } + + public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final String host, + final int port, + final ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) { + LlapNodeId nodeId = LlapNodeId.getInstance(host, port); + requestManager.queueRequest( + new SendSourceStateUpdateCallable(nodeId, request, callback)); + } + + public void sendQueryComplete(final QueryCompleteRequestProto request, final String host, + final int port, + final ExecuteRequestCallback<QueryCompleteResponseProto> callback) { + LlapNodeId nodeId = LlapNodeId.getInstance(host, port); + requestManager.queueRequest(new SendQueryCompleteCallable(nodeId, request, callback)); + } + + public void sendTerminateFragment(final TerminateFragmentRequestProto request, final String host, + final int port, + final ExecuteRequestCallback<TerminateFragmentResponseProto> callback) { + LlapNodeId nodeId = LlapNodeId.getInstance(host, port); + requestManager.queueRequest(new SendTerminateFragmentCallable(nodeId, request, callback)); + } + + @VisibleForTesting + static class RequestManager implements Callable<Void> { + + private final Lock lock = new ReentrantLock(); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final Condition queueCondition = lock.newCondition(); + private final AtomicBoolean shouldRun = new AtomicBoolean(false); + + private final int maxConcurrentRequestsPerNode = 1; + private final ListeningExecutorService executor; + + + // Tracks new additions via add, while the loop is processing existing ones. + private final LinkedList<CallableRequest> newRequestList = new LinkedList<>(); + + // Tracks existing requests which are cycled through. + private final LinkedList<CallableRequest> pendingRequests = new LinkedList<>(); + + // Tracks requests executing per node + private final ConcurrentMap<LlapNodeId, AtomicInteger> runningRequests = new ConcurrentHashMap<>(); + + // Tracks completed requests pre node + private final LinkedList<LlapNodeId> completedNodes = new LinkedList<>(); + + public RequestManager(int numThreads) { + ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setNameFormat("TaskCommunicator #%2d").build()); + executor = MoreExecutors.listeningDecorator(localExecutor); + } + + + @VisibleForTesting + Set<LlapNodeId> currentLoopDisabledNodes = new HashSet<>(); + @VisibleForTesting + List<CallableRequest> currentLoopSkippedRequests = new LinkedList<>(); + @Override + public Void call() { + // Caches disabled nodes for quicker lookups and ensures a request on a node which was skipped + // does not go out of order. + while (!isShutdown.get()) { + lock.lock(); + try { + while (!shouldRun.get()) { + queueCondition.await(); + break; // Break out and try executing. + } + boolean shouldBreak = process(); + if (shouldBreak) { + break; + } + } catch (InterruptedException e) { + if (isShutdown.get()) { + break; + } else { + LOG.warn("RunLoop interrupted without being shutdown first"); + throw new RuntimeException(e); + } + } finally { + lock.unlock(); + } + } + LOG.info("CallScheduler loop exiting"); + return null; + } + + /* Add a new request to be executed */ + public void queueRequest(CallableRequest request) { + synchronized (newRequestList) { + newRequestList.add(request); + shouldRun.set(true); + } + notifyRunLoop(); + } + + /* Indicates a request has completed on a node */ + public void requestFinished(LlapNodeId nodeId) { + synchronized (completedNodes) { + completedNodes.add(nodeId); + shouldRun.set(true); + } + notifyRunLoop(); + } + + public void shutdown() { + if (!isShutdown.getAndSet(true)) { + executor.shutdownNow(); + notifyRunLoop(); + } + } + + @VisibleForTesting + void submitToExecutor(CallableRequest request, LlapNodeId nodeId) { + ListenableFuture<SourceStateUpdatedResponseProto> future = + executor.submit(request); + Futures.addCallback(future, new ResponseCallback(request.getCallback(), nodeId, this)); + } + + @VisibleForTesting + boolean process() { + if (isShutdown.get()) { + return true; + } + currentLoopDisabledNodes.clear(); + currentLoopSkippedRequests.clear(); + + // Set to false to block the next loop. This must be called before draining the lists, + // otherwise an add/completion after draining the lists but before setting it to false, + // will not trigger a run. May cause one unnecessary run if an add comes in before drain. + // drain list. add request (setTrue). setFalse needs to be avoided. + shouldRun.compareAndSet(true, false); + // Drain any calls which may have come in during the last execution of the loop. + drainNewRequestList(); // Locks newRequestList + drainCompletedNodes(); // Locks completedNodes + + + Iterator<CallableRequest> iterator = pendingRequests.iterator(); + while (iterator.hasNext()) { + CallableRequest request = iterator.next(); + iterator.remove(); + LlapNodeId nodeId = request.getNodeId(); + if (canRunForNode(nodeId, currentLoopDisabledNodes)) { + submitToExecutor(request, nodeId); + } else { + currentLoopDisabledNodes.add(nodeId); + currentLoopSkippedRequests.add(request); + } + } + // Tried scheduling everything that could be scheduled in this loop. + pendingRequests.addAll(0, currentLoopSkippedRequests); + return false; + } + + private void drainNewRequestList() { + synchronized (newRequestList) { + if (!newRequestList.isEmpty()) { + pendingRequests.addAll(newRequestList); + newRequestList.clear(); + } + } + } + + private void drainCompletedNodes() { + synchronized (completedNodes) { + if (!completedNodes.isEmpty()) { + for (LlapNodeId nodeId : completedNodes) { + runningRequests.get(nodeId).decrementAndGet(); + } + } + completedNodes.clear(); + } + } + + private boolean canRunForNode(LlapNodeId nodeId, Set<LlapNodeId> currentRunDisabledNodes) { + if (currentRunDisabledNodes.contains(nodeId)) { + return false; + } else { + AtomicInteger count = runningRequests.get(nodeId); + if (count == null) { + count = new AtomicInteger(0); + AtomicInteger old = runningRequests.putIfAbsent(nodeId, count); + count = old != null ? old : count; + } + if (count.incrementAndGet() <= maxConcurrentRequestsPerNode) { + return true; + } else { + count.decrementAndGet(); + return false; + } + } + } + + private void notifyRunLoop() { + lock.lock(); + try { + queueCondition.signal(); + } finally { + lock.unlock(); + } + } + } + + + private static final class ResponseCallback<TYPE extends Message> + implements FutureCallback<TYPE> { + + private final ExecuteRequestCallback<TYPE> callback; + private final LlapNodeId nodeId; + private final RequestManager requestManager; + + public ResponseCallback(ExecuteRequestCallback<TYPE> callback, LlapNodeId nodeId, + RequestManager requestManager) { + this.callback = callback; + this.nodeId = nodeId; + this.requestManager = requestManager; + } + + @Override + public void onSuccess(TYPE result) { + try { + callback.setResponse(result); + } finally { + requestManager.requestFinished(nodeId); + } + } + + @Override + public void onFailure(Throwable t) { + try { + callback.indicateError(t); + } finally { + requestManager.requestFinished(nodeId); + } + } + } + + @VisibleForTesting + static abstract class CallableRequest<REQUEST extends Message, RESPONSE extends Message> + implements Callable { + + final LlapNodeId nodeId; + final ExecuteRequestCallback<RESPONSE> callback; + final REQUEST request; + + + protected CallableRequest(LlapNodeId nodeId, REQUEST request, ExecuteRequestCallback<RESPONSE> callback) { + this.nodeId = nodeId; + this.request = request; + this.callback = callback; + } + + public LlapNodeId getNodeId() { + return nodeId; + } + + public ExecuteRequestCallback<RESPONSE> getCallback() { + return callback; + } + + public abstract RESPONSE call() throws Exception; + } + + private class SubmitWorkCallable extends CallableRequest<SubmitWorkRequestProto, SubmitWorkResponseProto> { + + protected SubmitWorkCallable(LlapNodeId nodeId, + SubmitWorkRequestProto submitWorkRequestProto, + ExecuteRequestCallback<SubmitWorkResponseProto> callback) { + super(nodeId, submitWorkRequestProto, callback); + } + + @Override + public SubmitWorkResponseProto call() throws Exception { + return getProxy(nodeId).submitWork(null, request); + } + } + + private class SendSourceStateUpdateCallable + extends CallableRequest<SourceStateUpdatedRequestProto, SourceStateUpdatedResponseProto> { + + public SendSourceStateUpdateCallable(LlapNodeId nodeId, + SourceStateUpdatedRequestProto request, + ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) { + super(nodeId, request, callback); + } + + @Override + public SourceStateUpdatedResponseProto call() throws Exception { + return getProxy(nodeId).sourceStateUpdated(null, request); + } + } + + private class SendQueryCompleteCallable + extends CallableRequest<QueryCompleteRequestProto, QueryCompleteResponseProto> { + + protected SendQueryCompleteCallable(LlapNodeId nodeId, + QueryCompleteRequestProto queryCompleteRequestProto, + ExecuteRequestCallback<QueryCompleteResponseProto> callback) { + super(nodeId, queryCompleteRequestProto, callback); + } + + @Override + public QueryCompleteResponseProto call() throws Exception { + return getProxy(nodeId).queryComplete(null, request); + } + } + + private class SendTerminateFragmentCallable + extends CallableRequest<TerminateFragmentRequestProto, TerminateFragmentResponseProto> { + + protected SendTerminateFragmentCallable(LlapNodeId nodeId, + TerminateFragmentRequestProto terminateFragmentRequestProto, + ExecuteRequestCallback<TerminateFragmentResponseProto> callback) { + super(nodeId, terminateFragmentRequestProto, callback); + } + + @Override + public TerminateFragmentResponseProto call() throws Exception { + return getProxy(nodeId).terminateFragment(null, request); + } + } + + public interface ExecuteRequestCallback<T extends Message> { + void setResponse(T response); + void indicateError(Throwable t); + } + + private LlapDaemonProtocolBlockingPB getProxy(final LlapNodeId nodeId) { + String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort()); + + LlapDaemonProtocolBlockingPB proxy = hostProxies.get(hostId); + if (proxy == null) { + if (llapToken == null) { + proxy = new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(), + nodeId.getPort(), retryPolicy, socketFactory); + } else { + UserGroupInformation ugi; + try { + ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new RuntimeException(e); + } + Token<LlapTokenIdentifier> nodeToken = new Token<LlapTokenIdentifier>(llapToken); + SecurityUtil.setTokenService(nodeToken, NetUtils.createSocketAddrForHost( + nodeId.getHostname(), nodeId.getPort())); + ugi.addToken(nodeToken); + proxy = ugi.doAs(new PrivilegedAction<LlapDaemonProtocolBlockingPB>() { + @Override + public LlapDaemonProtocolBlockingPB run() { + return new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(), + nodeId.getPort(), retryPolicy, socketFactory); + } + }); + } + + LlapDaemonProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy); + if (proxyOld != null) { + // TODO Shutdown the new proxy. + proxy = proxyOld; + } + } + return proxy; + } + + private String getHostIdentifier(String hostname, int port) { + return hostname + ":" + port; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/0cbf45cf/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 9d47940..5c370ee 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -22,8 +22,10 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; @@ -79,6 +81,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { private static final Logger LOG = LoggerFactory.getLogger(LlapTaskCommunicator.class); + private static final boolean isInfoEnabled = LOG.isInfoEnabled(); + private static final boolean isDebugEnabed = LOG.isDebugEnabled(); + private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST; private final ConcurrentMap<String, ByteBuffer> credentialMap; @@ -88,11 +93,17 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { private final SourceStateTracker sourceStateTracker; private final Set<LlapNodeId> nodesForQuery = new HashSet<>(); - private TaskCommunicator communicator; + private LlapDaemonProtocolClientProxy communicator; private long deleteDelayOnDagComplete; private final LlapTaskUmbilicalProtocol umbilical; private final Token<LlapTokenIdentifier> token; + // These two structures track the list of known nodes, and the list of nodes which are sending in keep-alive heartbeats. + // Primarily for debugging purposes a.t.m, since there's some unexplained TASK_TIMEOUTS which are currently being observed. + private final ConcurrentMap<LlapNodeId, Long> knownNodeMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<LlapNodeId, PingingNodeInfo> pingedNodeMap = new ConcurrentHashMap<>(); + + private volatile String currentDagName; public LlapTaskCommunicator( @@ -131,7 +142,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { super.initialize(); Configuration conf = getConf(); int numThreads = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS); - this.communicator = new TaskCommunicator(numThreads, conf, token); + this.communicator = new LlapDaemonProtocolClientProxy(numThreads, conf, token); this.deleteDelayOnDagComplete = HiveConf.getTimeVar( conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS); LOG.info("Running LlapTaskCommunicator with " @@ -235,6 +246,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } LlapNodeId nodeId = LlapNodeId.getInstance(host, port); + registerKnownNode(nodeId); entityTracker.registerTaskAttempt(containerId, taskSpec.getTaskAttemptID(), host, port); nodesForQuery.add(nodeId); @@ -254,7 +266,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { getContext() .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId); communicator.sendSubmitWork(requestProto, host, port, - new TaskCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() { + new LlapDaemonProtocolClientProxy.ExecuteRequestCallback<SubmitWorkResponseProto>() { @Override public void setResponse(SubmitWorkResponseProto response) { if (response.hasSubmissionState()) { @@ -333,14 +345,14 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { LOG.info( "DBG: Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}", taskAttemptId.toString(), invokedByContainerEnd ? "containerEnd" : "taskEnd"); - LlapNodeId nodeId = entityTracker.getNodeIfForTaskAttempt(taskAttemptId); + LlapNodeId nodeId = entityTracker.getNodeIdForTaskAttempt(taskAttemptId); // NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself if (nodeId != null) { TerminateFragmentRequestProto request = TerminateFragmentRequestProto.newBuilder().setDagName(currentDagName) .setFragmentIdentifierString(taskAttemptId.toString()).build(); communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(), - new TaskCommunicator.ExecuteRequestCallback<TerminateFragmentResponseProto>() { + new LlapDaemonProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() { @Override public void setResponse(TerminateFragmentResponseProto response) { } @@ -365,7 +377,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { for (final LlapNodeId llapNodeId : nodesForQuery) { LOG.info("Sending dagComplete message for {}, to {}", dagName, llapNodeId); communicator.sendQueryComplete(request, llapNodeId.getHostname(), llapNodeId.getPort(), - new TaskCommunicator.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>() { + new LlapDaemonProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>() { @Override public void setResponse(LlapDaemonProtocolProtos.QueryCompleteResponseProto response) { } @@ -391,7 +403,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { public void sendStateUpdate(final String host, final int port, final SourceStateUpdatedRequestProto request) { communicator.sendSourceStateUpdate(request, host, port, - new TaskCommunicator.ExecuteRequestCallback<SourceStateUpdatedResponseProto>() { + new LlapDaemonProtocolClientProxy.ExecuteRequestCallback<SourceStateUpdatedResponseProto>() { @Override public void setResponse(SourceStateUpdatedResponseProto response) { } @@ -409,6 +421,79 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } + private static class PingingNodeInfo { + final AtomicLong logTimestamp; + final AtomicInteger pingCount; + + PingingNodeInfo(long currentTs) { + logTimestamp = new AtomicLong(currentTs); + pingCount = new AtomicInteger(1); + } + } + + public void registerKnownNode(LlapNodeId nodeId) { + Long old = knownNodeMap.putIfAbsent(nodeId, + TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS)); + if (old == null) { + if (isInfoEnabled) { + LOG.info("Added new known node: {}", nodeId); + } + } + } + + public void registerPingingNode(LlapNodeId nodeId) { + long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); + PingingNodeInfo ni = new PingingNodeInfo(currentTs); + PingingNodeInfo old = pingedNodeMap.put(nodeId, ni); + if (old == null) { + if (isInfoEnabled) { + LOG.info("Added new pinging node: [{}]", nodeId); + } + } else { + old.pingCount.incrementAndGet(); + } + // The node should always be known by this point. Log occasionally if it is not known. + if (!knownNodeMap.containsKey(nodeId)) { + if (old == null) { + // First time this is seen. Log it. + LOG.warn("Received ping from unknownNode: [{}], count={}", nodeId, ni.pingCount.get()); + } else { + // Pinged before. Log only occasionally. + if (currentTs > old.logTimestamp.get() + 5000l) { // 5 seconds elapsed. Log again. + LOG.warn("Received ping from unknownNode: [{}], count={}", nodeId, old.pingCount.get()); + old.logTimestamp.set(currentTs); + } + } + + } + } + + + private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0); + + void nodePinged(String hostname, int port) { + LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port); + registerPingingNode(nodeId); + BiMap<ContainerId, TezTaskAttemptID> biMap = + entityTracker.getContainerAttemptMapForNode(nodeId); + if (biMap != null) { + synchronized (biMap) { + for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) { + getContext().taskAlive(entry.getValue()); + getContext().containerAlive(entry.getKey()); + } + } + } else { + long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); + if (currentTs > nodeNotFoundLogTime.get() + 5000l) { + LOG.warn("Received ping from node without any registered tasks or containers: " + hostname + + ":" + port + + ". Could be caused by pre-emption by the AM," + + " or a mismatched hostname. Enable debug logging for mismatched host names"); + nodeNotFoundLogTime.set(currentTs); + } + } + } private void resetCurrentDag(String newDagName) { // Working on the assumption that a single DAG runs at a time per AM. @@ -454,6 +539,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength()); } + + protected class LlapTaskUmbilicalProtocolImpl implements LlapTaskUmbilicalProtocol { private final TezTaskUmbilicalProtocol tezUmbilical; @@ -475,7 +562,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { @Override public void nodeHeartbeat(Text hostname, int port) throws IOException { - entityTracker.nodePinged(hostname.toString(), port); + nodePinged(hostname.toString(), port); if (LOG.isDebugEnabled()) { LOG.debug("Received heartbeat from [" + hostname + ":" + port +"]"); } @@ -502,10 +589,17 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } } - private final class EntityTracker { - private final ConcurrentMap<TezTaskAttemptID, LlapNodeId> attemptToNodeMap = new ConcurrentHashMap<>(); - private final ConcurrentMap<ContainerId, LlapNodeId> containerToNodeMap = new ConcurrentHashMap<>(); - private final ConcurrentMap<LlapNodeId, BiMap<ContainerId, TezTaskAttemptID>> nodeMap = new ConcurrentHashMap<>(); + /** + * Track the association between known containers and taskAttempts, along with the nodes they are assigned to. + */ + @VisibleForTesting + static final class EntityTracker { + @VisibleForTesting + final ConcurrentMap<TezTaskAttemptID, LlapNodeId> attemptToNodeMap = new ConcurrentHashMap<>(); + @VisibleForTesting + final ConcurrentMap<ContainerId, LlapNodeId> containerToNodeMap = new ConcurrentHashMap<>(); + @VisibleForTesting + final ConcurrentMap<LlapNodeId, BiMap<ContainerId, TezTaskAttemptID>> nodeMap = new ConcurrentHashMap<>(); void registerTaskAttempt(ContainerId containerId, TezTaskAttemptID taskAttemptId, String host, int port) { if (LOG.isDebugEnabled()) { @@ -513,6 +607,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } LlapNodeId llapNodeId = LlapNodeId.getInstance(host, port); attemptToNodeMap.putIfAbsent(taskAttemptId, llapNodeId); + + registerContainer(containerId, host, port); + + // nodeMap registration. BiMap<ContainerId, TezTaskAttemptID> tmpMap = HashBiMap.create(); BiMap<ContainerId, TezTaskAttemptID> old = nodeMap.putIfAbsent(llapNodeId, tmpMap); BiMap<ContainerId, TezTaskAttemptID> usedInstance; @@ -538,10 +636,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { synchronized(bMap) { matched = bMap.inverse().remove(attemptId); } - } - // Removing here. Registration into the map has to make sure to put - if (bMap.isEmpty()) { - nodeMap.remove(llapNodeId); + if (bMap.isEmpty()) { + nodeMap.remove(llapNodeId); + } } // Remove the container mapping @@ -552,23 +649,29 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } void registerContainer(ContainerId containerId, String hostname, int port) { + if (LOG.isDebugEnabled()) { + LOG.debug("Registering " + containerId + " for node: " + hostname + ":" + port); + } containerToNodeMap.putIfAbsent(containerId, LlapNodeId.getInstance(hostname, port)); + // nodeMap registration is not required, since there's no taskId association. } LlapNodeId getNodeIdForContainer(ContainerId containerId) { return containerToNodeMap.get(containerId); } - LlapNodeId getNodeIfForTaskAttempt(TezTaskAttemptID taskAttemptId) { + LlapNodeId getNodeIdForTaskAttempt(TezTaskAttemptID taskAttemptId) { return attemptToNodeMap.get(taskAttemptId); } ContainerId getContainerIdForAttempt(TezTaskAttemptID taskAttemptId) { - LlapNodeId llapNodeId = getNodeIfForTaskAttempt(taskAttemptId); + LlapNodeId llapNodeId = getNodeIdForTaskAttempt(taskAttemptId); if (llapNodeId != null) { BiMap<TezTaskAttemptID, ContainerId> bMap = nodeMap.get(llapNodeId).inverse(); if (bMap != null) { - return bMap.get(taskAttemptId); + synchronized (bMap) { + return bMap.get(taskAttemptId); + } } else { return null; } @@ -582,7 +685,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { if (llapNodeId != null) { BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId); if (bMap != null) { - return bMap.get(containerId); + synchronized (bMap) { + return bMap.get(containerId); + } } else { return null; } @@ -604,10 +709,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { synchronized(bMap) { matched = bMap.remove(containerId); } - } - // Removing here. Registration into the map has to make sure to put - if (bMap.isEmpty()) { - nodeMap.remove(llapNodeId); + if (bMap.isEmpty()) { + nodeMap.remove(llapNodeId); + } } // Remove the container mapping @@ -616,25 +720,20 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } } - private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0); - void nodePinged(String hostname, int port) { - LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port); - BiMap<ContainerId, TezTaskAttemptID> biMap = nodeMap.get(nodeId); - if (biMap != null) { - synchronized(biMap) { - for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) { - getContext().taskAlive(entry.getValue()); - getContext().containerAlive(entry.getKey()); - } - } - } else { - if (System.currentTimeMillis() > nodeNotFoundLogTime.get() + 5000l) { - LOG.warn("Received ping from unknown node: " + hostname + ":" + port + - ". Could be caused by pre-emption by the AM," + - " or a mismatched hostname. Enable debug logging for mismatched host names"); - nodeNotFoundLogTime.set(System.currentTimeMillis()); - } - } + /** + * Return a {@link BiMap} containing container->taskAttemptId mapping for the host specified. + * </p> + * <p/> + * This method return the internal structure used by the EntityTracker. Users must synchronize + * on the structure to ensure correct usage. + * + * @param llapNodeId + * @return + */ + BiMap<ContainerId, TezTaskAttemptID> getContainerAttemptMapForNode(LlapNodeId llapNodeId) { + BiMap<ContainerId, TezTaskAttemptID> biMap = nodeMap.get(llapNodeId); + return biMap; } + } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hive/blob/0cbf45cf/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java deleted file mode 100644 index f9ca677..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java +++ /dev/null @@ -1,512 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.tezplugins; - -import javax.net.SocketFactory; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.security.PrivilegedAction; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.Message; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.LlapNodeId; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; -import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; -import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.service.AbstractService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TaskCommunicator extends AbstractService { - - private static final Logger LOG = LoggerFactory.getLogger(TaskCommunicator.class); - - private final ConcurrentMap<String, LlapDaemonProtocolBlockingPB> hostProxies; - - private final RequestManager requestManager; - private final RetryPolicy retryPolicy; - private final SocketFactory socketFactory; - - private final ListeningExecutorService requestManagerExecutor; - private volatile ListenableFuture<Void> requestManagerFuture; - private final Token<LlapTokenIdentifier> llapToken; - - public TaskCommunicator( - int numThreads, Configuration conf, Token<LlapTokenIdentifier> llapToken) { - super(TaskCommunicator.class.getSimpleName()); - this.hostProxies = new ConcurrentHashMap<>(); - this.socketFactory = NetUtils.getDefaultSocketFactory(conf); - this.llapToken = llapToken; - - long connectionTimeout = HiveConf.getTimeVar(conf, - ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); - long retrySleep = HiveConf.getTimeVar(conf, - ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, - TimeUnit.MILLISECONDS); - this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep( - connectionTimeout, retrySleep, TimeUnit.MILLISECONDS); - - this.requestManager = new RequestManager(numThreads); - ExecutorService localExecutor = Executors.newFixedThreadPool(1, - new ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build()); - this.requestManagerExecutor = MoreExecutors.listeningDecorator(localExecutor); - - LOG.info("Setting up taskCommunicator with" + - "numThreads=" + numThreads + - "retryTime(millis)=" + connectionTimeout + - "retrySleep(millis)=" + retrySleep); - } - - @Override - public void serviceStart() { - requestManagerFuture = requestManagerExecutor.submit(requestManager); - Futures.addCallback(requestManagerFuture, new FutureCallback<Void>() { - @Override - public void onSuccess(Void result) { - LOG.info("RequestManager shutdown"); - } - - @Override - public void onFailure(Throwable t) { - LOG.warn("RequestManager shutdown with error", t); - } - }); - } - - @Override - public void serviceStop() { - if (requestManagerFuture != null) { - requestManager.shutdown(); - requestManagerFuture.cancel(true); - } - requestManagerExecutor.shutdown(); - } - - public void sendSubmitWork(SubmitWorkRequestProto request, String host, int port, - final ExecuteRequestCallback<SubmitWorkResponseProto> callback) { - LlapNodeId nodeId = LlapNodeId.getInstance(host, port); - requestManager.queueRequest(new SubmitWorkCallable(nodeId, request, callback)); - } - - public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final String host, - final int port, - final ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) { - LlapNodeId nodeId = LlapNodeId.getInstance(host, port); - requestManager.queueRequest( - new SendSourceStateUpdateCallable(nodeId, request, callback)); - } - - public void sendQueryComplete(final QueryCompleteRequestProto request, final String host, - final int port, - final ExecuteRequestCallback<QueryCompleteResponseProto> callback) { - LlapNodeId nodeId = LlapNodeId.getInstance(host, port); - requestManager.queueRequest(new SendQueryCompleteCallable(nodeId, request, callback)); - } - - public void sendTerminateFragment(final TerminateFragmentRequestProto request, final String host, - final int port, - final ExecuteRequestCallback<TerminateFragmentResponseProto> callback) { - LlapNodeId nodeId = LlapNodeId.getInstance(host, port); - requestManager.queueRequest(new SendTerminateFragmentCallable(nodeId, request, callback)); - } - - @VisibleForTesting - static class RequestManager implements Callable<Void> { - - private final Lock lock = new ReentrantLock(); - private final AtomicBoolean isShutdown = new AtomicBoolean(false); - private final Condition queueCondition = lock.newCondition(); - private final AtomicBoolean shouldRun = new AtomicBoolean(false); - - private final int maxConcurrentRequestsPerNode = 1; - private final ListeningExecutorService executor; - - - // Tracks new additions via add, while the loop is processing existing ones. - private final LinkedList<CallableRequest> newRequestList = new LinkedList<>(); - - // Tracks existing requests which are cycled through. - private final LinkedList<CallableRequest> pendingRequests = new LinkedList<>(); - - // Tracks requests executing per node - private final ConcurrentMap<LlapNodeId, AtomicInteger> runningRequests = new ConcurrentHashMap<>(); - - // Tracks completed requests pre node - private final LinkedList<LlapNodeId> completedNodes = new LinkedList<>(); - - public RequestManager(int numThreads) { - ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads, - new ThreadFactoryBuilder().setNameFormat("TaskCommunicator #%2d").build()); - executor = MoreExecutors.listeningDecorator(localExecutor); - } - - - @VisibleForTesting - Set<LlapNodeId> currentLoopDisabledNodes = new HashSet<>(); - @VisibleForTesting - List<CallableRequest> currentLoopSkippedRequests = new LinkedList<>(); - @Override - public Void call() { - // Caches disabled nodes for quicker lookups and ensures a request on a node which was skipped - // does not go out of order. - while (!isShutdown.get()) { - lock.lock(); - try { - while (!shouldRun.get()) { - queueCondition.await(); - break; // Break out and try executing. - } - boolean shouldBreak = process(); - if (shouldBreak) { - break; - } - } catch (InterruptedException e) { - if (isShutdown.get()) { - break; - } else { - LOG.warn("RunLoop interrupted without being shutdown first"); - throw new RuntimeException(e); - } - } finally { - lock.unlock(); - } - } - LOG.info("CallScheduler loop exiting"); - return null; - } - - /* Add a new request to be executed */ - public void queueRequest(CallableRequest request) { - synchronized (newRequestList) { - newRequestList.add(request); - shouldRun.set(true); - } - notifyRunLoop(); - } - - /* Indicates a request has completed on a node */ - public void requestFinished(LlapNodeId nodeId) { - synchronized (completedNodes) { - completedNodes.add(nodeId); - shouldRun.set(true); - } - notifyRunLoop(); - } - - public void shutdown() { - if (!isShutdown.getAndSet(true)) { - executor.shutdownNow(); - notifyRunLoop(); - } - } - - @VisibleForTesting - void submitToExecutor(CallableRequest request, LlapNodeId nodeId) { - ListenableFuture<SourceStateUpdatedResponseProto> future = - executor.submit(request); - Futures.addCallback(future, new ResponseCallback(request.getCallback(), nodeId, this)); - } - - @VisibleForTesting - boolean process() { - if (isShutdown.get()) { - return true; - } - currentLoopDisabledNodes.clear(); - currentLoopSkippedRequests.clear(); - - // Set to false to block the next loop. This must be called before draining the lists, - // otherwise an add/completion after draining the lists but before setting it to false, - // will not trigger a run. May cause one unnecessary run if an add comes in before drain. - // drain list. add request (setTrue). setFalse needs to be avoided. - shouldRun.compareAndSet(true, false); - // Drain any calls which may have come in during the last execution of the loop. - drainNewRequestList(); // Locks newRequestList - drainCompletedNodes(); // Locks completedNodes - - - Iterator<CallableRequest> iterator = pendingRequests.iterator(); - while (iterator.hasNext()) { - CallableRequest request = iterator.next(); - iterator.remove(); - LlapNodeId nodeId = request.getNodeId(); - if (canRunForNode(nodeId, currentLoopDisabledNodes)) { - submitToExecutor(request, nodeId); - } else { - currentLoopDisabledNodes.add(nodeId); - currentLoopSkippedRequests.add(request); - } - } - // Tried scheduling everything that could be scheduled in this loop. - pendingRequests.addAll(0, currentLoopSkippedRequests); - return false; - } - - private void drainNewRequestList() { - synchronized (newRequestList) { - if (!newRequestList.isEmpty()) { - pendingRequests.addAll(newRequestList); - newRequestList.clear(); - } - } - } - - private void drainCompletedNodes() { - synchronized (completedNodes) { - if (!completedNodes.isEmpty()) { - for (LlapNodeId nodeId : completedNodes) { - runningRequests.get(nodeId).decrementAndGet(); - } - } - completedNodes.clear(); - } - } - - private boolean canRunForNode(LlapNodeId nodeId, Set<LlapNodeId> currentRunDisabledNodes) { - if (currentRunDisabledNodes.contains(nodeId)) { - return false; - } else { - AtomicInteger count = runningRequests.get(nodeId); - if (count == null) { - count = new AtomicInteger(0); - AtomicInteger old = runningRequests.putIfAbsent(nodeId, count); - count = old != null ? old : count; - } - if (count.incrementAndGet() <= maxConcurrentRequestsPerNode) { - return true; - } else { - count.decrementAndGet(); - return false; - } - } - } - - private void notifyRunLoop() { - lock.lock(); - try { - queueCondition.signal(); - } finally { - lock.unlock(); - } - } - } - - - private static final class ResponseCallback<TYPE extends Message> - implements FutureCallback<TYPE> { - - private final ExecuteRequestCallback<TYPE> callback; - private final LlapNodeId nodeId; - private final RequestManager requestManager; - - public ResponseCallback(ExecuteRequestCallback<TYPE> callback, LlapNodeId nodeId, - RequestManager requestManager) { - this.callback = callback; - this.nodeId = nodeId; - this.requestManager = requestManager; - } - - @Override - public void onSuccess(TYPE result) { - try { - callback.setResponse(result); - } finally { - requestManager.requestFinished(nodeId); - } - } - - @Override - public void onFailure(Throwable t) { - try { - callback.indicateError(t); - } finally { - requestManager.requestFinished(nodeId); - } - } - } - - @VisibleForTesting - static abstract class CallableRequest<REQUEST extends Message, RESPONSE extends Message> - implements Callable { - - final LlapNodeId nodeId; - final ExecuteRequestCallback<RESPONSE> callback; - final REQUEST request; - - - protected CallableRequest(LlapNodeId nodeId, REQUEST request, ExecuteRequestCallback<RESPONSE> callback) { - this.nodeId = nodeId; - this.request = request; - this.callback = callback; - } - - public LlapNodeId getNodeId() { - return nodeId; - } - - public ExecuteRequestCallback<RESPONSE> getCallback() { - return callback; - } - - public abstract RESPONSE call() throws Exception; - } - - private class SubmitWorkCallable extends CallableRequest<SubmitWorkRequestProto, SubmitWorkResponseProto> { - - protected SubmitWorkCallable(LlapNodeId nodeId, - SubmitWorkRequestProto submitWorkRequestProto, - ExecuteRequestCallback<SubmitWorkResponseProto> callback) { - super(nodeId, submitWorkRequestProto, callback); - } - - @Override - public SubmitWorkResponseProto call() throws Exception { - return getProxy(nodeId).submitWork(null, request); - } - } - - private class SendSourceStateUpdateCallable - extends CallableRequest<SourceStateUpdatedRequestProto, SourceStateUpdatedResponseProto> { - - public SendSourceStateUpdateCallable(LlapNodeId nodeId, - SourceStateUpdatedRequestProto request, - ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) { - super(nodeId, request, callback); - } - - @Override - public SourceStateUpdatedResponseProto call() throws Exception { - return getProxy(nodeId).sourceStateUpdated(null, request); - } - } - - private class SendQueryCompleteCallable - extends CallableRequest<QueryCompleteRequestProto, QueryCompleteResponseProto> { - - protected SendQueryCompleteCallable(LlapNodeId nodeId, - QueryCompleteRequestProto queryCompleteRequestProto, - ExecuteRequestCallback<QueryCompleteResponseProto> callback) { - super(nodeId, queryCompleteRequestProto, callback); - } - - @Override - public QueryCompleteResponseProto call() throws Exception { - return getProxy(nodeId).queryComplete(null, request); - } - } - - private class SendTerminateFragmentCallable - extends CallableRequest<TerminateFragmentRequestProto, TerminateFragmentResponseProto> { - - protected SendTerminateFragmentCallable(LlapNodeId nodeId, - TerminateFragmentRequestProto terminateFragmentRequestProto, - ExecuteRequestCallback<TerminateFragmentResponseProto> callback) { - super(nodeId, terminateFragmentRequestProto, callback); - } - - @Override - public TerminateFragmentResponseProto call() throws Exception { - return getProxy(nodeId).terminateFragment(null, request); - } - } - - public interface ExecuteRequestCallback<T extends Message> { - void setResponse(T response); - void indicateError(Throwable t); - } - - private LlapDaemonProtocolBlockingPB getProxy(final LlapNodeId nodeId) { - String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort()); - - LlapDaemonProtocolBlockingPB proxy = hostProxies.get(hostId); - if (proxy == null) { - if (llapToken == null) { - proxy = new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(), - nodeId.getPort(), retryPolicy, socketFactory); - } else { - UserGroupInformation ugi; - try { - ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new RuntimeException(e); - } - Token<LlapTokenIdentifier> nodeToken = new Token<LlapTokenIdentifier>(llapToken); - SecurityUtil.setTokenService(nodeToken, NetUtils.createSocketAddrForHost( - nodeId.getHostname(), nodeId.getPort())); - ugi.addToken(nodeToken); - proxy = ugi.doAs(new PrivilegedAction<LlapDaemonProtocolBlockingPB>() { - @Override - public LlapDaemonProtocolBlockingPB run() { - return new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(), - nodeId.getPort(), retryPolicy, socketFactory); - } - }); - } - - LlapDaemonProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy); - if (proxyOld != null) { - // TODO Shutdown the new proxy. - proxy = proxyOld; - } - } - return proxy; - } - - private String getHostIdentifier(String hostname, int port) { - return hostname + ":" + port; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/0cbf45cf/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java new file mode 100644 index 0000000..a6af8c2 --- /dev/null +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java @@ -0,0 +1,143 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.HashMap; +import java.util.Map; + +import com.google.protobuf.Message; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.hive.llap.LlapNodeId; +import org.junit.Test; + +public class TestLlapDaemonProtocolClientProxy { + + @Test (timeout = 5000) + public void testMultipleNodes() { + RequestManagerForTest requestManager = new RequestManagerForTest(1); + + LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025); + LlapNodeId nodeId2 = LlapNodeId.getInstance("host2", 1025); + + Message mockMessage = mock(Message.class); + LlapDaemonProtocolClientProxy.ExecuteRequestCallback mockExecuteRequestCallback = mock( + LlapDaemonProtocolClientProxy.ExecuteRequestCallback.class); + + // Request two messages + requestManager.queueRequest( + new CallableRequestForTest(nodeId1, mockMessage, mockExecuteRequestCallback)); + requestManager.queueRequest( + new CallableRequestForTest(nodeId2, mockMessage, mockExecuteRequestCallback)); + + // Should go through in a single process call + requestManager.process(); + assertEquals(2, requestManager.numSubmissionsCounters); + assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1)); + assertNotNull(requestManager.numInvocationsPerNode.get(nodeId2)); + assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); + assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId2).getValue().intValue()); + assertEquals(0, requestManager.currentLoopSkippedRequests.size()); + assertEquals(0, requestManager.currentLoopSkippedRequests.size()); + assertEquals(0, requestManager.currentLoopDisabledNodes.size()); + } + + @Test(timeout = 5000) + public void testSingleInvocationPerNode() { + RequestManagerForTest requestManager = new RequestManagerForTest(1); + + LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025); + + Message mockMessage = mock(Message.class); + LlapDaemonProtocolClientProxy.ExecuteRequestCallback mockExecuteRequestCallback = mock( + LlapDaemonProtocolClientProxy.ExecuteRequestCallback.class); + + // First request for host. + requestManager.queueRequest( + new CallableRequestForTest(nodeId1, mockMessage, mockExecuteRequestCallback)); + requestManager.process(); + assertEquals(1, requestManager.numSubmissionsCounters); + assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1)); + assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); + assertEquals(0, requestManager.currentLoopSkippedRequests.size()); + + // Second request for host. Single invocation since the last has not completed. + requestManager.queueRequest( + new CallableRequestForTest(nodeId1, mockMessage, mockExecuteRequestCallback)); + requestManager.process(); + assertEquals(1, requestManager.numSubmissionsCounters); + assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1)); + assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); + assertEquals(1, requestManager.currentLoopSkippedRequests.size()); + assertEquals(1, requestManager.currentLoopDisabledNodes.size()); + assertTrue(requestManager.currentLoopDisabledNodes.contains(nodeId1)); + + // Complete first request. Second pending request should go through. + requestManager.requestFinished(nodeId1); + requestManager.process(); + assertEquals(2, requestManager.numSubmissionsCounters); + assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1)); + assertEquals(2, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); + assertEquals(0, requestManager.currentLoopSkippedRequests.size()); + assertEquals(0, requestManager.currentLoopDisabledNodes.size()); + assertFalse(requestManager.currentLoopDisabledNodes.contains(nodeId1)); + } + + + static class RequestManagerForTest extends LlapDaemonProtocolClientProxy.RequestManager { + + int numSubmissionsCounters = 0; + private Map<LlapNodeId, MutableInt> numInvocationsPerNode = new HashMap<>(); + + public RequestManagerForTest(int numThreads) { + super(numThreads); + } + + protected void submitToExecutor(LlapDaemonProtocolClientProxy.CallableRequest request, LlapNodeId nodeId) { + numSubmissionsCounters++; + MutableInt nodeCount = numInvocationsPerNode.get(nodeId); + if (nodeCount == null) { + nodeCount = new MutableInt(0); + numInvocationsPerNode.put(nodeId, nodeCount); + } + nodeCount.increment(); + } + + void reset() { + numSubmissionsCounters = 0; + numInvocationsPerNode.clear(); + } + + } + + static class CallableRequestForTest extends LlapDaemonProtocolClientProxy.CallableRequest<Message, Message> { + + protected CallableRequestForTest(LlapNodeId nodeId, Message message, + LlapDaemonProtocolClientProxy.ExecuteRequestCallback<Message> callback) { + super(nodeId, message, callback); + } + + @Override + public Message call() throws Exception { + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/0cbf45cf/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java new file mode 100644 index 0000000..8f3d104 --- /dev/null +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import org.apache.hadoop.hive.llap.LlapNodeId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.junit.Test; + +public class TestLlapTaskCommunicator { + + @Test (timeout = 5000) + public void testEntityTracker1() { + LlapTaskCommunicator.EntityTracker entityTracker = new LlapTaskCommunicator.EntityTracker(); + + String host1 = "host1"; + String host2 = "host2"; + String host3 = "host3"; + int port = 1451; + + + // Simple container registration and un-registration without any task attempt being involved. + ContainerId containerId101 = constructContainerId(101); + entityTracker.registerContainer(containerId101, host1, port); + assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForContainer(containerId101)); + + entityTracker.unregisterContainer(containerId101); + assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1, port))); + assertNull(entityTracker.getNodeIdForContainer(containerId101)); + assertEquals(0, entityTracker.nodeMap.size()); + assertEquals(0, entityTracker.attemptToNodeMap.size()); + assertEquals(0, entityTracker.containerToNodeMap.size()); + + + // Simple task registration and un-registration. + ContainerId containerId1 = constructContainerId(1); + TezTaskAttemptID taskAttemptId1 = constructTaskAttemptId(1); + entityTracker.registerTaskAttempt(containerId1, taskAttemptId1, host1, port); + assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForContainer(containerId1)); + assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForTaskAttempt(taskAttemptId1)); + + entityTracker.unregisterTaskAttempt(taskAttemptId1); + assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1, port))); + assertNull(entityTracker.getNodeIdForContainer(containerId1)); + assertNull(entityTracker.getNodeIdForTaskAttempt(taskAttemptId1)); + assertEquals(0, entityTracker.nodeMap.size()); + assertEquals(0, entityTracker.attemptToNodeMap.size()); + assertEquals(0, entityTracker.containerToNodeMap.size()); + + // Register taskAttempt, unregister container. TaskAttempt should also be unregistered + ContainerId containerId201 = constructContainerId(201); + TezTaskAttemptID taskAttemptId201 = constructTaskAttemptId(201); + entityTracker.registerTaskAttempt(containerId201, taskAttemptId201, host1, port); + assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForContainer(containerId201)); + assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForTaskAttempt(taskAttemptId201)); + + entityTracker.unregisterContainer(containerId201); + assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1, port))); + assertNull(entityTracker.getNodeIdForContainer(containerId201)); + assertNull(entityTracker.getNodeIdForTaskAttempt(taskAttemptId201)); + assertEquals(0, entityTracker.nodeMap.size()); + assertEquals(0, entityTracker.attemptToNodeMap.size()); + assertEquals(0, entityTracker.containerToNodeMap.size()); + + entityTracker.unregisterTaskAttempt(taskAttemptId201); // No errors + } + + + private ContainerId constructContainerId(int id) { + ContainerId containerId = mock(ContainerId.class); + doReturn(id).when(containerId).getId(); + doReturn((long)id).when(containerId).getContainerId(); + return containerId; + } + + private TezTaskAttemptID constructTaskAttemptId(int id) { + TezTaskAttemptID taskAttemptId = mock(TezTaskAttemptID.class); + doReturn(id).when(taskAttemptId).getId(); + return taskAttemptId; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/0cbf45cf/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestTaskCommunicator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestTaskCommunicator.java deleted file mode 100644 index 2aef4ed..0000000 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestTaskCommunicator.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.tezplugins; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - -import java.util.HashMap; -import java.util.Map; - -import com.google.protobuf.Message; -import org.apache.commons.lang3.mutable.MutableInt; -import org.apache.hadoop.hive.llap.LlapNodeId; -import org.junit.Test; - -public class TestTaskCommunicator { - - @Test (timeout = 5000) - public void testMultipleNodes() { - RequestManagerForTest requestManager = new RequestManagerForTest(1); - - LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025); - LlapNodeId nodeId2 = LlapNodeId.getInstance("host2", 1025); - - Message mockMessage = mock(Message.class); - TaskCommunicator.ExecuteRequestCallback mockExecuteRequestCallback = mock( - TaskCommunicator.ExecuteRequestCallback.class); - - // Request two messages - requestManager.queueRequest( - new CallableRequestForTest(nodeId1, mockMessage, mockExecuteRequestCallback)); - requestManager.queueRequest( - new CallableRequestForTest(nodeId2, mockMessage, mockExecuteRequestCallback)); - - // Should go through in a single process call - requestManager.process(); - assertEquals(2, requestManager.numSubmissionsCounters); - assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1)); - assertNotNull(requestManager.numInvocationsPerNode.get(nodeId2)); - assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); - assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId2).getValue().intValue()); - assertEquals(0, requestManager.currentLoopSkippedRequests.size()); - assertEquals(0, requestManager.currentLoopSkippedRequests.size()); - assertEquals(0, requestManager.currentLoopDisabledNodes.size()); - } - - @Test(timeout = 5000) - public void testSingleInvocationPerNode() { - RequestManagerForTest requestManager = new RequestManagerForTest(1); - - LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025); - - Message mockMessage = mock(Message.class); - TaskCommunicator.ExecuteRequestCallback mockExecuteRequestCallback = mock( - TaskCommunicator.ExecuteRequestCallback.class); - - // First request for host. - requestManager.queueRequest( - new CallableRequestForTest(nodeId1, mockMessage, mockExecuteRequestCallback)); - requestManager.process(); - assertEquals(1, requestManager.numSubmissionsCounters); - assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1)); - assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); - assertEquals(0, requestManager.currentLoopSkippedRequests.size()); - - // Second request for host. Single invocation since the last has not completed. - requestManager.queueRequest( - new CallableRequestForTest(nodeId1, mockMessage, mockExecuteRequestCallback)); - requestManager.process(); - assertEquals(1, requestManager.numSubmissionsCounters); - assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1)); - assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); - assertEquals(1, requestManager.currentLoopSkippedRequests.size()); - assertEquals(1, requestManager.currentLoopDisabledNodes.size()); - assertTrue(requestManager.currentLoopDisabledNodes.contains(nodeId1)); - - // Complete first request. Second pending request should go through. - requestManager.requestFinished(nodeId1); - requestManager.process(); - assertEquals(2, requestManager.numSubmissionsCounters); - assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1)); - assertEquals(2, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue()); - assertEquals(0, requestManager.currentLoopSkippedRequests.size()); - assertEquals(0, requestManager.currentLoopDisabledNodes.size()); - assertFalse(requestManager.currentLoopDisabledNodes.contains(nodeId1)); - } - - - static class RequestManagerForTest extends TaskCommunicator.RequestManager { - - int numSubmissionsCounters = 0; - private Map<LlapNodeId, MutableInt> numInvocationsPerNode = new HashMap<>(); - - public RequestManagerForTest(int numThreads) { - super(numThreads); - } - - protected void submitToExecutor(TaskCommunicator.CallableRequest request, LlapNodeId nodeId) { - numSubmissionsCounters++; - MutableInt nodeCount = numInvocationsPerNode.get(nodeId); - if (nodeCount == null) { - nodeCount = new MutableInt(0); - numInvocationsPerNode.put(nodeId, nodeCount); - } - nodeCount.increment(); - } - - void reset() { - numSubmissionsCounters = 0; - numInvocationsPerNode.clear(); - } - - } - - static class CallableRequestForTest extends TaskCommunicator.CallableRequest<Message, Message> { - - protected CallableRequestForTest(LlapNodeId nodeId, Message message, - TaskCommunicator.ExecuteRequestCallback<Message> callback) { - super(nodeId, message, callback); - } - - @Override - public Message call() throws Exception { - return null; - } - } - -}