GEODE-3406: Address PR feedback * Rename ExecutionContext -> MessageExecutionContext * Properly close socket when processing ProtoBuf request in TcpServer * GetAvailableServersRequestHandler guards against `null` servers * minor style changes
Signed-off-by: Alexander Murmann <amurm...@pivotal.io> Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0eb320fa Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0eb320fa Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0eb320fa Branch: refs/heads/develop Commit: 0eb320fad2071c0ebb8cb1e74576ed4a231a9e3b Parents: 530f48f Author: Hitesh Khamesra <hkhame...@pivotal.io> Authored: Fri Aug 18 10:44:57 2017 -0700 Committer: Hitesh Khamesra <hkhame...@pivotal.io> Committed: Tue Aug 22 10:55:12 2017 -0700 ---------------------------------------------------------------------- .../distributed/internal/InternalLocator.java | 5 +- .../internal/tcpserver/TcpServer.java | 171 ++++++++++--------- .../geode/internal/cache/InternalCache.java | 4 +- .../sockets/ClientProtocolMessageHandler.java | 2 +- .../ClientProtocolMessageHandlerLoader.java | 64 ------- .../cache/tier/sockets/ExecutionContext.java | 54 ------ .../GenericProtocolServerConnection.java | 2 +- .../tier/sockets/MessageExecutionContext.java | 56 ++++++ .../tier/sockets/MessageHandlerFactory.java | 19 +++ .../tier/sockets/ServerConnectionFactory.java | 22 +-- .../cache/tier/sockets/TcpServerFactory.java | 39 +++++ .../AutoConnectionSourceImplJUnitTest.java | 10 +- .../tcpserver/TCPServerSSLJUnitTest.java | 3 +- .../internal/tcpserver/TcpServerJUnitTest.java | 5 +- .../tier/sockets/TcpServerFactoryTest.java | 19 +++ .../test/dunit/standalone/DUnitLauncher.java | 2 + .../protocol/operations/OperationHandler.java | 4 +- .../protocol/protobuf/ProtobufOpsProcessor.java | 5 +- .../protobuf/ProtobufStreamProcessor.java | 7 +- .../GetAllRequestOperationHandler.java | 4 +- .../GetAvailableServersOperationHandler.java | 14 +- .../GetRegionNamesRequestOperationHandler.java | 4 +- .../GetRegionRequestOperationHandler.java | 4 +- .../operations/GetRequestOperationHandler.java | 4 +- .../PutAllRequestOperationHandler.java | 4 +- .../operations/PutRequestOperationHandler.java | 4 +- .../RemoveRequestOperationHandler.java | 4 +- .../RoundTripLocatorConnectionJUnitTest.java | 19 ++- .../protobuf/ProtobufStreamProcessorTest.java | 4 +- .../GetAllRequestOperationHandlerJUnitTest.java | 6 +- ...ailableServersOperationHandlerJUnitTest.java | 42 +++-- ...onNamesRequestOperationHandlerJUnitTest.java | 6 +- ...tRegionRequestOperationHandlerJUnitTest.java | 7 +- .../GetRequestOperationHandlerJUnitTest.java | 12 +- .../PutAllRequestOperationHandlerJUnitTest.java | 8 +- .../PutRequestOperationHandlerJUnitTest.java | 10 +- .../RemoveRequestOperationHandlerJUnitTest.java | 10 +- 37 files changed, 361 insertions(+), 298 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java index 8d2daf6..06603cc 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java @@ -62,6 +62,7 @@ import org.apache.geode.distributed.internal.tcpserver.TcpServer; import org.apache.geode.internal.admin.remote.DistributionLocatorId; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.sockets.TcpServerFactory; import org.apache.geode.internal.cache.wan.WANServiceProvider; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.InternalLogWriter; @@ -498,8 +499,8 @@ public class InternalLocator extends Locator implements ConnectListener { ThreadGroup group = LoggingThreadGroup.createThreadGroup("Distribution locators", logger); this.stats = new LocatorStats(); - this.server = new TcpServer(port, this.bindAddress, null, this.config, this.handler, - new DelayedPoolStatHelper(), group, this.toString(), this); + this.server = new TcpServerFactory().makeTcpServer(port, this.bindAddress, null, this.config, + this.handler, new DelayedPoolStatHelper(), group, this.toString(), this); } // Reset the file names with the correct port number if startLocatorAndDS was called with port http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java index c3d51c1..d471062 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java @@ -14,6 +14,32 @@ */ package org.apache.geode.distributed.internal.tcpserver; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.StreamCorruptedException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.URL; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.net.ssl.SSLException; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.CancelException; import org.apache.geode.DataSerializer; import org.apache.geode.SystemFailure; @@ -32,39 +58,13 @@ import org.apache.geode.internal.VersionedDataInputStream; import org.apache.geode.internal.VersionedDataOutputStream; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Acceptor; -import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl; -import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandlerLoader; import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; import org.apache.geode.internal.cache.tier.sockets.HandShake; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.net.SocketCreator; import org.apache.geode.internal.net.SocketCreatorFactory; import org.apache.geode.internal.security.SecurableCommunicationChannel; -import org.apache.logging.log4j.Logger; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.File; -import java.io.IOException; -import java.io.StreamCorruptedException; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketAddress; -import java.net.URL; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import javax.net.ssl.SSLException; /** * TCP server which listens on a port and delegates requests to a request handler. The server uses @@ -99,6 +99,11 @@ public class TcpServer { private static/* GemStoneAddition */ final Map GOSSIP_TO_GEMFIRE_VERSION_MAP = new HashMap(); + /** + * For the new client-server protocol, which ignores the usual handshake mechanism. + */ + public static final byte PROTOBUF_CLIENT_SERVER_PROTOCOL = (byte) 110; + // For test purpose only public static boolean isTesting = false; // Non-final field for testing to avoid any security holes in system. @@ -126,8 +131,10 @@ public class TcpServer { private InetAddress bind_address; private volatile boolean shuttingDown = false; // GemStoneAddition private final PoolStatHelper poolHelper; - private InternalLocator internalLocator; + private final InternalLocator internalLocator; private final TcpHandler handler; + private ClientProtocolMessageHandler messageHandler; + private PooledExecutorWithDMStats executor; private final ThreadGroup threadGroup; @@ -150,12 +157,14 @@ public class TcpServer { public TcpServer(int port, InetAddress bind_address, Properties sslConfig, DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper, - ThreadGroup threadGroup, String threadName, InternalLocator internalLocator) { + ThreadGroup threadGroup, String threadName, InternalLocator internalLocator, + ClientProtocolMessageHandler messageHandler) { this.port = port; this.bind_address = bind_address; this.handler = handler; this.poolHelper = poolHelper; this.internalLocator = internalLocator; + this.messageHandler = messageHandler; // register DSFID types first; invoked explicitly so that all message type // initializations do not happen in first deserialization on a possibly // "precious" thread @@ -365,69 +374,67 @@ public class TcpServer { short versionOrdinal; if (gossipVersion == NON_GOSSIP_REQUEST_VERSION) { - if (input.readUnsignedByte() == AcceptorImpl.PROTOBUF_CLIENT_SERVER_PROTOCOL + if (input.readUnsignedByte() == PROTOBUF_CLIENT_SERVER_PROTOCOL && Boolean.getBoolean("geode.feature-protobuf-protocol")) { - ClientProtocolMessageHandler messageHandler = ClientProtocolMessageHandlerLoader.load(); messageHandler.receiveMessage(input, socket.getOutputStream(), - new ExecutionContext(internalLocator)); + new MessageExecutionContext(internalLocator)); } else { rejectUnknownProtocolConnection(socket, gossipVersion); - return; } - } - if (gossipVersion <= getCurrentGossipVersion() - && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) { - // Create a versioned stream to remember sender's GemFire version - versionOrdinal = (short) GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion); } else { - // Close the socket. We can not accept requests from a newer version - rejectUnknownProtocolConnection(socket, gossipVersion); - return; - } - if (Version.GFE_71.compareTo(versionOrdinal) <= 0) { - // Recent versions of TcpClient will send the version ordinal - versionOrdinal = input.readShort(); - } - - if (log.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) { - log.debug("Locator reading request from " + socket.getInetAddress() + " with version " - + Version.fromOrdinal(versionOrdinal, false)); - } - input = new VersionedDataInputStream(input, Version.fromOrdinal(versionOrdinal, false)); - request = DataSerializer.readObject(input); - if (log.isDebugEnabled()) { - log.debug("Locator received request " + request + " from " + socket.getInetAddress()); - } - if (request instanceof ShutdownRequest) { - shuttingDown = true; - // Don't call shutdown from within the worker thread, see java bug #6576792. - // Closing the socket will cause our acceptor thread to shutdown the executor - this.serverSocketPortAtClose = srv_sock.getLocalPort(); - srv_sock.close(); - response = new ShutdownResponse(); - } else if (request instanceof InfoRequest) { - response = handleInfoRequest(request); - } else if (request instanceof VersionRequest) { - response = handleVersionRequest(request); - } else { - response = handler.processRequest(request); - } - - handler.endRequest(request, startTime); + if (gossipVersion <= getCurrentGossipVersion() + && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) { + // Create a versioned stream to remember sender's GemFire version + versionOrdinal = (short) GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion); + } else { + // Close the socket. We can not accept requests from a newer version + rejectUnknownProtocolConnection(socket, gossipVersion); + return; + } + if (Version.GFE_71.compareTo(versionOrdinal) <= 0) { + // Recent versions of TcpClient will send the version ordinal + versionOrdinal = input.readShort(); + } - startTime = DistributionStats.getStatTime(); - if (response != null) { - DataOutputStream output = new DataOutputStream(socket.getOutputStream()); - if (versionOrdinal != Version.CURRENT_ORDINAL) { - output = - new VersionedDataOutputStream(output, Version.fromOrdinal(versionOrdinal, false)); + if (log.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) { + log.debug("Locator reading request from " + socket.getInetAddress() + " with version " + + Version.fromOrdinal(versionOrdinal, false)); + } + input = new VersionedDataInputStream(input, Version.fromOrdinal(versionOrdinal, false)); + request = DataSerializer.readObject(input); + if (log.isDebugEnabled()) { + log.debug("Locator received request " + request + " from " + socket.getInetAddress()); + } + if (request instanceof ShutdownRequest) { + shuttingDown = true; + // Don't call shutdown from within the worker thread, see java bug #6576792. + // Closing the socket will cause our acceptor thread to shutdown the executor + this.serverSocketPortAtClose = srv_sock.getLocalPort(); + srv_sock.close(); + response = new ShutdownResponse(); + } else if (request instanceof InfoRequest) { + response = handleInfoRequest(request); + } else if (request instanceof VersionRequest) { + response = handleVersionRequest(request); + } else { + response = handler.processRequest(request); } - DataSerializer.writeObject(response, output); - output.flush(); - } - handler.endResponse(request, startTime); + handler.endRequest(request, startTime); + + startTime = DistributionStats.getStatTime(); + if (response != null) { + DataOutputStream output = new DataOutputStream(socket.getOutputStream()); + if (versionOrdinal != Version.CURRENT_ORDINAL) { + output = + new VersionedDataOutputStream(output, Version.fromOrdinal(versionOrdinal, false)); + } + DataSerializer.writeObject(response, output); + output.flush(); + } + handler.endResponse(request, startTime); + } } catch (EOFException ignore) { // client went away - ignore } catch (CancelException ignore) { http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java index 4c7a6ef..84aa66e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java @@ -75,9 +75,7 @@ import org.apache.geode.pdx.internal.TypeRegistry; */ public interface InternalCache extends Cache, Extensible<Cache>, CacheTime { - default InternalDistributedMember getMyId() { - return null; - } + InternalDistributedMember getMyId(); Collection<DiskStore> listDiskStores(); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java index 38ab73e..0ced3aa 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java @@ -30,5 +30,5 @@ import java.io.OutputStream; */ public interface ClientProtocolMessageHandler { void receiveMessage(InputStream inputStream, OutputStream outputStream, - ExecutionContext executionContext) throws IOException; + MessageExecutionContext executionContext) throws IOException; } http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandlerLoader.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandlerLoader.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandlerLoader.java deleted file mode 100644 index 1dc6129..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandlerLoader.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.internal.cache.tier.sockets; - -import java.io.IOException; -import java.net.Socket; -import java.util.Iterator; -import java.util.ServiceLoader; - -import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.internal.cache.tier.Acceptor; -import org.apache.geode.internal.cache.tier.CachedRegionHelper; -import org.apache.geode.internal.security.SecurityService; - -/** - * Creates instances of ServerConnection based on the connection mode provided. - */ -public class ClientProtocolMessageHandlerLoader { - private static ClientProtocolMessageHandler protobufProtocolHandler; - private static final Object protocolLoadLock = new Object(); - - public static ClientProtocolMessageHandler load() { - if (protobufProtocolHandler != null) { - return protobufProtocolHandler; - } - - synchronized (protocolLoadLock) { - if (protobufProtocolHandler != null) { - return protobufProtocolHandler; - } - - ServiceLoader<ClientProtocolMessageHandler> loader = - ServiceLoader.load(ClientProtocolMessageHandler.class); - Iterator<ClientProtocolMessageHandler> iterator = loader.iterator(); - - if (!iterator.hasNext()) { - throw new ServiceLoadingFailureException( - "ClientProtocolMessageHandler implementation not found in JVM"); - } - - ClientProtocolMessageHandler returnValue = iterator.next(); - - if (iterator.hasNext()) { - throw new ServiceLoadingFailureException( - "Multiple service implementations found for ClientProtocolMessageHandler"); - } - - return returnValue; - } - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ExecutionContext.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ExecutionContext.java deleted file mode 100644 index 27da205..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ExecutionContext.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.internal.cache.tier.sockets; - -import org.apache.geode.cache.Cache; -import org.apache.geode.distributed.internal.InternalLocator; - -public class ExecutionContext { - private Cache cache; - private InternalLocator locator; - - public ExecutionContext(Cache cache) { - this.cache = cache; - } - - public ExecutionContext(InternalLocator locator) { - this.locator = locator; - } - - // This throws if the cache isn't present because we know that non of the callers can take any - // reasonable action if the cache is not present - public Cache getCache() throws InvalidExecutionContextException { - if (cache != null) { - return cache; - } else { - throw new InvalidExecutionContextException( - "Execution context's cache was accessed but isn't present. Did this happen on a locator? Operations on the locator should not try to operate on a cache"); - } - } - - // This throws if the locator isn't present because we know that non of the callers can take any - // reasonable action if the locator is not present - public InternalLocator getLocator() throws InvalidExecutionContextException { - if (locator != null) { - return locator; - } else { - throw new InvalidExecutionContextException( - "Execution context's locator was accessed but isn't present. Did this happen on a server? Operations on the locator should not try to operate on a cache"); - } - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java index 8f6720e..cd1647a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java @@ -62,7 +62,7 @@ public class GenericProtocolServerConnection extends ServerConnection { authenticator.receiveMessage(inputStream, outputStream, securityManager); } else { messageHandler.receiveMessage(inputStream, outputStream, - new ExecutionContext(this.getCache())); + new MessageExecutionContext(this.getCache())); } } catch (IOException e) { logger.warn(e); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java new file mode 100644 index 0000000..1cb8c9d --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +import org.apache.geode.annotations.Experimental; +import org.apache.geode.cache.Cache; +import org.apache.geode.distributed.internal.InternalLocator; + +@Experimental +public class MessageExecutionContext { + private Cache cache; + private InternalLocator locator; + + public MessageExecutionContext(Cache cache) { + this.cache = cache; + } + + public MessageExecutionContext(InternalLocator locator) { + this.locator = locator; + } + + // This throws if the cache isn't present because we know that non of the callers can take any + // reasonable action if the cache is not present + public Cache getCache() throws InvalidExecutionContextException { + if (cache != null) { + return cache; + } else { + throw new InvalidExecutionContextException( + "Operations on the locator should not to try to operate on a cache"); + } + } + + // This throws if the locator isn't present because we know that non of the callers can take any + // reasonable action if the locator is not present + public InternalLocator getLocator() throws InvalidExecutionContextException { + if (locator != null) { + return locator; + } else { + throw new InvalidExecutionContextException( + "Operations on the server should not to try to operate on a locator"); + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageHandlerFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageHandlerFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageHandlerFactory.java new file mode 100644 index 0000000..fd261d7 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageHandlerFactory.java @@ -0,0 +1,19 @@ +package org.apache.geode.internal.cache.tier.sockets; + +import java.util.Iterator; +import java.util.ServiceLoader; + +public class MessageHandlerFactory { + public ClientProtocolMessageHandler makeMessageHandler() { + ServiceLoader<ClientProtocolMessageHandler> loader = + ServiceLoader.load(ClientProtocolMessageHandler.class); + Iterator<ClientProtocolMessageHandler> iterator = loader.iterator(); + + if (!iterator.hasNext()) { + throw new ServiceLoadingFailureException( + "There is no ClientProtocolMessageHandler implementation found in JVM"); + } + + return iterator.next(); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java index 9173f6a..d2d85f6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java @@ -24,7 +24,6 @@ import org.apache.geode.security.StreamAuthenticator; import java.io.IOException; import java.net.Socket; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import java.util.ServiceLoader; @@ -32,7 +31,7 @@ import java.util.ServiceLoader; * Creates instances of ServerConnection based on the connection mode provided. */ public class ServerConnectionFactory { - private ClientProtocolMessageHandler protobufProtocolHandler; + private ClientProtocolMessageHandler protocolHandler; private Map<String, Class<? extends StreamAuthenticator>> authenticators = null; public ServerConnectionFactory() {} @@ -49,20 +48,13 @@ public class ServerConnectionFactory { } private synchronized ClientProtocolMessageHandler initializeMessageHandler() { - if (protobufProtocolHandler != null) { - return protobufProtocolHandler; + if (protocolHandler != null) { + return protocolHandler; } - ServiceLoader<ClientProtocolMessageHandler> loader = - ServiceLoader.load(ClientProtocolMessageHandler.class); - Iterator<ClientProtocolMessageHandler> iterator = loader.iterator(); - if (!iterator.hasNext()) { - throw new ServiceLoadingFailureException( - "There is no ClientProtocolMessageHandler implementation found in JVM"); - } + protocolHandler = new MessageHandlerFactory().makeMessageHandler(); - protobufProtocolHandler = iterator.next(); - return protobufProtocolHandler; + return protocolHandler; } private StreamAuthenticator findStreamAuthenticator(String implementationID) { @@ -86,10 +78,10 @@ public class ServerConnectionFactory { } private ClientProtocolMessageHandler getClientProtocolMessageHandler() { - if (protobufProtocolHandler == null) { + if (protocolHandler == null) { initializeMessageHandler(); } - return protobufProtocolHandler; + return protocolHandler; } public ServerConnection makeServerConnection(Socket s, InternalCache c, CachedRegionHelper helper, http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java new file mode 100644 index 0000000..991ed75 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java @@ -0,0 +1,39 @@ +package org.apache.geode.internal.cache.tier.sockets; + +import java.net.InetAddress; +import java.util.Properties; + +import org.apache.geode.distributed.internal.DistributionConfigImpl; +import org.apache.geode.distributed.internal.InternalLocator; +import org.apache.geode.distributed.internal.PoolStatHelper; +import org.apache.geode.distributed.internal.tcpserver.TcpHandler; +import org.apache.geode.distributed.internal.tcpserver.TcpServer; + +public class TcpServerFactory { + private ClientProtocolMessageHandler protocolHandler; + + public TcpServerFactory() { + initializeMessageHandler(); + } + + public TcpServer makeTcpServer(int port, InetAddress bind_address, Properties sslConfig, + DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper, + ThreadGroup threadGroup, String threadName, InternalLocator internalLocator) { + + return new TcpServer(port, bind_address, sslConfig, cfg, handler, poolHelper, threadGroup, + threadName, internalLocator, protocolHandler); + } + + public synchronized ClientProtocolMessageHandler initializeMessageHandler() { + if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) { + return null; + } + if (protocolHandler != null) { + return protocolHandler; + } + + protocolHandler = new MessageHandlerFactory().makeMessageHandler(); + + return protocolHandler; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java index 802620c..e57ca83 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java @@ -35,6 +35,7 @@ import org.apache.geode.distributed.internal.tcpserver.TcpServer; import org.apache.geode.internal.AvailablePortHelper; import org.apache.geode.internal.cache.PoolStats; import org.apache.geode.internal.cache.tier.InternalClientMembership; +import org.apache.geode.internal.cache.tier.sockets.TcpServerFactory; import org.apache.geode.management.membership.ClientMembershipEvent; import org.apache.geode.management.membership.ClientMembershipListener; import org.apache.geode.test.junit.categories.ClientServerTest; @@ -303,8 +304,9 @@ public class AutoConnectionSourceImplJUnitTest { public void test_DiscoverLocators_whenOneLocatorWasShutdown() throws Exception { startFakeLocator(); int secondPort = AvailablePortHelper.getRandomAvailableTCPPort(); - TcpServer server2 = new TcpServer(secondPort, InetAddress.getLocalHost(), null, null, handler, - new FakeHelper(), Thread.currentThread().getThreadGroup(), "tcp server", null); + TcpServer server2 = + new TcpServerFactory().makeTcpServer(secondPort, InetAddress.getLocalHost(), null, null, + handler, new FakeHelper(), Thread.currentThread().getThreadGroup(), "tcp server", null); server2.start(); try { @@ -387,8 +389,8 @@ public class AutoConnectionSourceImplJUnitTest { } private void startFakeLocator() throws UnknownHostException, IOException, InterruptedException { - server = new TcpServer(port, InetAddress.getLocalHost(), null, null, handler, new FakeHelper(), - Thread.currentThread().getThreadGroup(), "Tcp Server", null); + server = new TcpServerFactory().makeTcpServer(port, InetAddress.getLocalHost(), null, null, + handler, new FakeHelper(), Thread.currentThread().getThreadGroup(), "Tcp Server", null); server.start(); Thread.sleep(500); } http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java index 229fbb9..c58eb31 100644 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java @@ -138,7 +138,8 @@ public class TCPServerSSLJUnitTest { public DummyTcpServer(int port, InetAddress bind_address, Properties sslConfig, DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper, ThreadGroup threadGroup, String threadName) { - super(port, bind_address, sslConfig, cfg, handler, poolHelper, threadGroup, threadName, null); + super(port, bind_address, sslConfig, cfg, handler, poolHelper, threadGroup, threadName, null, + null); if (cfg == null) { cfg = new DistributionConfigImpl(sslConfig); } http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java index 9d20e8c..d02051f 100644 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java @@ -26,6 +26,7 @@ import org.apache.geode.distributed.internal.ClusterConfigurationService; import org.apache.geode.distributed.internal.DistributionConfigImpl; import org.apache.geode.distributed.internal.PoolStatHelper; import org.apache.geode.internal.AvailablePort; +import org.apache.geode.internal.cache.tier.sockets.TcpServerFactory; import org.apache.geode.internal.net.SocketCreatorFactory; import org.apache.geode.test.junit.categories.IntegrationTest; import org.apache.geode.test.junit.categories.MembershipTest; @@ -68,8 +69,8 @@ public class TcpServerJUnitTest { port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); stats = new SimpleStats(); - server = new TcpServer(port, localhost, new Properties(), null, handler, stats, - Thread.currentThread().getThreadGroup(), "server thread", null); + server = new TcpServerFactory().makeTcpServer(port, localhost, new Properties(), null, handler, + stats, Thread.currentThread().getThreadGroup(), "server thread", null); server.start(); } http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactoryTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactoryTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactoryTest.java new file mode 100644 index 0000000..7d40d01 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactoryTest.java @@ -0,0 +1,19 @@ +package org.apache.geode.internal.cache.tier.sockets; + +import static org.junit.Assert.*; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.distributed.internal.tcpserver.TcpServer; +import org.apache.geode.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class TcpServerFactoryTest { + @Test + public void createsATcpServer() { + TcpServerFactory factory = new TcpServerFactory(); + TcpServer server = factory.makeTcpServer(80, null, null, null, null, null, null, null, null); + assertTrue(server != null); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-core/src/test/java/org/apache/geode/test/dunit/standalone/DUnitLauncher.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/standalone/DUnitLauncher.java b/geode-core/src/test/java/org/apache/geode/test/dunit/standalone/DUnitLauncher.java index b35270e..fd88abf 100644 --- a/geode-core/src/test/java/org/apache/geode/test/dunit/standalone/DUnitLauncher.java +++ b/geode-core/src/test/java/org/apache/geode/test/dunit/standalone/DUnitLauncher.java @@ -297,6 +297,8 @@ public class DUnitLauncher { // able to do so successfully anyway p.setProperty(DISABLE_AUTO_RECONNECT, "true"); + System.setProperty("geode.feature-protobuf-protocol", "true"); + try { Locator.startLocatorAndDS(0, locatorLogFile, p); InternalLocator internalLocator = (InternalLocator) Locator.getLocator(); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java index 5d9012f..ca3548b 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java @@ -15,7 +15,7 @@ package org.apache.geode.protocol.operations; import org.apache.geode.annotations.Experimental; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.protobuf.ProtobufOpsProcessor; import org.apache.geode.protocol.protobuf.Result; @@ -34,6 +34,6 @@ public interface OperationHandler<Req, Resp> { * indicated on the provided cache, and return a response. */ Result<Resp> process(SerializationService serializationService, Req request, - ExecutionContext executionContext) throws InvalidExecutionContextException; + MessageExecutionContext executionContext) throws InvalidExecutionContextException; } http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java index 76f81e7..3619e0d 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java @@ -15,7 +15,7 @@ package org.apache.geode.protocol.protobuf; import org.apache.geode.annotations.Experimental; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.protobuf.registry.OperationContextRegistry; import org.apache.geode.protocol.protobuf.utilities.ProtobufResponseUtilities; @@ -37,7 +37,8 @@ public class ProtobufOpsProcessor { this.operationContextRegistry = operationContextRegistry; } - public ClientProtocol.Response process(ClientProtocol.Request request, ExecutionContext context) { + public ClientProtocol.Response process(ClientProtocol.Request request, + MessageExecutionContext context) { ClientProtocol.Request.RequestAPICase requestType = request.getRequestAPICase(); OperationContext operationContext = operationContextRegistry.getOperationContext(requestType); ClientProtocol.Response.Builder builder; http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java index d04e49e..accb899 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java @@ -21,7 +21,7 @@ import java.io.OutputStream; import org.apache.geode.annotations.Experimental; import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.protocol.exception.InvalidProtocolMessageException; import org.apache.geode.protocol.protobuf.registry.OperationContextRegistry; import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer; @@ -46,7 +46,7 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler { @Override public void receiveMessage(InputStream inputStream, OutputStream outputStream, - ExecutionContext executionContext) throws IOException { + MessageExecutionContext executionContext) throws IOException { try { processOneMessage(inputStream, outputStream, executionContext); } catch (InvalidProtocolMessageException e) { @@ -55,7 +55,8 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler { } private void processOneMessage(InputStream inputStream, OutputStream outputStream, - ExecutionContext executionContext) throws InvalidProtocolMessageException, IOException { + MessageExecutionContext executionContext) + throws InvalidProtocolMessageException, IOException { ClientProtocol.Message message = protobufProtocolSerializer.deserialize(inputStream); if (message == null) { throw new EOFException("Tried to deserialize protobuf message at EOF"); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java index 75274c1..77cef67 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java @@ -20,7 +20,7 @@ import java.util.Set; import org.apache.geode.annotations.Experimental; import org.apache.geode.cache.Region; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.BasicTypes; @@ -41,7 +41,7 @@ public class GetAllRequestOperationHandler @Override public Result<RegionAPI.GetAllResponse> process(SerializationService serializationService, - RegionAPI.GetAllRequest request, ExecutionContext executionContext) + RegionAPI.GetAllRequest request, MessageExecutionContext executionContext) throws InvalidExecutionContextException { String regionName = request.getRegionName(); Region region = executionContext.getCache().getRegion(regionName); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java index e7c18cd..c1c3e99 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java @@ -21,7 +21,7 @@ import java.util.stream.Collectors; import org.apache.geode.annotations.Experimental; import org.apache.geode.distributed.internal.InternalLocator; import org.apache.geode.distributed.internal.ServerLocation; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.BasicTypes; @@ -37,13 +37,17 @@ public class GetAvailableServersOperationHandler implements @Override public Result<ServerAPI.GetAvailableServersResponse> process( SerializationService serializationService, ServerAPI.GetAvailableServersRequest request, - ExecutionContext executionContext) throws InvalidExecutionContextException { + MessageExecutionContext executionContext) throws InvalidExecutionContextException { InternalLocator locator = executionContext.getLocator(); - ArrayList servers2 = locator.getServerLocatorAdvisee().getLoadSnapshot().getServers(null); + ArrayList serversFromSnapshot = + locator.getServerLocatorAdvisee().getLoadSnapshot().getServers(null); + if (serversFromSnapshot == null) { + serversFromSnapshot = new ArrayList(); + } - Collection<BasicTypes.Server> servers = (Collection<BasicTypes.Server>) servers2.stream() - .map(serverLocation -> getServerProtobufMessage((ServerLocation) serverLocation)) + Collection<BasicTypes.Server> servers = (Collection<BasicTypes.Server>) serversFromSnapshot + .stream().map(serverLocation -> getServerProtobufMessage((ServerLocation) serverLocation)) .collect(Collectors.toList()); ServerAPI.GetAvailableServersResponse.Builder builder = ServerAPI.GetAvailableServersResponse.newBuilder().addAllServers(servers); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java index 53898ed..e2edfed 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java @@ -18,7 +18,7 @@ import java.util.Set; import org.apache.geode.annotations.Experimental; import org.apache.geode.cache.Region; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.RegionAPI; @@ -33,7 +33,7 @@ public class GetRegionNamesRequestOperationHandler @Override public Result<RegionAPI.GetRegionNamesResponse> process(SerializationService serializationService, - RegionAPI.GetRegionNamesRequest request, ExecutionContext executionContext) + RegionAPI.GetRegionNamesRequest request, MessageExecutionContext executionContext) throws InvalidExecutionContextException { Set<Region<?, ?>> regions = executionContext.getCache().rootRegions(); return Success.of(ProtobufResponseUtilities.createGetRegionNamesResponse(regions)); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java index 007f96b..5a8d4d3 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java @@ -16,7 +16,7 @@ package org.apache.geode.protocol.protobuf.operations; import org.apache.geode.annotations.Experimental; import org.apache.geode.cache.Region; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.BasicTypes; @@ -35,7 +35,7 @@ public class GetRegionRequestOperationHandler @Override public Result<RegionAPI.GetRegionResponse> process(SerializationService serializationService, - RegionAPI.GetRegionRequest request, ExecutionContext executionContext) + RegionAPI.GetRegionRequest request, MessageExecutionContext executionContext) throws InvalidExecutionContextException { String regionName = request.getRegionName(); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java index 8f0fef7..504189e 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java @@ -16,7 +16,7 @@ package org.apache.geode.protocol.protobuf.operations; import org.apache.geode.annotations.Experimental; import org.apache.geode.cache.Region; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.BasicTypes; @@ -37,7 +37,7 @@ public class GetRequestOperationHandler @Override public Result<RegionAPI.GetResponse> process(SerializationService serializationService, - RegionAPI.GetRequest request, ExecutionContext executionContext) + RegionAPI.GetRequest request, MessageExecutionContext executionContext) throws InvalidExecutionContextException { String regionName = request.getRegionName(); Region region = executionContext.getCache().getRegion(regionName); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java index e0ebc41..99c7766 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java @@ -22,7 +22,7 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.annotations.Experimental; import org.apache.geode.cache.Region; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.BasicTypes; @@ -44,7 +44,7 @@ public class PutAllRequestOperationHandler @Override public Result<RegionAPI.PutAllResponse> process(SerializationService serializationService, - RegionAPI.PutAllRequest putAllRequest, ExecutionContext executionContext) + RegionAPI.PutAllRequest putAllRequest, MessageExecutionContext executionContext) throws InvalidExecutionContextException { Region region = executionContext.getCache().getRegion(putAllRequest.getRegionName()); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java index cf5afb4..e94127b 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java @@ -16,7 +16,7 @@ package org.apache.geode.protocol.protobuf.operations; import org.apache.geode.annotations.Experimental; import org.apache.geode.cache.Region; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.BasicTypes; @@ -37,7 +37,7 @@ public class PutRequestOperationHandler @Override public Result<RegionAPI.PutResponse> process(SerializationService serializationService, - RegionAPI.PutRequest request, ExecutionContext executionContext) + RegionAPI.PutRequest request, MessageExecutionContext executionContext) throws InvalidExecutionContextException { String regionName = request.getRegionName(); Region region = executionContext.getCache().getRegion(regionName); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java index 052efcf..94e3504 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java @@ -19,7 +19,7 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.annotations.Experimental; import org.apache.geode.cache.Region; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.Failure; @@ -40,7 +40,7 @@ public class RemoveRequestOperationHandler @Override public Result<RegionAPI.RemoveResponse> process(SerializationService serializationService, - RegionAPI.RemoveRequest request, ExecutionContext executionContext) + RegionAPI.RemoveRequest request, MessageExecutionContext executionContext) throws InvalidExecutionContextException { String regionName = request.getRegionName(); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionJUnitTest.java index 799c55c..14d8c44 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionJUnitTest.java @@ -15,12 +15,22 @@ package org.apache.geode.protocol; +import static org.apache.geode.distributed.ConfigurationProperties.DISABLE_AUTO_RECONNECT; +import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION; +import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION; +import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; +import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; +import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT; +import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.net.Socket; +import java.util.Properties; import org.junit.Before; import org.junit.Rule; @@ -29,7 +39,11 @@ import org.junit.contrib.java.lang.system.RestoreSystemProperties; import org.junit.experimental.categories.Category; import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.distributed.Locator; +import org.apache.geode.distributed.internal.InternalLocator; +import org.apache.geode.internal.AvailablePort; import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl; import org.apache.geode.protocol.exception.InvalidProtocolMessageException; import org.apache.geode.protocol.protobuf.ClientProtocol; import org.apache.geode.protocol.protobuf.ProtocolErrorCode; @@ -39,6 +53,7 @@ import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities; import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities; import org.apache.geode.test.dunit.DistributedTestUtils; import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.LogWriterUtils; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; import org.apache.geode.test.junit.categories.DistributedTest; @@ -48,6 +63,7 @@ public class RoundTripLocatorConnectionJUnitTest extends JUnit4CacheTestCase { private Socket socket; private DataOutputStream dataOutputStream; + private Locator locator; @Rule public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); @@ -63,7 +79,8 @@ public class RoundTripLocatorConnectionJUnitTest extends JUnit4CacheTestCase { socket = new Socket(host.getHostName(), locatorPort); dataOutputStream = new DataOutputStream(socket.getOutputStream()); dataOutputStream.writeInt(0); - dataOutputStream.writeByte(110); + // Using the constant from AcceptorImpl to ensure that magic byte is the same + dataOutputStream.writeByte(AcceptorImpl.PROTOBUF_CLIENT_SERVER_PROTOCOL); } @Test http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java index 2185b15..16eb48b 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java @@ -26,7 +26,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.test.junit.categories.UnitTest; @Category(UnitTest.class) @@ -39,6 +39,6 @@ public class ProtobufStreamProcessorTest { ProtobufStreamProcessor protobufStreamProcessor = new ProtobufStreamProcessor(); InternalCache mockInternalCache = mock(InternalCache.class); protobufStreamProcessor.receiveMessage(inputStream, outputStream, - new ExecutionContext(mockInternalCache)); + new MessageExecutionContext(mockInternalCache)); } } http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java index f4d098c..64ee50b 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java @@ -27,7 +27,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.cache.Region; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.protobuf.BasicTypes; import org.apache.geode.protocol.protobuf.RegionAPI; @@ -82,7 +82,7 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni throws CodecAlreadyRegisteredForTypeException, UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException, InvalidExecutionContextException { Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub, - generateTestRequest(true), new ExecutionContext(cacheStub)); + generateTestRequest(true), new MessageExecutionContext(cacheStub)); Assert.assertTrue(result instanceof Success); @@ -102,7 +102,7 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni public void processReturnsNoEntriesForNoKeysRequested() throws UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException, InvalidExecutionContextException { Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub, - generateTestRequest(false), new ExecutionContext(cacheStub)); + generateTestRequest(false), new MessageExecutionContext(cacheStub)); Assert.assertTrue(result instanceof Success); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java index cff6ddc..406beea 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java @@ -18,7 +18,8 @@ import org.apache.geode.distributed.internal.InternalLocator; import org.apache.geode.distributed.internal.LocatorLoadSnapshot; import org.apache.geode.distributed.internal.ServerLocation; import org.apache.geode.distributed.internal.ServerLocator; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.protocol.protobuf.BasicTypes; import org.apache.geode.protocol.protobuf.Result; import org.apache.geode.protocol.protobuf.ServerAPI; @@ -31,6 +32,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.util.ArrayList; +import java.util.HashMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -41,13 +43,14 @@ import static org.mockito.Mockito.when; @Category(UnitTest.class) public class GetAvailableServersOperationHandlerJUnitTest extends OperationHandlerJUnitTest { - public static final String HOSTNAME_1 = "hostname1"; - public static final int PORT_1 = 12345; + private final String HOSTNAME_1 = "hostname1"; + private final int PORT_1 = 12345; - public static final String HOSTNAME_2 = "hostname2"; - public static final int PORT_2 = 23456; + private final String HOSTNAME_2 = "hostname2"; + private final int PORT_2 = 23456; private InternalLocator internalLocatorMock; + private LocatorLoadSnapshot locatorLoadSnapshot; @Before public void setUp() throws Exception { @@ -56,27 +59,44 @@ public class GetAvailableServersOperationHandlerJUnitTest extends OperationHandl operationHandler = new GetAvailableServersOperationHandler(); internalLocatorMock = mock(InternalLocator.class); ServerLocator serverLocatorAdviseeMock = mock(ServerLocator.class); - LocatorLoadSnapshot locatorLoadSnapshot = mock(LocatorLoadSnapshot.class); - ArrayList<Object> serverList = new ArrayList<>(); - serverList.add(new ServerLocation(HOSTNAME_1, PORT_1)); - serverList.add(new ServerLocation(HOSTNAME_2, PORT_2)); + locatorLoadSnapshot = mock(LocatorLoadSnapshot.class); + when(internalLocatorMock.getServerLocatorAdvisee()).thenReturn(serverLocatorAdviseeMock); when(serverLocatorAdviseeMock.getLoadSnapshot()).thenReturn(locatorLoadSnapshot); - when(locatorLoadSnapshot.getServers(null)).thenReturn(serverList); } @Test public void testServerReturnedFromHandler() throws Exception { + ArrayList<Object> serverList = new ArrayList<>(); + serverList.add(new ServerLocation(HOSTNAME_1, PORT_1)); + serverList.add(new ServerLocation(HOSTNAME_2, PORT_2)); + when(locatorLoadSnapshot.getServers(null)).thenReturn(serverList); + ServerAPI.GetAvailableServersRequest getAvailableServersRequest = ProtobufRequestUtilities.createGetAvailableServersRequest(); Result operationHandlerResult = operationHandler.process(serializationServiceStub, - getAvailableServersRequest, new ExecutionContext(internalLocatorMock)); + getAvailableServersRequest, new MessageExecutionContext(internalLocatorMock)); assertTrue(operationHandlerResult instanceof Success); ValidateGetAvailableServersResponse( (GetAvailableServersResponse) operationHandlerResult.getMessage()); } + @Test + public void testWhenServersFromSnapshotAreNullReturnsEmtpy() + throws InvalidExecutionContextException { + when(locatorLoadSnapshot.getServers(any())).thenReturn(null); + + ServerAPI.GetAvailableServersRequest getAvailableServersRequest = + ProtobufRequestUtilities.createGetAvailableServersRequest(); + Result operationHandlerResult = operationHandler.process(serializationServiceStub, + getAvailableServersRequest, new MessageExecutionContext(internalLocatorMock)); + assertTrue(operationHandlerResult instanceof Success); + GetAvailableServersResponse availableServersResponse = + (GetAvailableServersResponse) operationHandlerResult.getMessage(); + assertEquals(0, availableServersResponse.getServersCount()); + } + private void ValidateGetAvailableServersResponse( GetAvailableServersResponse getAvailableServersResponse) { assertEquals(2, getAvailableServersResponse.getServersCount()); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java index fd84d41..2fcf575 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java @@ -27,7 +27,7 @@ import org.junit.experimental.categories.Category; import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.protobuf.RegionAPI; import org.apache.geode.protocol.protobuf.Result; @@ -67,7 +67,7 @@ public class GetRegionNamesRequestOperationHandlerJUnitTest extends OperationHan CodecNotRegisteredForTypeException, InvalidExecutionContextException { Result<RegionAPI.GetRegionNamesResponse> result = operationHandler.process( serializationServiceStub, ProtobufRequestUtilities.createGetRegionNamesRequest(), - new ExecutionContext(cacheStub)); + new MessageExecutionContext(cacheStub)); Assert.assertTrue(result instanceof Success); RegionAPI.GetRegionNamesResponse getRegionsResponse = result.getMessage(); @@ -93,7 +93,7 @@ public class GetRegionNamesRequestOperationHandlerJUnitTest extends OperationHan .thenReturn(Collections.unmodifiableSet(new HashSet<Region<String, String>>())); Result<RegionAPI.GetRegionNamesResponse> result = operationHandler.process( serializationServiceStub, ProtobufRequestUtilities.createGetRegionNamesRequest(), - new ExecutionContext(emptyCache)); + new MessageExecutionContext(emptyCache)); Assert.assertTrue(result instanceof Success); RegionAPI.GetRegionNamesResponse getRegionsResponse = result.getMessage(); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java index 6762f66..60d4985 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java @@ -19,7 +19,7 @@ import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.Scope; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.MessageUtil; import org.apache.geode.protocol.protobuf.BasicTypes; @@ -75,7 +75,7 @@ public class GetRegionRequestOperationHandlerJUnitTest extends OperationHandlerJ Result<RegionAPI.GetRegionResponse> result = operationHandler.process(serializationServiceStub, - MessageUtil.makeGetRegionRequest(TEST_REGION1), new ExecutionContext(cacheStub)); + MessageUtil.makeGetRegionRequest(TEST_REGION1), new MessageExecutionContext(cacheStub)); RegionAPI.GetRegionResponse response = result.getMessage(); BasicTypes.Region region = response.getRegion(); Assert.assertEquals(TEST_REGION1, region.getName()); @@ -100,7 +100,8 @@ public class GetRegionRequestOperationHandlerJUnitTest extends OperationHandlerJ .thenReturn(Collections.unmodifiableSet(new HashSet<Region<String, String>>())); String unknownRegionName = "UNKNOWN_REGION"; Result<RegionAPI.GetRegionResponse> result = operationHandler.process(serializationServiceStub, - MessageUtil.makeGetRegionRequest(unknownRegionName), new ExecutionContext(emptyCache)); + MessageUtil.makeGetRegionRequest(unknownRegionName), + new MessageExecutionContext(emptyCache)); Assert.assertTrue(result instanceof Failure); Assert.assertEquals(ProtocolErrorCode.REGION_NOT_FOUND.codeValue, result.getErrorMessage().getError().getErrorCode()); http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java index af35f6b..6885666 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java @@ -16,7 +16,7 @@ package org.apache.geode.protocol.protobuf.operations; import com.google.protobuf.ByteString; import org.apache.geode.cache.Region; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.protobuf.BasicTypes; import org.apache.geode.protocol.protobuf.Failure; @@ -75,7 +75,7 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe CodecNotRegisteredForTypeException, InvalidExecutionContextException { RegionAPI.GetRequest getRequest = generateTestRequest(false, false, false); Result<RegionAPI.GetResponse> result = operationHandler.process(serializationServiceStub, - getRequest, new ExecutionContext(cacheStub)); + getRequest, new MessageExecutionContext(cacheStub)); Assert.assertTrue(result instanceof Success); Assert.assertEquals(BasicTypes.EncodedValue.ValueCase.STRINGRESULT, @@ -90,7 +90,7 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe CodecNotRegisteredForTypeException, InvalidExecutionContextException { RegionAPI.GetRequest getRequest = generateTestRequest(true, false, false); Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub, - getRequest, new ExecutionContext(cacheStub)); + getRequest, new MessageExecutionContext(cacheStub)); Assert.assertTrue(response instanceof Failure); Assert.assertEquals(ProtocolErrorCode.REGION_NOT_FOUND.codeValue, @@ -103,7 +103,7 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe CodecNotRegisteredForTypeException, InvalidExecutionContextException { RegionAPI.GetRequest getRequest = generateTestRequest(false, true, false); Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub, - getRequest, new ExecutionContext(cacheStub)); + getRequest, new MessageExecutionContext(cacheStub)); Assert.assertTrue(response instanceof Success); } @@ -114,7 +114,7 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe CodecNotRegisteredForTypeException, InvalidExecutionContextException { RegionAPI.GetRequest getRequest = generateTestRequest(false, false, true); Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub, - getRequest, new ExecutionContext(cacheStub)); + getRequest, new MessageExecutionContext(cacheStub)); Assert.assertTrue(response instanceof Success); } @@ -136,7 +136,7 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe RegionAPI.GetRequest getRequest = ProtobufRequestUtilities.createGetRequest(TEST_REGION, encodedKey).getGetRequest(); Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub, - getRequest, new ExecutionContext(cacheStub)); + getRequest, new MessageExecutionContext(cacheStub)); Assert.assertTrue(response instanceof Failure); Assert.assertEquals(ProtocolErrorCode.VALUE_ENCODING_ERROR.codeValue, http://git-wip-us.apache.org/repos/asf/geode/blob/0eb320fa/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java index d3fff49..955013f 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java @@ -15,7 +15,7 @@ package org.apache.geode.protocol.protobuf.operations; import org.apache.geode.cache.Region; -import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.protobuf.BasicTypes; import org.apache.geode.protocol.protobuf.RegionAPI; @@ -74,7 +74,7 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler(); Result<RegionAPI.PutAllResponse> result = operationHandler.process(serializationServiceStub, - generateTestRequest(false, true), new ExecutionContext(cacheStub)); + generateTestRequest(false, true), new MessageExecutionContext(cacheStub)); Assert.assertTrue(result instanceof Success); @@ -88,7 +88,7 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler(); Result<RegionAPI.PutAllResponse> result = operationHandler.process(serializationServiceStub, - generateTestRequest(true, true), new ExecutionContext(cacheStub)); + generateTestRequest(true, true), new MessageExecutionContext(cacheStub)); assertTrue(result instanceof Success); verify(regionMock).put(TEST_KEY1, TEST_VALUE1); @@ -107,7 +107,7 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler(); Result<RegionAPI.PutAllResponse> result = operationHandler.process(serializationServiceStub, - generateTestRequest(false, false), new ExecutionContext(cacheStub)); + generateTestRequest(false, false), new MessageExecutionContext(cacheStub)); assertTrue(result instanceof Success);