GEODE-2582: Add get request; add serializer to new protocol handler.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/9973c493 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/9973c493 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/9973c493 Branch: refs/heads/feature/GEODE-2580 Commit: 9973c4938a0c27f0a53aedf3460995a47ce78427 Parents: 511e2a3 Author: Galen OSullivan <gosulli...@pivotal.io> Authored: Thu May 18 17:17:09 2017 -0700 Committer: Udo Kohlmeyer <ukohlme...@pivotal.io> Committed: Mon May 22 11:27:03 2017 -0700 ---------------------------------------------------------------------- .../client/NewClientProtocolTestClient.java | 6 +- .../client/ProtobufProtocolMessageHandler.java | 49 ++++++++++++---- .../geode/protocol/client/MessageUtils.java | 15 +++++ .../client/ProtobufProtocolIntegrationTest.java | 62 ++++++++++++++++++-- ...rotobufSerializationDeserializationTest.java | 4 +- .../sockets/ClientProtocolMessageHandler.java | 5 +- .../cache/tier/sockets/ServerConnection.java | 3 +- 7 files changed, 120 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/9973c493/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java index a4476e1..834d2f6 100644 --- a/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java +++ b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java @@ -43,6 +43,8 @@ public class NewClientProtocolTestClient implements AutoCloseable { socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); inputStream = socketChannel.socket().getInputStream(); outputStream = socketChannel.socket().getOutputStream(); + + sendHeader(outputStream); } @Override @@ -55,15 +57,13 @@ public class NewClientProtocolTestClient implements AutoCloseable { } public Message blockingSendMessage(Message message) throws IOException { - sendHeader(outputStream); - message.writeDelimitedTo(outputStream); outputStream.flush(); return ClientProtocol.Message.parseDelimitedFrom(inputStream); } - void parseResponse(Message response) { + void printResponse(Message response) { System.out.println("response = " + response.toString()); } http://git-wip-us.apache.org/repos/asf/geode/blob/9973c493/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java index a6993c4..10ad7e9 100644 --- a/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java +++ b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java @@ -16,7 +16,6 @@ package org.apache.geode.protocol.client; import com.google.protobuf.ByteString; -import com.google.protobuf.Parser; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheWriterException; import org.apache.geode.cache.Region; @@ -24,10 +23,11 @@ import org.apache.geode.cache.TimeoutException; import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler; import org.apache.geode.internal.logging.LogService; import org.apache.geode.protocol.protobuf.BasicTypes; +import org.apache.geode.protocol.protobuf.RegionAPI; import org.apache.geode.protocol.protobuf.RegionAPI.GetRequest; -import org.apache.geode.protocol.protobuf.RegionAPI.GetResponse; import org.apache.geode.protocol.protobuf.RegionAPI.PutResponse; import org.apache.geode.serialization.Deserializer; +import org.apache.geode.serialization.Serializer; import org.apache.logging.log4j.Logger; import java.io.IOException; @@ -49,7 +49,7 @@ public class ProtobufProtocolMessageHandler implements ClientProtocolMessageHand @Override public void receiveMessage(InputStream inputStream, OutputStream outputStream, - Deserializer deserializer, Cache cache) throws IOException { + Deserializer deserializer, Serializer serializer, Cache cache) throws IOException { final Message message = Message.parseDelimitedFrom(inputStream); // can be null at EOF, see Parser.parseDelimitedFrom(java.io.InputStream) if (message == null) { @@ -61,14 +61,23 @@ public class ProtobufProtocolMessageHandler implements ClientProtocolMessageHand logger.error(() -> "Got message of type response: " + ErrorMessageFromMessage(message)); } - Request request = message.getRequest(); - Message putResponseMessage = doPutRequest(request.getPutRequest(), deserializer, cache); + Message responseMessage = null; - putResponseMessage.writeDelimitedTo(outputStream); + Request request = message.getRequest(); + Request.RequestAPICase requestAPICase = request.getRequestAPICase(); + if (requestAPICase == Request.RequestAPICase.GETREQUEST) { + responseMessage = doGetRequest(request.getGetRequest(), deserializer, serializer, cache); + } else if (requestAPICase == Request.RequestAPICase.PUTREQUEST) { + responseMessage = doPutRequest(request.getPutRequest(), deserializer, cache); + } else { + // TODO + } + if (responseMessage != null) { + responseMessage.writeDelimitedTo(outputStream); + } } private Message doPutRequest(PutRequest request, Deserializer dataDeserializer, Cache cache) { - logger.error("Doing put request."); final String regionName = request.getRegionName(); final BasicTypes.Entry entry = request.getEntry(); final ByteString key = entry.getKey().getKey(); @@ -80,7 +89,7 @@ public class ProtobufProtocolMessageHandler implements ClientProtocolMessageHand dataDeserializer.deserialize(value.toByteArray())); return putResponseWithStatus(true); } catch (TimeoutException | CacheWriterException ex) { - logger.error("Caught normal-ish exception doing region put", ex); + logger.warn("Caught normal-ish exception doing region put", ex); return putResponseWithStatus(false); } } @@ -91,9 +100,27 @@ public class ProtobufProtocolMessageHandler implements ClientProtocolMessageHand .build(); } - private GetResponse doGetRequest(GetRequest request, Deserializer deserializer, Cache cache) { - // TODO - return null; + private Message doGetRequest(GetRequest request, Deserializer deserializer, Serializer serializer, Cache cache) { + String regionName = request.getRegionName(); + BasicTypes.Key key = request.getKey(); + byte[] keyBytes = key.getKey().toByteArray(); + Region<Object, Object> region = cache.getRegion(regionName); + + Object returnValue = region.get(deserializer.deserialize(keyBytes)); + + if (returnValue == null) { + return getResponseWithValue(new byte[0]); + } else { + // TODO types in the region? + return getResponseWithValue(serializer.serialize(returnValue)); + } + } + + private Message getResponseWithValue(byte[] value) { + return Message.newBuilder() + .setResponse(Response.newBuilder().setGetResponse(RegionAPI.GetResponse.newBuilder() + .setResult(BasicTypes.Value.newBuilder().setValue(ByteString.copyFrom(value))))) + .build(); } public ProtobufProtocolMessageHandler() {} http://git-wip-us.apache.org/repos/asf/geode/blob/9973c493/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java index 02e2a58..4991467 100644 --- a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java +++ b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java @@ -82,6 +82,21 @@ public class MessageUtils { return message.build(); } + public static ClientProtocol.Message makeGetMessageFor(String region, String key) { + Random random = new Random(); + ClientProtocol.MessageHeader.Builder messageHeader = + ClientProtocol.MessageHeader.newBuilder().setCorrelationId(random.nextInt()); + + BasicTypes.Key.Builder keyBuilder = + BasicTypes.Key.newBuilder().setKey(ByteString.copyFromUtf8(key)); + + RegionAPI.GetRequest.Builder getRequest = RegionAPI.GetRequest.newBuilder().setRegionName(region).setKey(keyBuilder); + ClientProtocol.Request.Builder request = ClientProtocol.Request.newBuilder().setGetRequest(getRequest); + + return ClientProtocol.Message.newBuilder().setMessageHeader(messageHeader).setRequest(request) + .build(); + } + private static byte[] createByteArrayOfSize(int msgSize) { byte[] array = new byte[msgSize]; for (int i = 0; i < msgSize; i++) { http://git-wip-us.apache.org/repos/asf/geode/blob/9973c493/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java index 5d92cf0..b3864a2 100644 --- a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java +++ b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java @@ -15,11 +15,13 @@ package org.apache.geode.protocol.client; +import com.google.protobuf.ByteString; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.ConfigurationProperties; +import org.apache.geode.protocol.protobuf.BasicTypes; import org.apache.geode.protocol.protobuf.ClientProtocol; import org.apache.geode.test.junit.categories.IntegrationTest; import org.junit.Test; @@ -28,12 +30,15 @@ import org.junit.experimental.categories.Category; import java.io.IOException; import java.util.Properties; +import static org.apache.geode.protocol.protobuf.ClientProtocol.Message.MessageTypeCase.RESPONSE; +import static org.apache.geode.protocol.protobuf.ClientProtocol.Response.ResponseAPICase.GETRESPONSE; +import static org.apache.geode.protocol.protobuf.ClientProtocol.Response.ResponseAPICase.PUTRESPONSE; import static org.junit.Assert.*; @Category(IntegrationTest.class) public class ProtobufProtocolIntegrationTest { @Test - public void testRoundTripClientCommunicationWorks() throws IOException { + public void testRoundTripPutRequest() throws IOException { try (Cache cache = createCacheOnPort(40404); NewClientProtocolTestClient client = new NewClientProtocolTestClient("localhost", 40404)) { final String testRegion = "testRegion"; @@ -44,11 +49,11 @@ public class ProtobufProtocolIntegrationTest { ClientProtocol.Message message = MessageUtils.makePutMessageFor(testRegion, testKey, testValue); ClientProtocol.Message response = client.blockingSendMessage(message); - client.parseResponse(response); + client.printResponse(response); - assertEquals(response.getMessageTypeCase(), ClientProtocol.Message.MessageTypeCase.RESPONSE); - assertEquals(response.getResponse().getResponseAPICase(), - ClientProtocol.Response.ResponseAPICase.PUTRESPONSE); + assertEquals(RESPONSE, response.getMessageTypeCase()); + assertEquals(PUTRESPONSE, + response.getResponse().getResponseAPICase()); assertTrue(response.getResponse().getPutResponse().getSuccess()); assertEquals(1, region.size()); @@ -58,6 +63,53 @@ public class ProtobufProtocolIntegrationTest { } @Test + public void testRoundTripEmptyGetRequest() throws IOException { + try (Cache cache = createCacheOnPort(40404); + NewClientProtocolTestClient client = new NewClientProtocolTestClient("localhost", 40404)) { + final String testRegion = "testRegion"; + final String testKey = "testKey"; + Region<Object, Object> region = cache.createRegionFactory().create("testRegion"); + + ClientProtocol.Message message = MessageUtils.makeGetMessageFor(testRegion, testKey); + ClientProtocol.Message response = client.blockingSendMessage(message); + + assertEquals(RESPONSE, response.getMessageTypeCase()); + assertEquals(GETRESPONSE, + response.getResponse().getResponseAPICase()); + BasicTypes.Value value = response.getResponse().getGetResponse().getResult(); + + assertTrue(value.getValue().isEmpty()); + } + } + + @Test + public void testRoundTripNonEmptyGetRequest() throws IOException { + try (Cache cache = createCacheOnPort(40404); + NewClientProtocolTestClient client = new NewClientProtocolTestClient("localhost", 40404)) { + final String testRegion = "testRegion"; + final String testKey = "testKey"; + final String testValue = "testValue"; + Region<Object, Object> region = cache.createRegionFactory().create("testRegion"); + + + ClientProtocol.Message putMessage = + MessageUtils.makePutMessageFor(testRegion, testKey, testValue); + ClientProtocol.Message putResponse = client.blockingSendMessage(putMessage); + client.printResponse(putResponse); + + ClientProtocol.Message getMessage = MessageUtils.makeGetMessageFor(testRegion, testKey); + ClientProtocol.Message getResponse = client.blockingSendMessage(getMessage); + + assertEquals(RESPONSE, getResponse.getMessageTypeCase()); + assertEquals(GETRESPONSE, + getResponse.getResponse().getResponseAPICase()); + BasicTypes.Value value = getResponse.getResponse().getGetResponse().getResult(); + + assertEquals(value.getValue().toStringUtf8(), testValue); + } + } + + @Test public void startCache() throws IOException { try (Cache cache = createCacheOnPort(40404)) { while (true) { http://git-wip-us.apache.org/repos/asf/geode/blob/9973c493/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java ---------------------------------------------------------------------- diff --git a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java index 8cf36ca..05d2216 100644 --- a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java +++ b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java @@ -66,7 +66,7 @@ public class ProtobufSerializationDeserializationTest { ProtobufProtocolMessageHandler newClientProtocol = new ProtobufProtocolMessageHandler(); newClientProtocol.receiveMessage(MessageUtils.loadMessageIntoInputStream(message), - mockOutputStream, deserializer, mockCache); + mockOutputStream, deserializer, SerializationType.STRING.serializer, mockCache); verify(mockRegion).put(testKey.getBytes(), testValue.getBytes()); } @@ -86,7 +86,7 @@ public class ProtobufSerializationDeserializationTest { ProtobufProtocolMessageHandler newClientProtocol = new ProtobufProtocolMessageHandler(); newClientProtocol.receiveMessage(MessageUtils.loadMessageIntoInputStream(message), outputStream, - deserializer, mockCache); + deserializer, SerializationType.STRING.serializer, mockCache); ClientProtocol.Message responseMessage = ClientProtocol.Message .parseDelimitedFrom(new ByteArrayInputStream(outputStream.toByteArray())); http://git-wip-us.apache.org/repos/asf/geode/blob/9973c493/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java index aa6d4cb..e7e75bf 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java @@ -17,12 +17,13 @@ package org.apache.geode.internal.cache.tier.sockets; import org.apache.geode.cache.Cache; import org.apache.geode.serialization.Deserializer; +import org.apache.geode.serialization.Serializer; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public interface ClientProtocolMessageHandler { - void receiveMessage(InputStream inputStream, OutputStream outputStream, Deserializer serializer, - Cache cache) throws IOException; + void receiveMessage(InputStream inputStream, OutputStream outputStream, Deserializer deserializer, + Serializer serializer, Cache cache) throws IOException; } http://git-wip-us.apache.org/repos/asf/geode/blob/9973c493/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index e58e213..58f1709 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -957,7 +957,8 @@ public class ServerConnection implements Runnable { OutputStream outputStream = socket.getOutputStream(); // TODO serialization types? newClientProtocol.receiveMessage(inputStream, outputStream, - SerializationType.STRING.deserializer, this.getCache()); + SerializationType.STRING.deserializer, SerializationType.STRING.serializer, + this.getCache()); } catch (IOException e) { // TODO? }