This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 8ce5ebf6962b28366e0eef2570ca625bf62c0aa6
Author: Dan Smith <[email protected]>
AuthorDate: Fri Feb 23 11:16:58 2018 -0800

    Refactoring request/response into a common class in protobuf driver
    
    Refactoring the logic to actually interact with a socket to a common
    class in the geode-experimental-driver. This reduces duplicate code and
    will also make it easier to plugin in connection pooling and failover at
    a later time.
---
 .../{ProtobufDriver.java => ProtobufChannel.java}  | 114 +++++++++------------
 .../geode/experimental/driver/ProtobufDriver.java  |  95 +++--------------
 .../geode/experimental/driver/ProtobufRegion.java  |  83 +++++++--------
 3 files changed, 93 insertions(+), 199 deletions(-)

diff --git 
a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
 
b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
similarity index 68%
copy from 
geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
copy to 
geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
index f0c3cc9..4670d2d 100644
--- 
a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
+++ 
b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
@@ -19,47 +19,52 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.geode.annotations.Experimental;
 import org.apache.geode.internal.protocol.protobuf.ProtocolVersion;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.ErrorResponse;
+import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase;
 import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI;
-import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
 
-/**
- * Implements the behaviors of a driver for communicating with a GemFire 
server by way of the new
- * protocol.
- *
- * <strong>This code is an experimental prototype and is presented "as is" 
with no warranty,
- * suitability, or fitness of purpose implied.</strong>
- */
-@Experimental
-public class ProtobufDriver implements Driver {
-  /**
-   * Set of Internet-address-or-host-name/port pairs of the locators to use to 
find GemFire servers
-   * that have Protobuf enabled.
-   */
-  private final Set<InetSocketAddress> locators;
+class ProtobufChannel {
 
+  private final Set<InetSocketAddress> locators;
   /**
-   * Socket to a GemFire locator that has Protobuf enabled.
+   * Socket to a GemFire server that has Protobuf enabled.
    */
-  private final Socket socket;
+  final Socket socket;
 
-  /**
-   * Creates a driver implementation that communicates via <code>socket</code> 
to a GemFire locator.
-   *
-   * @param locators Set of Internet-address-or-host-name/port pairs of the 
locators to use to find
-   *        GemFire servers that have Protobuf enabled.
-   * @throws IOException
-   */
-  ProtobufDriver(Set<InetSocketAddress> locators) throws IOException {
+  public ProtobufChannel(final Set<InetSocketAddress> locators) throws 
IOException {
     this.locators = locators;
+    this.socket = connectToAServer();
+  }
+
+  Message sendRequest(final Message request, MessageTypeCase expectedResult) 
throws IOException {
+    final OutputStream outputStream = socket.getOutputStream();
+    request.writeDelimitedTo(outputStream);
+    Message response = readResponse();
+
+    if (!response.getMessageTypeCase().equals(expectedResult)) {
+      throw new RuntimeException(
+          "Got invalid response for request " + request + ", response " + 
response);
+    }
+    return response;
+  }
+
+  public void close() throws IOException {
+    this.socket.close();
+  }
+
+  public boolean isClosed() {
+    return this.socket.isClosed();
+  }
+
+  private Socket connectToAServer() throws IOException {
     InetSocketAddress server = findAServer();
-    socket = new Socket(server.getAddress(), server.getPort());
+    Socket socket = new Socket(server.getAddress(), server.getPort());
     socket.setTcpNoDelay(true);
     socket.setSendBufferSize(65535);
     socket.setReceiveBufferSize(65535);
@@ -75,51 +80,13 @@ public class ProtobufDriver implements Driver {
         .getVersionAccepted()) {
       throw new IOException("Failed protocol version verification.");
     }
-  }
-
-  @Override
-  public Set<String> getRegionNames() throws IOException {
-    Set<String> regionNames = new HashSet<>();
-
-    final OutputStream outputStream = socket.getOutputStream();
-    ClientProtocol.Message.newBuilder()
-        
.setGetRegionNamesRequest(RegionAPI.GetRegionNamesRequest.newBuilder()).build()
-        .writeDelimitedTo(outputStream);
-
-    final InputStream inputStream = socket.getInputStream();
-    final RegionAPI.GetRegionNamesResponse getRegionNamesResponse =
-        
ClientProtocol.Message.parseDelimitedFrom(inputStream).getGetRegionNamesResponse();
-    for (int i = 0; i < getRegionNamesResponse.getRegionsCount(); ++i) {
-      regionNames.add(getRegionNamesResponse.getRegions(i));
-    }
-
-    return regionNames;
-  }
-
-  @Override
-  public <K, V> Region<K, V> getRegion(String regionName) {
-    return new ProtobufRegion(regionName, socket);
-  }
-
-  @Override
-  public void close() {
-    try {
-      this.socket.close();
-    } catch (IOException e) {
-      // ignore
-    }
-  }
-
-  @Override
-  public boolean isConnected() {
-    return !this.socket.isClosed();
+    return socket;
   }
 
   /**
    * Queries locators for a Geode server that has Protobuf enabled.
-   *
+   * 
    * @return The server chosen by the Locator service for this client
-   * @throws IOException
    */
   private InetSocketAddress findAServer() throws IOException {
     IOException lastException = null;
@@ -169,4 +136,15 @@ public class ProtobufDriver implements Driver {
       throw new IllegalStateException("No locators");
     }
   }
+
+  private Message readResponse() throws IOException {
+    final InputStream inputStream = socket.getInputStream();
+    Message response = ClientProtocol.Message.parseDelimitedFrom(inputStream);
+    final ErrorResponse errorResponse = response.getErrorResponse();
+    if (errorResponse != null && errorResponse.hasError()) {
+      throw new IOException(errorResponse.getError().getMessage());
+    }
+    return response;
+  }
+
 }
diff --git 
a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
 
b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
index f0c3cc9..573f3ef 100644
--- 
a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
+++ 
b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
@@ -26,8 +26,11 @@ import org.apache.geode.annotations.Experimental;
 import org.apache.geode.internal.protocol.protobuf.ProtocolVersion;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase;
 import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
+import 
org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.GetRegionNamesRequest;
 
 /**
  * Implements the behaviors of a driver for communicating with a GemFire 
server by way of the new
@@ -44,10 +47,7 @@ public class ProtobufDriver implements Driver {
    */
   private final Set<InetSocketAddress> locators;
 
-  /**
-   * Socket to a GemFire locator that has Protobuf enabled.
-   */
-  private final Socket socket;
+  private final ProtobufChannel channel;
 
   /**
    * Creates a driver implementation that communicates via <code>socket</code> 
to a GemFire locator.
@@ -58,37 +58,20 @@ public class ProtobufDriver implements Driver {
    */
   ProtobufDriver(Set<InetSocketAddress> locators) throws IOException {
     this.locators = locators;
-    InetSocketAddress server = findAServer();
-    socket = new Socket(server.getAddress(), server.getPort());
-    socket.setTcpNoDelay(true);
-    socket.setSendBufferSize(65535);
-    socket.setReceiveBufferSize(65535);
 
-    final OutputStream outputStream = socket.getOutputStream();
-    ProtocolVersion.NewConnectionClientVersion.newBuilder()
-        
.setMajorVersion(ProtocolVersion.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-        
.setMinorVersion(ProtocolVersion.MinorVersions.CURRENT_MINOR_VERSION_VALUE).build()
-        .writeDelimitedTo(outputStream);
 
-    final InputStream inputStream = socket.getInputStream();
-    if (!ProtocolVersion.VersionAcknowledgement.parseDelimitedFrom(inputStream)
-        .getVersionAccepted()) {
-      throw new IOException("Failed protocol version verification.");
-    }
+    this.channel = new ProtobufChannel(locators);
   }
 
   @Override
   public Set<String> getRegionNames() throws IOException {
     Set<String> regionNames = new HashSet<>();
 
-    final OutputStream outputStream = socket.getOutputStream();
-    ClientProtocol.Message.newBuilder()
-        
.setGetRegionNamesRequest(RegionAPI.GetRegionNamesRequest.newBuilder()).build()
-        .writeDelimitedTo(outputStream);
+    final Message request =
+        
Message.newBuilder().setGetRegionNamesRequest(GetRegionNamesRequest.newBuilder()).build();
 
-    final InputStream inputStream = socket.getInputStream();
-    final RegionAPI.GetRegionNamesResponse getRegionNamesResponse =
-        
ClientProtocol.Message.parseDelimitedFrom(inputStream).getGetRegionNamesResponse();
+    final RegionAPI.GetRegionNamesResponse getRegionNamesResponse = channel
+        .sendRequest(request, 
MessageTypeCase.GETREGIONNAMESRESPONSE).getGetRegionNamesResponse();
     for (int i = 0; i < getRegionNamesResponse.getRegionsCount(); ++i) {
       regionNames.add(getRegionNamesResponse.getRegions(i));
     }
@@ -98,13 +81,13 @@ public class ProtobufDriver implements Driver {
 
   @Override
   public <K, V> Region<K, V> getRegion(String regionName) {
-    return new ProtobufRegion(regionName, socket);
+    return new ProtobufRegion(regionName, channel);
   }
 
   @Override
   public void close() {
     try {
-      this.socket.close();
+      this.channel.close();
     } catch (IOException e) {
       // ignore
     }
@@ -112,61 +95,7 @@ public class ProtobufDriver implements Driver {
 
   @Override
   public boolean isConnected() {
-    return !this.socket.isClosed();
+    return !this.channel.isClosed();
   }
 
-  /**
-   * Queries locators for a Geode server that has Protobuf enabled.
-   *
-   * @return The server chosen by the Locator service for this client
-   * @throws IOException
-   */
-  private InetSocketAddress findAServer() throws IOException {
-    IOException lastException = null;
-
-    for (InetSocketAddress locator : locators) {
-      try {
-        final Socket locatorSocket = new Socket(locator.getAddress(), 
locator.getPort());
-
-        final OutputStream outputStream = locatorSocket.getOutputStream();
-        final InputStream inputStream = locatorSocket.getInputStream();
-        ProtocolVersion.NewConnectionClientVersion.newBuilder()
-            
.setMajorVersion(ProtocolVersion.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
-            
.setMinorVersion(ProtocolVersion.MinorVersions.CURRENT_MINOR_VERSION_VALUE).build()
-            .writeDelimitedTo(outputStream);
-
-        // The locator does not currently send a reply to the 
ProtocolVersion...
-        if 
(!ProtocolVersion.VersionAcknowledgement.parseDelimitedFrom(inputStream)
-            .getVersionAccepted()) {
-          throw new IOException("Failed ProtocolVersion.");
-        }
-
-        ClientProtocol.Message.newBuilder()
-            
.setGetServerRequest(LocatorAPI.GetServerRequest.newBuilder()).build()
-            .writeDelimitedTo(outputStream);
-
-        ClientProtocol.Message response = 
ClientProtocol.Message.parseDelimitedFrom(inputStream);
-        ClientProtocol.ErrorResponse errorResponse = 
response.getErrorResponse();
-
-        if (errorResponse != null && errorResponse.hasError()) {
-          throw new IOException(
-              "Error finding server: error code= " + 
errorResponse.getError().getErrorCode()
-                  + "; error message=" + 
errorResponse.getError().getMessage());
-        }
-
-        LocatorAPI.GetServerResponse getServerResponse = 
response.getGetServerResponse();
-
-        BasicTypes.Server server = getServerResponse.getServer();
-        return new InetSocketAddress(server.getHostname(), server.getPort());
-      } catch (IOException e) {
-        lastException = e;
-      }
-    }
-
-    if (lastException != null) {
-      throw lastException;
-    } else {
-      throw new IllegalStateException("No locators");
-    }
-  }
 }
diff --git 
a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufRegion.java
 
b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufRegion.java
index 7021f40..9f99f25 100644
--- 
a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufRegion.java
+++ 
b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufRegion.java
@@ -15,8 +15,6 @@
 package org.apache.geode.experimental.driver;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.Socket;
 import java.util.Collection;
 import java.util.HashMap;
@@ -24,8 +22,13 @@ import java.util.Map;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase;
 import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
+import 
org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.GetRegionRequest;
+import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.GetRequest;
+import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.PutRequest;
+import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.RemoveRequest;
 
 /**
  * Implements the behaviors of a GemFire region. Send and receives Protobuf 
messages on the provided
@@ -44,53 +47,39 @@ public class ProtobufRegion<K, V> implements Region<K, V> {
    */
   final String name;
 
-  /**
-   * Socket to a GemFire server that has Protobuf enabled.
-   */
-  final Socket socket;
+  final ProtobufChannel protobufChannel;
 
   /**
    * Creates a region implementation for the region <code>name</code> that 
communicates via
    * <code>socket</code> to a GemFire server.
    *
    * @param name String that uniquely identifies the region.
-   * @param socket Socket to a GemFire server that has Protobuf enabled.
    */
-  ProtobufRegion(String name, Socket socket) {
+  ProtobufRegion(String name, ProtobufChannel channel) {
     this.name = name;
-    this.socket = socket;
-  }
-
-  private ClientProtocol.Message readResponse() throws IOException {
-    final InputStream inputStream = socket.getInputStream();
-    ClientProtocol.Message response = 
ClientProtocol.Message.parseDelimitedFrom(inputStream);
-    final ClientProtocol.ErrorResponse errorResponse = 
response.getErrorResponse();
-    if (errorResponse != null && errorResponse.hasError()) {
-      throw new IOException(errorResponse.getError().getMessage());
-    }
-    return response;
+    this.protobufChannel = channel;
   }
 
 
   @Override
   public RegionAttributes getRegionAttributes() throws IOException {
-    final OutputStream outputStream = socket.getOutputStream();
-    ClientProtocol.Message.newBuilder()
-        
.setGetRegionRequest(RegionAPI.GetRegionRequest.newBuilder().setRegionName(name)).build()
-        .writeDelimitedTo(outputStream);
+    final Message request = Message.newBuilder()
+        
.setGetRegionRequest(GetRegionRequest.newBuilder().setRegionName(name)).build();
 
-    return new 
RegionAttributes(readResponse().getGetRegionResponse().getRegion());
+    return new RegionAttributes(
+        protobufChannel.sendRequest(request, MessageTypeCase.GETREGIONRESPONSE)
+            .getGetRegionResponse().getRegion());
   }
 
 
   @Override
   public V get(K key) throws IOException {
-    final OutputStream outputStream = socket.getOutputStream();
-    ClientProtocol.Message.newBuilder().setGetRequest(
-        
RegionAPI.GetRequest.newBuilder().setRegionName(name).setKey(ValueEncoder.encodeValue(key)))
-        .build().writeDelimitedTo(outputStream);
+    Message request = Message.newBuilder()
+        .setGetRequest(
+            
GetRequest.newBuilder().setRegionName(name).setKey(ValueEncoder.encodeValue(key)))
+        .build();
+    final Message response = protobufChannel.sendRequest(request, 
MessageTypeCase.GETRESPONSE);
 
-    final ClientProtocol.Message response = readResponse();
     return (V) ValueEncoder.decodeValue(response.getGetResponse().getResult());
   }
 
@@ -98,16 +87,16 @@ public class ProtobufRegion<K, V> implements Region<K, V> {
   public Map<K, V> getAll(Collection<K> keys) throws IOException {
     Map<K, V> values = new HashMap<>();
 
-    final OutputStream outputStream = socket.getOutputStream();
     RegionAPI.GetAllRequest.Builder getAllRequest = 
RegionAPI.GetAllRequest.newBuilder();
     getAllRequest.setRegionName(name);
     for (K key : keys) {
       getAllRequest.addKey(ValueEncoder.encodeValue(key));
     }
-    ClientProtocol.Message.newBuilder().setGetAllRequest(getAllRequest).build()
-        .writeDelimitedTo(outputStream);
+    Message request = 
Message.newBuilder().setGetAllRequest(getAllRequest).build();
+
+    Message message = protobufChannel.sendRequest(request, 
MessageTypeCase.GETALLRESPONSE);
 
-    final RegionAPI.GetAllResponse getAllResponse = 
readResponse().getGetAllResponse();
+    final RegionAPI.GetAllResponse getAllResponse = 
message.getGetAllResponse();
     Map<Object, String> failures = new HashMap<>();
     if (getAllResponse.getFailuresCount() > 0) {
       for (BasicTypes.KeyedError keyedError : 
getAllResponse.getFailuresList()) {
@@ -126,26 +115,24 @@ public class ProtobufRegion<K, V> implements Region<K, V> 
{
 
   @Override
   public void put(K key, V value) throws IOException {
-    final OutputStream outputStream = socket.getOutputStream();
-    
ClientProtocol.Message.newBuilder().setPutRequest(RegionAPI.PutRequest.newBuilder()
-        .setRegionName(name).setEntry(ValueEncoder.encodeEntry(key, 
value))).build()
-        .writeDelimitedTo(outputStream);
+    final Message request = Message.newBuilder().setPutRequest(
+        
PutRequest.newBuilder().setRegionName(name).setEntry(ValueEncoder.encodeEntry(key,
 value)))
+        .build();
 
-    readResponse();
+    protobufChannel.sendRequest(request, MessageTypeCase.PUTRESPONSE);
   }
 
   @Override
   public void putAll(Map<K, V> values) throws IOException {
-    final OutputStream outputStream = socket.getOutputStream();
     RegionAPI.PutAllRequest.Builder putAllRequest = 
RegionAPI.PutAllRequest.newBuilder();
     putAllRequest.setRegionName(name);
     for (K key : values.keySet()) {
       putAllRequest.addEntry(ValueEncoder.encodeEntry(key, values.get(key)));
     }
-    ClientProtocol.Message.newBuilder().setPutAllRequest(putAllRequest).build()
-        .writeDelimitedTo(outputStream);
+    final Message request = 
Message.newBuilder().setPutAllRequest(putAllRequest).build();
 
-    final RegionAPI.PutAllResponse putAllResponse = 
readResponse().getPutAllResponse();
+    final RegionAPI.PutAllResponse putAllResponse =
+        protobufChannel.sendRequest(request, 
MessageTypeCase.PUTALLRESPONSE).getPutAllResponse();
     if (0 < putAllResponse.getFailedKeysCount()) {
       Map<Object, String> failures = new HashMap<>();
       for (BasicTypes.KeyedError keyedError : 
putAllResponse.getFailedKeysList()) {
@@ -159,11 +146,11 @@ public class ProtobufRegion<K, V> implements Region<K, V> 
{
 
   @Override
   public void remove(K key) throws IOException {
-    final OutputStream outputStream = socket.getOutputStream();
-    
ClientProtocol.Message.newBuilder().setRemoveRequest(RegionAPI.RemoveRequest.newBuilder()
-        .setRegionName(name).setKey(ValueEncoder.encodeValue(key))).build()
-        .writeDelimitedTo(outputStream);
+    final Message request = Message.newBuilder()
+        .setRemoveRequest(
+            
RemoveRequest.newBuilder().setRegionName(name).setKey(ValueEncoder.encodeValue(key)))
+        .build();
 
-    readResponse();
+    protobufChannel.sendRequest(request, MessageTypeCase.REMOVERESPONSE);
   }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to