GEODE-3075: changes in response to feedback; refactor some. `AcceptorImpl.handleNewClientConnection` has had refactoring and should be a bit more understandable.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/c71c28df Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/c71c28df Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/c71c28df Branch: refs/heads/feature/GEODE-3109 Commit: c71c28dff7fd6d20798fadcbd6a1a15f055ae52e Parents: cdcc4d9 Author: Galen OSullivan <gosulli...@pivotal.io> Authored: Mon Jun 19 17:48:53 2017 -0700 Committer: Hitesh Khamesra <hkhame...@pivotal.io> Committed: Mon Jun 26 09:26:22 2017 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/tier/Acceptor.java | 2 +- .../cache/tier/sockets/AcceptorImpl.java | 316 ++++++++++--------- .../GenericProtocolServerConnection.java | 85 +++++ .../sockets/NewProtocolServerConnection.java | 87 ----- .../tier/sockets/ServerConnectionFactory.java | 9 +- .../ServerConnectionFactoryIntegrationTest.java | 45 ++- .../sockets/ServerConnectionFactoryTest.java | 15 +- 7 files changed, 291 insertions(+), 268 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/c71c28df/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java index a95195a..e12a409 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java @@ -74,7 +74,7 @@ public abstract class Acceptor { /** * For the new client-server protocol, which ignores the usual handshake mechanism. */ - public static final byte CLIENT_TO_SERVER_NEW_PROTOCOL = (byte) 110; + public static final byte PROTOBUF_CLIENT_SERVER_PROTOCOL = (byte) 110; /** * The GFE version of the server. http://git-wip-us.apache.org/repos/asf/geode/blob/c71c28df/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java index 24efc93..50f7006 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java @@ -94,7 +94,7 @@ import javax.net.ssl.SSLException; /** * Implements the acceptor thread on the bridge server. Accepts connections from the edge and starts * up threads to process requests from these. - * + * * @since GemFire 2.0.2 */ @SuppressWarnings("deprecation") @@ -113,19 +113,29 @@ public class AcceptorImpl extends Acceptor implements Runnable { */ private final ThreadPoolExecutor hsPool; - /** The port on which this acceptor listens for client connections */ + /** + * The port on which this acceptor listens for client connections + */ private final int localPort; - /** The server socket that handles requests for connections */ + /** + * The server socket that handles requests for connections + */ private ServerSocket serverSock = null; - /** The GemFire cache served up by this acceptor */ + /** + * The GemFire cache served up by this acceptor + */ protected final InternalCache cache; - /** Caches region information */ + /** + * Caches region information + */ private final CachedRegionHelper crHelper; - /** A lock to prevent close from occurring while creating a ServerConnection */ + /** + * A lock to prevent close from occurring while creating a ServerConnection + */ private final Object syncLock = new Object(); /** @@ -165,7 +175,9 @@ public class AcceptorImpl extends Acceptor implements Runnable { */ public static final int DEFAULT_HANDSHAKE_TIMEOUT_MS = 59000; - /** Test value for handshake timeout */ + /** + * Test value for handshake timeout + */ protected static final int handShakeTimeout = Integer.getInteger(HANDSHAKE_TIMEOUT_PROPERTY_NAME, DEFAULT_HANDSHAKE_TIMEOUT_MS).intValue(); @@ -180,17 +192,25 @@ public class AcceptorImpl extends Acceptor implements Runnable { */ public static final int DEFAULT_ACCEPT_TIMEOUT_MS = 9900; - /** Test value for accept timeout */ + /** + * Test value for accept timeout + */ private final int acceptTimeout = Integer.getInteger(ACCEPT_TIMEOUT_PROPERTY_NAME, DEFAULT_ACCEPT_TIMEOUT_MS).intValue(); - /** The mininum value of max-connections */ + /** + * The mininum value of max-connections + */ public static final int MINIMUM_MAX_CONNECTIONS = 16; - /** The buffer size for server-side sockets. */ + /** + * The buffer size for server-side sockets. + */ private final int socketBufferSize; - /** Notifies clients of updates */ + /** + * Notifies clients of updates + */ private CacheClientNotifier clientNotifier; /** @@ -198,7 +218,9 @@ public class AcceptorImpl extends Acceptor implements Runnable { */ private static final int DEFAULT_BACKLOG = 1000; - /** The system property name for setting the {@link ServerSocket}backlog */ + /** + * The system property name for setting the {@link ServerSocket}backlog + */ public static final String BACKLOG_PROPERTY_NAME = "BridgeServer.backlog"; /** @@ -206,13 +228,19 @@ public class AcceptorImpl extends Acceptor implements Runnable { */ public final AtomicInteger clientServerCnxCount = new AtomicInteger(); - /** Has this acceptor been shut down */ + /** + * Has this acceptor been shut down + */ private volatile boolean shutdownStarted = false; - /** The thread that runs the acceptor */ + /** + * The thread that runs the acceptor + */ private Thread thread = null; - /** The thread that runs the selector loop if any */ + /** + * The thread that runs the selector loop if any + */ private Thread selectorThread = null; /** @@ -222,16 +250,16 @@ public class AcceptorImpl extends Acceptor implements Runnable { /** * List of ServerConnection. - * + * * Instances added when constructed; removed when terminated. - * + * * guarded.By {@link #allSCsLock} */ private final HashSet allSCs = new HashSet(); /** * List of ServerConnections, for {@link #emergencyClose()} - * + * * guarded.By {@link #allSCsLock} */ private volatile ServerConnection allSCList[] = new ServerConnection[0]; @@ -239,7 +267,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { /** * The ip address or host name this acceptor is to bind to; <code>null</code> or "" indicates it * will listen on all local addresses. - * + * * @since GemFire 5.7 */ private final String bindHostName; @@ -249,10 +277,14 @@ public class AcceptorImpl extends Acceptor implements Runnable { */ private final ConnectionListener connectionListener; - /** The client health monitor tracking connections for this acceptor */ + /** + * The client health monitor tracking connections for this acceptor + */ private ClientHealthMonitor healthMonitor; - /** bridge's setting of notifyBySubscription */ + /** + * bridge's setting of notifyBySubscription + */ private final boolean notifyBySubscription; /** @@ -273,7 +305,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { /** * Initializes this acceptor thread to listen for connections on the given port. - * + * * @param port The port on which this acceptor listens for connections. If <code>0</code>, a * random port will be chosen. * @param bindHostName The ip address or host name this acceptor listens on for connections. If @@ -284,7 +316,6 @@ public class AcceptorImpl extends Acceptor implements Runnable { * @param internalCache The GemFire cache whose contents is served to clients * @param maxConnections the maximum number of connections allowed in the server pool * @param maxThreads the maximum number of threads allowed in the server pool - * * @see SocketCreator#createServerSocket(int, int, InetAddress) * @see ClientHealthMonitor * @since GemFire 5.7 @@ -615,7 +646,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { /** * This system property is only used if max-threads == 0. This is for 5.0.2 backwards * compatibility. - * + * * @deprecated since 5.1 use cache-server max-threads instead */ @Deprecated @@ -624,7 +655,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { /** * This system property is only used if max-threads == 0. This is for 5.0.2 backwards * compatibility. - * + * * @deprecated since 5.1 use cache-server max-threads instead */ @Deprecated @@ -693,7 +724,9 @@ public class AcceptorImpl extends Acceptor implements Runnable { wakeupSelector(); } - /** wake up the selector thread */ + /** + * wake up the selector thread + */ private void wakeupSelector() { Selector s = getSelector(); if (s != null && s.isOpen()) { @@ -760,7 +793,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { /** * Ensure that the CachedRegionHelper and ServerConnection classes get loaded. - * + * * @see SystemFailure#loadEmergencyClasses() */ public static void loadEmergencyClasses() { @@ -1146,7 +1179,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { /** * The work loop of this acceptor - * + * * @see #accept */ public void run() { @@ -1369,156 +1402,145 @@ public class AcceptorImpl extends Acceptor implements Runnable { return this.clientServerCnxCount.get(); } - protected void handleNewClientConnection(final Socket s) throws IOException { + protected void handleNewClientConnection(final Socket socket) throws IOException { // Read the first byte. If this socket is being used for 'client to server' // communication, create a ServerConnection. If this socket is being used // for 'server to client' communication, send it to the CacheClientNotifier // for processing. byte communicationMode; if (isSelector()) { - ByteBuffer bb = ByteBuffer.allocateDirect(1); - final SocketChannel sc = s.getChannel(); - sc.configureBlocking(false); + ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1); + final SocketChannel socketChannel = socket.getChannel(); + socketChannel.configureBlocking(false); // try to read the byte first in non-blocking mode - int res = sc.read(bb); - sc.configureBlocking(true); + int res = socketChannel.read(byteBuffer); + socketChannel.configureBlocking(true); if (res < 0) { throw new EOFException(); } else if (res == 0) { // now do a blocking read so setup a timer to close the socket if the // the read takes too long - SystemTimer.SystemTimerTask st = new SystemTimer.SystemTimerTask() { + SystemTimer.SystemTimerTask timerTask = new SystemTimer.SystemTimerTask() { @Override public void run2() { logger.warn(LocalizedMessage.create( LocalizedStrings.AcceptorImpl_CACHE_SERVER_TIMED_OUT_WAITING_FOR_HANDSHAKE_FROM__0, - s.getRemoteSocketAddress())); - closeSocket(s); + socket.getRemoteSocketAddress())); + closeSocket(socket); } }; - this.hsTimer.schedule(st, this.acceptTimeout); - res = sc.read(bb); - if ((!st.cancel()) || res <= 0) { + this.hsTimer.schedule(timerTask, this.acceptTimeout); + res = socketChannel.read(byteBuffer); + if ((!timerTask.cancel()) || res <= 0) { throw new EOFException(); } } - communicationMode = bb.get(0); - if (logger.isTraceEnabled()) { - logger.trace("read communications mode(1) ", communicationMode); - } + communicationMode = byteBuffer.get(0); } else { - s.setSoTimeout(this.acceptTimeout); - this.socketCreator.configureServerSSLSocket(s); - communicationMode = (byte) s.getInputStream().read(); - if (logger.isTraceEnabled()) { - logger.trace("read communications mode(2) ", communicationMode); - } + socket.setSoTimeout(this.acceptTimeout); + this.socketCreator.configureServerSSLSocket(socket); + communicationMode = (byte) socket.getInputStream().read(); if (communicationMode == -1) { throw new EOFException(); } - s.setSoTimeout(0); + socket.setSoTimeout(0); } - s.setTcpNoDelay(this.tcpNoDelay); + socket.setTcpNoDelay(this.tcpNoDelay); - if (communicationMode == CLIENT_TO_SERVER || communicationMode == GATEWAY_TO_GATEWAY - || communicationMode == MONITOR_TO_SERVER || communicationMode == CLIENT_TO_SERVER_FOR_QUEUE - || communicationMode == CLIENT_TO_SERVER_NEW_PROTOCOL) { - String communicationModeStr = ""; - switch (communicationMode) { - case CLIENT_TO_SERVER: - communicationModeStr = "client"; - break; - case GATEWAY_TO_GATEWAY: - communicationModeStr = "gateway"; - break; - case MONITOR_TO_SERVER: - communicationModeStr = "monitor"; - break; - case CLIENT_TO_SERVER_FOR_QUEUE: - communicationModeStr = "clientToServerForQueue"; - break; - } - if (logger.isDebugEnabled()) { - logger.debug("Bridge server: Initializing {} communication socket: {}", - communicationModeStr, s); + String communicationModeStr; + switch (communicationMode) { + default: + throw new IOException("Acceptor received unknown communication mode: " + communicationMode); + + case PRIMARY_SERVER_TO_CLIENT: + logger.debug( + ":Bridge server: Initializing primary server-to-client communication socket: {}", + socket); + AcceptorImpl.this.clientNotifier.registerClient(socket, true, this.acceptorId, + this.notifyBySubscription); + return; + + case SECONDARY_SERVER_TO_CLIENT: + logger.debug( + ":Bridge server: Initializing secondary server-to-client communication socket: {}", + socket); + AcceptorImpl.this.clientNotifier.registerClient(socket, false, this.acceptorId, + this.notifyBySubscription); + return; + + case CLIENT_TO_SERVER: + communicationModeStr = "client"; + break; + case GATEWAY_TO_GATEWAY: + communicationModeStr = "gateway"; + break; + case MONITOR_TO_SERVER: + communicationModeStr = "monitor"; + break; + case CLIENT_TO_SERVER_FOR_QUEUE: + communicationModeStr = "clientToServerForQueue"; + break; + case PROTOBUF_CLIENT_SERVER_PROTOCOL: + communicationModeStr = "Protobuf client"; + break; + } + + logger.debug("Bridge server: Initializing {} communication socket: {}", communicationModeStr, + socket); + if (communicationMode != CLIENT_TO_SERVER_FOR_QUEUE) { + int curCnt = this.getClientServerCnxCount(); + if (curCnt >= this.maxConnections) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2, + new Object[] {socket.getInetAddress(), Integer.valueOf(curCnt), + Integer.valueOf(this.maxConnections)})); + try { + ServerHandShakeProcessor.refuse(socket.getOutputStream(), + LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0 + .toLocalizedString(Integer.valueOf(this.maxConnections))); + } catch (Exception ex) { + logger.debug("rejection message failed", ex); + } + closeSocket(socket); + return; } - if (communicationMode != CLIENT_TO_SERVER_FOR_QUEUE) { - int curCnt = this.getClientServerCnxCount(); - if (curCnt >= this.maxConnections) { - logger.warn(LocalizedMessage.create( - LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2, - new Object[] {s.getInetAddress(), Integer.valueOf(curCnt), - Integer.valueOf(this.maxConnections)})); - // if (s != null) (cannot be null) - { - try { - ServerHandShakeProcessor.refuse(s.getOutputStream(), - LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0 - .toLocalizedString(Integer.valueOf(this.maxConnections))); - } catch (Exception ex) { - if (logger.isDebugEnabled()) { - logger.debug("rejection message failed", ex); - } - } - closeSocket(s); - } + } + + ServerConnection serverConn = ServerConnectionFactory.makeServerConnection(socket, this.cache, + this.crHelper, this.stats, AcceptorImpl.handShakeTimeout, this.socketBufferSize, + communicationModeStr, communicationMode, this, this.securityService); + + synchronized (this.allSCsLock) { + this.allSCs.add(serverConn); + ServerConnection snap[] = this.allSCList; // avoid volatile read + this.allSCList = (ServerConnection[]) ArrayUtils.insert(snap, snap.length, serverConn); + } + if (communicationMode != CLIENT_TO_SERVER_FOR_QUEUE) { + incClientServerCnxCount(); + } + if (isSelector()) { + serverConn.registerWithSelector(); + } else { + try { + pool.execute(serverConn); + } catch (RejectedExecutionException rejected) { + if (!isRunning()) { return; } - } - ServerConnection serverConn = ServerConnectionFactory.makeServerConnection(s, this.cache, - this.crHelper, this.stats, AcceptorImpl.handShakeTimeout, this.socketBufferSize, - communicationModeStr, communicationMode, this, this.securityService); - synchronized (this.allSCsLock) { - this.allSCs.add(serverConn); - ServerConnection snap[] = this.allSCList; // avoid volatile read - this.allSCList = (ServerConnection[]) ArrayUtils.insert(snap, snap.length, serverConn); - } - if (communicationMode != CLIENT_TO_SERVER_FOR_QUEUE) { - incClientServerCnxCount(); - } - if (isSelector()) { - serverConn.registerWithSelector(); - } else { + logger.warn(LocalizedMessage.create( + LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_REQUEST_REJECTED_BY_POOL, + new Object[] {serverConn})); try { - pool.execute(serverConn); - } catch (RejectedExecutionException rejected) { - if (!isRunning()) { - return; - } - logger.warn(LocalizedMessage.create( - LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_REQUEST_REJECTED_BY_POOL, - new Object[] {serverConn})); - try { - ServerHandShakeProcessor.refuse(s.getOutputStream(), - LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0 - .toLocalizedString(Integer.valueOf(this.maxConnections))); + ServerHandShakeProcessor.refuse(socket.getOutputStream(), + LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0 + .toLocalizedString(Integer.valueOf(this.maxConnections))); - } catch (Exception ex) { - if (logger.isDebugEnabled()) { - logger.debug("rejection message failed", ex); - } - } - serverConn.cleanup(); + } catch (Exception ex) { + logger.debug("rejection message failed", ex); } + serverConn.cleanup(); } - } else if (communicationMode == PRIMARY_SERVER_TO_CLIENT) { - if (logger.isDebugEnabled()) { - logger.debug( - ":Bridge server: Initializing primary server-to-client communication socket: {}", s); - } - // try { - AcceptorImpl.this.clientNotifier.registerClient(s, true, this.acceptorId, - this.notifyBySubscription); - } else if (communicationMode == SECONDARY_SERVER_TO_CLIENT) { - if (logger.isDebugEnabled()) { - logger.debug( - ":Bridge server: Initializing secondary server-to-client communication socket: {}", s); - } - AcceptorImpl.this.clientNotifier.registerClient(s, false, this.acceptorId, - this.notifyBySubscription); - } else { - throw new IOException("Acceptor received unknown communication mode: " + communicationMode); } } @@ -1645,7 +1667,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { * then calculate it. * @return the ip address or host name this acceptor will listen on. An "" if all local addresses * will be listened to. - * + * * @since GemFire 5.7 */ private static String calcBindHostName(Cache cache, String bindName) { @@ -1682,7 +1704,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { /** * Gets the address that this bridge server can be contacted on from external processes. - * + * * @since GemFire 5.7 */ public String getExternalAddress() { @@ -1716,7 +1738,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { /** * This method finds a client notifier and returns it. It is used to propagate interest * registrations to other servers - * + * * @return the instance that provides client notification */ public CacheClientNotifier getCacheClientNotifier() { @@ -1773,7 +1795,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { * This method returns a thread safe structure which can be iterated over without worrying about * ConcurrentModificationException. JMX MBeans/Commands need to iterate over this list to get * client info. - * + * */ public ServerConnection[] getAllServerConnectionList() { return this.allSCList; http://git-wip-us.apache.org/repos/asf/geode/blob/c71c28df/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java new file mode 100644 index 0000000..8edd83c --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.Acceptor; +import org.apache.geode.internal.cache.tier.CachedRegionHelper; +import org.apache.geode.internal.security.SecurityService; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + +/** + * Holds the socket and protocol handler for the new client protocol. TODO: Currently unimplemented + * due the the protocol not being there. + */ +public class GenericProtocolServerConnection extends ServerConnection { + // The new protocol lives in a separate module and gets loaded when this class is instantiated. + // TODO implement this. + private final ClientProtocolMessageHandler messageHandler; + + /** + * Creates a new <code>GenericProtocolServerConnection</code> that processes messages received + * from an edge client over a given <code>Socket</code>. + * + * @param s + * @param c + * @param helper + * @param stats + * @param hsTimeout + * @param socketBufferSize + * @param communicationModeStr + * @param communicationMode + * @param acceptor + */ + public GenericProtocolServerConnection(Socket s, InternalCache c, CachedRegionHelper helper, + CacheServerStats stats, int hsTimeout, int socketBufferSize, String communicationModeStr, + byte communicationMode, Acceptor acceptor, ClientProtocolMessageHandler newClientProtocol, + SecurityService securityService) { + super(s, c, helper, stats, hsTimeout, socketBufferSize, communicationModeStr, communicationMode, + acceptor, securityService); + this.messageHandler = newClientProtocol; + } + + @Override + protected void doOneMessage() { + try { + Socket socket = this.getSocket(); + InputStream inputStream = socket.getInputStream(); + OutputStream outputStream = socket.getOutputStream(); + + // TODO serialization types? + messageHandler.receiveMessage(inputStream, outputStream, this.getCache()); + } catch (IOException e) { + // TODO? + } + return; + } + + @Override + protected boolean doHandShake(byte epType, int qSize) { + // no handshake for new client protocol. + return true; + } + + @Override + public boolean isClientServerConnection() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/c71c28df/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java deleted file mode 100644 index 83b23b1..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.internal.cache.tier.sockets; - -import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.internal.cache.tier.Acceptor; -import org.apache.geode.internal.cache.tier.CachedRegionHelper; -import org.apache.geode.internal.security.SecurityService; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; - -/** - * Holds the socket and protocol handler for the new client protocol. TODO: Currently unimplemented - * due the the protocol not being there. - */ -public class NewProtocolServerConnection extends ServerConnection { - // The new protocol lives in a separate module and gets loaded when this class is instantiated. - // TODO implement this. - private final ClientProtocolMessageHandler newClientProtocol; - - - /** - * Creates a new <code>NewProtocolServerConnection</code> that processes messages received from an - * edge client over a given <code>Socket</code>. - * - * @param s - * @param c - * @param helper - * @param stats - * @param hsTimeout - * @param socketBufferSize - * @param communicationModeStr - * @param communicationMode - * @param acceptor - */ - public NewProtocolServerConnection(Socket s, InternalCache c, CachedRegionHelper helper, - CacheServerStats stats, int hsTimeout, int socketBufferSize, String communicationModeStr, - byte communicationMode, Acceptor acceptor, ClientProtocolMessageHandler newClientProtocol, - SecurityService securityService) { - super(s, c, helper, stats, hsTimeout, socketBufferSize, communicationModeStr, communicationMode, - acceptor, securityService); - assert (communicationMode == AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL); - this.newClientProtocol = newClientProtocol; - } - - @Override - protected void doOneMessage() { - try { - Socket socket = this.getSocket(); - InputStream inputStream = socket.getInputStream(); - OutputStream outputStream = socket.getOutputStream(); - - // TODO serialization types? - newClientProtocol.receiveMessage(inputStream, outputStream, this.getCache()); - } catch (IOException e) { - // TODO? - } - return; - } - - @Override - protected boolean doHandShake(byte epType, int qSize) { - // no handshake for new client protocol. - return true; - } - - @Override - public boolean isClientServerConnection() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/c71c28df/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java index 4f2e304..e4746a7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java @@ -28,19 +28,20 @@ import java.net.Socket; */ public class ServerConnectionFactory { // TODO: implement ClientProtocolMessageHandler. - private static final ClientProtocolMessageHandler newClientProtocol = + private static final ClientProtocolMessageHandler protobufProtocolHandler = new ClientProtocolMessageHandler(); public static ServerConnection makeServerConnection(Socket s, InternalCache c, CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize, String communicationModeStr, byte communicationMode, Acceptor acceptor, SecurityService securityService) throws IOException { - if (communicationMode == Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL) { + if (communicationMode == Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL) { if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) { throw new IOException("Acceptor received unknown communication mode: " + communicationMode); } else { - return new NewProtocolServerConnection(s, c, helper, stats, hsTimeout, socketBufferSize, - communicationModeStr, communicationMode, acceptor, newClientProtocol, securityService); + return new GenericProtocolServerConnection(s, c, helper, stats, hsTimeout, socketBufferSize, + communicationModeStr, communicationMode, acceptor, protobufProtocolHandler, + securityService); } } else { return new LegacyServerConnection(s, c, helper, stats, hsTimeout, socketBufferSize, http://git-wip-us.apache.org/repos/asf/geode/blob/c71c28df/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryIntegrationTest.java index 007d1d6..82c4e54 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryIntegrationTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryIntegrationTest.java @@ -15,6 +15,8 @@ package org.apache.geode.internal.cache.tier.sockets; +import static org.junit.Assert.assertEquals; + import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.server.CacheServer; @@ -22,45 +24,42 @@ import org.apache.geode.internal.AvailablePortHelper; import org.apache.geode.internal.cache.tier.Acceptor; import org.apache.geode.test.junit.categories.IntegrationTest; import org.awaitility.Awaitility; +import org.junit.Rule; import org.junit.Test; +import org.junit.contrib.java.lang.system.RestoreSystemProperties; import org.junit.experimental.categories.Category; import java.io.IOException; import java.net.Socket; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; - /** - * Test that switching on the header byte makes instances of {@link NewProtocolServerConnection}. + * Test that switching on the header byte makes instances of + * {@link GenericProtocolServerConnection}. */ @Category(IntegrationTest.class) public class ServerConnectionFactoryIntegrationTest { - /** - * - * @throws IOException - */ + + @Rule + public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); + @Test public void testNewProtocolHeaderLeadsToNewProtocolServerConnection() throws IOException { System.setProperty("geode.feature-protobuf-protocol", "true"); - try { - CacheFactory cacheFactory = new CacheFactory(); - Cache cache = cacheFactory.create(); + CacheFactory cacheFactory = new CacheFactory(); + Cache cache = cacheFactory.create(); - CacheServer cacheServer = cache.addCacheServer(); - final int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort(); - cacheServer.setPort(cacheServerPort); - cacheServer.start(); + CacheServer cacheServer = cache.addCacheServer(); + final int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort(); + cacheServer.setPort(cacheServerPort); + cacheServer.start(); - Socket socket = new Socket("localhost", cacheServerPort); - Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); - socket.getOutputStream().write(Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL); - socket.getOutputStream().write(222); - assertEquals(222, socket.getInputStream().read()); + Socket socket = new Socket("localhost", cacheServerPort); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); + socket.getOutputStream().write(Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL); + socket.getOutputStream().write(222); + assertEquals(222, socket.getInputStream().read()); - cache.close(); - } finally { - System.clearProperty("geode.feature-protobuf-protocol"); - } + cache.close(); } } http://git-wip-us.apache.org/repos/asf/geode/blob/c71c28df/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java index 8e241b2..11b5289 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java @@ -39,16 +39,19 @@ public class ServerConnectionFactoryTest { */ @Test(expected = IOException.class) public void newClientProtocolThrows() throws Exception { - serverConnectionMockedExceptForCommunicationMode(Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL); + serverConnectionMockedExceptForCommunicationMode(Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL); } @Test public void newClientProtocolSucceedsWithSystemPropertySet() throws Exception { - System.setProperty("geode.feature-protobuf-protocol", "true"); - ServerConnection serverConnection = - serverConnectionMockedExceptForCommunicationMode(Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL); - assertTrue(serverConnection instanceof NewProtocolServerConnection); - System.clearProperty("geode.feature-protobuf-protocol"); + try { + System.setProperty("geode.feature-protobuf-protocol", "true"); + ServerConnection serverConnection = serverConnectionMockedExceptForCommunicationMode( + Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL); + assertTrue(serverConnection instanceof GenericProtocolServerConnection); + } finally { + System.clearProperty("geode.feature-protobuf-protocol"); + } } @Test