GEODE-3406: Locator accepts Protobuf requests Also addresses GEODE-3400, GEODE-3399 This allows the locator to respond to Protobuf requests. Currently it will only be able to respond to getAvailableServers.
To enable this we are introducing a new value of "0" that will be sent in place of the Gossip version. After it we expect the same magic byte ("110") as in AcceptorImpl. This also is gated by the `geode.feature-protobuf-protocol` system property. The getAvailableServers request handler now uses the locator directly, since we are on the locator. Signed-off-by: Brian Rowe <br...@pivotal.io> Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/530f48f3 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/530f48f3 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/530f48f3 Branch: refs/heads/develop Commit: 530f48f35a96c4f8af7e51ed03b1ee2e5e150ebd Parents: be45511 Author: Alexander Murmann <amurm...@pivotal.io> Authored: Mon Aug 14 15:08:14 2017 -0700 Committer: Hitesh Khamesra <hkhame...@pivotal.io> Committed: Tue Aug 22 10:52:50 2017 -0700 ---------------------------------------------------------------------- .../distributed/internal/InternalLocator.java | 4 +- .../distributed/internal/ServerLocator.java | 4 + .../internal/tcpserver/TcpServer.java | 77 +++++++---- .../geode/internal/cache/InternalCache.java | 4 +- .../ClientProtoclMessageHandlerLoader.java | 64 +++++++++ .../sockets/ClientProtocolMessageHandler.java | 7 +- .../ClientProtocolMessageHandlerLoader.java | 64 +++++++++ .../cache/tier/sockets/ExecutionContext.java | 54 ++++++++ .../GenericProtocolServerConnection.java | 3 +- .../InvalidExecutionContextException.java | 33 +++++ .../AutoConnectionSourceImplJUnitTest.java | 8 +- .../tcpserver/TCPServerSSLJUnitTest.java | 2 +- .../internal/tcpserver/TcpServerJUnitTest.java | 2 +- .../protocol/operations/OperationHandler.java | 6 +- .../protocol/protobuf/ProtobufOpsProcessor.java | 17 ++- .../protobuf/ProtobufStreamProcessor.java | 29 ++-- .../protocol/protobuf/ProtocolErrorCode.java | 1 + .../GetAllRequestOperationHandler.java | 8 +- .../GetAvailableServersOperationHandler.java | 65 ++------- .../GetRegionNamesRequestOperationHandler.java | 8 +- .../GetRegionRequestOperationHandler.java | 8 +- .../operations/GetRequestOperationHandler.java | 8 +- .../PutAllRequestOperationHandler.java | 8 +- .../operations/PutRequestOperationHandler.java | 8 +- .../RemoveRequestOperationHandler.java | 9 +- .../protocol/GetAvailableServersDUnitTest.java | 108 --------------- .../RoundTripLocatorConnectionJUnitTest.java | 132 +++++++++++++++++++ .../protobuf/ProtobufStreamProcessorTest.java | 4 +- .../GetAllRequestOperationHandlerJUnitTest.java | 18 +-- ...ailableServersOperationHandlerJUnitTest.java | 97 ++++---------- ...onNamesRequestOperationHandlerJUnitTest.java | 26 ++-- ...tRegionRequestOperationHandlerJUnitTest.java | 16 ++- .../GetRequestOperationHandlerJUnitTest.java | 33 ++--- .../PutAllRequestOperationHandlerJUnitTest.java | 13 +- .../PutRequestOperationHandlerJUnitTest.java | 39 +++--- .../RemoveRequestOperationHandlerJUnitTest.java | 27 ++-- 36 files changed, 618 insertions(+), 396 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java index 4725518..8d2daf6 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java @@ -315,7 +315,6 @@ public class InternalLocator extends Locator implements ConnectListener { // TODO:GEODE-1243: this.server is now a TcpServer and it should store or return its non-zero // port in a variable to use here - try { newLocator.startPeerLocation(startDistributedSystem); if (startDistributedSystem) { @@ -500,7 +499,7 @@ public class InternalLocator extends Locator implements ConnectListener { this.stats = new LocatorStats(); this.server = new TcpServer(port, this.bindAddress, null, this.config, this.handler, - new DelayedPoolStatHelper(), group, this.toString()); + new DelayedPoolStatHelper(), group, this.toString(), this); } // Reset the file names with the correct port number if startLocatorAndDS was called with port @@ -636,7 +635,6 @@ public class InternalLocator extends Locator implements ConnectListener { */ private void startDistributedSystem() throws UnknownHostException { InternalDistributedSystem existing = InternalDistributedSystem.getConnectedInstance(); - if (existing != null) { // LOG: changed from config to info logger.info(LocalizedMessage http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java index fb66b4c..27c557c 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java @@ -103,6 +103,10 @@ public class ServerLocator implements TcpHandler, DistributionAdvisee { this.stats = null; } + public LocatorLoadSnapshot getLoadSnapshot() { + return loadSnapshot; + } + public ServerLocator(int port, InetAddress bindAddress, String hostNameForClients, File logFile, ProductUseLog productUseLogWriter, String memberName, InternalDistributedSystem ds, LocatorStats stats) throws IOException { http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java index 976f504..c3d51c1 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java @@ -22,6 +22,7 @@ import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.DistributionConfigImpl; import org.apache.geode.distributed.internal.DistributionStats; import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.InternalLocator; import org.apache.geode.distributed.internal.PoolStatHelper; import org.apache.geode.distributed.internal.PooledExecutorWithDMStats; import org.apache.geode.internal.DSFIDFactory; @@ -31,6 +32,10 @@ import org.apache.geode.internal.VersionedDataInputStream; import org.apache.geode.internal.VersionedDataOutputStream; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Acceptor; +import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl; +import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandlerLoader; +import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; import org.apache.geode.internal.cache.tier.sockets.HandShake; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.net.SocketCreator; @@ -49,7 +54,6 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; import java.net.URL; -import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -78,6 +82,7 @@ public class TcpServer { * <p> * This should be incremented if the gossip message structures change * <p> + * 0 - special indicator of a non-gossip message from a client<br> * 1000 - gemfire 5.5 - using java serialization<br> * 1001 - 5.7 - using DataSerializable and supporting server locator messages.<br> * 1002 - 7.1 - sending GemFire version along with GOSSIP_VERSION in each request. @@ -86,6 +91,7 @@ public class TcpServer { * version number */ public final static int GOSSIPVERSION = 1002; + public final static int NON_GOSSIP_REQUEST_VERSION = 0; // Don't change it ever. We did NOT send GemFire version in a Gossip request till 1001 version. // This GOSSIPVERSION is used in _getVersionForAddress request for getting GemFire version of a // GossipServer. @@ -120,6 +126,7 @@ public class TcpServer { private InetAddress bind_address; private volatile boolean shuttingDown = false; // GemStoneAddition private final PoolStatHelper poolHelper; + private InternalLocator internalLocator; private final TcpHandler handler; private PooledExecutorWithDMStats executor; @@ -143,11 +150,12 @@ public class TcpServer { public TcpServer(int port, InetAddress bind_address, Properties sslConfig, DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper, - ThreadGroup threadGroup, String threadName) { + ThreadGroup threadGroup, String threadName, InternalLocator internalLocator) { this.port = port; this.bind_address = bind_address; this.handler = handler; this.poolHelper = poolHelper; + this.internalLocator = internalLocator; // register DSFID types first; invoked explicitly so that all message type // initializations do not happen in first deserialization on a possibly // "precious" thread @@ -334,42 +342,46 @@ public class TcpServer { * fix for bug 33711 - client requests are spun off to another thread for processing. Requests are * synchronized in processGossip. */ - private void processRequest(final Socket sock) { + private void processRequest(final Socket socket) { executor.execute(() -> { long startTime = DistributionStats.getStatTime(); DataInputStream input = null; Object request, response; try { - sock.setSoTimeout(READ_TIMEOUT); - getSocketCreator().configureServerSSLSocket(sock); + socket.setSoTimeout(READ_TIMEOUT); + getSocketCreator().configureServerSSLSocket(socket); try { - input = new DataInputStream(sock.getInputStream()); + input = new DataInputStream(socket.getInputStream()); } catch (StreamCorruptedException e) { // Some garbage can be left on the socket stream // if a peer disappears at exactly the wrong moment. log.debug("Discarding illegal request from " - + (sock.getInetAddress().getHostAddress() + ":" + sock.getPort()), e); + + (socket.getInetAddress().getHostAddress() + ":" + socket.getPort()), e); return; } - int gossipVersion = readGossipVersion(sock, input); + int gossipVersion = readGossipVersion(socket, input); short versionOrdinal; + if (gossipVersion == NON_GOSSIP_REQUEST_VERSION) { + if (input.readUnsignedByte() == AcceptorImpl.PROTOBUF_CLIENT_SERVER_PROTOCOL + && Boolean.getBoolean("geode.feature-protobuf-protocol")) { + ClientProtocolMessageHandler messageHandler = ClientProtocolMessageHandlerLoader.load(); + messageHandler.receiveMessage(input, socket.getOutputStream(), + new ExecutionContext(internalLocator)); + } else { + rejectUnknownProtocolConnection(socket, gossipVersion); + return; + } + } if (gossipVersion <= getCurrentGossipVersion() && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) { // Create a versioned stream to remember sender's GemFire version versionOrdinal = (short) GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion); } else { // Close the socket. We can not accept requests from a newer version - try { - sock.getOutputStream().write("unknown protocol version".getBytes()); - sock.getOutputStream().flush(); - } catch (IOException e) { - log.debug( - "exception in sending reply to process using unknown protocol " + gossipVersion, e); - } - sock.close(); + rejectUnknownProtocolConnection(socket, gossipVersion); return; } if (Version.GFE_71.compareTo(versionOrdinal) <= 0) { @@ -378,13 +390,13 @@ public class TcpServer { } if (log.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) { - log.debug("Locator reading request from " + sock.getInetAddress() + " with version " + log.debug("Locator reading request from " + socket.getInetAddress() + " with version " + Version.fromOrdinal(versionOrdinal, false)); } input = new VersionedDataInputStream(input, Version.fromOrdinal(versionOrdinal, false)); request = DataSerializer.readObject(input); if (log.isDebugEnabled()) { - log.debug("Locator received request " + request + " from " + sock.getInetAddress()); + log.debug("Locator received request " + request + " from " + socket.getInetAddress()); } if (request instanceof ShutdownRequest) { shuttingDown = true; @@ -405,7 +417,7 @@ public class TcpServer { startTime = DistributionStats.getStatTime(); if (response != null) { - DataOutputStream output = new DataOutputStream(sock.getOutputStream()); + DataOutputStream output = new DataOutputStream(socket.getOutputStream()); if (versionOrdinal != Version.CURRENT_ORDINAL) { output = new VersionedDataOutputStream(output, Version.fromOrdinal(versionOrdinal, false)); @@ -422,19 +434,19 @@ public class TcpServer { // ignore } catch (ClassNotFoundException ex) { String sender = null; - if (sock != null) { - sender = sock.getInetAddress().getHostAddress(); + if (socket != null) { + sender = socket.getInetAddress().getHostAddress(); } log.info("Unable to process request from " + sender + " exception=" + ex.getMessage()); } catch (Exception ex) { String sender = null; - if (sock != null) { - sender = sock.getInetAddress().getHostAddress(); + if (socket != null) { + sender = socket.getInetAddress().getHostAddress(); } if (ex instanceof IOException) { // IOException could be caused by a client failure. Don't // log with severe. - if (!sock.isClosed()) { + if (!socket.isClosed()) { log.info("Exception in processing request from " + sender, ex); } } else { @@ -447,8 +459,8 @@ public class TcpServer { } catch (Throwable ex) { SystemFailure.checkFailure(); String sender = null; - if (sock != null) { - sender = sock.getInetAddress().getHostAddress(); + if (socket != null) { + sender = socket.getInetAddress().getHostAddress(); } try { log.fatal("Exception in processing request from " + sender, ex); @@ -461,7 +473,7 @@ public class TcpServer { } } finally { try { - sock.close(); + socket.close(); } catch (IOException ignore) { // ignore } @@ -469,6 +481,17 @@ public class TcpServer { }); } + private void rejectUnknownProtocolConnection(Socket socket, int gossipVersion) + throws IOException { + try { + socket.getOutputStream().write("unknown protocol version".getBytes()); + socket.getOutputStream().flush(); + } catch (IOException e) { + log.debug("exception in sending reply to process using unknown protocol " + gossipVersion, e); + } + socket.close(); + } + private int readGossipVersion(Socket sock, DataInputStream input) throws Exception { // read the first byte & check for an improperly configured client pool trying // to contact a cache server http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java index 84aa66e..4c7a6ef 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java @@ -75,7 +75,9 @@ import org.apache.geode.pdx.internal.TypeRegistry; */ public interface InternalCache extends Cache, Extensible<Cache>, CacheTime { - InternalDistributedMember getMyId(); + default InternalDistributedMember getMyId() { + return null; + } Collection<DiskStore> listDiskStores(); http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtoclMessageHandlerLoader.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtoclMessageHandlerLoader.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtoclMessageHandlerLoader.java new file mode 100644 index 0000000..6654757 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtoclMessageHandlerLoader.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +import java.io.IOException; +import java.net.Socket; +import java.util.Iterator; +import java.util.ServiceLoader; + +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.Acceptor; +import org.apache.geode.internal.cache.tier.CachedRegionHelper; +import org.apache.geode.internal.security.SecurityService; + +/** + * Creates instances of ServerConnection based on the connection mode provided. + */ +public class ClientProtoclMessageHandlerLoader { + private static ClientProtocolMessageHandler protobufProtocolHandler; + private static final Object protocolLoadLock = new Object(); + + public static ClientProtocolMessageHandler load() { + if (protobufProtocolHandler != null) { + return protobufProtocolHandler; + } + + synchronized (protocolLoadLock) { + if (protobufProtocolHandler != null) { + return protobufProtocolHandler; + } + + ServiceLoader<ClientProtocolMessageHandler> loader = + ServiceLoader.load(ClientProtocolMessageHandler.class); + Iterator<ClientProtocolMessageHandler> iterator = loader.iterator(); + + if (!iterator.hasNext()) { + throw new ServiceLoadingFailureException( + "ClientProtocolMessageHandler implementation not found in JVM"); + } + + ClientProtocolMessageHandler returnValue = iterator.next(); + + if (iterator.hasNext()) { + throw new ServiceLoadingFailureException( + "Multiple service implementations found for ClientProtocolMessageHandler"); + } + + return returnValue; + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java index 32e9e4b..38ab73e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java @@ -15,12 +15,11 @@ package org.apache.geode.internal.cache.tier.sockets; -import org.apache.geode.internal.cache.InternalCache; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; + /** * This is an interface that other modules can implement to hook into * {@link GenericProtocolServerConnection} to handle messages sent to Geode. @@ -30,6 +29,6 @@ import java.io.OutputStream; * {@link GenericProtocolServerConnection}. */ public interface ClientProtocolMessageHandler { - void receiveMessage(InputStream inputStream, OutputStream outputStream, InternalCache cache) - throws IOException; + void receiveMessage(InputStream inputStream, OutputStream outputStream, + ExecutionContext executionContext) throws IOException; } http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandlerLoader.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandlerLoader.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandlerLoader.java new file mode 100644 index 0000000..1dc6129 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandlerLoader.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +import java.io.IOException; +import java.net.Socket; +import java.util.Iterator; +import java.util.ServiceLoader; + +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.Acceptor; +import org.apache.geode.internal.cache.tier.CachedRegionHelper; +import org.apache.geode.internal.security.SecurityService; + +/** + * Creates instances of ServerConnection based on the connection mode provided. + */ +public class ClientProtocolMessageHandlerLoader { + private static ClientProtocolMessageHandler protobufProtocolHandler; + private static final Object protocolLoadLock = new Object(); + + public static ClientProtocolMessageHandler load() { + if (protobufProtocolHandler != null) { + return protobufProtocolHandler; + } + + synchronized (protocolLoadLock) { + if (protobufProtocolHandler != null) { + return protobufProtocolHandler; + } + + ServiceLoader<ClientProtocolMessageHandler> loader = + ServiceLoader.load(ClientProtocolMessageHandler.class); + Iterator<ClientProtocolMessageHandler> iterator = loader.iterator(); + + if (!iterator.hasNext()) { + throw new ServiceLoadingFailureException( + "ClientProtocolMessageHandler implementation not found in JVM"); + } + + ClientProtocolMessageHandler returnValue = iterator.next(); + + if (iterator.hasNext()) { + throw new ServiceLoadingFailureException( + "Multiple service implementations found for ClientProtocolMessageHandler"); + } + + return returnValue; + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ExecutionContext.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ExecutionContext.java new file mode 100644 index 0000000..27da205 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ExecutionContext.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +import org.apache.geode.cache.Cache; +import org.apache.geode.distributed.internal.InternalLocator; + +public class ExecutionContext { + private Cache cache; + private InternalLocator locator; + + public ExecutionContext(Cache cache) { + this.cache = cache; + } + + public ExecutionContext(InternalLocator locator) { + this.locator = locator; + } + + // This throws if the cache isn't present because we know that non of the callers can take any + // reasonable action if the cache is not present + public Cache getCache() throws InvalidExecutionContextException { + if (cache != null) { + return cache; + } else { + throw new InvalidExecutionContextException( + "Execution context's cache was accessed but isn't present. Did this happen on a locator? Operations on the locator should not try to operate on a cache"); + } + } + + // This throws if the locator isn't present because we know that non of the callers can take any + // reasonable action if the locator is not present + public InternalLocator getLocator() throws InvalidExecutionContextException { + if (locator != null) { + return locator; + } else { + throw new InvalidExecutionContextException( + "Execution context's locator was accessed but isn't present. Did this happen on a server? Operations on the locator should not try to operate on a cache"); + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java index 93a7f6f..8f6720e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java @@ -61,7 +61,8 @@ public class GenericProtocolServerConnection extends ServerConnection { if (!authenticator.isAuthenticated()) { authenticator.receiveMessage(inputStream, outputStream, securityManager); } else { - messageHandler.receiveMessage(inputStream, outputStream, this.getCache()); + messageHandler.receiveMessage(inputStream, outputStream, + new ExecutionContext(this.getCache())); } } catch (IOException e) { logger.warn(e); http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/InvalidExecutionContextException.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/InvalidExecutionContextException.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/InvalidExecutionContextException.java new file mode 100644 index 0000000..919e301 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/InvalidExecutionContextException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.tier.sockets; + +import org.apache.geode.annotations.Experimental; + +/* + * Indicates that OperationContext was missing required data. This will typically happen if a + * operation that is supposed to run on a server runs on a locator and receives a locator in its + * context instead of a cache. The reverse case applies as well. + */ +@Experimental +public class InvalidExecutionContextException extends Exception { + public InvalidExecutionContextException(String message) { + super(message); + } + + public InvalidExecutionContextException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java index 5c33468..802620c 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java @@ -18,11 +18,8 @@ import org.apache.geode.CancelCriterion; import org.apache.geode.cache.*; import org.apache.geode.cache.client.NoAvailableLocatorsException; import org.apache.geode.cache.client.SubscriptionNotEnabledException; -import org.apache.geode.cache.client.internal.AutoConnectionSourceImpl.UpdateLocatorListTask; -import org.apache.geode.cache.client.internal.PoolImpl.PoolTask; import org.apache.geode.cache.client.internal.locator.ClientConnectionRequest; import org.apache.geode.cache.client.internal.locator.ClientConnectionResponse; -import org.apache.geode.cache.client.internal.locator.LocatorListRequest; import org.apache.geode.cache.client.internal.locator.LocatorListResponse; import org.apache.geode.cache.query.QueryService; import org.apache.geode.distributed.DistributedSystem; @@ -63,7 +60,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -308,7 +304,7 @@ public class AutoConnectionSourceImplJUnitTest { startFakeLocator(); int secondPort = AvailablePortHelper.getRandomAvailableTCPPort(); TcpServer server2 = new TcpServer(secondPort, InetAddress.getLocalHost(), null, null, handler, - new FakeHelper(), Thread.currentThread().getThreadGroup(), "tcp server"); + new FakeHelper(), Thread.currentThread().getThreadGroup(), "tcp server", null); server2.start(); try { @@ -392,7 +388,7 @@ public class AutoConnectionSourceImplJUnitTest { private void startFakeLocator() throws UnknownHostException, IOException, InterruptedException { server = new TcpServer(port, InetAddress.getLocalHost(), null, null, handler, new FakeHelper(), - Thread.currentThread().getThreadGroup(), "Tcp Server"); + Thread.currentThread().getThreadGroup(), "Tcp Server", null); server.start(); Thread.sleep(500); } http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java index 8a25aaf..229fbb9 100644 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java @@ -138,7 +138,7 @@ public class TCPServerSSLJUnitTest { public DummyTcpServer(int port, InetAddress bind_address, Properties sslConfig, DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper, ThreadGroup threadGroup, String threadName) { - super(port, bind_address, sslConfig, cfg, handler, poolHelper, threadGroup, threadName); + super(port, bind_address, sslConfig, cfg, handler, poolHelper, threadGroup, threadName, null); if (cfg == null) { cfg = new DistributionConfigImpl(sslConfig); } http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java index eda0641..9d20e8c 100644 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java @@ -69,7 +69,7 @@ public class TcpServerJUnitTest { stats = new SimpleStats(); server = new TcpServer(port, localhost, new Properties(), null, handler, stats, - Thread.currentThread().getThreadGroup(), "server thread"); + Thread.currentThread().getThreadGroup(), "server thread", null); server.start(); } http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java index aa6d79e..5d9012f 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java @@ -15,7 +15,8 @@ package org.apache.geode.protocol.operations; import org.apache.geode.annotations.Experimental; -import org.apache.geode.cache.Cache; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.protobuf.ProtobufOpsProcessor; import org.apache.geode.protocol.protobuf.Result; import org.apache.geode.serialization.SerializationService; @@ -32,6 +33,7 @@ public interface OperationHandler<Req, Resp> { * Decode the message, deserialize contained values using the serialization service, do the work * indicated on the provided cache, and return a response. */ - Result<Resp> process(SerializationService serializationService, Req request, Cache cache); + Result<Resp> process(SerializationService serializationService, Req request, + ExecutionContext executionContext) throws InvalidExecutionContextException; } http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java index 7d75b4a..76f81e7 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java @@ -15,8 +15,10 @@ package org.apache.geode.protocol.protobuf; import org.apache.geode.annotations.Experimental; -import org.apache.geode.cache.Cache; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.protobuf.registry.OperationContextRegistry; +import org.apache.geode.protocol.protobuf.utilities.ProtobufResponseUtilities; import org.apache.geode.serialization.SerializationService; /** @@ -35,12 +37,19 @@ public class ProtobufOpsProcessor { this.operationContextRegistry = operationContextRegistry; } - public ClientProtocol.Response process(ClientProtocol.Request request, Cache cache) { + public ClientProtocol.Response process(ClientProtocol.Request request, ExecutionContext context) { ClientProtocol.Request.RequestAPICase requestType = request.getRequestAPICase(); OperationContext operationContext = operationContextRegistry.getOperationContext(requestType); ClientProtocol.Response.Builder builder; - Result result = operationContext.getOperationHandler().process(serializationService, - operationContext.getFromRequest().apply(request), cache); + Result result; + try { + result = operationContext.getOperationHandler().process(serializationService, + operationContext.getFromRequest().apply(request), context); + } catch (InvalidExecutionContextException e) { + result = Failure.of(ProtobufResponseUtilities.makeErrorResponse( + ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue, + "Invalid execution context found for operation.")); + } builder = (ClientProtocol.Response.Builder) result.map(operationContext.getToResponse(), operationContext.getToErrorResponse()); http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java index 648ab3c..d04e49e 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java @@ -20,9 +20,8 @@ import java.io.InputStream; import java.io.OutputStream; import org.apache.geode.annotations.Experimental; -import org.apache.geode.cache.Cache; -import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; import org.apache.geode.protocol.exception.InvalidProtocolMessageException; import org.apache.geode.protocol.protobuf.registry.OperationContextRegistry; import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer; @@ -45,29 +44,29 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler { new OperationContextRegistry()); } - public void processOneMessage(InputStream inputStream, OutputStream outputStream, Cache cache) - throws InvalidProtocolMessageException, IOException { + @Override + public void receiveMessage(InputStream inputStream, OutputStream outputStream, + ExecutionContext executionContext) throws IOException { + try { + processOneMessage(inputStream, outputStream, executionContext); + } catch (InvalidProtocolMessageException e) { + throw new IOException(e); + } + } + + private void processOneMessage(InputStream inputStream, OutputStream outputStream, + ExecutionContext executionContext) throws InvalidProtocolMessageException, IOException { ClientProtocol.Message message = protobufProtocolSerializer.deserialize(inputStream); if (message == null) { throw new EOFException("Tried to deserialize protobuf message at EOF"); } ClientProtocol.Request request = message.getRequest(); - ClientProtocol.Response response = protobufOpsProcessor.process(request, cache); + ClientProtocol.Response response = protobufOpsProcessor.process(request, executionContext); ClientProtocol.MessageHeader responseHeader = ProtobufUtilities.createMessageHeaderForRequest(message); ClientProtocol.Message responseMessage = ProtobufUtilities.createProtobufResponse(responseHeader, response); protobufProtocolSerializer.serialize(responseMessage, outputStream); } - - @Override - public void receiveMessage(InputStream inputStream, OutputStream outputStream, - InternalCache cache) throws IOException { - try { - processOneMessage(inputStream, outputStream, cache); - } catch (InvalidProtocolMessageException e) { - throw new IOException(e); - } - } } http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtocolErrorCode.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtocolErrorCode.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtocolErrorCode.java index e3b262d..6a6f605 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtocolErrorCode.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtocolErrorCode.java @@ -18,6 +18,7 @@ public enum ProtocolErrorCode { GENERIC_FAILURE(1000), VALUE_ENCODING_ERROR(1100), UNSUPPORTED_VERSION(1101), + UNSUPPORTED_OPERATION(1102), AUTHENTICATION_FAILED(1200), AUTHORIZATION_FAILED(1201), UNAUTHORIZED_REQUEST(1202), http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java index 607d1d2..75274c1 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java @@ -19,8 +19,9 @@ import java.util.Map; import java.util.Set; import org.apache.geode.annotations.Experimental; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.BasicTypes; import org.apache.geode.protocol.protobuf.Failure; @@ -40,9 +41,10 @@ public class GetAllRequestOperationHandler @Override public Result<RegionAPI.GetAllResponse> process(SerializationService serializationService, - RegionAPI.GetAllRequest request, Cache cache) { + RegionAPI.GetAllRequest request, ExecutionContext executionContext) + throws InvalidExecutionContextException { String regionName = request.getRegionName(); - Region region = cache.getRegion(regionName); + Region region = executionContext.getCache().getRegion(regionName); if (region == null) { return Failure.of(ProtobufResponseUtilities .makeErrorResponse(ProtocolErrorCode.REGION_NOT_FOUND.codeValue, "Region not found")); http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java index 239d9f7..e7c18cd 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java @@ -14,33 +14,20 @@ */ package org.apache.geode.protocol.protobuf.operations; -import java.io.IOException; -import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; -import java.util.Properties; -import java.util.StringTokenizer; import java.util.stream.Collectors; -import org.apache.commons.lang.StringUtils; - import org.apache.geode.annotations.Experimental; -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.client.internal.locator.GetAllServersRequest; -import org.apache.geode.cache.client.internal.locator.GetAllServersResponse; -import org.apache.geode.distributed.ConfigurationProperties; -import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.InternalLocator; import org.apache.geode.distributed.internal.ServerLocation; -import org.apache.geode.distributed.internal.tcpserver.TcpClient; -import org.apache.geode.internal.admin.remote.DistributionLocatorId; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.BasicTypes; -import org.apache.geode.protocol.protobuf.Failure; -import org.apache.geode.protocol.protobuf.ProtocolErrorCode; import org.apache.geode.protocol.protobuf.Result; import org.apache.geode.protocol.protobuf.ServerAPI; import org.apache.geode.protocol.protobuf.Success; -import org.apache.geode.protocol.protobuf.utilities.ProtobufResponseUtilities; import org.apache.geode.serialization.SerializationService; @Experimental @@ -50,51 +37,19 @@ public class GetAvailableServersOperationHandler implements @Override public Result<ServerAPI.GetAvailableServersResponse> process( SerializationService serializationService, ServerAPI.GetAvailableServersRequest request, - Cache cache) { - - InternalDistributedSystem distributedSystem = - (InternalDistributedSystem) cache.getDistributedSystem(); - Properties properties = distributedSystem.getProperties(); - String locatorsString = properties.getProperty(ConfigurationProperties.LOCATORS); - - HashSet<DistributionLocatorId> locators = new HashSet(); - StringTokenizer stringTokenizer = new StringTokenizer(locatorsString, ","); - while (stringTokenizer.hasMoreTokens()) { - String locator = stringTokenizer.nextToken(); - if (StringUtils.isNotEmpty(locator)) { - locators.add(new DistributionLocatorId(locator)); - } - } + ExecutionContext executionContext) throws InvalidExecutionContextException { - TcpClient tcpClient = getTcpClient(); - for (DistributionLocatorId locator : locators) { - try { - return getGetAvailableServersFromLocator(tcpClient, locator.getHost()); - } catch (IOException | ClassNotFoundException e) { - // try the next locator - } - } - return Failure.of(ProtobufResponseUtilities.makeErrorResponse( - ProtocolErrorCode.DATA_UNREACHABLE.codeValue, "Unable to find a locator")); - } + InternalLocator locator = executionContext.getLocator(); + ArrayList servers2 = locator.getServerLocatorAdvisee().getLoadSnapshot().getServers(null); - private Result<ServerAPI.GetAvailableServersResponse> getGetAvailableServersFromLocator( - TcpClient tcpClient, InetSocketAddress address) throws IOException, ClassNotFoundException { - GetAllServersResponse getAllServersResponse = (GetAllServersResponse) tcpClient - .requestToServer(address, new GetAllServersRequest(), 1000, true); - Collection<BasicTypes.Server> servers = - (Collection<BasicTypes.Server>) getAllServersResponse.getServers().stream() - .map(serverLocation -> getServerProtobufMessage((ServerLocation) serverLocation)) - .collect(Collectors.toList()); + Collection<BasicTypes.Server> servers = (Collection<BasicTypes.Server>) servers2.stream() + .map(serverLocation -> getServerProtobufMessage((ServerLocation) serverLocation)) + .collect(Collectors.toList()); ServerAPI.GetAvailableServersResponse.Builder builder = ServerAPI.GetAvailableServersResponse.newBuilder().addAllServers(servers); return Success.of(builder.build()); } - protected TcpClient getTcpClient() { - return new TcpClient(); - } - private BasicTypes.Server getServerProtobufMessage(ServerLocation serverLocation) { BasicTypes.Server.Builder serverBuilder = BasicTypes.Server.newBuilder(); serverBuilder.setHostname(serverLocation.getHostName()).setPort(serverLocation.getPort()); http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java index e5d216a..53898ed 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java @@ -17,8 +17,9 @@ package org.apache.geode.protocol.protobuf.operations; import java.util.Set; import org.apache.geode.annotations.Experimental; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.RegionAPI; import org.apache.geode.protocol.protobuf.Result; @@ -32,8 +33,9 @@ public class GetRegionNamesRequestOperationHandler @Override public Result<RegionAPI.GetRegionNamesResponse> process(SerializationService serializationService, - RegionAPI.GetRegionNamesRequest request, Cache cache) { - Set<Region<?, ?>> regions = cache.rootRegions(); + RegionAPI.GetRegionNamesRequest request, ExecutionContext executionContext) + throws InvalidExecutionContextException { + Set<Region<?, ?>> regions = executionContext.getCache().rootRegions(); return Success.of(ProtobufResponseUtilities.createGetRegionNamesResponse(regions)); } } http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java index b563a5d..007f96b 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java @@ -15,8 +15,9 @@ package org.apache.geode.protocol.protobuf.operations; import org.apache.geode.annotations.Experimental; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.BasicTypes; import org.apache.geode.protocol.protobuf.Failure; @@ -34,10 +35,11 @@ public class GetRegionRequestOperationHandler @Override public Result<RegionAPI.GetRegionResponse> process(SerializationService serializationService, - RegionAPI.GetRegionRequest request, Cache cache) { + RegionAPI.GetRegionRequest request, ExecutionContext executionContext) + throws InvalidExecutionContextException { String regionName = request.getRegionName(); - Region region = cache.getRegion(regionName); + Region region = executionContext.getCache().getRegion(regionName); if (region == null) { return Failure.of( ProtobufResponseUtilities.makeErrorResponse(ProtocolErrorCode.REGION_NOT_FOUND.codeValue, http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java index 96c0282..8f0fef7 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java @@ -15,8 +15,9 @@ package org.apache.geode.protocol.protobuf.operations; import org.apache.geode.annotations.Experimental; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.BasicTypes; import org.apache.geode.protocol.protobuf.Failure; @@ -36,9 +37,10 @@ public class GetRequestOperationHandler @Override public Result<RegionAPI.GetResponse> process(SerializationService serializationService, - RegionAPI.GetRequest request, Cache cache) { + RegionAPI.GetRequest request, ExecutionContext executionContext) + throws InvalidExecutionContextException { String regionName = request.getRegionName(); - Region region = cache.getRegion(regionName); + Region region = executionContext.getCache().getRegion(regionName); if (region == null) { return Failure.of(ProtobufResponseUtilities .makeErrorResponse(ProtocolErrorCode.REGION_NOT_FOUND.codeValue, "Region not found")); http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java index 253a95d..e0ebc41 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java @@ -21,8 +21,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.geode.annotations.Experimental; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.BasicTypes; import org.apache.geode.protocol.protobuf.Failure; @@ -43,8 +44,9 @@ public class PutAllRequestOperationHandler @Override public Result<RegionAPI.PutAllResponse> process(SerializationService serializationService, - RegionAPI.PutAllRequest putAllRequest, Cache cache) { - Region region = cache.getRegion(putAllRequest.getRegionName()); + RegionAPI.PutAllRequest putAllRequest, ExecutionContext executionContext) + throws InvalidExecutionContextException { + Region region = executionContext.getCache().getRegion(putAllRequest.getRegionName()); if (region == null) { return Failure.of(ProtobufResponseUtilities.createAndLogErrorResponse( http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java index c24fb29..cf5afb4 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java @@ -15,8 +15,9 @@ package org.apache.geode.protocol.protobuf.operations; import org.apache.geode.annotations.Experimental; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.protocol.protobuf.BasicTypes; import org.apache.geode.protocol.protobuf.Failure; @@ -36,9 +37,10 @@ public class PutRequestOperationHandler @Override public Result<RegionAPI.PutResponse> process(SerializationService serializationService, - RegionAPI.PutRequest request, Cache cache) { + RegionAPI.PutRequest request, ExecutionContext executionContext) + throws InvalidExecutionContextException { String regionName = request.getRegionName(); - Region region = cache.getRegion(regionName); + Region region = executionContext.getCache().getRegion(regionName); if (region == null) { return Failure.of( ProtobufResponseUtilities.makeErrorResponse(ProtocolErrorCode.REGION_NOT_FOUND.codeValue, http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java index 59236be..052efcf 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java @@ -18,10 +18,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.geode.annotations.Experimental; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.operations.OperationHandler; -import org.apache.geode.protocol.protobuf.BasicTypes; import org.apache.geode.protocol.protobuf.Failure; import org.apache.geode.protocol.protobuf.ProtocolErrorCode; import org.apache.geode.protocol.protobuf.RegionAPI; @@ -40,10 +40,11 @@ public class RemoveRequestOperationHandler @Override public Result<RegionAPI.RemoveResponse> process(SerializationService serializationService, - RegionAPI.RemoveRequest request, Cache cache) { + RegionAPI.RemoveRequest request, ExecutionContext executionContext) + throws InvalidExecutionContextException { String regionName = request.getRegionName(); - Region region = cache.getRegion(regionName); + Region region = executionContext.getCache().getRegion(regionName); if (region == null) { return Failure.of(ProtobufResponseUtilities .makeErrorResponse(ProtocolErrorCode.REGION_NOT_FOUND.codeValue, "Region not found")); http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/test/java/org/apache/geode/protocol/GetAvailableServersDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/GetAvailableServersDUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/GetAvailableServersDUnitTest.java deleted file mode 100644 index 4d6390b..0000000 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/GetAvailableServersDUnitTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.protocol; - -import org.apache.geode.cache.server.CacheServer; -import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.protocol.exception.InvalidProtocolMessageException; -import org.apache.geode.protocol.protobuf.ClientProtocol; -import org.apache.geode.protocol.protobuf.ServerAPI; -import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer; -import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities; -import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities; -import org.apache.geode.test.dunit.DistributedTestUtils; -import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; -import org.apache.geode.test.junit.categories.DistributedTest; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.io.IOException; -import java.net.Socket; - -import static org.junit.Assert.assertEquals; - -@Category(DistributedTest.class) -public class GetAvailableServersDUnitTest extends JUnit4CacheTestCase { - - @Rule - public DistributedRestoreSystemProperties distributedRestoreSystemProperties = - new DistributedRestoreSystemProperties(); - - @Before - public void setup() { - - } - - @Test - public void testGetAllAvailableServersRequest() - throws IOException, InvalidProtocolMessageException { - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); - VM vm1 = host.getVM(1); - VM vm2 = host.getVM(2); - - int locatorPort = DistributedTestUtils.getDUnitLocatorPort(); - - // int cacheServer1Port = vm0.invoke("Start Cache1", () -> startCacheWithCacheServer()); - int cacheServer1Port = startCacheWithCacheServer(); - int cacheServer2Port = vm1.invoke("Start Cache2", () -> startCacheWithCacheServer()); - int cacheServer3Port = vm2.invoke("Start Cache3", () -> startCacheWithCacheServer()); - - vm0.invoke(() -> { - Socket socket = new Socket(host.getHostName(), cacheServer1Port); - socket.getOutputStream().write(110); - - ClientProtocol.Request.Builder protobufRequestBuilder = - ProtobufUtilities.createProtobufRequestBuilder(); - ClientProtocol.Message getAvailableServersRequestMessage = - ProtobufUtilities.createProtobufMessage(ProtobufUtilities.createMessageHeader(1233445), - protobufRequestBuilder.setGetAvailableServersRequest( - ProtobufRequestUtilities.createGetAvailableServersRequest()).build()); - - ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); - protobufProtocolSerializer.serialize(getAvailableServersRequestMessage, - socket.getOutputStream()); - - ClientProtocol.Message getAvailableServersResponseMessage = - protobufProtocolSerializer.deserialize(socket.getInputStream()); - assertEquals(1233445, - getAvailableServersResponseMessage.getMessageHeader().getCorrelationId()); - assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, - getAvailableServersResponseMessage.getMessageTypeCase()); - ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse(); - assertEquals(ClientProtocol.Response.ResponseAPICase.GETAVAILABLESERVERSRESPONSE, - messageResponse.getResponseAPICase()); - ServerAPI.GetAvailableServersResponse getAvailableServersResponse = - messageResponse.getGetAvailableServersResponse(); - assertEquals(3, getAvailableServersResponse.getServersCount()); - }); - } - - private Integer startCacheWithCacheServer() throws IOException { - System.setProperty("geode.feature-protobuf-protocol", "true"); - - InternalCache cache = getCache(); - CacheServer cacheServer = cache.addCacheServer(); - cacheServer.setPort(0); - cacheServer.start(); - return cacheServer.getPort(); - } - -} http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionJUnitTest.java new file mode 100644 index 0000000..799c55c --- /dev/null +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionJUnitTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.protocol; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.RestoreSystemProperties; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.protocol.exception.InvalidProtocolMessageException; +import org.apache.geode.protocol.protobuf.ClientProtocol; +import org.apache.geode.protocol.protobuf.ProtocolErrorCode; +import org.apache.geode.protocol.protobuf.ServerAPI; +import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer; +import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities; +import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities; +import org.apache.geode.test.dunit.DistributedTestUtils; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; +import org.apache.geode.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class RoundTripLocatorConnectionJUnitTest extends JUnit4CacheTestCase { + + private Socket socket; + private DataOutputStream dataOutputStream; + + @Rule + public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); + + @Before + public void setup() throws IOException { + Host host = Host.getHost(0); + int locatorPort = DistributedTestUtils.getDUnitLocatorPort(); + int cacheServer1Port = startCacheWithCacheServer(); + + Host.getLocator().invoke(() -> System.setProperty("geode.feature-protobuf-protocol", "true")); + + socket = new Socket(host.getHostName(), locatorPort); + dataOutputStream = new DataOutputStream(socket.getOutputStream()); + dataOutputStream.writeInt(0); + dataOutputStream.writeByte(110); + } + + @Test + public void testEchoProtobufMessageFromLocator() + throws IOException, InvalidProtocolMessageException { + ClientProtocol.Request.Builder protobufRequestBuilder = + ProtobufUtilities.createProtobufRequestBuilder(); + ClientProtocol.Message getAvailableServersRequestMessage = + ProtobufUtilities.createProtobufMessage(ProtobufUtilities.createMessageHeader(1233445), + protobufRequestBuilder.setGetAvailableServersRequest( + ProtobufRequestUtilities.createGetAvailableServersRequest()).build()); + + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); + protobufProtocolSerializer.serialize(getAvailableServersRequestMessage, + socket.getOutputStream()); + + ClientProtocol.Message getAvailableServersResponseMessage = + protobufProtocolSerializer.deserialize(socket.getInputStream()); + assertEquals(1233445, getAvailableServersResponseMessage.getMessageHeader().getCorrelationId()); + assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, + getAvailableServersResponseMessage.getMessageTypeCase()); + ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse(); + assertEquals(ClientProtocol.Response.ResponseAPICase.GETAVAILABLESERVERSRESPONSE, + messageResponse.getResponseAPICase()); + ServerAPI.GetAvailableServersResponse getAvailableServersResponse = + messageResponse.getGetAvailableServersResponse(); + assertEquals(1, getAvailableServersResponse.getServersCount()); + } + + @Test + public void testInvalidOperationReturnsFailure() + throws IOException, InvalidProtocolMessageException { + ClientProtocol.Request.Builder protobufRequestBuilder = + ProtobufUtilities.createProtobufRequestBuilder(); + ClientProtocol.Message getAvailableServersRequestMessage = + ProtobufUtilities.createProtobufMessage(ProtobufUtilities.createMessageHeader(1233445), + protobufRequestBuilder + .setGetRegionNamesRequest(ProtobufRequestUtilities.createGetRegionNamesRequest()) + .build()); + + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); + protobufProtocolSerializer.serialize(getAvailableServersRequestMessage, + socket.getOutputStream()); + + ClientProtocol.Message getAvailableServersResponseMessage = + protobufProtocolSerializer.deserialize(socket.getInputStream()); + assertEquals(1233445, getAvailableServersResponseMessage.getMessageHeader().getCorrelationId()); + assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, + getAvailableServersResponseMessage.getMessageTypeCase()); + ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse(); + assertEquals(ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE, + messageResponse.getResponseAPICase()); + assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue, + messageResponse.getErrorResponse().getError().getErrorCode()); + } + + private Integer startCacheWithCacheServer() throws IOException { + System.setProperty("geode.feature-protobuf-protocol", "true"); + + InternalCache cache = getCache(); + CacheServer cacheServer = cache.addCacheServer(); + cacheServer.setPort(0); + cacheServer.start(); + return cacheServer.getPort(); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java index 87bfd52..2185b15 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java @@ -26,6 +26,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; import org.apache.geode.test.junit.categories.UnitTest; @Category(UnitTest.class) @@ -37,6 +38,7 @@ public class ProtobufStreamProcessorTest { ProtobufStreamProcessor protobufStreamProcessor = new ProtobufStreamProcessor(); InternalCache mockInternalCache = mock(InternalCache.class); - protobufStreamProcessor.receiveMessage(inputStream, outputStream, mockInternalCache); + protobufStreamProcessor.receiveMessage(inputStream, outputStream, + new ExecutionContext(mockInternalCache)); } } http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java index f2e3199..f4d098c 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java @@ -17,7 +17,6 @@ package org.apache.geode.protocol.protobuf.operations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.nio.charset.Charset; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -28,13 +27,14 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.cache.Region; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; +import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException; import org.apache.geode.protocol.protobuf.BasicTypes; import org.apache.geode.protocol.protobuf.RegionAPI; import org.apache.geode.protocol.protobuf.Result; import org.apache.geode.protocol.protobuf.Success; import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities; import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities; -import org.apache.geode.serialization.SerializationService; import org.apache.geode.serialization.codec.StringCodec; import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException; import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException; @@ -80,9 +80,9 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni @Test public void processReturnsExpectedValuesForValidKeys() throws CodecAlreadyRegisteredForTypeException, UnsupportedEncodingTypeException, - CodecNotRegisteredForTypeException { - Result<RegionAPI.GetAllResponse> result = - operationHandler.process(serializationServiceStub, generateTestRequest(true), cacheStub); + CodecNotRegisteredForTypeException, InvalidExecutionContextException { + Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub, + generateTestRequest(true), new ExecutionContext(cacheStub)); Assert.assertTrue(result instanceof Success); @@ -99,10 +99,10 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni } @Test - public void processReturnsNoEntriesForNoKeysRequested() - throws UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException { - Result<RegionAPI.GetAllResponse> result = - operationHandler.process(serializationServiceStub, generateTestRequest(false), cacheStub); + public void processReturnsNoEntriesForNoKeysRequested() throws UnsupportedEncodingTypeException, + CodecNotRegisteredForTypeException, InvalidExecutionContextException { + Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub, + generateTestRequest(false), new ExecutionContext(cacheStub)); Assert.assertTrue(result instanceof Success); http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java index 77b088d..cff6ddc 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java @@ -14,14 +14,12 @@ */ package org.apache.geode.protocol.protobuf.operations; -import org.apache.geode.cache.client.internal.locator.GetAllServersResponse; -import org.apache.geode.distributed.ConfigurationProperties; -import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.InternalLocator; +import org.apache.geode.distributed.internal.LocatorLoadSnapshot; import org.apache.geode.distributed.internal.ServerLocation; -import org.apache.geode.distributed.internal.tcpserver.TcpClient; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.distributed.internal.ServerLocator; +import org.apache.geode.internal.cache.tier.sockets.ExecutionContext; import org.apache.geode.protocol.protobuf.BasicTypes; -import org.apache.geode.protocol.protobuf.Failure; import org.apache.geode.protocol.protobuf.Result; import org.apache.geode.protocol.protobuf.ServerAPI; import org.apache.geode.protocol.protobuf.ServerAPI.GetAvailableServersResponse; @@ -32,75 +30,48 @@ import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; import java.util.ArrayList; -import java.util.Properties; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @Category(UnitTest.class) public class GetAvailableServersOperationHandlerJUnitTest extends OperationHandlerJUnitTest { - private TcpClient mockTCPClient; + public static final String HOSTNAME_1 = "hostname1"; + public static final int PORT_1 = 12345; + + public static final String HOSTNAME_2 = "hostname2"; + public static final int PORT_2 = 23456; + + private InternalLocator internalLocatorMock; @Before public void setUp() throws Exception { super.setUp(); - operationHandler = mock(GetAvailableServersOperationHandler.class); - cacheStub = mock(GemFireCacheImpl.class); - when(operationHandler.process(any(), any(), any())).thenCallRealMethod(); - InternalDistributedSystem mockDistributedSystem = mock(InternalDistributedSystem.class); - when(cacheStub.getDistributedSystem()).thenReturn(mockDistributedSystem); - Properties mockProperties = mock(Properties.class); - when(mockDistributedSystem.getProperties()).thenReturn(mockProperties); - String locatorString = "testLocator1Host[12345],testLocator2Host[23456]"; - when(mockProperties.getProperty(ConfigurationProperties.LOCATORS)).thenReturn(locatorString); - mockTCPClient = mock(TcpClient.class); - when(((GetAvailableServersOperationHandler) operationHandler).getTcpClient()) - .thenReturn(mockTCPClient); - } - - @Test - public void testServerReturnedFromHandler() throws Exception { - when(mockTCPClient.requestToServer(any(), any(), anyInt(), anyBoolean())) - .thenReturn(new GetAllServersResponse(new ArrayList<ServerLocation>() { - { - add(new ServerLocation("hostname1", 12345)); - add(new ServerLocation("hostname2", 23456)); - } - })); + operationHandler = new GetAvailableServersOperationHandler(); + internalLocatorMock = mock(InternalLocator.class); + ServerLocator serverLocatorAdviseeMock = mock(ServerLocator.class); + LocatorLoadSnapshot locatorLoadSnapshot = mock(LocatorLoadSnapshot.class); + ArrayList<Object> serverList = new ArrayList<>(); + serverList.add(new ServerLocation(HOSTNAME_1, PORT_1)); + serverList.add(new ServerLocation(HOSTNAME_2, PORT_2)); - ServerAPI.GetAvailableServersRequest getAvailableServersRequest = - ProtobufRequestUtilities.createGetAvailableServersRequest(); - Result operationHandlerResult = - operationHandler.process(serializationServiceStub, getAvailableServersRequest, cacheStub); - assertTrue(operationHandlerResult instanceof Success); - ValidateGetAvailableServersResponse( - (GetAvailableServersResponse) operationHandlerResult.getMessage()); + when(internalLocatorMock.getServerLocatorAdvisee()).thenReturn(serverLocatorAdviseeMock); + when(serverLocatorAdviseeMock.getLoadSnapshot()).thenReturn(locatorLoadSnapshot); + when(locatorLoadSnapshot.getServers(null)).thenReturn(serverList); } @Test - public void testServerReturnedFromSecondLocatorIfFirstDown() throws Exception { - when(mockTCPClient.requestToServer(any(), any(), anyInt(), anyBoolean())) - .thenThrow(new IOException("BOOM!!!")) - .thenReturn(new GetAllServersResponse(new ArrayList<ServerLocation>() { - { - add(new ServerLocation("hostname1", 12345)); - add(new ServerLocation("hostname2", 23456)); - } - })); - + public void testServerReturnedFromHandler() throws Exception { ServerAPI.GetAvailableServersRequest getAvailableServersRequest = ProtobufRequestUtilities.createGetAvailableServersRequest(); - Result operationHandlerResult = - operationHandler.process(serializationServiceStub, getAvailableServersRequest, cacheStub); + Result operationHandlerResult = operationHandler.process(serializationServiceStub, + getAvailableServersRequest, new ExecutionContext(internalLocatorMock)); assertTrue(operationHandlerResult instanceof Success); ValidateGetAvailableServersResponse( (GetAvailableServersResponse) operationHandlerResult.getMessage()); @@ -110,22 +81,10 @@ public class GetAvailableServersOperationHandlerJUnitTest extends OperationHandl GetAvailableServersResponse getAvailableServersResponse) { assertEquals(2, getAvailableServersResponse.getServersCount()); BasicTypes.Server server = getAvailableServersResponse.getServers(0); - assertEquals("hostname1", server.getHostname()); - assertEquals(12345, server.getPort()); + assertEquals(HOSTNAME_1, server.getHostname()); + assertEquals(PORT_1, server.getPort()); server = getAvailableServersResponse.getServers(1); - assertEquals("hostname2", server.getHostname()); - assertEquals(23456, server.getPort()); - } - - @Test - public void testProcessFailsIfNoLocatorsAvailable() throws Exception { - when(mockTCPClient.requestToServer(any(), any(), anyInt(), anyBoolean())) - .thenThrow(new IOException("BOOM!!!")); - - ServerAPI.GetAvailableServersRequest getAvailableServersRequest = - ProtobufRequestUtilities.createGetAvailableServersRequest(); - Result operationHandlerResult = - operationHandler.process(serializationServiceStub, getAvailableServersRequest, cacheStub); - assertTrue(operationHandlerResult instanceof Failure); + assertEquals(HOSTNAME_2, server.getHostname()); + assertEquals(PORT_2, server.getPort()); } }