This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 5dd8a6f60e006c2dc707f241b7619634e4e82bbd Author: Sorabh Hamirwasia <shamirwa...@maprtech.com> AuthorDate: Tue May 8 13:06:20 2018 -0700 DRILL-6255: Drillbit while sending control message to itself creates a connection instead of submitting locally closes #1253 --- .../org/apache/drill/exec/rpc/BitRpcUtility.java | 18 ++ .../rpc/control/ConnectionManagerRegistry.java | 6 +- .../exec/rpc/control/ControlConnectionConfig.java | 9 +- .../exec/rpc/control/ControlConnectionManager.java | 16 +- .../drill/exec/rpc/control/ControlTunnel.java | 162 ++++++++---- .../rpc/control/LocalControlConnectionManager.java | 236 +++++++++++++++++ ...er.java => RemoteControlConnectionManager.java} | 26 +- .../org/apache/drill/exec/rpc/data/DataTunnel.java | 25 +- .../exec/work/batch/ControlMessageHandler.java | 51 ++-- .../drill/exec/work/foreman/FragmentsRunner.java | 105 ++------ .../exec/work/fragment/FragmentStatusReporter.java | 15 +- .../rpc/control/ConnectionManagerRegistryTest.java | 149 +++++++++++ .../control/TestLocalControlConnectionManager.java | 278 +++++++++++++++++++++ .../rpc/user/security/TestUserBitKerberos.java | 36 ++- .../work/fragment/FragmentStatusReporterTest.java | 73 +----- .../apache/drill/exec/rpc/FutureBitCommand.java | 20 +- .../apache/drill/exec/rpc/ListeningCommand.java | 33 +-- .../drill/exec/rpc/ReconnectingConnection.java | 16 +- .../java/org/apache/drill/exec/rpc/RpcCommand.java | 13 +- 19 files changed, 983 insertions(+), 304 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java index c398033..0bc01f5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java @@ -103,6 +103,24 @@ public final class BitRpcUtility { } } + /** + * Verifies if local and remote Drillbit Endpoint has same control server by using address and control port + * information. This method is used instead of equals in {@link DrillbitEndpoint} because DrillbitEndpoint stores + * state information in it. + * For local Drillbit a reference is stored in {@link org.apache.drill.exec.server.DrillbitContext} as soon as + * Drillbit is started in {@link org.apache.drill.exec.service.ServiceEngine#start} with state as STARTUP, but + * while planning minor fragment the assignment list is used from active list of Drillbits in which state for local + * Drillbit will not be STARTUP + * @param local - DrillbitEndpoint instance for local bit + * @param remote - DrillbitEndpoint instance for remote bit + * @return true if address and control port for local and remote are same. + * false - otherwise + */ + public static boolean isLocalControlServer(DrillbitEndpoint local, DrillbitEndpoint remote) { + return local.hasAddress() && local.hasControlPort() && remote.hasAddress() && remote.hasControlPort() && + local.getAddress().equals(remote.getAddress()) && local.getControlPort() == remote.getControlPort(); + } + // Suppress default constructor private BitRpcUtility() { } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java index 800cf3c..e27729c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.google.common.collect.Maps; +import org.apache.drill.exec.rpc.BitRpcUtility; public class ConnectionManagerRegistry implements Iterable<ControlConnectionManager> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionManagerRegistry.class); @@ -40,9 +41,12 @@ public class ConnectionManagerRegistry implements Iterable<ControlConnectionMana public ControlConnectionManager getConnectionManager(DrillbitEndpoint remoteEndpoint) { assert localEndpoint != null : "DrillbitEndpoint must be set before a connection manager can be retrieved"; + + final boolean isLocalServer = BitRpcUtility.isLocalControlServer(localEndpoint, remoteEndpoint); ControlConnectionManager m = registry.get(remoteEndpoint); if (m == null) { - m = new ControlConnectionManager(config, localEndpoint, remoteEndpoint); + m = (isLocalServer) ? new LocalControlConnectionManager(config, remoteEndpoint) + : new RemoteControlConnectionManager(config, localEndpoint, remoteEndpoint); final ControlConnectionManager m2 = registry.putIfAbsent(remoteEndpoint, m); if (m2 != null) { m = m2; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java index b19fb8b..f114af4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc.control; +import com.google.common.annotations.VisibleForTesting; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.rpc.BitConnectionConfig; @@ -24,8 +25,8 @@ import org.apache.drill.exec.server.BootStrapContext; import org.apache.drill.exec.work.batch.ControlMessageHandler; // config for bit to bit connection -// package private -class ControlConnectionConfig extends BitConnectionConfig { +@VisibleForTesting +public class ControlConnectionConfig extends BitConnectionConfig { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnectionConfig.class); private final ControlMessageHandler handler; @@ -41,8 +42,8 @@ class ControlConnectionConfig extends BitConnectionConfig { return "control"; // unused } - ControlMessageHandler getMessageHandler() { + @VisibleForTesting + public ControlMessageHandler getMessageHandler() { return handler; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java index 6bfcbd5..240421e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java @@ -25,14 +25,10 @@ import org.apache.drill.exec.rpc.ReconnectingConnection; /** * Maintains connection between two particular bits. */ -public class ControlConnectionManager extends ReconnectingConnection<ControlConnection, BitControlHandshake>{ +public abstract class ControlConnectionManager extends ReconnectingConnection<ControlConnection, BitControlHandshake>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnectionManager.class); - private final ControlConnectionConfig config; - private final DrillbitEndpoint remoteEndpoint; - - public ControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint localEndpoint, - DrillbitEndpoint remoteEndpoint) { + public ControlConnectionManager(DrillbitEndpoint localEndpoint, DrillbitEndpoint remoteEndpoint) { super( BitControlHandshake.newBuilder() .setRpcVersion(ControlRpcConfig.RPC_VERSION) @@ -40,14 +36,8 @@ public class ControlConnectionManager extends ReconnectingConnection<ControlConn .build(), remoteEndpoint.getAddress(), remoteEndpoint.getControlPort()); - - this.config = config; - this.remoteEndpoint = remoteEndpoint; } @Override - protected BasicClient<?, ControlConnection, BitControlHandshake, ?> getNewClient() { - return new ControlClient(config, remoteEndpoint, new CloseHandlerCreator()); - } - + protected abstract BasicClient<?, ControlConnection, BitControlHandshake, ?> getNewClient(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java index 1a4af90..492d4de 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java @@ -17,11 +17,20 @@ */ package org.apache.drill.exec.rpc.control; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; import io.netty.buffer.ByteBuf; import io.netty.buffer.DrillBuf; - -import java.util.concurrent.TimeUnit; - import org.apache.drill.exec.proto.BitControl.CustomMessage; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FragmentStatus; @@ -38,19 +47,7 @@ import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.control.Controller.CustomSerDe; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.MessageLite; -import com.google.protobuf.Parser; - +import java.util.concurrent.TimeUnit; public class ControlTunnel { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlTunnel.class); @@ -99,8 +96,7 @@ public class ControlTunnel { return b.getFuture(); } - - public static class SendFragmentStatus extends FutureBitCommand<Ack, ControlConnection> { + public static class SendFragmentStatus extends FutureBitCommand<Ack, ControlConnection, RpcType, FragmentStatus> { final FragmentStatus status; public SendFragmentStatus(FragmentStatus status) { @@ -110,13 +106,22 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.sendUnsafe(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, status, Ack.class); + connection.sendUnsafe(outcomeListener, getRpcType(), status, Ack.class); } - } + @Override + public RpcType getRpcType() { + return RpcType.REQ_FRAGMENT_STATUS; + } + @Override + public FragmentStatus getMessage() { + return status; + } - public static class ReceiverFinished extends ListeningCommand<Ack, ControlConnection> { + } + + public static class ReceiverFinished extends ListeningCommand<Ack, ControlConnection, RpcType, FinishedReceiver> { final FinishedReceiver finishedReceiver; public ReceiverFinished(RpcOutcomeListener<Ack> listener, FinishedReceiver finishedReceiver) { @@ -126,11 +131,21 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_RECEIVER_FINISHED, finishedReceiver, Ack.class); + connection.send(outcomeListener, getRpcType(), finishedReceiver, Ack.class); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_RECEIVER_FINISHED; + } + + @Override + public FinishedReceiver getMessage() { + return finishedReceiver; } } - public static class SignalFragment extends ListeningCommand<Ack, ControlConnection> { + public static class SignalFragment extends ListeningCommand<Ack, ControlConnection, RpcType, FragmentHandle> { final FragmentHandle handle; final RpcType type; @@ -145,9 +160,18 @@ public class ControlTunnel { connection.sendUnsafe(outcomeListener, type, handle, Ack.class); } + @Override + public RpcType getRpcType() { + return type; + } + + @Override + public FragmentHandle getMessage() { + return handle; + } } - public static class SendFragment extends ListeningCommand<Ack, ControlConnection> { + public static class SendFragment extends ListeningCommand<Ack, ControlConnection, RpcType, InitializeFragments> { final InitializeFragments fragments; public SendFragment(RpcOutcomeListener<Ack> listener, InitializeFragments fragments) { @@ -157,12 +181,21 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_INITIALIZE_FRAGMENTS, fragments, Ack.class); + connection.send(outcomeListener, getRpcType(), fragments, Ack.class); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_INITIALIZE_FRAGMENTS; } + @Override + public InitializeFragments getMessage() { + return fragments; + } } - public static class RequestProfile extends FutureBitCommand<QueryProfile, ControlConnection> { + public static class RequestProfile extends FutureBitCommand<QueryProfile, ControlConnection, RpcType, QueryId> { final QueryId queryId; public RequestProfile(QueryId queryId) { @@ -172,11 +205,21 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<QueryProfile> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_QUERY_STATUS, queryId, QueryProfile.class); + connection.send(outcomeListener, getRpcType(), queryId, QueryProfile.class); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_QUERY_STATUS; + } + + @Override + public QueryId getMessage() { + return queryId; } } - public static class CancelQuery extends FutureBitCommand<Ack, ControlConnection> { + public static class CancelQuery extends FutureBitCommand<Ack, ControlConnection, RpcType, QueryId> { final QueryId queryId; public CancelQuery(QueryId queryId) { @@ -186,7 +229,17 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_QUERY_CANCEL, queryId, Ack.class); + connection.send(outcomeListener, getRpcType(), queryId, Ack.class); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_QUERY_CANCEL; + } + + @Override + public QueryId getMessage() { + return queryId; } } @@ -204,8 +257,8 @@ public class ControlTunnel { return new CustomTunnel<SEND, RECEIVE>(messageTypeId, send, receive); } - - private static class CustomMessageSender extends ListeningCommand<CustomMessage, ControlConnection> { + public static class CustomMessageSender extends + ListeningCommand<CustomMessage, ControlConnection, RpcType, CustomMessage> { private CustomMessage message; private ByteBuf[] dataBodies; @@ -218,12 +271,27 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies); + connection.send(outcomeListener, getRpcType(), message, CustomMessage.class, dataBodies); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_CUSTOM; + } + + @Override + public CustomMessage getMessage() { + return message; + } + + public ByteBuf[] getDataBodies() { + return dataBodies; } } - private static class SyncCustomMessageSender extends FutureBitCommand<CustomMessage, ControlConnection> { + public static class SyncCustomMessageSender extends + FutureBitCommand<CustomMessage, ControlConnection, RpcType, CustomMessage> { private CustomMessage message; private ByteBuf[] dataBodies; @@ -236,7 +304,21 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies); + connection.send(outcomeListener, getRpcType(), message, CustomMessage.class, dataBodies); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_CUSTOM; + } + + @Override + public CustomMessage getMessage() { + return message; + } + + public ByteBuf[] getDataBodies() { + return dataBodies; } } @@ -261,8 +343,7 @@ public class ControlTunnel { return serde.deserializeReceived(message.getMessage().toByteArray()); } - public RECEIVE get(long timeout, TimeUnit unit) throws Exception, - InvalidProtocolBufferException { + public RECEIVE get(long timeout, TimeUnit unit) throws Exception, InvalidProtocolBufferException { CustomMessage message = future.checkedGet(timeout, unit); return serde.deserializeReceived(message.getMessage().toByteArray()); } @@ -270,7 +351,6 @@ public class ControlTunnel { public DrillBuf getBuffer() throws RpcException { return (DrillBuf) future.getBuffer(); } - } @@ -351,21 +431,15 @@ public class ControlTunnel { } catch (Exception e) { innerListener.failed(new RpcException("Failure while parsing message locally.", e)); } - } @Override public void interrupted(InterruptedException e) { innerListener.interrupted(e); } - } - } - - - public static class ProtoSerDe<MSG extends MessageLite> implements CustomSerDe<MSG> { private final Parser<MSG> parser; @@ -420,7 +494,5 @@ public class ControlTunnel { public MSG deserializeReceived(byte[] bytes) throws Exception { return (MSG) reader.readValue(bytes); } - } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java new file mode 100644 index 0000000..fa6a2e9 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.control; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.BasicClient; +import org.apache.drill.exec.rpc.Response; +import org.apache.drill.exec.rpc.RpcCommand; +import org.apache.drill.exec.rpc.RpcConstants; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.work.batch.ControlMessageHandler; + +public class LocalControlConnectionManager extends ControlConnectionManager { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalControlConnectionManager.class); + + private final ControlConnectionConfig config; + + public LocalControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint localEndpoint) { + super(localEndpoint, localEndpoint); + this.config = config; + } + + @Override + protected BasicClient<?, ControlConnection, BitControl.BitControlHandshake, ?> getNewClient() { + throw new UnsupportedOperationException("LocalControlConnectionManager doesn't support creating a control client"); + } + + @Override + public void runCommand(RpcCommand cmd) { + final int rpcType = cmd.getRpcType().getNumber(); + final ControlMessageHandler messageHandler = config.getMessageHandler(); + + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Received bit com message of type {} over local connection manager", rpcType); + } + + switch (rpcType) { + + case BitControl.RpcType.REQ_CANCEL_FRAGMENT_VALUE: { + final ControlTunnel.SignalFragment signalFragment = ((ControlTunnel.SignalFragment) cmd); + final RpcOutcomeListener<Ack> outcomeListener = signalFragment.getOutcomeListener(); + final Ack ackResponse = messageHandler.cancelFragment(signalFragment.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + case BitControl.RpcType.REQ_CUSTOM_VALUE: { + final ByteBuf[] dataBodies; + final RpcOutcomeListener<BitControl.CustomMessage> outcomeListener; + + if (cmd instanceof ControlTunnel.CustomMessageSender) { + dataBodies = ((ControlTunnel.CustomMessageSender)cmd).getDataBodies(); + outcomeListener = ((ControlTunnel.CustomMessageSender)cmd).getOutcomeListener(); + } else if (cmd instanceof ControlTunnel.SyncCustomMessageSender) { + dataBodies = ((ControlTunnel.SyncCustomMessageSender)cmd).getDataBodies(); + outcomeListener = ((ControlTunnel.SyncCustomMessageSender)cmd).getOutcomeListener(); + } else { + throw new UnsupportedOperationException("Unknown Custom Type control message received"); + } + + DrillBuf reqDrillBuff; + try { + reqDrillBuff = convertToByteBuf(dataBodies); + } catch (Exception ex) { + outcomeListener.failed(new RpcException("Failed to allocate memory while sending request in " + + "LocalControlConnectionManager#convertToByteBuff", ex)); + return; + } finally { + releaseByteBuf(dataBodies); + } + + try { + BitControl.CustomMessage message = (BitControl.CustomMessage) cmd.getMessage(); + final Response response = messageHandler.getHandlerRegistry().handle(message, reqDrillBuff); + DrillBuf responseBuffer; + try { + responseBuffer = convertToByteBuf(response.dBodies); + } catch (Exception ex) { + outcomeListener.failed(new RpcException("Failed to allocate memory while sending response in " + + "LocalControlConnectionManager#convertToByteBuff", ex)); + return; + } finally { + releaseByteBuf(response.dBodies); + } + + // Passed responseBuffer will be owned by consumer + outcomeListener.success((BitControl.CustomMessage)response.pBody, responseBuffer); + } catch (RpcException ex) { + cmd.getOutcomeListener().failed(ex); + } finally { + // Release the reqDrillBuff passed into handler + releaseByteBuf(reqDrillBuff); + } + break; + } + + case BitControl.RpcType.REQ_RECEIVER_FINISHED_VALUE: { + final ControlTunnel.ReceiverFinished receiverFinished = ((ControlTunnel.ReceiverFinished) cmd); + final RpcOutcomeListener<Ack> outcomeListener = receiverFinished.getOutcomeListener(); + final Ack ackResponse = messageHandler.receivingFragmentFinished(receiverFinished.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + case BitControl.RpcType.REQ_FRAGMENT_STATUS_VALUE: { + final ControlTunnel.SendFragmentStatus fragmentStatus = ((ControlTunnel.SendFragmentStatus) cmd); + final RpcOutcomeListener<Ack> outcomeListener = fragmentStatus.getOutcomeListener(); + final Ack ackResponse = messageHandler.requestFragmentStatus(fragmentStatus.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + case BitControl.RpcType.REQ_QUERY_CANCEL_VALUE: { + final ControlTunnel.CancelQuery cancelQuery = ((ControlTunnel.CancelQuery) cmd); + final RpcOutcomeListener<Ack> outcomeListener = cancelQuery.getOutcomeListener(); + final Ack ackResponse = messageHandler.requestQueryCancel(cancelQuery.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + case BitControl.RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: { + final ControlTunnel.SendFragment sendFragment = ((ControlTunnel.SendFragment) cmd); + final RpcOutcomeListener<Ack> outcomeListener = sendFragment.getOutcomeListener(); + + try { + final Ack ackResponse = messageHandler.initializeFragment(sendFragment.getMessage()); + outcomeListener.success(ackResponse, null); + } catch (RpcException ex) { + outcomeListener.failed(ex); + } + break; + } + + case BitControl.RpcType.REQ_QUERY_STATUS_VALUE: { + final ControlTunnel.RequestProfile requestProfile = ((ControlTunnel.RequestProfile) cmd); + final RpcOutcomeListener<UserBitShared.QueryProfile> outcomeListener = requestProfile.getOutcomeListener(); + + try { + final UserBitShared.QueryProfile profile = messageHandler.requestQueryStatus(requestProfile.getMessage()); + outcomeListener.success(profile, null); + } catch (RpcException ex) { + outcomeListener.failed(ex); + } + break; + } + + case BitControl.RpcType.REQ_UNPAUSE_FRAGMENT_VALUE: { + final ControlTunnel.SignalFragment signalFragment = ((ControlTunnel.SignalFragment) cmd); + final RpcOutcomeListener<Ack> outcomeListener = signalFragment.getOutcomeListener(); + final Ack ackResponse = messageHandler.resumeFragment(signalFragment.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + default: + final RpcException rpcException = new RpcException(String.format("Unsupported control request type %s " + + "received on LocalControlConnectionManager", rpcType)); + cmd.getOutcomeListener().failed(rpcException); + } + } + + /** + * Copies ByteBuf in the input array into a single DrillBuf + * @param byteBuffArray - input array of ByteBuf's + * @return DrillBuf - output Drillbuf with all the input bytes. + * @throws OutOfMemoryException + */ + private DrillBuf convertToByteBuf(ByteBuf[] byteBuffArray) throws OutOfMemoryException { + + if (byteBuffArray == null) { + return null; + } + + int bytesToCopy = 0; + for (ByteBuf b : byteBuffArray) { + final int validBytes = b.readableBytes(); + if (0 == validBytes) { + b.release(); + } else { + bytesToCopy += validBytes; + } + } + final DrillBuf drillBuff = config.getAllocator().buffer(bytesToCopy); + + for (ByteBuf b : byteBuffArray) { + final int validBytes = b.readableBytes(); + drillBuff.writeBytes(b, 0, validBytes); + } + + return drillBuff; + } + + /** + * Releases all the ByteBuf inside the input ByteBuf array + * @param byteBuffArray - input array of ByteBuf's + */ + private void releaseByteBuf(ByteBuf[] byteBuffArray) { + if (byteBuffArray != null) { + for (ByteBuf b : byteBuffArray) { + b.release(); + } + } + } + + /** + * Releases the input ByteBuf + * @param byteBuf - input ByteBuf + */ + private void releaseByteBuf(ByteBuf byteBuf) { + if (byteBuf != null) { + byteBuf.release(); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java similarity index 57% copy from exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java copy to exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java index 6bfcbd5..6a4dc21 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java @@ -17,37 +17,25 @@ */ package org.apache.drill.exec.rpc.control; -import org.apache.drill.exec.proto.BitControl.BitControlHandshake; +import org.apache.drill.exec.proto.BitControl; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.rpc.BasicClient; -import org.apache.drill.exec.rpc.ReconnectingConnection; -/** - * Maintains connection between two particular bits. - */ -public class ControlConnectionManager extends ReconnectingConnection<ControlConnection, BitControlHandshake>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnectionManager.class); +public class RemoteControlConnectionManager extends ControlConnectionManager { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteControlConnectionManager.class); private final ControlConnectionConfig config; private final DrillbitEndpoint remoteEndpoint; - public ControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint localEndpoint, - DrillbitEndpoint remoteEndpoint) { - super( - BitControlHandshake.newBuilder() - .setRpcVersion(ControlRpcConfig.RPC_VERSION) - .setEndpoint(localEndpoint) - .build(), - remoteEndpoint.getAddress(), - remoteEndpoint.getControlPort()); - + public RemoteControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint + localEndpoint, DrillbitEndpoint remoteEndpoint) { + super(localEndpoint, remoteEndpoint); this.config = config; this.remoteEndpoint = remoteEndpoint; } @Override - protected BasicClient<?, ControlConnection, BitControlHandshake, ?> getNewClient() { + protected BasicClient<?, ControlConnection, BitControl.BitControlHandshake, ?> getNewClient() { return new ControlClient(config, remoteEndpoint, new CloseHandlerCreator()); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java index 7e286fa..7b05b81 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc.data; +import com.google.protobuf.MessageLite; import io.netty.buffer.ByteBuf; import java.util.concurrent.Semaphore; @@ -43,7 +44,6 @@ public class DataTunnel { private ExecutionControls testControls; private org.slf4j.Logger testLogger; - public DataTunnel(DataConnectionManager manager) { this.manager = manager; } @@ -69,7 +69,7 @@ public class DataTunnel { public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWritableBatch batch) { SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch); - try{ + try { if (isInjectionControlSet) { // Wait for interruption if set. Used to simulate the fragment interruption while the fragment is waiting for // semaphore acquire. We expect the @@ -78,9 +78,9 @@ public class DataTunnel { sendingSemaphore.acquire(); manager.runCommand(b); - }catch(final InterruptedException e){ + } catch (final InterruptedException e) { // Release the buffers first before informing the listener about the interrupt. - for(ByteBuf buffer : batch.getBuffers()) { + for (ByteBuf buffer : batch.getBuffers()) { buffer.release(); } @@ -119,7 +119,7 @@ public class DataTunnel { } } - private class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection> { + private class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection, RpcType, MessageLite> { final FragmentWritableBatch batch; public SendBatchAsyncListen(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) { @@ -129,7 +129,18 @@ public class DataTunnel { @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, DataClientConnection connection) { - connection.send(new ThrottlingOutcomeListener(outcomeListener), RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers()); + connection.send(new ThrottlingOutcomeListener(outcomeListener), getRpcType(), batch.getHeader(), + Ack.class, batch.getBuffers()); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_RECORD_BATCH; + } + + @Override + public MessageLite getMessage() { + return batch.getHeader(); } @Override @@ -139,7 +150,7 @@ public class DataTunnel { @Override public void connectionFailed(FailureType type, Throwable t) { - for(ByteBuf buffer : batch.getBuffers()) { + for (ByteBuf buffer : batch.getBuffers()) { buffer.release(); } super.connectionFailed(type, t); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java index e562b16..963f53a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java @@ -91,14 +91,16 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> } case RpcType.REQ_FRAGMENT_STATUS_VALUE: - bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER)); + final FragmentStatus status = get(pBody, FragmentStatus.PARSER); + requestFragmentStatus(status); // TODO: Support a type of message that has no response. sender.send(ControlRpcConfig.OK); break; case RpcType.REQ_QUERY_CANCEL_VALUE: { final QueryId queryId = get(pBody, QueryId.PARSER); - if (bee.cancelForeman(queryId, null)) { + final Ack cancelStatus = requestQueryCancel(queryId); + if (cancelStatus.getOk()) { sender.send(ControlRpcConfig.OK); } else { sender.send(ControlRpcConfig.FAIL); @@ -108,21 +110,14 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: { final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER); - final DrillbitContext drillbitContext = bee.getContext(); - for(int i = 0; i < fragments.getFragmentCount(); i++) { - startNewFragment(fragments.getFragment(i), drillbitContext); - } + initializeFragment(fragments); sender.send(ControlRpcConfig.OK); break; } case RpcType.REQ_QUERY_STATUS_VALUE: { final QueryId queryId = get(pBody, QueryId.PARSER); - final Foreman foreman = bee.getForemanForQueryId(queryId); - if (foreman == null) { - throw new RpcException("Query not running on node."); - } - final QueryProfile profile = foreman.getQueryManager().getQueryProfile(); + final QueryProfile profile = requestQueryStatus(queryId); sender.send(new Response(RpcType.RESP_QUERY_STATUS, profile)); break; } @@ -145,7 +140,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> * @param fragment * @throws UserRpcException */ - private void startNewFragment(final PlanFragment fragment, final DrillbitContext drillbitContext) + public void startNewFragment(final PlanFragment fragment, final DrillbitContext drillbitContext) throws UserRpcException { logger.debug("Received remote fragment start instruction", fragment); @@ -182,7 +177,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> /* (non-Javadoc) * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle) */ - private Ack cancelFragment(final FragmentHandle handle) { + public Ack cancelFragment(final FragmentHandle handle) { /** * For case 1, see {@link org.apache.drill.exec.work.foreman.QueryManager#cancelExecutingFragments}. * In comments below, "active" refers to fragment states: SENDING, AWAITING_ALLOCATION, RUNNING and @@ -213,7 +208,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> return Acks.OK; } - private Ack resumeFragment(final FragmentHandle handle) { + public Ack resumeFragment(final FragmentHandle handle) { // resume a pending fragment final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(handle); if (manager != null) { @@ -233,7 +228,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> return Acks.OK; } - private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) { + public Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) { final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender()); @@ -254,6 +249,32 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> return Acks.OK; } + public Ack requestFragmentStatus(FragmentStatus status) { + bee.getContext().getWorkBus().statusUpdate( status); + return Acks.OK; + } + + public Ack requestQueryCancel(QueryId queryId) { + return bee.cancelForeman(queryId, null) ? Acks.OK : Acks.FAIL; + } + + public Ack initializeFragment(InitializeFragments fragments) throws RpcException { + final DrillbitContext drillbitContext = bee.getContext(); + for (int i = 0; i < fragments.getFragmentCount(); i++) { + startNewFragment(fragments.getFragment(i), drillbitContext); + } + + return Acks.OK; + } + + public QueryProfile requestQueryStatus(QueryId queryId) throws RpcException { + final Foreman foreman = bee.getForemanForQueryId(queryId); + if (foreman == null) { + throw new RpcException("Query not running on node."); + } + return foreman.getQueryManager().getQueryProfile(); + } + public CustomHandlerRegistry getHandlerRegistry() { return handlerRegistry; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java index 3d051bb..91b2a2a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java @@ -44,11 +44,8 @@ import org.apache.drill.exec.work.EndpointListener; import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.fragment.FragmentExecutor; import org.apache.drill.exec.work.fragment.FragmentStatusReporter; -import org.apache.drill.exec.work.fragment.NonRootFragmentManager; import org.apache.drill.exec.work.fragment.RootFragmentManager; - -import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -141,7 +138,6 @@ public class FragmentsRunner { } } - /** * Set up the non-root fragments for execution. Some may be local, and some may be remote. * Messages are sent immediately, so they may start returning data even before we complete this. @@ -158,17 +154,11 @@ public class FragmentsRunner { * executed there. We need to start up the intermediate fragments first so that they will be * ready once the leaf fragments start producing data. To satisfy both of these, we will * make a pass through the fragments and put them into the remote maps according to their - * leaf/intermediate state, as well as their target drillbit. Also filter the leaf/intermediate - * fragments which are assigned to run on local Drillbit node (or Foreman node) into separate lists. - * - * This will help to schedule local + * leaf/intermediate state, as well as their target drillbit. */ - final Multimap<DrillbitEndpoint, PlanFragment> remoteLeafFragmentMap = ArrayListMultimap.create(); - final List<PlanFragment> localLeafFragmentList = new ArrayList<>(); - final Multimap<DrillbitEndpoint, PlanFragment> remoteIntFragmentMap = ArrayListMultimap.create(); - final List<PlanFragment> localIntFragmentList = new ArrayList<>(); + final Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create(); + final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create(); - final DrillbitEndpoint localDrillbitEndpoint = drillbitContext.getEndpoint(); // record all fragments for status purposes. for (final PlanFragment planFragment : fragments) { @@ -180,44 +170,38 @@ public class FragmentsRunner { foreman.getQueryManager().addFragmentStatusTracker(planFragment, false); if (planFragment.getLeafFragment()) { - updateFragmentCollection(planFragment, localDrillbitEndpoint, localLeafFragmentList, remoteLeafFragmentMap); + leafFragmentMap.put(planFragment.getAssignment(), planFragment); } else { - updateFragmentCollection(planFragment, localDrillbitEndpoint, localIntFragmentList, remoteIntFragmentMap); + intFragmentMap.put(planFragment.getAssignment(), planFragment); } } /* * We need to wait for the intermediates to be sent so that they'll be set up by the time - * the leaves start producing data. We'll use this latch to wait for the responses. + * the leaves start producing data. We'll use this latch to wait for the responses. All the local intermediate + * fragments are submitted locally without creating any actual control connection to itself. * * However, in order not to hang the process if any of the RPC requests fails, we always * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll * know if any submissions did fail. */ - scheduleRemoteIntermediateFragments(remoteIntFragmentMap); - - // Setup local intermediate fragments - for (final PlanFragment fragment : localIntFragmentList) { - startLocalFragment(fragment); - } + scheduleIntermediateFragments(intFragmentMap); injector.injectChecked(foreman.getQueryContext().getExecutionControls(), "send-fragments", ForemanException.class); /* * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through - * the regular sendListener event delivery. + * the regular sendListener event delivery˚. All the local leaf fragments are submitted locally without creating + * any actual control connection to itself. */ - for (final DrillbitEndpoint ep : remoteLeafFragmentMap.keySet()) { - sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null); - } - - // Setup local leaf fragments - for (final PlanFragment fragment : localLeafFragmentList) { - startLocalFragment(fragment); + for (final DrillbitEndpoint ep : leafFragmentMap.keySet()) { + sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null); } } /** - * Send all the remote fragments belonging to a single target drillbit in one request. + * Send all the remote fragments belonging to a single target drillbit in one request. If the assignment + * DrillbitEndpoint is local Drillbit then {@link Controller#getTunnel(DrillbitEndpoint)} takes care of submitting it + * locally without actually creating a Control Connection to itself. * * @param assignment the drillbit assigned to these fragments * @param fragments the set of fragments @@ -241,41 +225,21 @@ public class FragmentsRunner { } /** - * Add planFragment into either of local fragment list or remote fragment map based on assigned Drillbit Endpoint node - * and the local Drillbit Endpoint. + * Send intermediate fragment to the assigned Drillbit node. If the assignment DrillbitEndpoint is local Drillbit + * then {@link Controller#getTunnel(DrillbitEndpoint)} takes care of submitting it locally without actually creating + * a Control Connection to itself. Throws {@link UserException} in case of failure to send the fragment. * - * @param planFragment plan fragment - * @param localEndPoint local endpoint - * @param localFragmentList local fragment list - * @param remoteFragmentMap remote fragment map + * @param intermediateFragmentMap - Map of Drillbit Endpoint to list of intermediate PlanFragment's */ - private void updateFragmentCollection(final PlanFragment planFragment, final DrillbitEndpoint localEndPoint, - final List<PlanFragment> localFragmentList, - final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) { - final DrillbitEndpoint assignedDrillbit = planFragment.getAssignment(); + private void scheduleIntermediateFragments(final Multimap<DrillbitEndpoint, PlanFragment> intermediateFragmentMap) { - if (assignedDrillbit.equals(localEndPoint)) { - localFragmentList.add(planFragment); - } else { - remoteFragmentMap.put(assignedDrillbit, planFragment); - } - } - - /** - * Send remote intermediate fragment to the assigned Drillbit node. - * Throw exception in case of failure to send the fragment. - * - * @param remoteFragmentMap - Map of Drillbit Endpoint to list of PlanFragment's - */ - private void scheduleRemoteIntermediateFragments(final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) { - - final int numIntFragments = remoteFragmentMap.keySet().size(); + final int numIntFragments = intermediateFragmentMap.keySet().size(); final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments); final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures(); // send remote intermediate fragments - for (final DrillbitEndpoint ep : remoteFragmentMap.keySet()) { - sendRemoteFragments(ep, remoteFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures); + for (final DrillbitEndpoint ep : intermediateFragmentMap.keySet()) { + sendRemoteFragments(ep, intermediateFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures); } final long timeout = drillbitContext.getOptionManager().getLong(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT) * numIntFragments; @@ -313,29 +277,6 @@ public class FragmentsRunner { } } - - /** - * Start the locally assigned leaf or intermediate fragment - * - * @param fragment fragment - */ - private void startLocalFragment(final PlanFragment fragment) throws ExecutionSetupException { - logger.debug("Received local fragment start instruction", fragment); - - final FragmentContextImpl fragmentContext = new FragmentContextImpl(drillbitContext, fragment, drillbitContext.getFunctionImplementationRegistry()); - final FragmentStatusReporter statusReporter = new FragmentStatusReporter(fragmentContext); - final FragmentExecutor fragmentExecutor = new FragmentExecutor(fragmentContext, fragment, statusReporter); - - // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf. - if (fragment.getLeafFragment()) { - bee.addFragmentRunner(fragmentExecutor); - } else { - // isIntermediate, store for incoming data. - final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, fragmentExecutor, statusReporter); - drillbitContext.getWorkBus().addFragmentManager(manager); - } - } - /** * Used by {@link FragmentSubmitListener} to track the number of submission failures. */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java index eb57658..c0c5b04 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java @@ -40,12 +40,9 @@ public class FragmentStatusReporter implements AutoCloseable { protected final AtomicReference<DrillbitEndpoint> foremanDrillbit; - protected final DrillbitEndpoint localDrillbit; - public FragmentStatusReporter(final ExecutorFragmentContext context) { this.context = context; this.foremanDrillbit = new AtomicReference<>(context.getForemanEndpoint()); - this.localDrillbit = context.getEndpoint(); } /** @@ -119,14 +116,10 @@ public class FragmentStatusReporter implements AutoCloseable { return; } - if (localDrillbit.equals(foremanNode)) { - // Update the status locally - context.getWorkEventbus().statusUpdate(status); - } else { - // Send the status via Control Tunnel to remote foreman node - final ControlTunnel tunnel = context.getController().getTunnel(foremanNode); - tunnel.sendFragmentStatus(status); - } + // Send status for both local and remote foreman node via Tunnel. For local there won't be any network connection + // created and it will be submitted locally using LocalControlConnectionManager + final ControlTunnel tunnel = context.getController().getTunnel(foremanNode); + tunnel.sendFragmentStatus(status); } /** diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistryTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistryTest.java new file mode 100644 index 0000000..667e440 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistryTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.control; + +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class ConnectionManagerRegistryTest { + + private static final DrillbitEndpoint localEndpoint = DrillbitEndpoint.newBuilder() + .setAddress("10.0.0.1") + .setControlPort(31012) + .setDataPort(31011) + .setUserPort(31010) + .setState(DrillbitEndpoint.State.STARTUP) + .build(); + + private static final DrillbitEndpoint foremanEndpoint = DrillbitEndpoint.newBuilder() + .setAddress("10.0.0.2") + .setControlPort(31012) + .setDataPort(31011) + .setUserPort(31010) + .setState(DrillbitEndpoint.State.STARTUP) + .build(); + + private static ControlConnectionConfig config; + + @BeforeClass + public static void setup() { + config = mock(ControlConnectionConfig.class); + } + + @Test + public void testLocalConnectionManager() { + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(config); + registry.setLocalEndpoint(localEndpoint); + final ControlConnectionManager manager = registry.getConnectionManager(localEndpoint); + assertTrue(registry.iterator().hasNext()); + assertEquals(manager, registry.iterator().next()); + assertTrue(manager instanceof LocalControlConnectionManager); + } + + @Test + public void testLocalConnectionManager_differentState() { + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(config); + registry.setLocalEndpoint(localEndpoint); + + final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder().setState(DrillbitEndpoint.State.ONLINE).build(); + final ControlConnectionManager manager = registry.getConnectionManager(foremanEndpoint2); + assertTrue(registry.iterator().hasNext()); + assertEquals(manager, registry.iterator().next()); + assertTrue(manager instanceof LocalControlConnectionManager); + } + + @Test + public void testLocalConnectionManager_differentUserDataPort() { + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(config); + registry.setLocalEndpoint(localEndpoint); + + final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder() + .setState(DrillbitEndpoint.State.ONLINE) + .setUserPort(10000) + .setDataPort(11000) + .build(); + final ControlConnectionManager manager = registry.getConnectionManager(foremanEndpoint2); + assertTrue(registry.iterator().hasNext()); + assertEquals(manager, registry.iterator().next()); + assertTrue(manager instanceof LocalControlConnectionManager); + } + + @Test + public void testRemoteConnectionManager() { + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(config); + registry.setLocalEndpoint(localEndpoint); + final ControlConnectionManager manager = registry.getConnectionManager(foremanEndpoint); + assertTrue(registry.iterator().hasNext()); + assertTrue(manager instanceof RemoteControlConnectionManager); + } + + @Test + public void testRemoteConnectionManager_differentControlPort() { + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(config); + registry.setLocalEndpoint(localEndpoint); + + final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder() + .setControlPort(10000) + .build(); + final ControlConnectionManager manager = registry.getConnectionManager(foremanEndpoint2); + assertTrue(registry.iterator().hasNext()); + assertEquals(manager, registry.iterator().next()); + assertTrue(manager instanceof RemoteControlConnectionManager); + } + + @Test + public void testRemoteConnectionManager_differentAddress() { + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(config); + registry.setLocalEndpoint(localEndpoint); + + final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder() + .setAddress("10.0.0.0") + .build(); + final ControlConnectionManager manager = registry.getConnectionManager(foremanEndpoint2); + assertTrue(registry.iterator().hasNext()); + assertEquals(manager, registry.iterator().next()); + assertTrue(manager instanceof RemoteControlConnectionManager); + } + + @Test + public void testRemoteAndLocalConnectionManager() { + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(config); + registry.setLocalEndpoint(localEndpoint); + + final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder() + .setAddress("10.0.0.0") + .build(); + + final ControlConnectionManager remoteManager = registry.getConnectionManager(foremanEndpoint2); + final ControlConnectionManager localManager = registry.getConnectionManager(localEndpoint); + + final ControlConnectionManager remoteManager_2 = registry.getConnectionManager(foremanEndpoint2); + final ControlConnectionManager localManager_2 = registry.getConnectionManager(localEndpoint); + + assertTrue(remoteManager instanceof RemoteControlConnectionManager); + assertEquals(remoteManager, remoteManager_2); + + assertTrue(localManager instanceof LocalControlConnectionManager); + assertEquals(localManager, localManager_2); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestLocalControlConnectionManager.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestLocalControlConnectionManager.java new file mode 100644 index 0000000..c0cf09b --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestLocalControlConnectionManager.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.control; + +import io.netty.buffer.ByteBuf; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.ExecProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.Acks; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.work.batch.ControlMessageHandler; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestLocalControlConnectionManager { + + private static final DrillbitEndpoint localEndpoint = DrillbitEndpoint.newBuilder() + .setAddress("10.0.0.1") + .setControlPort(31011) + .setState(DrillbitEndpoint.State.STARTUP) + .build(); + + private static ControlConnectionConfig mockConfig; + + private static ControlMessageHandler mockHandler; + + private static ControlTunnel controlTunnel; + + private static CountDownLatch latch; + + private static final String NEGATIVE_ACK_MESSAGE = "Negative Ack received"; + + private static final RpcOutcomeListener<GeneralRPCProtos.Ack> outcomeListener = + new RpcOutcomeListener<GeneralRPCProtos.Ack>() { + @Override + public void failed(RpcException ex) { + throw new IllegalStateException(ex); + } + + @Override + public void success(GeneralRPCProtos.Ack value, ByteBuf buffer) { + if (value.getOk()) { + latch.countDown(); + } else { + throw new IllegalStateException(NEGATIVE_ACK_MESSAGE); + } + } + + @Override + public void interrupted(InterruptedException e) { + // Do nothing + } + }; + + @Rule + public ExpectedException exceptionThrown = ExpectedException.none(); + + @BeforeClass + public static void setup() { + mockConfig = mock(ControlConnectionConfig.class); + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(mockConfig); + registry.setLocalEndpoint(localEndpoint); + ControlConnectionManager manager = registry.getConnectionManager(localEndpoint); + assertTrue(manager instanceof LocalControlConnectionManager); + controlTunnel = new ControlTunnel(manager); + } + + @Before + public void setupForTest() { + mockHandler = mock(ControlMessageHandler.class); + when(mockConfig.getMessageHandler()).thenReturn(mockHandler); + } + + /** + * Verify that SendFragmentStatus is handled correctly using ControlTunnel with LocalControlConnectionManager + */ + @Test + public void testLocalSendFragmentStatus_Success() throws Exception { + final UserBitShared.QueryId mockQueryId = UserBitShared.QueryId.getDefaultInstance(); + final UserBitShared.QueryProfile mockProfile = UserBitShared.QueryProfile.getDefaultInstance(); + when(mockHandler.requestQueryStatus(mockQueryId)).thenReturn(mockProfile); + final UserBitShared.QueryProfile returnedProfile = controlTunnel.requestQueryProfile(mockQueryId).checkedGet(); + assertEquals(returnedProfile, mockProfile); + } + + /** + * Verify that SendFragmentStatus failure scenario is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testLocalSendFragmentStatus_Failure() throws Exception { + final UserBitShared.QueryId mockQueryId = UserBitShared.QueryId.getDefaultInstance(); + final String exceptionMessage = "Testing failure case"; + exceptionThrown.expect(RpcException.class); + exceptionThrown.expectMessage(exceptionMessage); + when(mockHandler.requestQueryStatus(mockQueryId)).thenThrow(new RpcException(exceptionMessage)); + controlTunnel.requestQueryProfile(mockQueryId).checkedGet(); + } + + /** + * Verify that CancelFragment with positive ack is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testLocalCancelFragment_PositiveAck() throws Exception { + final ExecProtos.FragmentHandle mockHandle = ExecProtos.FragmentHandle.getDefaultInstance(); + latch = new CountDownLatch(1); + final GeneralRPCProtos.Ack mockResponse = Acks.OK; + when(mockHandler.cancelFragment(mockHandle)).thenReturn(mockResponse); + controlTunnel.cancelFragment(outcomeListener, mockHandle); + latch.await(); + } + + /** + * Verify that CancelFragment with negative ack is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testLocalCancelFragment_NegativeAck() throws Exception { + final ExecProtos.FragmentHandle mockHandle = ExecProtos.FragmentHandle.getDefaultInstance(); + latch = new CountDownLatch(1); + exceptionThrown.expect(IllegalStateException.class); + exceptionThrown.expectMessage(NEGATIVE_ACK_MESSAGE); + final GeneralRPCProtos.Ack mockResponse = Acks.FAIL; + when(mockHandler.cancelFragment(mockHandle)).thenReturn(mockResponse); + controlTunnel.cancelFragment(outcomeListener, mockHandle); + latch.await(); + } + + /** + * Verify that InitializeFragments with positive ack is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testLocalSendFragments_PositiveAck() throws Exception { + final BitControl.InitializeFragments mockFragments = BitControl.InitializeFragments.getDefaultInstance(); + latch = new CountDownLatch(1); + final GeneralRPCProtos.Ack mockResponse = Acks.OK; + when(mockHandler.initializeFragment(mockFragments)).thenReturn(mockResponse); + controlTunnel.sendFragments(outcomeListener, mockFragments); + latch.await(); + } + + /** + * Verify that InitializeFragments with negative ack is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testLocalSendFragments_NegativeAck() throws Exception { + final BitControl.InitializeFragments mockFragments = BitControl.InitializeFragments.getDefaultInstance(); + latch = new CountDownLatch(1); + exceptionThrown.expect(IllegalStateException.class); + exceptionThrown.expectMessage(NEGATIVE_ACK_MESSAGE); + final GeneralRPCProtos.Ack mockResponse = Acks.FAIL; + when(mockHandler.initializeFragment(mockFragments)).thenReturn(mockResponse); + controlTunnel.sendFragments(outcomeListener, mockFragments); + latch.await(); + } + + /** + * Verify that InitializeFragments failure case is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testLocalSendFragments_Failure() throws Exception { + final BitControl.InitializeFragments mockFragments = BitControl.InitializeFragments.getDefaultInstance(); + latch = new CountDownLatch(1); + exceptionThrown.expect(IllegalStateException.class); + exceptionThrown.expectCause(new TypeSafeMatcher<Throwable>(RpcException.class) { + @Override + protected boolean matchesSafely(Throwable throwable) { + return (throwable != null && throwable instanceof RpcException); + } + + @Override + public void describeTo(Description description) { + // Do nothing + } + }); + when(mockHandler.initializeFragment(mockFragments)).thenThrow(new RpcException("Failed to initialize")); + controlTunnel.sendFragments(outcomeListener, mockFragments); + latch.await(); + } + + /** + * Verify that UnpauseFragment is handled correctly using ControlTunnel with LocalControlConnectionManager + */ + @Test + public void testUnpauseFragments() throws Exception { + final ExecProtos.FragmentHandle mockHandle = ExecProtos.FragmentHandle.getDefaultInstance(); + latch = new CountDownLatch(1); + final GeneralRPCProtos.Ack mockResponse = Acks.OK; + when(mockHandler.resumeFragment(mockHandle)).thenReturn(mockResponse); + controlTunnel.unpauseFragment(outcomeListener, mockHandle); + latch.await(); + } + + /** + * Verify that RequestQueryStatus is handled correctly using ControlTunnel with LocalControlConnectionManager + */ + @Test + public void testRequestQueryStatus() throws Exception { + final UserBitShared.QueryId mockQueryId = UserBitShared.QueryId.getDefaultInstance(); + final UserBitShared.QueryProfile mockProfile = UserBitShared.QueryProfile.getDefaultInstance(); + when(mockHandler.requestQueryStatus(mockQueryId)).thenReturn(mockProfile); + final UserBitShared.QueryProfile returnedProfile = controlTunnel.requestQueryProfile(mockQueryId).checkedGet(); + assertEquals(returnedProfile, mockProfile); + } + + /** + * Verify that CancelQuery with positive ack is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testCancelQuery_PositiveAck() throws Exception { + final UserBitShared.QueryId mockQueryId = UserBitShared.QueryId.getDefaultInstance(); + final GeneralRPCProtos.Ack mockResponse = Acks.OK; + when(mockHandler.requestQueryCancel(mockQueryId)).thenReturn(mockResponse); + GeneralRPCProtos.Ack response = controlTunnel.requestCancelQuery(mockQueryId).checkedGet(); + assertEquals(response, mockResponse); + } + + /** + * Verify that CancelQuery with negative ack is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testCancelQuery_NegativeAck() throws Exception { + final UserBitShared.QueryId mockQueryId = UserBitShared.QueryId.getDefaultInstance(); + final GeneralRPCProtos.Ack mockResponse = Acks.FAIL; + when(mockHandler.requestQueryCancel(mockQueryId)).thenReturn(mockResponse); + GeneralRPCProtos.Ack response = controlTunnel.requestCancelQuery(mockQueryId).checkedGet(); + assertEquals(response, mockResponse); + } + + /** + * Verify that FinishedReceiver is handled correctly using ControlTunnel with LocalControlConnectionManager + */ + @Test + public void testInformReceiverFinished_success() throws Exception { + final BitControl.FinishedReceiver finishedReceiver = BitControl.FinishedReceiver.getDefaultInstance(); + latch = new CountDownLatch(1); + final GeneralRPCProtos.Ack mockResponse = Acks.OK; + when(mockHandler.receivingFragmentFinished(finishedReceiver)).thenReturn(mockResponse); + controlTunnel.informReceiverFinished(outcomeListener, finishedReceiver); + latch.await(); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java index aaff154..04d54d5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java @@ -19,16 +19,16 @@ package org.apache.drill.exec.rpc.user.security; import com.google.common.collect.Lists; import com.typesafe.config.ConfigValueFactory; -import org.apache.drill.test.BaseTestQuery; import org.apache.drill.categories.SecurityTest; -import org.apache.drill.common.config.DrillProperties; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.config.DrillProperties; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.rpc.control.ControlRpcMetrics; import org.apache.drill.exec.rpc.data.DataRpcMetrics; import org.apache.drill.exec.rpc.security.KerberosHelper; import org.apache.drill.exec.rpc.user.UserRpcMetrics; import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl; +import org.apache.drill.test.BaseTestQuery; import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.security.authentication.util.KerberosUtil; import org.apache.kerby.kerberos.kerb.client.JaasKrbUtil; @@ -178,6 +178,38 @@ public class TestUserBitKerberos extends BaseTestQuery { assertTrue(0 == DataRpcMetrics.getInstance().getUnEncryptedConnectionCount()); } + @Test + public void testUnecryptedConnectionCounter_LocalControlMessage() throws Exception { + final Properties connectionProps = new Properties(); + connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL); + connectionProps.setProperty(DrillProperties.KERBEROS_FROM_SUBJECT, "true"); + final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(krbHelper.CLIENT_PRINCIPAL, + krbHelper.clientKeytab.getAbsoluteFile()); + + Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + updateClient(connectionProps); + return null; + } + }); + + // Run query on memory system table this sends remote fragments to all Drillbit and Drillbits then send data + // using data channel. In this test we have only 1 Drillbit so there should not be any control connection but a + // local data connections + testSql("SELECT * FROM sys.memory"); + + // Check encrypted counters value + assertTrue(0 == UserRpcMetrics.getInstance().getEncryptedConnectionCount()); + assertTrue(0 == ControlRpcMetrics.getInstance().getEncryptedConnectionCount()); + assertTrue(0 == DataRpcMetrics.getInstance().getEncryptedConnectionCount()); + + // Check unencrypted counters value + assertTrue(1 == UserRpcMetrics.getInstance().getUnEncryptedConnectionCount()); + assertTrue(0 == ControlRpcMetrics.getInstance().getUnEncryptedConnectionCount()); + assertTrue(2 == DataRpcMetrics.getInstance().getUnEncryptedConnectionCount()); + } + @AfterClass public static void cleanTest() throws Exception { krbHelper.stopKdc(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java index 83c1eca..a119a8c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java @@ -17,54 +17,44 @@ */ package org.apache.drill.exec.work.fragment; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.rpc.control.Controller; -import org.apache.drill.exec.rpc.control.WorkEventBus; -import org.junit.Before; -import org.junit.Test; - import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContextImpl; import org.apache.drill.exec.ops.FragmentStats; import org.apache.drill.exec.proto.BitControl.FragmentStatus; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.FragmentState; import org.apache.drill.exec.rpc.control.ControlTunnel; +import org.apache.drill.exec.rpc.control.Controller; +import org.junit.Before; +import org.junit.Test; -import static org.junit.Assert.assertTrue; +import static org.apache.drill.exec.proto.UserBitShared.FragmentState.CANCELLATION_REQUESTED; +import static org.apache.drill.exec.proto.UserBitShared.FragmentState.FAILED; +import static org.apache.drill.exec.proto.UserBitShared.FragmentState.RUNNING; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import static org.apache.drill.exec.proto.UserBitShared.FragmentState.CANCELLATION_REQUESTED; -import static org.apache.drill.exec.proto.UserBitShared.FragmentState.FAILED; -import static org.apache.drill.exec.proto.UserBitShared.FragmentState.RUNNING; - public class FragmentStatusReporterTest { private FragmentStatusReporter statusReporter; private ControlTunnel foremanTunnel; - private FragmentContextImpl context; - @Before public void setUp() throws Exception { - context = mock(FragmentContextImpl.class); + final FragmentContextImpl context = mock(FragmentContextImpl.class); Controller controller = mock(Controller.class); - // Create 2 different endpoint such that foremanEndpoint is different than - // localEndpoint - DrillbitEndpoint localEndpoint = DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build(); + // Create Foreman Endpoint and it's tunnel DrillbitEndpoint foremanEndpoint = DrillbitEndpoint.newBuilder().setAddress("10.0.0.2").build(); foremanTunnel = mock(ControlTunnel.class); - when(context.getEndpoint()).thenReturn(localEndpoint); when(context.getController()).thenReturn(controller); when(controller.getTunnel(foremanEndpoint)).thenReturn(foremanTunnel); @@ -120,49 +110,4 @@ public class FragmentStatusReporterTest { statusReporter.stateChanged(CANCELLATION_REQUESTED); verify(foremanTunnel).sendFragmentStatus(any(FragmentStatus.class)); } - - - /** - * With LocalEndpoint and Foreman Endpoint being same node, test that status change - * message doesn't happen via Control Tunnel instead it happens locally via WorkEventBus - * - * @throws Exception - */ - @Test - public void testStateChangedLocalForeman() throws Exception { - - DrillbitEndpoint localEndpoint = DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build(); - - when(context.getEndpoint()).thenReturn(localEndpoint); - when(context.getForemanEndpoint()).thenReturn(localEndpoint); - when(context.getWorkEventbus()).thenReturn(mock(WorkEventBus.class)); - - statusReporter = new FragmentStatusReporter(context); - statusReporter.stateChanged(RUNNING); - verifyZeroInteractions(foremanTunnel); - verify(context.getWorkEventbus()).statusUpdate(any(FragmentStatus.class)); - } - - /** - * With LocalEndpoint and Foreman Endpoint being same node, test that after close of - * FragmentStatusReporter, status update doesn't happen either through Control Tunnel - * or through WorkEventBus. - * - * @throws Exception - */ - @Test - public void testCloseLocalForeman() throws Exception { - DrillbitEndpoint localEndpoint = DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build(); - - when(context.getEndpoint()).thenReturn(localEndpoint); - when(context.getForemanEndpoint()).thenReturn(localEndpoint); - when(context.getWorkEventbus()).thenReturn(mock(WorkEventBus.class)); - statusReporter = new FragmentStatusReporter(context); - - statusReporter.close(); - assertTrue(statusReporter.foremanDrillbit.get() == null); - statusReporter.stateChanged(RUNNING); - verifyZeroInteractions(foremanTunnel); - verify(context.getWorkEventbus(), never()).statusUpdate(any(FragmentStatus.class)); - } } diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java index 90b2d19..2437ddb 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java @@ -17,20 +17,24 @@ */ package org.apache.drill.exec.rpc; -import io.netty.buffer.ByteBuf; - import com.google.common.util.concurrent.SettableFuture; +import com.google.protobuf.Internal.EnumLite; import com.google.protobuf.MessageLite; +import io.netty.buffer.ByteBuf; -public abstract class FutureBitCommand<T extends MessageLite, C extends RemoteConnection> implements RpcCommand<T,C> { +public abstract class FutureBitCommand<T extends MessageLite, C extends RemoteConnection, + E extends EnumLite, M extends MessageLite> implements RpcCommand<T, C, E, M> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FutureBitCommand.class); protected final SettableFuture<T> settableFuture; private final RpcCheckedFuture<T> parentFuture; + private final RpcOutcomeListener<T> outcomeListener; + public FutureBitCommand() { this.settableFuture = SettableFuture.create(); this.parentFuture = new RpcCheckedFuture<T>(settableFuture); + outcomeListener = new DeferredRpcOutcome(); } public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, C connection); @@ -38,7 +42,7 @@ public abstract class FutureBitCommand<T extends MessageLite, C extends RemoteCo @Override public void connectionAvailable(C connection) { - doRpcCall(new DeferredRpcOutcome(), connection); + doRpcCall(outcomeListener, connection); } @Override @@ -72,9 +76,13 @@ public abstract class FutureBitCommand<T extends MessageLite, C extends RemoteCo } @Override + public RpcOutcomeListener<T> getOutcomeListener() { + return outcomeListener; + } + + @Override public void connectionFailed(FailureType type, Throwable t) { settableFuture.setException(RpcException.mapException( String.format("Command failed while establishing connection. Failure type %s.", type), t)); } - -} \ No newline at end of file +} diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java index c9b8b3a..964abf4 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java @@ -17,11 +17,11 @@ */ package org.apache.drill.exec.rpc; -import io.netty.buffer.ByteBuf; - +import com.google.protobuf.Internal.EnumLite; import com.google.protobuf.MessageLite; -public abstract class ListeningCommand<T extends MessageLite, C extends RemoteConnection> implements RpcCommand<T, C> { +public abstract class ListeningCommand<T extends MessageLite, C extends RemoteConnection, + E extends EnumLite, M extends MessageLite> implements RpcCommand<T, C, E, M> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListeningCommand.class); private final RpcOutcomeListener<T> listener; @@ -35,7 +35,7 @@ public abstract class ListeningCommand<T extends MessageLite, C extends RemoteCo @Override public void connectionAvailable(C connection) { - doRpcCall(new DeferredRpcOutcome(), connection); + doRpcCall(listener, connection); } @Override @@ -43,28 +43,15 @@ public abstract class ListeningCommand<T extends MessageLite, C extends RemoteCo connectionAvailable(connection); } - private class DeferredRpcOutcome implements RpcOutcomeListener<T> { - - @Override - public void failed(RpcException ex) { - listener.failed(ex); - } - - @Override - public void success(T value, ByteBuf buf) { - listener.success(value, buf); - } - - @Override - public void interrupted(final InterruptedException e) { - listener.interrupted(e); - } - } - @Override public void connectionFailed(FailureType type, Throwable t) { listener.failed(RpcException.mapException( String.format("Command failed while establishing connection. Failure type %s.", type), t)); } -} \ No newline at end of file + @Override + public RpcOutcomeListener<T> getOutcomeListener() { + return listener; + } + +} diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java index fec3962..c648a16 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc; +import com.google.protobuf.Internal.EnumLite; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.GenericFutureListener; @@ -52,7 +53,8 @@ public abstract class ReconnectingConnection<C extends ClientConnection, HS exte protected abstract BasicClient<?, C, HS, ?> getNewClient(); - public <T extends MessageLite, R extends RpcCommand<T, C>> void runCommand(R cmd) { + public <T extends MessageLite, E extends EnumLite, M extends MessageLite, + R extends RpcCommand<T, C, E, M>> void runCommand(R cmd) { // if(logger.isDebugEnabled()) logger.debug(String.format("Running command %s sending to host %s:%d", cmd, host, port)); C connection = connectionHolder.get(); if (connection != null) { @@ -78,7 +80,7 @@ public abstract class ReconnectingConnection<C extends ClientConnection, HS exte } else { // logger.debug("No connection active, opening client connection."); BasicClient<?, C, HS, ?> client = getNewClient(); - ConnectionListeningFuture<T> future = new ConnectionListeningFuture<>(cmd); + ConnectionListeningFuture<T,E,M> future = new ConnectionListeningFuture<>(cmd); client.connectAsClient(future, handshake, host, port); future.waitAndRun(); // logger.debug("Connection available and active, command now being run inline."); @@ -88,13 +90,13 @@ public abstract class ReconnectingConnection<C extends ClientConnection, HS exte } } - public class ConnectionListeningFuture<R extends MessageLite> + public class ConnectionListeningFuture<R extends MessageLite, E extends EnumLite, M extends MessageLite> extends AbstractFuture<C> implements RpcConnectionHandler<C> { - private RpcCommand<R, C> cmd; + private RpcCommand<R, C, E, M> cmd; - public ConnectionListeningFuture(RpcCommand<R, C> cmd) { + public ConnectionListeningFuture(RpcCommand<R, C, E, M> cmd) { super(); this.cmd = cmd; } @@ -173,9 +175,7 @@ public abstract class ReconnectingConnection<C extends ClientConnection, HS exte incoming.getChannel().close(); } set(connection); - } - } /** Factory for close handlers **/ @@ -204,7 +204,6 @@ public abstract class ReconnectingConnection<C extends ClientConnection, HS exte connectionHolder.compareAndSet(connection, null); parent.operationComplete(future); } - } public CloseHandlerCreator getCloseHandlerCreator() { @@ -223,5 +222,4 @@ public abstract class ReconnectingConnection<C extends ClientConnection, HS exte c.getChannel().close(); } } - } diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java index 1bb39ac..821b8fb 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java @@ -17,10 +17,17 @@ */ package org.apache.drill.exec.rpc; +import com.google.protobuf.Internal.EnumLite; import com.google.protobuf.MessageLite; -public interface RpcCommand<T extends MessageLite, C extends RemoteConnection> extends RpcConnectionHandler<C>{ +public interface RpcCommand<T extends MessageLite, C extends RemoteConnection, + E extends EnumLite, M extends MessageLite> extends RpcConnectionHandler<C> { - public abstract void connectionAvailable(C connection); + void connectionAvailable(C connection); -} \ No newline at end of file + E getRpcType(); + + M getMessage(); + + RpcOutcomeListener<T> getOutcomeListener(); +} -- To stop receiving notification emails like this one, please contact ar...@apache.org.