[ https://issues.apache.org/jira/browse/DRILL-6255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481026#comment-16481026 ]
ASF GitHub Bot commented on DRILL-6255: --------------------------------------- asfgit closed pull request #1253: DRILL-6255: Drillbit while sending control message to itself creates … URL: https://github.com/apache/drill/pull/1253 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java index c3980336f7..0bc01f5367 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java @@ -103,6 +103,24 @@ void prepareSaslHandshake(final RpcConnectionHandler<CC> connectionHandler, List } } + /** + * Verifies if local and remote Drillbit Endpoint has same control server by using address and control port + * information. This method is used instead of equals in {@link DrillbitEndpoint} because DrillbitEndpoint stores + * state information in it. + * For local Drillbit a reference is stored in {@link org.apache.drill.exec.server.DrillbitContext} as soon as + * Drillbit is started in {@link org.apache.drill.exec.service.ServiceEngine#start} with state as STARTUP, but + * while planning minor fragment the assignment list is used from active list of Drillbits in which state for local + * Drillbit will not be STARTUP + * @param local - DrillbitEndpoint instance for local bit + * @param remote - DrillbitEndpoint instance for remote bit + * @return true if address and control port for local and remote are same. + * false - otherwise + */ + public static boolean isLocalControlServer(DrillbitEndpoint local, DrillbitEndpoint remote) { + return local.hasAddress() && local.hasControlPort() && remote.hasAddress() && remote.hasControlPort() && + local.getAddress().equals(remote.getAddress()) && local.getControlPort() == remote.getControlPort(); + } + // Suppress default constructor private BitRpcUtility() { } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java index 800cf3cc85..e27729c1f4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java @@ -23,6 +23,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.google.common.collect.Maps; +import org.apache.drill.exec.rpc.BitRpcUtility; public class ConnectionManagerRegistry implements Iterable<ControlConnectionManager> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionManagerRegistry.class); @@ -40,9 +41,12 @@ public ConnectionManagerRegistry(ControlConnectionConfig config) { public ControlConnectionManager getConnectionManager(DrillbitEndpoint remoteEndpoint) { assert localEndpoint != null : "DrillbitEndpoint must be set before a connection manager can be retrieved"; + + final boolean isLocalServer = BitRpcUtility.isLocalControlServer(localEndpoint, remoteEndpoint); ControlConnectionManager m = registry.get(remoteEndpoint); if (m == null) { - m = new ControlConnectionManager(config, localEndpoint, remoteEndpoint); + m = (isLocalServer) ? new LocalControlConnectionManager(config, remoteEndpoint) + : new RemoteControlConnectionManager(config, localEndpoint, remoteEndpoint); final ControlConnectionManager m2 = registry.putIfAbsent(remoteEndpoint, m); if (m2 != null) { m = m2; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java index b19fb8bb75..f114af44de 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc.control; +import com.google.common.annotations.VisibleForTesting; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.rpc.BitConnectionConfig; @@ -24,8 +25,8 @@ import org.apache.drill.exec.work.batch.ControlMessageHandler; // config for bit to bit connection -// package private -class ControlConnectionConfig extends BitConnectionConfig { +@VisibleForTesting +public class ControlConnectionConfig extends BitConnectionConfig { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnectionConfig.class); private final ControlMessageHandler handler; @@ -41,8 +42,8 @@ public String getName() { return "control"; // unused } - ControlMessageHandler getMessageHandler() { + @VisibleForTesting + public ControlMessageHandler getMessageHandler() { return handler; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java index 6bfcbd5d6e..240421ef70 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java @@ -25,14 +25,10 @@ /** * Maintains connection between two particular bits. */ -public class ControlConnectionManager extends ReconnectingConnection<ControlConnection, BitControlHandshake>{ +public abstract class ControlConnectionManager extends ReconnectingConnection<ControlConnection, BitControlHandshake>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnectionManager.class); - private final ControlConnectionConfig config; - private final DrillbitEndpoint remoteEndpoint; - - public ControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint localEndpoint, - DrillbitEndpoint remoteEndpoint) { + public ControlConnectionManager(DrillbitEndpoint localEndpoint, DrillbitEndpoint remoteEndpoint) { super( BitControlHandshake.newBuilder() .setRpcVersion(ControlRpcConfig.RPC_VERSION) @@ -40,14 +36,8 @@ public ControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint .build(), remoteEndpoint.getAddress(), remoteEndpoint.getControlPort()); - - this.config = config; - this.remoteEndpoint = remoteEndpoint; } @Override - protected BasicClient<?, ControlConnection, BitControlHandshake, ?> getNewClient() { - return new ControlClient(config, remoteEndpoint, new CloseHandlerCreator()); - } - + protected abstract BasicClient<?, ControlConnection, BitControlHandshake, ?> getNewClient(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java index 1a4af9054f..492d4de625 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java @@ -17,11 +17,20 @@ */ package org.apache.drill.exec.rpc.control; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; import io.netty.buffer.ByteBuf; import io.netty.buffer.DrillBuf; - -import java.util.concurrent.TimeUnit; - import org.apache.drill.exec.proto.BitControl.CustomMessage; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FragmentStatus; @@ -38,19 +47,7 @@ import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.control.Controller.CustomSerDe; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.MessageLite; -import com.google.protobuf.Parser; - +import java.util.concurrent.TimeUnit; public class ControlTunnel { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlTunnel.class); @@ -99,8 +96,7 @@ public void informReceiverFinished(RpcOutcomeListener<Ack> outcomeListener, Fini return b.getFuture(); } - - public static class SendFragmentStatus extends FutureBitCommand<Ack, ControlConnection> { + public static class SendFragmentStatus extends FutureBitCommand<Ack, ControlConnection, RpcType, FragmentStatus> { final FragmentStatus status; public SendFragmentStatus(FragmentStatus status) { @@ -110,13 +106,22 @@ public SendFragmentStatus(FragmentStatus status) { @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.sendUnsafe(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, status, Ack.class); + connection.sendUnsafe(outcomeListener, getRpcType(), status, Ack.class); } - } + @Override + public RpcType getRpcType() { + return RpcType.REQ_FRAGMENT_STATUS; + } + @Override + public FragmentStatus getMessage() { + return status; + } - public static class ReceiverFinished extends ListeningCommand<Ack, ControlConnection> { + } + + public static class ReceiverFinished extends ListeningCommand<Ack, ControlConnection, RpcType, FinishedReceiver> { final FinishedReceiver finishedReceiver; public ReceiverFinished(RpcOutcomeListener<Ack> listener, FinishedReceiver finishedReceiver) { @@ -126,11 +131,21 @@ public ReceiverFinished(RpcOutcomeListener<Ack> listener, FinishedReceiver finis @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_RECEIVER_FINISHED, finishedReceiver, Ack.class); + connection.send(outcomeListener, getRpcType(), finishedReceiver, Ack.class); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_RECEIVER_FINISHED; + } + + @Override + public FinishedReceiver getMessage() { + return finishedReceiver; } } - public static class SignalFragment extends ListeningCommand<Ack, ControlConnection> { + public static class SignalFragment extends ListeningCommand<Ack, ControlConnection, RpcType, FragmentHandle> { final FragmentHandle handle; final RpcType type; @@ -145,9 +160,18 @@ public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection.sendUnsafe(outcomeListener, type, handle, Ack.class); } + @Override + public RpcType getRpcType() { + return type; + } + + @Override + public FragmentHandle getMessage() { + return handle; + } } - public static class SendFragment extends ListeningCommand<Ack, ControlConnection> { + public static class SendFragment extends ListeningCommand<Ack, ControlConnection, RpcType, InitializeFragments> { final InitializeFragments fragments; public SendFragment(RpcOutcomeListener<Ack> listener, InitializeFragments fragments) { @@ -157,12 +181,21 @@ public SendFragment(RpcOutcomeListener<Ack> listener, InitializeFragments fragme @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_INITIALIZE_FRAGMENTS, fragments, Ack.class); + connection.send(outcomeListener, getRpcType(), fragments, Ack.class); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_INITIALIZE_FRAGMENTS; } + @Override + public InitializeFragments getMessage() { + return fragments; + } } - public static class RequestProfile extends FutureBitCommand<QueryProfile, ControlConnection> { + public static class RequestProfile extends FutureBitCommand<QueryProfile, ControlConnection, RpcType, QueryId> { final QueryId queryId; public RequestProfile(QueryId queryId) { @@ -172,11 +205,21 @@ public RequestProfile(QueryId queryId) { @Override public void doRpcCall(RpcOutcomeListener<QueryProfile> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_QUERY_STATUS, queryId, QueryProfile.class); + connection.send(outcomeListener, getRpcType(), queryId, QueryProfile.class); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_QUERY_STATUS; + } + + @Override + public QueryId getMessage() { + return queryId; } } - public static class CancelQuery extends FutureBitCommand<Ack, ControlConnection> { + public static class CancelQuery extends FutureBitCommand<Ack, ControlConnection, RpcType, QueryId> { final QueryId queryId; public CancelQuery(QueryId queryId) { @@ -186,7 +229,17 @@ public CancelQuery(QueryId queryId) { @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_QUERY_CANCEL, queryId, Ack.class); + connection.send(outcomeListener, getRpcType(), queryId, Ack.class); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_QUERY_CANCEL; + } + + @Override + public QueryId getMessage() { + return queryId; } } @@ -204,8 +257,8 @@ public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection return new CustomTunnel<SEND, RECEIVE>(messageTypeId, send, receive); } - - private static class CustomMessageSender extends ListeningCommand<CustomMessage, ControlConnection> { + public static class CustomMessageSender extends + ListeningCommand<CustomMessage, ControlConnection, RpcType, CustomMessage> { private CustomMessage message; private ByteBuf[] dataBodies; @@ -218,12 +271,27 @@ public CustomMessageSender(RpcOutcomeListener<CustomMessage> listener, CustomMes @Override public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies); + connection.send(outcomeListener, getRpcType(), message, CustomMessage.class, dataBodies); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_CUSTOM; + } + + @Override + public CustomMessage getMessage() { + return message; + } + + public ByteBuf[] getDataBodies() { + return dataBodies; } } - private static class SyncCustomMessageSender extends FutureBitCommand<CustomMessage, ControlConnection> { + public static class SyncCustomMessageSender extends + FutureBitCommand<CustomMessage, ControlConnection, RpcType, CustomMessage> { private CustomMessage message; private ByteBuf[] dataBodies; @@ -236,7 +304,21 @@ public SyncCustomMessageSender(CustomMessage message, ByteBuf[] dataBodies) { @Override public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies); + connection.send(outcomeListener, getRpcType(), message, CustomMessage.class, dataBodies); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_CUSTOM; + } + + @Override + public CustomMessage getMessage() { + return message; + } + + public ByteBuf[] getDataBodies() { + return dataBodies; } } @@ -261,8 +343,7 @@ public RECEIVE get() throws Exception { return serde.deserializeReceived(message.getMessage().toByteArray()); } - public RECEIVE get(long timeout, TimeUnit unit) throws Exception, - InvalidProtocolBufferException { + public RECEIVE get(long timeout, TimeUnit unit) throws Exception, InvalidProtocolBufferException { CustomMessage message = future.checkedGet(timeout, unit); return serde.deserializeReceived(message.getMessage().toByteArray()); } @@ -270,7 +351,6 @@ public RECEIVE get(long timeout, TimeUnit unit) throws Exception, public DrillBuf getBuffer() throws RpcException { return (DrillBuf) future.getBuffer(); } - } @@ -351,21 +431,15 @@ public void success(CustomMessage value, ByteBuf buffer) { } catch (Exception e) { innerListener.failed(new RpcException("Failure while parsing message locally.", e)); } - } @Override public void interrupted(InterruptedException e) { innerListener.interrupted(e); } - } - } - - - public static class ProtoSerDe<MSG extends MessageLite> implements CustomSerDe<MSG> { private final Parser<MSG> parser; @@ -420,7 +494,5 @@ public JacksonSerDe(Class<MSG> clazz, JsonSerializer<MSG> serializer, JsonDeseri public MSG deserializeReceived(byte[] bytes) throws Exception { return (MSG) reader.readValue(bytes); } - } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java new file mode 100644 index 0000000000..fa6a2e9a55 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.control; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.BasicClient; +import org.apache.drill.exec.rpc.Response; +import org.apache.drill.exec.rpc.RpcCommand; +import org.apache.drill.exec.rpc.RpcConstants; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.work.batch.ControlMessageHandler; + +public class LocalControlConnectionManager extends ControlConnectionManager { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalControlConnectionManager.class); + + private final ControlConnectionConfig config; + + public LocalControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint localEndpoint) { + super(localEndpoint, localEndpoint); + this.config = config; + } + + @Override + protected BasicClient<?, ControlConnection, BitControl.BitControlHandshake, ?> getNewClient() { + throw new UnsupportedOperationException("LocalControlConnectionManager doesn't support creating a control client"); + } + + @Override + public void runCommand(RpcCommand cmd) { + final int rpcType = cmd.getRpcType().getNumber(); + final ControlMessageHandler messageHandler = config.getMessageHandler(); + + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Received bit com message of type {} over local connection manager", rpcType); + } + + switch (rpcType) { + + case BitControl.RpcType.REQ_CANCEL_FRAGMENT_VALUE: { + final ControlTunnel.SignalFragment signalFragment = ((ControlTunnel.SignalFragment) cmd); + final RpcOutcomeListener<Ack> outcomeListener = signalFragment.getOutcomeListener(); + final Ack ackResponse = messageHandler.cancelFragment(signalFragment.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + case BitControl.RpcType.REQ_CUSTOM_VALUE: { + final ByteBuf[] dataBodies; + final RpcOutcomeListener<BitControl.CustomMessage> outcomeListener; + + if (cmd instanceof ControlTunnel.CustomMessageSender) { + dataBodies = ((ControlTunnel.CustomMessageSender)cmd).getDataBodies(); + outcomeListener = ((ControlTunnel.CustomMessageSender)cmd).getOutcomeListener(); + } else if (cmd instanceof ControlTunnel.SyncCustomMessageSender) { + dataBodies = ((ControlTunnel.SyncCustomMessageSender)cmd).getDataBodies(); + outcomeListener = ((ControlTunnel.SyncCustomMessageSender)cmd).getOutcomeListener(); + } else { + throw new UnsupportedOperationException("Unknown Custom Type control message received"); + } + + DrillBuf reqDrillBuff; + try { + reqDrillBuff = convertToByteBuf(dataBodies); + } catch (Exception ex) { + outcomeListener.failed(new RpcException("Failed to allocate memory while sending request in " + + "LocalControlConnectionManager#convertToByteBuff", ex)); + return; + } finally { + releaseByteBuf(dataBodies); + } + + try { + BitControl.CustomMessage message = (BitControl.CustomMessage) cmd.getMessage(); + final Response response = messageHandler.getHandlerRegistry().handle(message, reqDrillBuff); + DrillBuf responseBuffer; + try { + responseBuffer = convertToByteBuf(response.dBodies); + } catch (Exception ex) { + outcomeListener.failed(new RpcException("Failed to allocate memory while sending response in " + + "LocalControlConnectionManager#convertToByteBuff", ex)); + return; + } finally { + releaseByteBuf(response.dBodies); + } + + // Passed responseBuffer will be owned by consumer + outcomeListener.success((BitControl.CustomMessage)response.pBody, responseBuffer); + } catch (RpcException ex) { + cmd.getOutcomeListener().failed(ex); + } finally { + // Release the reqDrillBuff passed into handler + releaseByteBuf(reqDrillBuff); + } + break; + } + + case BitControl.RpcType.REQ_RECEIVER_FINISHED_VALUE: { + final ControlTunnel.ReceiverFinished receiverFinished = ((ControlTunnel.ReceiverFinished) cmd); + final RpcOutcomeListener<Ack> outcomeListener = receiverFinished.getOutcomeListener(); + final Ack ackResponse = messageHandler.receivingFragmentFinished(receiverFinished.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + case BitControl.RpcType.REQ_FRAGMENT_STATUS_VALUE: { + final ControlTunnel.SendFragmentStatus fragmentStatus = ((ControlTunnel.SendFragmentStatus) cmd); + final RpcOutcomeListener<Ack> outcomeListener = fragmentStatus.getOutcomeListener(); + final Ack ackResponse = messageHandler.requestFragmentStatus(fragmentStatus.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + case BitControl.RpcType.REQ_QUERY_CANCEL_VALUE: { + final ControlTunnel.CancelQuery cancelQuery = ((ControlTunnel.CancelQuery) cmd); + final RpcOutcomeListener<Ack> outcomeListener = cancelQuery.getOutcomeListener(); + final Ack ackResponse = messageHandler.requestQueryCancel(cancelQuery.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + case BitControl.RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: { + final ControlTunnel.SendFragment sendFragment = ((ControlTunnel.SendFragment) cmd); + final RpcOutcomeListener<Ack> outcomeListener = sendFragment.getOutcomeListener(); + + try { + final Ack ackResponse = messageHandler.initializeFragment(sendFragment.getMessage()); + outcomeListener.success(ackResponse, null); + } catch (RpcException ex) { + outcomeListener.failed(ex); + } + break; + } + + case BitControl.RpcType.REQ_QUERY_STATUS_VALUE: { + final ControlTunnel.RequestProfile requestProfile = ((ControlTunnel.RequestProfile) cmd); + final RpcOutcomeListener<UserBitShared.QueryProfile> outcomeListener = requestProfile.getOutcomeListener(); + + try { + final UserBitShared.QueryProfile profile = messageHandler.requestQueryStatus(requestProfile.getMessage()); + outcomeListener.success(profile, null); + } catch (RpcException ex) { + outcomeListener.failed(ex); + } + break; + } + + case BitControl.RpcType.REQ_UNPAUSE_FRAGMENT_VALUE: { + final ControlTunnel.SignalFragment signalFragment = ((ControlTunnel.SignalFragment) cmd); + final RpcOutcomeListener<Ack> outcomeListener = signalFragment.getOutcomeListener(); + final Ack ackResponse = messageHandler.resumeFragment(signalFragment.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + default: + final RpcException rpcException = new RpcException(String.format("Unsupported control request type %s " + + "received on LocalControlConnectionManager", rpcType)); + cmd.getOutcomeListener().failed(rpcException); + } + } + + /** + * Copies ByteBuf in the input array into a single DrillBuf + * @param byteBuffArray - input array of ByteBuf's + * @return DrillBuf - output Drillbuf with all the input bytes. + * @throws OutOfMemoryException + */ + private DrillBuf convertToByteBuf(ByteBuf[] byteBuffArray) throws OutOfMemoryException { + + if (byteBuffArray == null) { + return null; + } + + int bytesToCopy = 0; + for (ByteBuf b : byteBuffArray) { + final int validBytes = b.readableBytes(); + if (0 == validBytes) { + b.release(); + } else { + bytesToCopy += validBytes; + } + } + final DrillBuf drillBuff = config.getAllocator().buffer(bytesToCopy); + + for (ByteBuf b : byteBuffArray) { + final int validBytes = b.readableBytes(); + drillBuff.writeBytes(b, 0, validBytes); + } + + return drillBuff; + } + + /** + * Releases all the ByteBuf inside the input ByteBuf array + * @param byteBuffArray - input array of ByteBuf's + */ + private void releaseByteBuf(ByteBuf[] byteBuffArray) { + if (byteBuffArray != null) { + for (ByteBuf b : byteBuffArray) { + b.release(); + } + } + } + + /** + * Releases the input ByteBuf + * @param byteBuf - input ByteBuf + */ + private void releaseByteBuf(ByteBuf byteBuf) { + if (byteBuf != null) { + byteBuf.release(); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java new file mode 100644 index 0000000000..6a4dc21b90 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.control; + +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.rpc.BasicClient; + +public class RemoteControlConnectionManager extends ControlConnectionManager { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteControlConnectionManager.class); + + private final ControlConnectionConfig config; + private final DrillbitEndpoint remoteEndpoint; + + public RemoteControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint + localEndpoint, DrillbitEndpoint remoteEndpoint) { + super(localEndpoint, remoteEndpoint); + this.config = config; + this.remoteEndpoint = remoteEndpoint; + } + + @Override + protected BasicClient<?, ControlConnection, BitControl.BitControlHandshake, ?> getNewClient() { + return new ControlClient(config, remoteEndpoint, new CloseHandlerCreator()); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java index 7e286fa320..7b05b81e3b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc.data; +import com.google.protobuf.MessageLite; import io.netty.buffer.ByteBuf; import java.util.concurrent.Semaphore; @@ -43,7 +44,6 @@ private ExecutionControls testControls; private org.slf4j.Logger testLogger; - public DataTunnel(DataConnectionManager manager) { this.manager = manager; } @@ -69,7 +69,7 @@ public void setTestInjectionControls(final ControlsInjector testInjector, public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWritableBatch batch) { SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch); - try{ + try { if (isInjectionControlSet) { // Wait for interruption if set. Used to simulate the fragment interruption while the fragment is waiting for // semaphore acquire. We expect the @@ -78,9 +78,9 @@ public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWri sendingSemaphore.acquire(); manager.runCommand(b); - }catch(final InterruptedException e){ + } catch (final InterruptedException e) { // Release the buffers first before informing the listener about the interrupt. - for(ByteBuf buffer : batch.getBuffers()) { + for (ByteBuf buffer : batch.getBuffers()) { buffer.release(); } @@ -119,7 +119,7 @@ public void interrupted(InterruptedException e) { } } - private class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection> { + private class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection, RpcType, MessageLite> { final FragmentWritableBatch batch; public SendBatchAsyncListen(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) { @@ -129,7 +129,18 @@ public SendBatchAsyncListen(RpcOutcomeListener<Ack> listener, FragmentWritableBa @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, DataClientConnection connection) { - connection.send(new ThrottlingOutcomeListener(outcomeListener), RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers()); + connection.send(new ThrottlingOutcomeListener(outcomeListener), getRpcType(), batch.getHeader(), + Ack.class, batch.getBuffers()); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_RECORD_BATCH; + } + + @Override + public MessageLite getMessage() { + return batch.getHeader(); } @Override @@ -139,7 +150,7 @@ public String toString() { @Override public void connectionFailed(FailureType type, Throwable t) { - for(ByteBuf buffer : batch.getBuffers()) { + for (ByteBuf buffer : batch.getBuffers()) { buffer.release(); } super.connectionFailed(type, t); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java index e562b167a2..963f53a9d5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java @@ -91,14 +91,16 @@ public void handle(ControlConnection connection, int rpcType, ByteBuf pBody, Byt } case RpcType.REQ_FRAGMENT_STATUS_VALUE: - bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER)); + final FragmentStatus status = get(pBody, FragmentStatus.PARSER); + requestFragmentStatus(status); // TODO: Support a type of message that has no response. sender.send(ControlRpcConfig.OK); break; case RpcType.REQ_QUERY_CANCEL_VALUE: { final QueryId queryId = get(pBody, QueryId.PARSER); - if (bee.cancelForeman(queryId, null)) { + final Ack cancelStatus = requestQueryCancel(queryId); + if (cancelStatus.getOk()) { sender.send(ControlRpcConfig.OK); } else { sender.send(ControlRpcConfig.FAIL); @@ -108,21 +110,14 @@ public void handle(ControlConnection connection, int rpcType, ByteBuf pBody, Byt case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: { final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER); - final DrillbitContext drillbitContext = bee.getContext(); - for(int i = 0; i < fragments.getFragmentCount(); i++) { - startNewFragment(fragments.getFragment(i), drillbitContext); - } + initializeFragment(fragments); sender.send(ControlRpcConfig.OK); break; } case RpcType.REQ_QUERY_STATUS_VALUE: { final QueryId queryId = get(pBody, QueryId.PARSER); - final Foreman foreman = bee.getForemanForQueryId(queryId); - if (foreman == null) { - throw new RpcException("Query not running on node."); - } - final QueryProfile profile = foreman.getQueryManager().getQueryProfile(); + final QueryProfile profile = requestQueryStatus(queryId); sender.send(new Response(RpcType.RESP_QUERY_STATUS, profile)); break; } @@ -145,7 +140,7 @@ public void handle(ControlConnection connection, int rpcType, ByteBuf pBody, Byt * @param fragment * @throws UserRpcException */ - private void startNewFragment(final PlanFragment fragment, final DrillbitContext drillbitContext) + public void startNewFragment(final PlanFragment fragment, final DrillbitContext drillbitContext) throws UserRpcException { logger.debug("Received remote fragment start instruction", fragment); @@ -182,7 +177,7 @@ private void startNewFragment(final PlanFragment fragment, final DrillbitContext /* (non-Javadoc) * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle) */ - private Ack cancelFragment(final FragmentHandle handle) { + public Ack cancelFragment(final FragmentHandle handle) { /** * For case 1, see {@link org.apache.drill.exec.work.foreman.QueryManager#cancelExecutingFragments}. * In comments below, "active" refers to fragment states: SENDING, AWAITING_ALLOCATION, RUNNING and @@ -213,7 +208,7 @@ private Ack cancelFragment(final FragmentHandle handle) { return Acks.OK; } - private Ack resumeFragment(final FragmentHandle handle) { + public Ack resumeFragment(final FragmentHandle handle) { // resume a pending fragment final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(handle); if (manager != null) { @@ -233,7 +228,7 @@ private Ack resumeFragment(final FragmentHandle handle) { return Acks.OK; } - private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) { + public Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) { final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender()); @@ -254,6 +249,32 @@ private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) { return Acks.OK; } + public Ack requestFragmentStatus(FragmentStatus status) { + bee.getContext().getWorkBus().statusUpdate( status); + return Acks.OK; + } + + public Ack requestQueryCancel(QueryId queryId) { + return bee.cancelForeman(queryId, null) ? Acks.OK : Acks.FAIL; + } + + public Ack initializeFragment(InitializeFragments fragments) throws RpcException { + final DrillbitContext drillbitContext = bee.getContext(); + for (int i = 0; i < fragments.getFragmentCount(); i++) { + startNewFragment(fragments.getFragment(i), drillbitContext); + } + + return Acks.OK; + } + + public QueryProfile requestQueryStatus(QueryId queryId) throws RpcException { + final Foreman foreman = bee.getForemanForQueryId(queryId); + if (foreman == null) { + throw new RpcException("Query not running on node."); + } + return foreman.getQueryManager().getQueryProfile(); + } + public CustomHandlerRegistry getHandlerRegistry() { return handlerRegistry; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java index 3d051bbdc3..91b2a2a41f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java @@ -44,11 +44,8 @@ import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.fragment.FragmentExecutor; import org.apache.drill.exec.work.fragment.FragmentStatusReporter; -import org.apache.drill.exec.work.fragment.NonRootFragmentManager; import org.apache.drill.exec.work.fragment.RootFragmentManager; - -import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -141,7 +138,6 @@ private void setupRootFragment(final PlanFragment rootFragment, final FragmentRo } } - /** * Set up the non-root fragments for execution. Some may be local, and some may be remote. * Messages are sent immediately, so they may start returning data even before we complete this. @@ -158,17 +154,11 @@ private void setupNonRootFragments(final Collection<PlanFragment> fragments) thr * executed there. We need to start up the intermediate fragments first so that they will be * ready once the leaf fragments start producing data. To satisfy both of these, we will * make a pass through the fragments and put them into the remote maps according to their - * leaf/intermediate state, as well as their target drillbit. Also filter the leaf/intermediate - * fragments which are assigned to run on local Drillbit node (or Foreman node) into separate lists. - * - * This will help to schedule local + * leaf/intermediate state, as well as their target drillbit. */ - final Multimap<DrillbitEndpoint, PlanFragment> remoteLeafFragmentMap = ArrayListMultimap.create(); - final List<PlanFragment> localLeafFragmentList = new ArrayList<>(); - final Multimap<DrillbitEndpoint, PlanFragment> remoteIntFragmentMap = ArrayListMultimap.create(); - final List<PlanFragment> localIntFragmentList = new ArrayList<>(); + final Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create(); + final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create(); - final DrillbitEndpoint localDrillbitEndpoint = drillbitContext.getEndpoint(); // record all fragments for status purposes. for (final PlanFragment planFragment : fragments) { @@ -180,44 +170,38 @@ private void setupNonRootFragments(final Collection<PlanFragment> fragments) thr foreman.getQueryManager().addFragmentStatusTracker(planFragment, false); if (planFragment.getLeafFragment()) { - updateFragmentCollection(planFragment, localDrillbitEndpoint, localLeafFragmentList, remoteLeafFragmentMap); + leafFragmentMap.put(planFragment.getAssignment(), planFragment); } else { - updateFragmentCollection(planFragment, localDrillbitEndpoint, localIntFragmentList, remoteIntFragmentMap); + intFragmentMap.put(planFragment.getAssignment(), planFragment); } } /* * We need to wait for the intermediates to be sent so that they'll be set up by the time - * the leaves start producing data. We'll use this latch to wait for the responses. + * the leaves start producing data. We'll use this latch to wait for the responses. All the local intermediate + * fragments are submitted locally without creating any actual control connection to itself. * * However, in order not to hang the process if any of the RPC requests fails, we always * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll * know if any submissions did fail. */ - scheduleRemoteIntermediateFragments(remoteIntFragmentMap); - - // Setup local intermediate fragments - for (final PlanFragment fragment : localIntFragmentList) { - startLocalFragment(fragment); - } + scheduleIntermediateFragments(intFragmentMap); injector.injectChecked(foreman.getQueryContext().getExecutionControls(), "send-fragments", ForemanException.class); /* * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through - * the regular sendListener event delivery. + * the regular sendListener event delivery˚. All the local leaf fragments are submitted locally without creating + * any actual control connection to itself. */ - for (final DrillbitEndpoint ep : remoteLeafFragmentMap.keySet()) { - sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null); - } - - // Setup local leaf fragments - for (final PlanFragment fragment : localLeafFragmentList) { - startLocalFragment(fragment); + for (final DrillbitEndpoint ep : leafFragmentMap.keySet()) { + sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null); } } /** - * Send all the remote fragments belonging to a single target drillbit in one request. + * Send all the remote fragments belonging to a single target drillbit in one request. If the assignment + * DrillbitEndpoint is local Drillbit then {@link Controller#getTunnel(DrillbitEndpoint)} takes care of submitting it + * locally without actually creating a Control Connection to itself. * * @param assignment the drillbit assigned to these fragments * @param fragments the set of fragments @@ -241,41 +225,21 @@ private void sendRemoteFragments(final DrillbitEndpoint assignment, final Collec } /** - * Add planFragment into either of local fragment list or remote fragment map based on assigned Drillbit Endpoint node - * and the local Drillbit Endpoint. + * Send intermediate fragment to the assigned Drillbit node. If the assignment DrillbitEndpoint is local Drillbit + * then {@link Controller#getTunnel(DrillbitEndpoint)} takes care of submitting it locally without actually creating + * a Control Connection to itself. Throws {@link UserException} in case of failure to send the fragment. * - * @param planFragment plan fragment - * @param localEndPoint local endpoint - * @param localFragmentList local fragment list - * @param remoteFragmentMap remote fragment map + * @param intermediateFragmentMap - Map of Drillbit Endpoint to list of intermediate PlanFragment's */ - private void updateFragmentCollection(final PlanFragment planFragment, final DrillbitEndpoint localEndPoint, - final List<PlanFragment> localFragmentList, - final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) { - final DrillbitEndpoint assignedDrillbit = planFragment.getAssignment(); + private void scheduleIntermediateFragments(final Multimap<DrillbitEndpoint, PlanFragment> intermediateFragmentMap) { - if (assignedDrillbit.equals(localEndPoint)) { - localFragmentList.add(planFragment); - } else { - remoteFragmentMap.put(assignedDrillbit, planFragment); - } - } - - /** - * Send remote intermediate fragment to the assigned Drillbit node. - * Throw exception in case of failure to send the fragment. - * - * @param remoteFragmentMap - Map of Drillbit Endpoint to list of PlanFragment's - */ - private void scheduleRemoteIntermediateFragments(final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) { - - final int numIntFragments = remoteFragmentMap.keySet().size(); + final int numIntFragments = intermediateFragmentMap.keySet().size(); final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments); final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures(); // send remote intermediate fragments - for (final DrillbitEndpoint ep : remoteFragmentMap.keySet()) { - sendRemoteFragments(ep, remoteFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures); + for (final DrillbitEndpoint ep : intermediateFragmentMap.keySet()) { + sendRemoteFragments(ep, intermediateFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures); } final long timeout = drillbitContext.getOptionManager().getLong(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT) * numIntFragments; @@ -313,29 +277,6 @@ private void scheduleRemoteIntermediateFragments(final Multimap<DrillbitEndpoint } } - - /** - * Start the locally assigned leaf or intermediate fragment - * - * @param fragment fragment - */ - private void startLocalFragment(final PlanFragment fragment) throws ExecutionSetupException { - logger.debug("Received local fragment start instruction", fragment); - - final FragmentContextImpl fragmentContext = new FragmentContextImpl(drillbitContext, fragment, drillbitContext.getFunctionImplementationRegistry()); - final FragmentStatusReporter statusReporter = new FragmentStatusReporter(fragmentContext); - final FragmentExecutor fragmentExecutor = new FragmentExecutor(fragmentContext, fragment, statusReporter); - - // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf. - if (fragment.getLeafFragment()) { - bee.addFragmentRunner(fragmentExecutor); - } else { - // isIntermediate, store for incoming data. - final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, fragmentExecutor, statusReporter); - drillbitContext.getWorkBus().addFragmentManager(manager); - } - } - /** * Used by {@link FragmentSubmitListener} to track the number of submission failures. */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java index eb57658689..c0c5b04303 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java @@ -40,12 +40,9 @@ protected final AtomicReference<DrillbitEndpoint> foremanDrillbit; - protected final DrillbitEndpoint localDrillbit; - public FragmentStatusReporter(final ExecutorFragmentContext context) { this.context = context; this.foremanDrillbit = new AtomicReference<>(context.getForemanEndpoint()); - this.localDrillbit = context.getEndpoint(); } /** @@ -119,14 +116,10 @@ void sendStatus(final FragmentStatus status) { return; } - if (localDrillbit.equals(foremanNode)) { - // Update the status locally - context.getWorkEventbus().statusUpdate(status); - } else { - // Send the status via Control Tunnel to remote foreman node - final ControlTunnel tunnel = context.getController().getTunnel(foremanNode); - tunnel.sendFragmentStatus(status); - } + // Send status for both local and remote foreman node via Tunnel. For local there won't be any network connection + // created and it will be submitted locally using LocalControlConnectionManager + final ControlTunnel tunnel = context.getController().getTunnel(foremanNode); + tunnel.sendFragmentStatus(status); } /** diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistryTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistryTest.java new file mode 100644 index 0000000000..667e440618 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistryTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.control; + +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class ConnectionManagerRegistryTest { + + private static final DrillbitEndpoint localEndpoint = DrillbitEndpoint.newBuilder() + .setAddress("10.0.0.1") + .setControlPort(31012) + .setDataPort(31011) + .setUserPort(31010) + .setState(DrillbitEndpoint.State.STARTUP) + .build(); + + private static final DrillbitEndpoint foremanEndpoint = DrillbitEndpoint.newBuilder() + .setAddress("10.0.0.2") + .setControlPort(31012) + .setDataPort(31011) + .setUserPort(31010) + .setState(DrillbitEndpoint.State.STARTUP) + .build(); + + private static ControlConnectionConfig config; + + @BeforeClass + public static void setup() { + config = mock(ControlConnectionConfig.class); + } + + @Test + public void testLocalConnectionManager() { + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(config); + registry.setLocalEndpoint(localEndpoint); + final ControlConnectionManager manager = registry.getConnectionManager(localEndpoint); + assertTrue(registry.iterator().hasNext()); + assertEquals(manager, registry.iterator().next()); + assertTrue(manager instanceof LocalControlConnectionManager); + } + + @Test + public void testLocalConnectionManager_differentState() { + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(config); + registry.setLocalEndpoint(localEndpoint); + + final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder().setState(DrillbitEndpoint.State.ONLINE).build(); + final ControlConnectionManager manager = registry.getConnectionManager(foremanEndpoint2); + assertTrue(registry.iterator().hasNext()); + assertEquals(manager, registry.iterator().next()); + assertTrue(manager instanceof LocalControlConnectionManager); + } + + @Test + public void testLocalConnectionManager_differentUserDataPort() { + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(config); + registry.setLocalEndpoint(localEndpoint); + + final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder() + .setState(DrillbitEndpoint.State.ONLINE) + .setUserPort(10000) + .setDataPort(11000) + .build(); + final ControlConnectionManager manager = registry.getConnectionManager(foremanEndpoint2); + assertTrue(registry.iterator().hasNext()); + assertEquals(manager, registry.iterator().next()); + assertTrue(manager instanceof LocalControlConnectionManager); + } + + @Test + public void testRemoteConnectionManager() { + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(config); + registry.setLocalEndpoint(localEndpoint); + final ControlConnectionManager manager = registry.getConnectionManager(foremanEndpoint); + assertTrue(registry.iterator().hasNext()); + assertTrue(manager instanceof RemoteControlConnectionManager); + } + + @Test + public void testRemoteConnectionManager_differentControlPort() { + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(config); + registry.setLocalEndpoint(localEndpoint); + + final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder() + .setControlPort(10000) + .build(); + final ControlConnectionManager manager = registry.getConnectionManager(foremanEndpoint2); + assertTrue(registry.iterator().hasNext()); + assertEquals(manager, registry.iterator().next()); + assertTrue(manager instanceof RemoteControlConnectionManager); + } + + @Test + public void testRemoteConnectionManager_differentAddress() { + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(config); + registry.setLocalEndpoint(localEndpoint); + + final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder() + .setAddress("10.0.0.0") + .build(); + final ControlConnectionManager manager = registry.getConnectionManager(foremanEndpoint2); + assertTrue(registry.iterator().hasNext()); + assertEquals(manager, registry.iterator().next()); + assertTrue(manager instanceof RemoteControlConnectionManager); + } + + @Test + public void testRemoteAndLocalConnectionManager() { + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(config); + registry.setLocalEndpoint(localEndpoint); + + final DrillbitEndpoint foremanEndpoint2 = localEndpoint.toBuilder() + .setAddress("10.0.0.0") + .build(); + + final ControlConnectionManager remoteManager = registry.getConnectionManager(foremanEndpoint2); + final ControlConnectionManager localManager = registry.getConnectionManager(localEndpoint); + + final ControlConnectionManager remoteManager_2 = registry.getConnectionManager(foremanEndpoint2); + final ControlConnectionManager localManager_2 = registry.getConnectionManager(localEndpoint); + + assertTrue(remoteManager instanceof RemoteControlConnectionManager); + assertEquals(remoteManager, remoteManager_2); + + assertTrue(localManager instanceof LocalControlConnectionManager); + assertEquals(localManager, localManager_2); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestLocalControlConnectionManager.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestLocalControlConnectionManager.java new file mode 100644 index 0000000000..c0cf09be6b --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestLocalControlConnectionManager.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.control; + +import io.netty.buffer.ByteBuf; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.ExecProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.Acks; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.work.batch.ControlMessageHandler; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestLocalControlConnectionManager { + + private static final DrillbitEndpoint localEndpoint = DrillbitEndpoint.newBuilder() + .setAddress("10.0.0.1") + .setControlPort(31011) + .setState(DrillbitEndpoint.State.STARTUP) + .build(); + + private static ControlConnectionConfig mockConfig; + + private static ControlMessageHandler mockHandler; + + private static ControlTunnel controlTunnel; + + private static CountDownLatch latch; + + private static final String NEGATIVE_ACK_MESSAGE = "Negative Ack received"; + + private static final RpcOutcomeListener<GeneralRPCProtos.Ack> outcomeListener = + new RpcOutcomeListener<GeneralRPCProtos.Ack>() { + @Override + public void failed(RpcException ex) { + throw new IllegalStateException(ex); + } + + @Override + public void success(GeneralRPCProtos.Ack value, ByteBuf buffer) { + if (value.getOk()) { + latch.countDown(); + } else { + throw new IllegalStateException(NEGATIVE_ACK_MESSAGE); + } + } + + @Override + public void interrupted(InterruptedException e) { + // Do nothing + } + }; + + @Rule + public ExpectedException exceptionThrown = ExpectedException.none(); + + @BeforeClass + public static void setup() { + mockConfig = mock(ControlConnectionConfig.class); + final ConnectionManagerRegistry registry = new ConnectionManagerRegistry(mockConfig); + registry.setLocalEndpoint(localEndpoint); + ControlConnectionManager manager = registry.getConnectionManager(localEndpoint); + assertTrue(manager instanceof LocalControlConnectionManager); + controlTunnel = new ControlTunnel(manager); + } + + @Before + public void setupForTest() { + mockHandler = mock(ControlMessageHandler.class); + when(mockConfig.getMessageHandler()).thenReturn(mockHandler); + } + + /** + * Verify that SendFragmentStatus is handled correctly using ControlTunnel with LocalControlConnectionManager + */ + @Test + public void testLocalSendFragmentStatus_Success() throws Exception { + final UserBitShared.QueryId mockQueryId = UserBitShared.QueryId.getDefaultInstance(); + final UserBitShared.QueryProfile mockProfile = UserBitShared.QueryProfile.getDefaultInstance(); + when(mockHandler.requestQueryStatus(mockQueryId)).thenReturn(mockProfile); + final UserBitShared.QueryProfile returnedProfile = controlTunnel.requestQueryProfile(mockQueryId).checkedGet(); + assertEquals(returnedProfile, mockProfile); + } + + /** + * Verify that SendFragmentStatus failure scenario is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testLocalSendFragmentStatus_Failure() throws Exception { + final UserBitShared.QueryId mockQueryId = UserBitShared.QueryId.getDefaultInstance(); + final String exceptionMessage = "Testing failure case"; + exceptionThrown.expect(RpcException.class); + exceptionThrown.expectMessage(exceptionMessage); + when(mockHandler.requestQueryStatus(mockQueryId)).thenThrow(new RpcException(exceptionMessage)); + controlTunnel.requestQueryProfile(mockQueryId).checkedGet(); + } + + /** + * Verify that CancelFragment with positive ack is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testLocalCancelFragment_PositiveAck() throws Exception { + final ExecProtos.FragmentHandle mockHandle = ExecProtos.FragmentHandle.getDefaultInstance(); + latch = new CountDownLatch(1); + final GeneralRPCProtos.Ack mockResponse = Acks.OK; + when(mockHandler.cancelFragment(mockHandle)).thenReturn(mockResponse); + controlTunnel.cancelFragment(outcomeListener, mockHandle); + latch.await(); + } + + /** + * Verify that CancelFragment with negative ack is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testLocalCancelFragment_NegativeAck() throws Exception { + final ExecProtos.FragmentHandle mockHandle = ExecProtos.FragmentHandle.getDefaultInstance(); + latch = new CountDownLatch(1); + exceptionThrown.expect(IllegalStateException.class); + exceptionThrown.expectMessage(NEGATIVE_ACK_MESSAGE); + final GeneralRPCProtos.Ack mockResponse = Acks.FAIL; + when(mockHandler.cancelFragment(mockHandle)).thenReturn(mockResponse); + controlTunnel.cancelFragment(outcomeListener, mockHandle); + latch.await(); + } + + /** + * Verify that InitializeFragments with positive ack is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testLocalSendFragments_PositiveAck() throws Exception { + final BitControl.InitializeFragments mockFragments = BitControl.InitializeFragments.getDefaultInstance(); + latch = new CountDownLatch(1); + final GeneralRPCProtos.Ack mockResponse = Acks.OK; + when(mockHandler.initializeFragment(mockFragments)).thenReturn(mockResponse); + controlTunnel.sendFragments(outcomeListener, mockFragments); + latch.await(); + } + + /** + * Verify that InitializeFragments with negative ack is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testLocalSendFragments_NegativeAck() throws Exception { + final BitControl.InitializeFragments mockFragments = BitControl.InitializeFragments.getDefaultInstance(); + latch = new CountDownLatch(1); + exceptionThrown.expect(IllegalStateException.class); + exceptionThrown.expectMessage(NEGATIVE_ACK_MESSAGE); + final GeneralRPCProtos.Ack mockResponse = Acks.FAIL; + when(mockHandler.initializeFragment(mockFragments)).thenReturn(mockResponse); + controlTunnel.sendFragments(outcomeListener, mockFragments); + latch.await(); + } + + /** + * Verify that InitializeFragments failure case is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testLocalSendFragments_Failure() throws Exception { + final BitControl.InitializeFragments mockFragments = BitControl.InitializeFragments.getDefaultInstance(); + latch = new CountDownLatch(1); + exceptionThrown.expect(IllegalStateException.class); + exceptionThrown.expectCause(new TypeSafeMatcher<Throwable>(RpcException.class) { + @Override + protected boolean matchesSafely(Throwable throwable) { + return (throwable != null && throwable instanceof RpcException); + } + + @Override + public void describeTo(Description description) { + // Do nothing + } + }); + when(mockHandler.initializeFragment(mockFragments)).thenThrow(new RpcException("Failed to initialize")); + controlTunnel.sendFragments(outcomeListener, mockFragments); + latch.await(); + } + + /** + * Verify that UnpauseFragment is handled correctly using ControlTunnel with LocalControlConnectionManager + */ + @Test + public void testUnpauseFragments() throws Exception { + final ExecProtos.FragmentHandle mockHandle = ExecProtos.FragmentHandle.getDefaultInstance(); + latch = new CountDownLatch(1); + final GeneralRPCProtos.Ack mockResponse = Acks.OK; + when(mockHandler.resumeFragment(mockHandle)).thenReturn(mockResponse); + controlTunnel.unpauseFragment(outcomeListener, mockHandle); + latch.await(); + } + + /** + * Verify that RequestQueryStatus is handled correctly using ControlTunnel with LocalControlConnectionManager + */ + @Test + public void testRequestQueryStatus() throws Exception { + final UserBitShared.QueryId mockQueryId = UserBitShared.QueryId.getDefaultInstance(); + final UserBitShared.QueryProfile mockProfile = UserBitShared.QueryProfile.getDefaultInstance(); + when(mockHandler.requestQueryStatus(mockQueryId)).thenReturn(mockProfile); + final UserBitShared.QueryProfile returnedProfile = controlTunnel.requestQueryProfile(mockQueryId).checkedGet(); + assertEquals(returnedProfile, mockProfile); + } + + /** + * Verify that CancelQuery with positive ack is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testCancelQuery_PositiveAck() throws Exception { + final UserBitShared.QueryId mockQueryId = UserBitShared.QueryId.getDefaultInstance(); + final GeneralRPCProtos.Ack mockResponse = Acks.OK; + when(mockHandler.requestQueryCancel(mockQueryId)).thenReturn(mockResponse); + GeneralRPCProtos.Ack response = controlTunnel.requestCancelQuery(mockQueryId).checkedGet(); + assertEquals(response, mockResponse); + } + + /** + * Verify that CancelQuery with negative ack is handled correctly using ControlTunnel with + * LocalControlConnectionManager + */ + @Test + public void testCancelQuery_NegativeAck() throws Exception { + final UserBitShared.QueryId mockQueryId = UserBitShared.QueryId.getDefaultInstance(); + final GeneralRPCProtos.Ack mockResponse = Acks.FAIL; + when(mockHandler.requestQueryCancel(mockQueryId)).thenReturn(mockResponse); + GeneralRPCProtos.Ack response = controlTunnel.requestCancelQuery(mockQueryId).checkedGet(); + assertEquals(response, mockResponse); + } + + /** + * Verify that FinishedReceiver is handled correctly using ControlTunnel with LocalControlConnectionManager + */ + @Test + public void testInformReceiverFinished_success() throws Exception { + final BitControl.FinishedReceiver finishedReceiver = BitControl.FinishedReceiver.getDefaultInstance(); + latch = new CountDownLatch(1); + final GeneralRPCProtos.Ack mockResponse = Acks.OK; + when(mockHandler.receivingFragmentFinished(finishedReceiver)).thenReturn(mockResponse); + controlTunnel.informReceiverFinished(outcomeListener, finishedReceiver); + latch.await(); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java index aaff1544c5..04d54d54c9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java @@ -19,16 +19,16 @@ import com.google.common.collect.Lists; import com.typesafe.config.ConfigValueFactory; -import org.apache.drill.test.BaseTestQuery; import org.apache.drill.categories.SecurityTest; -import org.apache.drill.common.config.DrillProperties; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.config.DrillProperties; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.rpc.control.ControlRpcMetrics; import org.apache.drill.exec.rpc.data.DataRpcMetrics; import org.apache.drill.exec.rpc.security.KerberosHelper; import org.apache.drill.exec.rpc.user.UserRpcMetrics; import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl; +import org.apache.drill.test.BaseTestQuery; import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.security.authentication.util.KerberosUtil; import org.apache.kerby.kerberos.kerb.client.JaasKrbUtil; @@ -178,6 +178,38 @@ public Void run() throws Exception { assertTrue(0 == DataRpcMetrics.getInstance().getUnEncryptedConnectionCount()); } + @Test + public void testUnecryptedConnectionCounter_LocalControlMessage() throws Exception { + final Properties connectionProps = new Properties(); + connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL); + connectionProps.setProperty(DrillProperties.KERBEROS_FROM_SUBJECT, "true"); + final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(krbHelper.CLIENT_PRINCIPAL, + krbHelper.clientKeytab.getAbsoluteFile()); + + Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + updateClient(connectionProps); + return null; + } + }); + + // Run query on memory system table this sends remote fragments to all Drillbit and Drillbits then send data + // using data channel. In this test we have only 1 Drillbit so there should not be any control connection but a + // local data connections + testSql("SELECT * FROM sys.memory"); + + // Check encrypted counters value + assertTrue(0 == UserRpcMetrics.getInstance().getEncryptedConnectionCount()); + assertTrue(0 == ControlRpcMetrics.getInstance().getEncryptedConnectionCount()); + assertTrue(0 == DataRpcMetrics.getInstance().getEncryptedConnectionCount()); + + // Check unencrypted counters value + assertTrue(1 == UserRpcMetrics.getInstance().getUnEncryptedConnectionCount()); + assertTrue(0 == ControlRpcMetrics.getInstance().getUnEncryptedConnectionCount()); + assertTrue(2 == DataRpcMetrics.getInstance().getUnEncryptedConnectionCount()); + } + @AfterClass public static void cleanTest() throws Exception { krbHelper.stopKdc(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java index 83c1eca382..a119a8cf9a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/fragment/FragmentStatusReporterTest.java @@ -17,54 +17,44 @@ */ package org.apache.drill.exec.work.fragment; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.rpc.control.Controller; -import org.apache.drill.exec.rpc.control.WorkEventBus; -import org.junit.Before; -import org.junit.Test; - import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContextImpl; import org.apache.drill.exec.ops.FragmentStats; import org.apache.drill.exec.proto.BitControl.FragmentStatus; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.FragmentState; import org.apache.drill.exec.rpc.control.ControlTunnel; +import org.apache.drill.exec.rpc.control.Controller; +import org.junit.Before; +import org.junit.Test; -import static org.junit.Assert.assertTrue; +import static org.apache.drill.exec.proto.UserBitShared.FragmentState.CANCELLATION_REQUESTED; +import static org.apache.drill.exec.proto.UserBitShared.FragmentState.FAILED; +import static org.apache.drill.exec.proto.UserBitShared.FragmentState.RUNNING; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import static org.apache.drill.exec.proto.UserBitShared.FragmentState.CANCELLATION_REQUESTED; -import static org.apache.drill.exec.proto.UserBitShared.FragmentState.FAILED; -import static org.apache.drill.exec.proto.UserBitShared.FragmentState.RUNNING; - public class FragmentStatusReporterTest { private FragmentStatusReporter statusReporter; private ControlTunnel foremanTunnel; - private FragmentContextImpl context; - @Before public void setUp() throws Exception { - context = mock(FragmentContextImpl.class); + final FragmentContextImpl context = mock(FragmentContextImpl.class); Controller controller = mock(Controller.class); - // Create 2 different endpoint such that foremanEndpoint is different than - // localEndpoint - DrillbitEndpoint localEndpoint = DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build(); + // Create Foreman Endpoint and it's tunnel DrillbitEndpoint foremanEndpoint = DrillbitEndpoint.newBuilder().setAddress("10.0.0.2").build(); foremanTunnel = mock(ControlTunnel.class); - when(context.getEndpoint()).thenReturn(localEndpoint); when(context.getController()).thenReturn(controller); when(controller.getTunnel(foremanEndpoint)).thenReturn(foremanTunnel); @@ -120,49 +110,4 @@ public void testStateChangedAfterClose() throws Exception { statusReporter.stateChanged(CANCELLATION_REQUESTED); verify(foremanTunnel).sendFragmentStatus(any(FragmentStatus.class)); } - - - /** - * With LocalEndpoint and Foreman Endpoint being same node, test that status change - * message doesn't happen via Control Tunnel instead it happens locally via WorkEventBus - * - * @throws Exception - */ - @Test - public void testStateChangedLocalForeman() throws Exception { - - DrillbitEndpoint localEndpoint = DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build(); - - when(context.getEndpoint()).thenReturn(localEndpoint); - when(context.getForemanEndpoint()).thenReturn(localEndpoint); - when(context.getWorkEventbus()).thenReturn(mock(WorkEventBus.class)); - - statusReporter = new FragmentStatusReporter(context); - statusReporter.stateChanged(RUNNING); - verifyZeroInteractions(foremanTunnel); - verify(context.getWorkEventbus()).statusUpdate(any(FragmentStatus.class)); - } - - /** - * With LocalEndpoint and Foreman Endpoint being same node, test that after close of - * FragmentStatusReporter, status update doesn't happen either through Control Tunnel - * or through WorkEventBus. - * - * @throws Exception - */ - @Test - public void testCloseLocalForeman() throws Exception { - DrillbitEndpoint localEndpoint = DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").build(); - - when(context.getEndpoint()).thenReturn(localEndpoint); - when(context.getForemanEndpoint()).thenReturn(localEndpoint); - when(context.getWorkEventbus()).thenReturn(mock(WorkEventBus.class)); - statusReporter = new FragmentStatusReporter(context); - - statusReporter.close(); - assertTrue(statusReporter.foremanDrillbit.get() == null); - statusReporter.stateChanged(RUNNING); - verifyZeroInteractions(foremanTunnel); - verify(context.getWorkEventbus(), never()).statusUpdate(any(FragmentStatus.class)); - } } diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java index 90b2d19f86..2437ddb7a3 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java @@ -17,20 +17,24 @@ */ package org.apache.drill.exec.rpc; -import io.netty.buffer.ByteBuf; - import com.google.common.util.concurrent.SettableFuture; +import com.google.protobuf.Internal.EnumLite; import com.google.protobuf.MessageLite; +import io.netty.buffer.ByteBuf; -public abstract class FutureBitCommand<T extends MessageLite, C extends RemoteConnection> implements RpcCommand<T,C> { +public abstract class FutureBitCommand<T extends MessageLite, C extends RemoteConnection, + E extends EnumLite, M extends MessageLite> implements RpcCommand<T, C, E, M> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FutureBitCommand.class); protected final SettableFuture<T> settableFuture; private final RpcCheckedFuture<T> parentFuture; + private final RpcOutcomeListener<T> outcomeListener; + public FutureBitCommand() { this.settableFuture = SettableFuture.create(); this.parentFuture = new RpcCheckedFuture<T>(settableFuture); + outcomeListener = new DeferredRpcOutcome(); } public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, C connection); @@ -38,7 +42,7 @@ public FutureBitCommand() { @Override public void connectionAvailable(C connection) { - doRpcCall(new DeferredRpcOutcome(), connection); + doRpcCall(outcomeListener, connection); } @Override @@ -71,10 +75,14 @@ public void interrupted(final InterruptedException e) { return parentFuture; } + @Override + public RpcOutcomeListener<T> getOutcomeListener() { + return outcomeListener; + } + @Override public void connectionFailed(FailureType type, Throwable t) { settableFuture.setException(RpcException.mapException( String.format("Command failed while establishing connection. Failure type %s.", type), t)); } - -} \ No newline at end of file +} diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java index c9b8b3a527..964abf44ed 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java @@ -17,11 +17,11 @@ */ package org.apache.drill.exec.rpc; -import io.netty.buffer.ByteBuf; - +import com.google.protobuf.Internal.EnumLite; import com.google.protobuf.MessageLite; -public abstract class ListeningCommand<T extends MessageLite, C extends RemoteConnection> implements RpcCommand<T, C> { +public abstract class ListeningCommand<T extends MessageLite, C extends RemoteConnection, + E extends EnumLite, M extends MessageLite> implements RpcCommand<T, C, E, M> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListeningCommand.class); private final RpcOutcomeListener<T> listener; @@ -35,7 +35,7 @@ public ListeningCommand(RpcOutcomeListener<T> listener) { @Override public void connectionAvailable(C connection) { - doRpcCall(new DeferredRpcOutcome(), connection); + doRpcCall(listener, connection); } @Override @@ -43,28 +43,15 @@ public void connectionSucceeded(C connection) { connectionAvailable(connection); } - private class DeferredRpcOutcome implements RpcOutcomeListener<T> { - - @Override - public void failed(RpcException ex) { - listener.failed(ex); - } - - @Override - public void success(T value, ByteBuf buf) { - listener.success(value, buf); - } - - @Override - public void interrupted(final InterruptedException e) { - listener.interrupted(e); - } - } - @Override public void connectionFailed(FailureType type, Throwable t) { listener.failed(RpcException.mapException( String.format("Command failed while establishing connection. Failure type %s.", type), t)); } -} \ No newline at end of file + @Override + public RpcOutcomeListener<T> getOutcomeListener() { + return listener; + } + +} diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java index fec3962b90..c648a169e2 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc; +import com.google.protobuf.Internal.EnumLite; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.GenericFutureListener; @@ -52,7 +53,8 @@ public ReconnectingConnection(HS handshake, String host, int port) { protected abstract BasicClient<?, C, HS, ?> getNewClient(); - public <T extends MessageLite, R extends RpcCommand<T, C>> void runCommand(R cmd) { + public <T extends MessageLite, E extends EnumLite, M extends MessageLite, + R extends RpcCommand<T, C, E, M>> void runCommand(R cmd) { // if(logger.isDebugEnabled()) logger.debug(String.format("Running command %s sending to host %s:%d", cmd, host, port)); C connection = connectionHolder.get(); if (connection != null) { @@ -78,7 +80,7 @@ public ReconnectingConnection(HS handshake, String host, int port) { } else { // logger.debug("No connection active, opening client connection."); BasicClient<?, C, HS, ?> client = getNewClient(); - ConnectionListeningFuture<T> future = new ConnectionListeningFuture<>(cmd); + ConnectionListeningFuture<T,E,M> future = new ConnectionListeningFuture<>(cmd); client.connectAsClient(future, handshake, host, port); future.waitAndRun(); // logger.debug("Connection available and active, command now being run inline."); @@ -88,13 +90,13 @@ public ReconnectingConnection(HS handshake, String host, int port) { } } - public class ConnectionListeningFuture<R extends MessageLite> + public class ConnectionListeningFuture<R extends MessageLite, E extends EnumLite, M extends MessageLite> extends AbstractFuture<C> implements RpcConnectionHandler<C> { - private RpcCommand<R, C> cmd; + private RpcCommand<R, C, E, M> cmd; - public ConnectionListeningFuture(RpcCommand<R, C> cmd) { + public ConnectionListeningFuture(RpcCommand<R, C, E, M> cmd) { super(); this.cmd = cmd; } @@ -173,9 +175,7 @@ public void connectionSucceeded(C incoming) { incoming.getChannel().close(); } set(connection); - } - } /** Factory for close handlers **/ @@ -204,7 +204,6 @@ public void operationComplete(ChannelFuture future) throws Exception { connectionHolder.compareAndSet(connection, null); parent.operationComplete(future); } - } public CloseHandlerCreator getCloseHandlerCreator() { @@ -223,5 +222,4 @@ public void close() { c.getChannel().close(); } } - } diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java index 1bb39ac505..821b8fbf85 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java @@ -17,10 +17,17 @@ */ package org.apache.drill.exec.rpc; +import com.google.protobuf.Internal.EnumLite; import com.google.protobuf.MessageLite; -public interface RpcCommand<T extends MessageLite, C extends RemoteConnection> extends RpcConnectionHandler<C>{ +public interface RpcCommand<T extends MessageLite, C extends RemoteConnection, + E extends EnumLite, M extends MessageLite> extends RpcConnectionHandler<C> { - public abstract void connectionAvailable(C connection); + void connectionAvailable(C connection); -} \ No newline at end of file + E getRpcType(); + + M getMessage(); + + RpcOutcomeListener<T> getOutcomeListener(); +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drillbit while sending control message to itself creates a connection instead > of submitting locally > --------------------------------------------------------------------------------------------------- > > Key: DRILL-6255 > URL: https://issues.apache.org/jira/browse/DRILL-6255 > Project: Apache Drill > Issue Type: Bug > Components: Execution - Flow > Affects Versions: 1.12.0 > Reporter: Sorabh Hamirwasia > Assignee: Sorabh Hamirwasia > Priority: Major > Labels: ready-to-commit > Fix For: 1.14.0 > > > With the new shutdown feature introduced in 1.12, there is a state introduced > in DrillbitEndpoint. Due to this the equality check happening > [here|https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java#L256] > will result in false and hence the fragments supposed to be scheduled on > Foreman will be treated as remote fragments and a connection will be created > to schedule it. The equality check is false because localEndpoint state is > STARTUP whereas state in assigned Drillbit is ONLINE. > I guess now we should update the equality check to verify just for address > and control port to be same between assigned and local Drillbit endpoint. A > test can be added for this based on _sys.memory_ table since that guarantees > scheduling minor fragments on each Drillbit node. -- This message was sent by Atlassian JIRA (v7.6.3#76005)