This is an automated email from the ASF dual-hosted git repository. upthewaterspout pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit 8ce5ebf6962b28366e0eef2570ca625bf62c0aa6 Author: Dan Smith <[email protected]> AuthorDate: Fri Feb 23 11:16:58 2018 -0800 Refactoring request/response into a common class in protobuf driver Refactoring the logic to actually interact with a socket to a common class in the geode-experimental-driver. This reduces duplicate code and will also make it easier to plugin in connection pooling and failover at a later time. --- .../{ProtobufDriver.java => ProtobufChannel.java} | 114 +++++++++------------ .../geode/experimental/driver/ProtobufDriver.java | 95 +++-------------- .../geode/experimental/driver/ProtobufRegion.java | 83 +++++++-------- 3 files changed, 93 insertions(+), 199 deletions(-) diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java similarity index 68% copy from geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java copy to geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java index f0c3cc9..4670d2d 100644 --- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java +++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java @@ -19,47 +19,52 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.HashSet; import java.util.Set; -import org.apache.geode.annotations.Experimental; import org.apache.geode.internal.protocol.protobuf.ProtocolVersion; import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes; import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; +import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.ErrorResponse; +import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message; +import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase; import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI; -import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI; -/** - * Implements the behaviors of a driver for communicating with a GemFire server by way of the new - * protocol. - * - * <strong>This code is an experimental prototype and is presented "as is" with no warranty, - * suitability, or fitness of purpose implied.</strong> - */ -@Experimental -public class ProtobufDriver implements Driver { - /** - * Set of Internet-address-or-host-name/port pairs of the locators to use to find GemFire servers - * that have Protobuf enabled. - */ - private final Set<InetSocketAddress> locators; +class ProtobufChannel { + private final Set<InetSocketAddress> locators; /** - * Socket to a GemFire locator that has Protobuf enabled. + * Socket to a GemFire server that has Protobuf enabled. */ - private final Socket socket; + final Socket socket; - /** - * Creates a driver implementation that communicates via <code>socket</code> to a GemFire locator. - * - * @param locators Set of Internet-address-or-host-name/port pairs of the locators to use to find - * GemFire servers that have Protobuf enabled. - * @throws IOException - */ - ProtobufDriver(Set<InetSocketAddress> locators) throws IOException { + public ProtobufChannel(final Set<InetSocketAddress> locators) throws IOException { this.locators = locators; + this.socket = connectToAServer(); + } + + Message sendRequest(final Message request, MessageTypeCase expectedResult) throws IOException { + final OutputStream outputStream = socket.getOutputStream(); + request.writeDelimitedTo(outputStream); + Message response = readResponse(); + + if (!response.getMessageTypeCase().equals(expectedResult)) { + throw new RuntimeException( + "Got invalid response for request " + request + ", response " + response); + } + return response; + } + + public void close() throws IOException { + this.socket.close(); + } + + public boolean isClosed() { + return this.socket.isClosed(); + } + + private Socket connectToAServer() throws IOException { InetSocketAddress server = findAServer(); - socket = new Socket(server.getAddress(), server.getPort()); + Socket socket = new Socket(server.getAddress(), server.getPort()); socket.setTcpNoDelay(true); socket.setSendBufferSize(65535); socket.setReceiveBufferSize(65535); @@ -75,51 +80,13 @@ public class ProtobufDriver implements Driver { .getVersionAccepted()) { throw new IOException("Failed protocol version verification."); } - } - - @Override - public Set<String> getRegionNames() throws IOException { - Set<String> regionNames = new HashSet<>(); - - final OutputStream outputStream = socket.getOutputStream(); - ClientProtocol.Message.newBuilder() - .setGetRegionNamesRequest(RegionAPI.GetRegionNamesRequest.newBuilder()).build() - .writeDelimitedTo(outputStream); - - final InputStream inputStream = socket.getInputStream(); - final RegionAPI.GetRegionNamesResponse getRegionNamesResponse = - ClientProtocol.Message.parseDelimitedFrom(inputStream).getGetRegionNamesResponse(); - for (int i = 0; i < getRegionNamesResponse.getRegionsCount(); ++i) { - regionNames.add(getRegionNamesResponse.getRegions(i)); - } - - return regionNames; - } - - @Override - public <K, V> Region<K, V> getRegion(String regionName) { - return new ProtobufRegion(regionName, socket); - } - - @Override - public void close() { - try { - this.socket.close(); - } catch (IOException e) { - // ignore - } - } - - @Override - public boolean isConnected() { - return !this.socket.isClosed(); + return socket; } /** * Queries locators for a Geode server that has Protobuf enabled. - * + * * @return The server chosen by the Locator service for this client - * @throws IOException */ private InetSocketAddress findAServer() throws IOException { IOException lastException = null; @@ -169,4 +136,15 @@ public class ProtobufDriver implements Driver { throw new IllegalStateException("No locators"); } } + + private Message readResponse() throws IOException { + final InputStream inputStream = socket.getInputStream(); + Message response = ClientProtocol.Message.parseDelimitedFrom(inputStream); + final ErrorResponse errorResponse = response.getErrorResponse(); + if (errorResponse != null && errorResponse.hasError()) { + throw new IOException(errorResponse.getError().getMessage()); + } + return response; + } + } diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java index f0c3cc9..573f3ef 100644 --- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java +++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java @@ -26,8 +26,11 @@ import org.apache.geode.annotations.Experimental; import org.apache.geode.internal.protocol.protobuf.ProtocolVersion; import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes; import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; +import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message; +import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase; import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI; import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI; +import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.GetRegionNamesRequest; /** * Implements the behaviors of a driver for communicating with a GemFire server by way of the new @@ -44,10 +47,7 @@ public class ProtobufDriver implements Driver { */ private final Set<InetSocketAddress> locators; - /** - * Socket to a GemFire locator that has Protobuf enabled. - */ - private final Socket socket; + private final ProtobufChannel channel; /** * Creates a driver implementation that communicates via <code>socket</code> to a GemFire locator. @@ -58,37 +58,20 @@ public class ProtobufDriver implements Driver { */ ProtobufDriver(Set<InetSocketAddress> locators) throws IOException { this.locators = locators; - InetSocketAddress server = findAServer(); - socket = new Socket(server.getAddress(), server.getPort()); - socket.setTcpNoDelay(true); - socket.setSendBufferSize(65535); - socket.setReceiveBufferSize(65535); - final OutputStream outputStream = socket.getOutputStream(); - ProtocolVersion.NewConnectionClientVersion.newBuilder() - .setMajorVersion(ProtocolVersion.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) - .setMinorVersion(ProtocolVersion.MinorVersions.CURRENT_MINOR_VERSION_VALUE).build() - .writeDelimitedTo(outputStream); - final InputStream inputStream = socket.getInputStream(); - if (!ProtocolVersion.VersionAcknowledgement.parseDelimitedFrom(inputStream) - .getVersionAccepted()) { - throw new IOException("Failed protocol version verification."); - } + this.channel = new ProtobufChannel(locators); } @Override public Set<String> getRegionNames() throws IOException { Set<String> regionNames = new HashSet<>(); - final OutputStream outputStream = socket.getOutputStream(); - ClientProtocol.Message.newBuilder() - .setGetRegionNamesRequest(RegionAPI.GetRegionNamesRequest.newBuilder()).build() - .writeDelimitedTo(outputStream); + final Message request = + Message.newBuilder().setGetRegionNamesRequest(GetRegionNamesRequest.newBuilder()).build(); - final InputStream inputStream = socket.getInputStream(); - final RegionAPI.GetRegionNamesResponse getRegionNamesResponse = - ClientProtocol.Message.parseDelimitedFrom(inputStream).getGetRegionNamesResponse(); + final RegionAPI.GetRegionNamesResponse getRegionNamesResponse = channel + .sendRequest(request, MessageTypeCase.GETREGIONNAMESRESPONSE).getGetRegionNamesResponse(); for (int i = 0; i < getRegionNamesResponse.getRegionsCount(); ++i) { regionNames.add(getRegionNamesResponse.getRegions(i)); } @@ -98,13 +81,13 @@ public class ProtobufDriver implements Driver { @Override public <K, V> Region<K, V> getRegion(String regionName) { - return new ProtobufRegion(regionName, socket); + return new ProtobufRegion(regionName, channel); } @Override public void close() { try { - this.socket.close(); + this.channel.close(); } catch (IOException e) { // ignore } @@ -112,61 +95,7 @@ public class ProtobufDriver implements Driver { @Override public boolean isConnected() { - return !this.socket.isClosed(); + return !this.channel.isClosed(); } - /** - * Queries locators for a Geode server that has Protobuf enabled. - * - * @return The server chosen by the Locator service for this client - * @throws IOException - */ - private InetSocketAddress findAServer() throws IOException { - IOException lastException = null; - - for (InetSocketAddress locator : locators) { - try { - final Socket locatorSocket = new Socket(locator.getAddress(), locator.getPort()); - - final OutputStream outputStream = locatorSocket.getOutputStream(); - final InputStream inputStream = locatorSocket.getInputStream(); - ProtocolVersion.NewConnectionClientVersion.newBuilder() - .setMajorVersion(ProtocolVersion.MajorVersions.CURRENT_MAJOR_VERSION_VALUE) - .setMinorVersion(ProtocolVersion.MinorVersions.CURRENT_MINOR_VERSION_VALUE).build() - .writeDelimitedTo(outputStream); - - // The locator does not currently send a reply to the ProtocolVersion... - if (!ProtocolVersion.VersionAcknowledgement.parseDelimitedFrom(inputStream) - .getVersionAccepted()) { - throw new IOException("Failed ProtocolVersion."); - } - - ClientProtocol.Message.newBuilder() - .setGetServerRequest(LocatorAPI.GetServerRequest.newBuilder()).build() - .writeDelimitedTo(outputStream); - - ClientProtocol.Message response = ClientProtocol.Message.parseDelimitedFrom(inputStream); - ClientProtocol.ErrorResponse errorResponse = response.getErrorResponse(); - - if (errorResponse != null && errorResponse.hasError()) { - throw new IOException( - "Error finding server: error code= " + errorResponse.getError().getErrorCode() - + "; error message=" + errorResponse.getError().getMessage()); - } - - LocatorAPI.GetServerResponse getServerResponse = response.getGetServerResponse(); - - BasicTypes.Server server = getServerResponse.getServer(); - return new InetSocketAddress(server.getHostname(), server.getPort()); - } catch (IOException e) { - lastException = e; - } - } - - if (lastException != null) { - throw lastException; - } else { - throw new IllegalStateException("No locators"); - } - } } diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufRegion.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufRegion.java index 7021f40..9f99f25 100644 --- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufRegion.java +++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufRegion.java @@ -15,8 +15,6 @@ package org.apache.geode.experimental.driver; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.Socket; import java.util.Collection; import java.util.HashMap; @@ -24,8 +22,13 @@ import java.util.Map; import org.apache.geode.annotations.Experimental; import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes; -import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; +import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message; +import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase; import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI; +import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.GetRegionRequest; +import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.GetRequest; +import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.PutRequest; +import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.RemoveRequest; /** * Implements the behaviors of a GemFire region. Send and receives Protobuf messages on the provided @@ -44,53 +47,39 @@ public class ProtobufRegion<K, V> implements Region<K, V> { */ final String name; - /** - * Socket to a GemFire server that has Protobuf enabled. - */ - final Socket socket; + final ProtobufChannel protobufChannel; /** * Creates a region implementation for the region <code>name</code> that communicates via * <code>socket</code> to a GemFire server. * * @param name String that uniquely identifies the region. - * @param socket Socket to a GemFire server that has Protobuf enabled. */ - ProtobufRegion(String name, Socket socket) { + ProtobufRegion(String name, ProtobufChannel channel) { this.name = name; - this.socket = socket; - } - - private ClientProtocol.Message readResponse() throws IOException { - final InputStream inputStream = socket.getInputStream(); - ClientProtocol.Message response = ClientProtocol.Message.parseDelimitedFrom(inputStream); - final ClientProtocol.ErrorResponse errorResponse = response.getErrorResponse(); - if (errorResponse != null && errorResponse.hasError()) { - throw new IOException(errorResponse.getError().getMessage()); - } - return response; + this.protobufChannel = channel; } @Override public RegionAttributes getRegionAttributes() throws IOException { - final OutputStream outputStream = socket.getOutputStream(); - ClientProtocol.Message.newBuilder() - .setGetRegionRequest(RegionAPI.GetRegionRequest.newBuilder().setRegionName(name)).build() - .writeDelimitedTo(outputStream); + final Message request = Message.newBuilder() + .setGetRegionRequest(GetRegionRequest.newBuilder().setRegionName(name)).build(); - return new RegionAttributes(readResponse().getGetRegionResponse().getRegion()); + return new RegionAttributes( + protobufChannel.sendRequest(request, MessageTypeCase.GETREGIONRESPONSE) + .getGetRegionResponse().getRegion()); } @Override public V get(K key) throws IOException { - final OutputStream outputStream = socket.getOutputStream(); - ClientProtocol.Message.newBuilder().setGetRequest( - RegionAPI.GetRequest.newBuilder().setRegionName(name).setKey(ValueEncoder.encodeValue(key))) - .build().writeDelimitedTo(outputStream); + Message request = Message.newBuilder() + .setGetRequest( + GetRequest.newBuilder().setRegionName(name).setKey(ValueEncoder.encodeValue(key))) + .build(); + final Message response = protobufChannel.sendRequest(request, MessageTypeCase.GETRESPONSE); - final ClientProtocol.Message response = readResponse(); return (V) ValueEncoder.decodeValue(response.getGetResponse().getResult()); } @@ -98,16 +87,16 @@ public class ProtobufRegion<K, V> implements Region<K, V> { public Map<K, V> getAll(Collection<K> keys) throws IOException { Map<K, V> values = new HashMap<>(); - final OutputStream outputStream = socket.getOutputStream(); RegionAPI.GetAllRequest.Builder getAllRequest = RegionAPI.GetAllRequest.newBuilder(); getAllRequest.setRegionName(name); for (K key : keys) { getAllRequest.addKey(ValueEncoder.encodeValue(key)); } - ClientProtocol.Message.newBuilder().setGetAllRequest(getAllRequest).build() - .writeDelimitedTo(outputStream); + Message request = Message.newBuilder().setGetAllRequest(getAllRequest).build(); + + Message message = protobufChannel.sendRequest(request, MessageTypeCase.GETALLRESPONSE); - final RegionAPI.GetAllResponse getAllResponse = readResponse().getGetAllResponse(); + final RegionAPI.GetAllResponse getAllResponse = message.getGetAllResponse(); Map<Object, String> failures = new HashMap<>(); if (getAllResponse.getFailuresCount() > 0) { for (BasicTypes.KeyedError keyedError : getAllResponse.getFailuresList()) { @@ -126,26 +115,24 @@ public class ProtobufRegion<K, V> implements Region<K, V> { @Override public void put(K key, V value) throws IOException { - final OutputStream outputStream = socket.getOutputStream(); - ClientProtocol.Message.newBuilder().setPutRequest(RegionAPI.PutRequest.newBuilder() - .setRegionName(name).setEntry(ValueEncoder.encodeEntry(key, value))).build() - .writeDelimitedTo(outputStream); + final Message request = Message.newBuilder().setPutRequest( + PutRequest.newBuilder().setRegionName(name).setEntry(ValueEncoder.encodeEntry(key, value))) + .build(); - readResponse(); + protobufChannel.sendRequest(request, MessageTypeCase.PUTRESPONSE); } @Override public void putAll(Map<K, V> values) throws IOException { - final OutputStream outputStream = socket.getOutputStream(); RegionAPI.PutAllRequest.Builder putAllRequest = RegionAPI.PutAllRequest.newBuilder(); putAllRequest.setRegionName(name); for (K key : values.keySet()) { putAllRequest.addEntry(ValueEncoder.encodeEntry(key, values.get(key))); } - ClientProtocol.Message.newBuilder().setPutAllRequest(putAllRequest).build() - .writeDelimitedTo(outputStream); + final Message request = Message.newBuilder().setPutAllRequest(putAllRequest).build(); - final RegionAPI.PutAllResponse putAllResponse = readResponse().getPutAllResponse(); + final RegionAPI.PutAllResponse putAllResponse = + protobufChannel.sendRequest(request, MessageTypeCase.PUTALLRESPONSE).getPutAllResponse(); if (0 < putAllResponse.getFailedKeysCount()) { Map<Object, String> failures = new HashMap<>(); for (BasicTypes.KeyedError keyedError : putAllResponse.getFailedKeysList()) { @@ -159,11 +146,11 @@ public class ProtobufRegion<K, V> implements Region<K, V> { @Override public void remove(K key) throws IOException { - final OutputStream outputStream = socket.getOutputStream(); - ClientProtocol.Message.newBuilder().setRemoveRequest(RegionAPI.RemoveRequest.newBuilder() - .setRegionName(name).setKey(ValueEncoder.encodeValue(key))).build() - .writeDelimitedTo(outputStream); + final Message request = Message.newBuilder() + .setRemoveRequest( + RemoveRequest.newBuilder().setRegionName(name).setKey(ValueEncoder.encodeValue(key))) + .build(); - readResponse(); + protobufChannel.sendRequest(request, MessageTypeCase.REMOVERESPONSE); } } -- To stop receiving notification emails like this one, please contact [email protected].
