GEODE-3075: Initial refactor adding NewProtocolServerConnection subclassing `ServerConnection`.
The new code is broken but it won't be called under normal operation, since it's gated on a system property, "geode.feature-protobuf-protocol" Further refactoring and feature work to come. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/dfdde4af Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/dfdde4af Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/dfdde4af Branch: refs/heads/develop Commit: dfdde4af352566d60f18b0db156fc5b7f48ebd8b Parents: 49123c4 Author: Galen OSullivan <gosulli...@pivotal.io> Authored: Mon Jun 12 09:33:35 2017 -0700 Committer: Hitesh Khamesra <hkhame...@pivotal.io> Committed: Mon Jun 26 09:26:22 2017 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/tier/Acceptor.java | 4 + .../cache/tier/sockets/AcceptorImpl.java | 10 +-- .../cache/tier/sockets/ClientHealthMonitor.java | 7 +- .../sockets/ClientProtocolMessageHandler.java | 29 +++++++ .../tier/sockets/LegacyServerConnection.java | 88 ++++++++++++++++++++ .../sockets/NewProtocolServerConnection.java | 86 +++++++++++++++++++ .../cache/tier/sockets/ServerConnection.java | 75 +++++++---------- .../tier/sockets/ServerConnectionFactory.java | 50 +++++++++++ .../sockets/ServerConnectionFactoryTest.java | 77 +++++++++++++++++ .../tier/sockets/ServerConnectionTest.java | 7 +- 10 files changed, 375 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java index 9a3241b..a95195a 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java @@ -71,6 +71,10 @@ public abstract class Acceptor { */ public static final byte CLIENT_TO_SERVER_FOR_QUEUE = (byte) 107; + /** + * For the new client-server protocol, which ignores the usual handshake mechanism. + */ + public static final byte CLIENT_TO_SERVER_NEW_PROTOCOL = (byte) 110; /** * The GFE version of the server. http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java index 2a8818c..24efc93 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java @@ -1422,8 +1422,8 @@ public class AcceptorImpl extends Acceptor implements Runnable { s.setTcpNoDelay(this.tcpNoDelay); if (communicationMode == CLIENT_TO_SERVER || communicationMode == GATEWAY_TO_GATEWAY - || communicationMode == MONITOR_TO_SERVER - || communicationMode == CLIENT_TO_SERVER_FOR_QUEUE) { + || communicationMode == MONITOR_TO_SERVER || communicationMode == CLIENT_TO_SERVER_FOR_QUEUE + || communicationMode == CLIENT_TO_SERVER_NEW_PROTOCOL) { String communicationModeStr = ""; switch (communicationMode) { case CLIENT_TO_SERVER: @@ -1466,9 +1466,9 @@ public class AcceptorImpl extends Acceptor implements Runnable { return; } } - ServerConnection serverConn = new ServerConnection(s, this.cache, this.crHelper, this.stats, - AcceptorImpl.handShakeTimeout, this.socketBufferSize, communicationModeStr, - communicationMode, this, this.securityService); + ServerConnection serverConn = ServerConnectionFactory.makeServerConnection(s, this.cache, + this.crHelper, this.stats, AcceptorImpl.handShakeTimeout, this.socketBufferSize, + communicationModeStr, communicationMode, this, this.securityService); synchronized (this.allSCsLock) { this.allSCs.add(serverConn); ServerConnection snap[] = this.allSCList; // avoid volatile read http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java index e0b5ab8..35cc33f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java @@ -478,12 +478,7 @@ public class ClientHealthMonitor { Iterator connectionsIterator = connections.iterator(); while (connectionsIterator.hasNext()) { ServerConnection sc = (ServerConnection) connectionsIterator.next(); - byte communicationMode = sc.getCommunicationMode(); - /* Check for all modes that could be used for Client-Server communication */ - if (communicationMode == Acceptor.CLIENT_TO_SERVER - || communicationMode == Acceptor.PRIMARY_SERVER_TO_CLIENT - || communicationMode == Acceptor.SECONDARY_SERVER_TO_CLIENT - || communicationMode == Acceptor.CLIENT_TO_SERVER_FOR_QUEUE) { + if (sc.isClientServerConnection()) { memberId = sc.getMembershipID(); // each ServerConnection has the same member id cci.setMemberId(memberId); cci.setNumberOfConnections(connections.size()); http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java new file mode 100644 index 0000000..db42330 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +import org.apache.geode.internal.cache.InternalCache; + +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Stub, this will be hooked up to the new client protocol when it's implemented. + */ +public class ClientProtocolMessageHandler { + public void receiveMessage(InputStream inputStream, OutputStream outputStream, + InternalCache cache) {} +} http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/LegacyServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/LegacyServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/LegacyServerConnection.java new file mode 100644 index 0000000..2b46eb3 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/LegacyServerConnection.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.Acceptor; +import org.apache.geode.internal.cache.tier.CachedRegionHelper; +import org.apache.geode.internal.i18n.LocalizedStrings; +import org.apache.geode.internal.logging.log4j.LocalizedMessage; +import org.apache.geode.internal.security.SecurityService; + +import java.io.IOException; +import java.net.Socket; + +/** + * Handles everything but the new client protocol. + * + * Legacy is therefore a bit of a misnomer; do you have a better name? + */ +public class LegacyServerConnection extends ServerConnection { + /** + * Set to false once handshake has been done + */ + private boolean doHandshake = true; + + /** + * Creates a new <code>ServerConnection</code> that processes messages received from an edge + * client over a given <code>Socket</code>. + * + * @param socket + * @param internalCache + * @param helper + * @param stats + * @param hsTimeout + * @param socketBufferSize + * @param communicationModeStr + * @param communicationMode + * @param acceptor + * @param securityService + */ + public LegacyServerConnection(Socket socket, InternalCache internalCache, + CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize, + String communicationModeStr, byte communicationMode, Acceptor acceptor, + SecurityService securityService) { + super(socket, internalCache, helper, stats, hsTimeout, socketBufferSize, communicationModeStr, + communicationMode, acceptor, securityService); + } + + @Override + protected boolean doHandShake(byte epType, int qSize) { + try { + this.handshake.accept(theSocket.getOutputStream(), theSocket.getInputStream(), epType, qSize, + this.communicationMode, this.principal); + } catch (IOException ioe) { + if (!crHelper.isShutdown() && !isTerminated()) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.ServerConnection_0_HANDSHAKE_ACCEPT_FAILED_ON_SOCKET_1_2, + new Object[] {this.name, this.theSocket, ioe})); + } + cleanup(); + return false; + } + return true; + } + + protected void doOneMessage() { + if (this.doHandshake) { + doHandshake(); + this.doHandshake = false; + } else { + this.resetTransientData(); + doNormalMsg(); + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java new file mode 100644 index 0000000..a78cd1c --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NewProtocolServerConnection.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.Acceptor; +import org.apache.geode.internal.cache.tier.CachedRegionHelper; +import org.apache.geode.internal.security.SecurityService; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + +/** + * Holds the socket and protocol handler for the new client protocol. TODO: Currently unimplemented + * due the the protocol not being there. + */ +public class NewProtocolServerConnection extends ServerConnection { + // The new protocol lives in a separate module and gets loaded when this class is instantiated. + // TODO implement this. + private final ClientProtocolMessageHandler newClientProtocol; + + + /** + * Creates a new <code>NewProtocolServerConnection</code> that processes messages received from an + * edge client over a given <code>Socket</code>. + * + * @param s + * @param c + * @param helper + * @param stats + * @param hsTimeout + * @param socketBufferSize + * @param communicationModeStr + * @param communicationMode + * @param acceptor + */ + public NewProtocolServerConnection(Socket s, InternalCache c, CachedRegionHelper helper, + CacheServerStats stats, int hsTimeout, int socketBufferSize, String communicationModeStr, + byte communicationMode, Acceptor acceptor, ClientProtocolMessageHandler newClientProtocol, + SecurityService securityService) { + super(s, c, helper, stats, hsTimeout, socketBufferSize, communicationModeStr, communicationMode, + acceptor, securityService); + assert (communicationMode == AcceptorImpl.CLIENT_TO_SERVER_NEW_PROTOCOL); + this.newClientProtocol = newClientProtocol; + } + + @Override + protected void doOneMessage() { + try { + Socket socket = this.getSocket(); + InputStream inputStream = socket.getInputStream(); + OutputStream outputStream = socket.getOutputStream(); + // TODO serialization types? + newClientProtocol.receiveMessage(inputStream, outputStream, this.getCache()); + } catch (IOException e) { + // TODO? + } + return; + } + + @Override + protected boolean doHandShake(byte epType, int qSize) { + // no handshake for new client protocol. + return true; + } + + @Override + public boolean isClientServerConnection() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index 947b836..ebc9dab 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -74,9 +74,9 @@ import org.apache.geode.security.GemFireSecurityException; * * @since GemFire 2.0.2 */ -public class ServerConnection implements Runnable { +public abstract class ServerConnection implements Runnable { - private static final Logger logger = LogService.getLogger(); + protected static final Logger logger = LogService.getLogger(); /** * This is a buffer that we add to client readTimeout value before we cleanup the connection. This @@ -138,12 +138,12 @@ public class ServerConnection implements Runnable { } } - private Socket theSocket; + protected Socket theSocket; // private InputStream in = null; // private OutputStream out = null; private ByteBuffer commBuffer; - private final CachedRegionHelper crHelper; - private String name = null; + protected final CachedRegionHelper crHelper; + protected String name = null; // IMPORTANT: if new messages are added change setHandshake to initialize them // to the correct Version for serializing to the client @@ -168,7 +168,7 @@ public class ServerConnection implements Runnable { /** * Handshake reference uniquely identifying a client */ - private ClientHandShake handshake; + protected ClientHandShake handshake; private int handShakeTimeout; private final Object handShakeMonitor = new Object(); @@ -213,7 +213,7 @@ public class ServerConnection implements Runnable { * The communication mode for this <code>ServerConnection</code>. Valid types include * 'client-server', 'gateway-gateway' and 'monitor-server'. */ - private final byte communicationMode; + protected final byte communicationMode; private final String communicationModeStr; private long processingMessageStartTime = -1; @@ -233,7 +233,7 @@ public class ServerConnection implements Runnable { private Part securePart = null; - private Principal principal; + protected Principal principal; private MessageIdExtractor messageIdExtractor = new MessageIdExtractor(); @@ -592,19 +592,14 @@ public class ServerConnection implements Runnable { } } - private boolean acceptHandShake(byte epType, int qSize) { - try { - this.handshake.accept(theSocket.getOutputStream(), theSocket.getInputStream(), epType, qSize, - this.communicationMode, this.principal); - } catch (IOException ioe) { - if (!crHelper.isShutdown() && !isTerminated()) { - logger.warn(LocalizedMessage.create( - LocalizedStrings.ServerConnection_0_HANDSHAKE_ACCEPT_FAILED_ON_SOCKET_1_2, - new Object[] {this.name, this.theSocket, ioe})); - } - cleanup(); - return false; - } + protected boolean acceptHandShake(byte epType, int qSize) { + return doHandShake(epType, qSize) && handshakeAccepted(); + } + + protected abstract boolean doHandShake(byte epType, int qSize); + + + protected boolean handshakeAccepted() { if (logger.isDebugEnabled()) { logger.debug("{}: Accepted handshake", this.name); } @@ -670,6 +665,16 @@ public class ServerConnection implements Runnable { } } + /** + * @return whether this is a connection to a client, regardless of protocol. + */ + public boolean isClientServerConnection() { + return communicationMode == Acceptor.CLIENT_TO_SERVER + || communicationMode == Acceptor.PRIMARY_SERVER_TO_CLIENT + || communicationMode == Acceptor.SECONDARY_SERVER_TO_CLIENT + || communicationMode == Acceptor.CLIENT_TO_SERVER_FOR_QUEUE; + } + static class Counter { int cnt; @@ -686,22 +691,12 @@ public class ServerConnection implements Runnable { } } - // public void setUserAuthAttributes(ClientProxyMembershipID proxyId, AuthorizeRequest - // authzRequest, AuthorizeRequestPP postAuthzRequest) { - // UserAuthAttributes uaa = new UserAuthAttributes(authzRequest, postAuthzRequest); - // } - - /** - * Set to false once handshake has been done - */ - private boolean doHandshake = true; - private boolean clientDisconnectedCleanly = false; private Throwable clientDisconnectedException; private int failureCount = 0; private boolean processMessages = true; - private void doHandshake() { + protected void doHandshake() { // hitesh:to create new connection handshake if (verifyClientConnection()) { // Initialize the commands after the handshake so that the version @@ -718,7 +713,7 @@ public class ServerConnection implements Runnable { } } - private void doNormalMsg() { + protected void doNormalMsg() { Message msg = null; msg = BaseCommand.readRequest(this); ThreadState threadState = null; @@ -903,15 +898,7 @@ public class ServerConnection implements Runnable { } } - private void doOneMessage() { - if (this.doHandshake) { - doHandshake(); - this.doHandshake = false; - } else { - this.resetTransientData(); - doNormalMsg(); - } - } + protected abstract void doOneMessage(); private void initializeClientUserAuths() { this.clientUserAuths = getClientUserAuths(this.proxyId); @@ -1070,7 +1057,7 @@ public class ServerConnection implements Runnable { /** * MessageType of the messages (typically internal commands) which do not need to participate in * security should be added in the following if block. - * + * * @return Part * @see AbstractOp#processSecureBytes(Connection, Message) * @see AbstractOp#needsUserId() @@ -1495,7 +1482,7 @@ public class ServerConnection implements Runnable { /** * Just ensure that this class gets loaded. - * + * * @see SystemFailure#loadEmergencyClasses() */ public static void loadEmergencyClasses() { http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java new file mode 100644 index 0000000..4f2e304 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.Acceptor; +import org.apache.geode.internal.cache.tier.CachedRegionHelper; +import org.apache.geode.internal.security.SecurityService; + +import java.io.IOException; +import java.net.Socket; + +/** + * Creates instances of ServerConnection based on the connection mode provided. + */ +public class ServerConnectionFactory { + // TODO: implement ClientProtocolMessageHandler. + private static final ClientProtocolMessageHandler newClientProtocol = + new ClientProtocolMessageHandler(); + + public static ServerConnection makeServerConnection(Socket s, InternalCache c, + CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize, + String communicationModeStr, byte communicationMode, Acceptor acceptor, + SecurityService securityService) throws IOException { + if (communicationMode == Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL) { + if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) { + throw new IOException("Acceptor received unknown communication mode: " + communicationMode); + } else { + return new NewProtocolServerConnection(s, c, helper, stats, hsTimeout, socketBufferSize, + communicationModeStr, communicationMode, acceptor, newClientProtocol, securityService); + } + } else { + return new LegacyServerConnection(s, c, helper, stats, hsTimeout, socketBufferSize, + communicationModeStr, communicationMode, acceptor, securityService); + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java new file mode 100644 index 0000000..c314944 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.Acceptor; +import org.apache.geode.internal.cache.tier.CachedRegionHelper; +import org.apache.geode.internal.security.SecurityService; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class ServerConnectionFactoryTest { + /** + * Safeguard that we won't create the new client protocol object unless the feature flag is enabled. + */ + @Test(expected = IOException.class) + public void newClientProtocolThrows() throws Exception { + serverConnectionMockedExceptForCommunicationMode(Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL); + } + + @Test + public void newClientProtocolSucceedsWithSystemPropertySet() throws Exception { + System.setProperty("geode.feature-protobuf-protocol", "true"); + ServerConnection serverConnection = serverConnectionMockedExceptForCommunicationMode(Acceptor.CLIENT_TO_SERVER_NEW_PROTOCOL); + assertTrue(serverConnection instanceof NewProtocolServerConnection); + System.clearProperty("geode.feature-protobuf-protocol"); + } + + @Test + public void makeServerConnection() throws Exception { + byte[] communicationModes = new byte[]{ + Acceptor.CLIENT_TO_SERVER, + Acceptor.PRIMARY_SERVER_TO_CLIENT, + Acceptor.SECONDARY_SERVER_TO_CLIENT, + Acceptor.GATEWAY_TO_GATEWAY, + Acceptor.MONITOR_TO_SERVER, + Acceptor.SUCCESSFUL_SERVER_TO_CLIENT, + Acceptor.UNSUCCESSFUL_SERVER_TO_CLIENT, + Acceptor.CLIENT_TO_SERVER_FOR_QUEUE, + }; + + for (byte communicationMode : communicationModes) { + ServerConnection serverConnection = serverConnectionMockedExceptForCommunicationMode(communicationMode); + assertTrue(serverConnection instanceof LegacyServerConnection); + } + } + + private static ServerConnection serverConnectionMockedExceptForCommunicationMode(byte communicationMode) throws IOException { + Socket socketMock = mock(Socket.class); + when(socketMock.getInetAddress()).thenReturn(InetAddress.getByName("localhost")); + + return ServerConnectionFactory.makeServerConnection( + socketMock, mock(InternalCache.class), mock(CachedRegionHelper.class), + mock(CacheServerStats.class), 0, 0, "", + communicationMode, mock(AcceptorImpl.class), mock(SecurityService.class)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/geode/blob/dfdde4af/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java index 794c610..d3ef21f 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java @@ -39,6 +39,7 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import java.io.IOException; import java.net.InetAddress; import java.net.Socket; @@ -57,7 +58,7 @@ public class ServerConnectionTest { private ServerConnection serverConnection; @Before - public void setUp() { + public void setUp() throws IOException { AcceptorImpl acceptor = mock(AcceptorImpl.class); InetAddress inetAddress = mock(InetAddress.class); @@ -69,8 +70,8 @@ public class ServerConnectionTest { InternalCache cache = mock(InternalCache.class); SecurityService securityService = mock(SecurityService.class); - serverConnection = new ServerConnection(socket, cache, null, null, 0, 0, null, - Acceptor.PRIMARY_SERVER_TO_CLIENT, acceptor, securityService); + serverConnection = ServerConnectionFactory.makeServerConnection(socket, cache, null, null, 0, 0, + null, Acceptor.PRIMARY_SERVER_TO_CLIENT, acceptor, securityService); MockitoAnnotations.initMocks(this); }