This is an automated email from the ASF dual-hosted git repository. upthewaterspout pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit c373f371ed530d5ddea8ea432dea174dd71a23d2 Author: Dan Smith <[email protected]> AuthorDate: Fri Feb 23 12:16:29 2018 -0800 GEODE-3126: Adding a query command to the experimental driver Adding a QueryService and query operations to the experimental client. --- .../apache/geode/experimental/driver/Driver.java | 2 + .../geode/experimental/driver/ProtobufChannel.java | 5 +- .../geode/experimental/driver/ProtobufDriver.java | 13 +-- .../experimental/driver/ProtobufQueryService.java | 104 +++++++++++++++++++++ .../apache/geode/experimental/driver/Query.java | 23 +++++ .../geode/experimental/driver/QueryService.java | 20 ++++ .../experimental/driver/IntegrationTestBase.java | 74 +++++++++++++++ .../driver/QueryServiceIntegrationTest.java | 44 +++++++++ .../experimental/driver/RegionIntegrationTest.java | 45 +-------- 9 files changed, 277 insertions(+), 53 deletions(-) diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Driver.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Driver.java index 5ad7c94..65d8e37 100644 --- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Driver.java +++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Driver.java @@ -48,6 +48,8 @@ public interface Driver { */ <K, V> Region<K, V> getRegion(String regionName); + QueryService getQueryService(); + /** * Close this Driver, rendering it useless */ diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java index 4670d2d..5d674f9 100644 --- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java +++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java @@ -85,7 +85,7 @@ class ProtobufChannel { /** * Queries locators for a Geode server that has Protobuf enabled. - * + * * @return The server chosen by the Locator service for this client */ private InetSocketAddress findAServer() throws IOException { @@ -140,6 +140,9 @@ class ProtobufChannel { private Message readResponse() throws IOException { final InputStream inputStream = socket.getInputStream(); Message response = ClientProtocol.Message.parseDelimitedFrom(inputStream); + if (response == null) { + throw new IOException("Unable to parse a response message due to EOF"); + } final ErrorResponse errorResponse = response.getErrorResponse(); if (errorResponse != null && errorResponse.hasError()) { throw new IOException(errorResponse.getError().getMessage()); diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java index 573f3ef..40e435c 100644 --- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java +++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java @@ -41,11 +41,6 @@ import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.GetRegionNamesRe */ @Experimental public class ProtobufDriver implements Driver { - /** - * Set of Internet-address-or-host-name/port pairs of the locators to use to find GemFire servers - * that have Protobuf enabled. - */ - private final Set<InetSocketAddress> locators; private final ProtobufChannel channel; @@ -57,9 +52,6 @@ public class ProtobufDriver implements Driver { * @throws IOException */ ProtobufDriver(Set<InetSocketAddress> locators) throws IOException { - this.locators = locators; - - this.channel = new ProtobufChannel(locators); } @@ -85,6 +77,11 @@ public class ProtobufDriver implements Driver { } @Override + public QueryService getQueryService() { + return new ProtobufQueryService(channel); + } + + @Override public void close() { try { this.channel.close(); diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufQueryService.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufQueryService.java new file mode 100644 index 0000000..b1f8a17 --- /dev/null +++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufQueryService.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.experimental.driver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.google.protobuf.ProtocolStringList; + +import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes; +import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.EncodedValue; +import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.Table; +import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message; +import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase; +import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.OQLQueryRequest; +import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.OQLQueryResponse; + +class ProtobufQueryService implements QueryService { + private final ProtobufChannel channel; + + public ProtobufQueryService(ProtobufChannel channel) { + this.channel = channel; + } + + @Override + public <T> Query newQuery(final String queryString) { + return new ProtobufQuery<T>(queryString); + } + + class ProtobufQuery<T> implements Query<T> { + + private final String queryString; + + public ProtobufQuery(final String queryString) { + this.queryString = queryString; + } + + @Override + public List<T> execute(final Object... bindParameters) throws IOException { + List<EncodedValue> encodedParameters = Arrays.asList(bindParameters).stream() + .map(ValueEncoder::encodeValue).collect(Collectors.toList());; + Message request = Message.newBuilder().setOqlQueryRequest( + OQLQueryRequest.newBuilder().addAllBindParameter(encodedParameters).setQuery(queryString)) + .build(); + final OQLQueryResponse response = + channel.sendRequest(request, MessageTypeCase.OQLQUERYRESPONSE).getOqlQueryResponse(); + switch (response.getResultCase()) { + case SINGLERESULT: + return (List<T>) parseSingleResult(response); + case LISTRESULT: + return parseListResult(response); + case TABLERESULT: + return (List<T>) parseTableResult(response); + default: + throw new RuntimeException("Unexpected response: " + response); + } + } + + private List<Map<String, Object>> parseTableResult(final OQLQueryResponse response) { + final Table table = response.getTableResult(); + final ProtocolStringList fieldNames = table.getFieldNameList(); + List<Map<String, Object>> results = new ArrayList<>(); + for (BasicTypes.EncodedValueList row : table.getRowList()) { + final List<Object> decodedRow = row.getElementList().stream().map(ValueEncoder::decodeValue) + .collect(Collectors.toList()); + + Map<String, Object> rowMap = new LinkedHashMap<>(decodedRow.size()); + for (int i = 0; i < decodedRow.size(); i++) { + rowMap.put(fieldNames.get(i), decodedRow.get(i)); + } + } + + return results; + } + + private List<T> parseListResult(final OQLQueryResponse response) { + return response.getListResult().getElementList().stream() + .map(value -> (T) ValueEncoder.decodeValue(value)).collect(Collectors.toList()); + } + + private List<Object> parseSingleResult(final OQLQueryResponse response) { + return Collections.singletonList(ValueEncoder.decodeValue(response.getSingleResult())); + } + } + +} diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java new file mode 100644 index 0000000..fd0c1f4 --- /dev/null +++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.experimental.driver; + +import java.io.IOException; +import java.util.List; + +public interface Query<T> { + + List<T> execute(Object... bindParameters) throws IOException; +} diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/QueryService.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/QueryService.java new file mode 100644 index 0000000..5f1f89c --- /dev/null +++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/QueryService.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.experimental.driver; + +public interface QueryService { + + <T> Query newQuery(String queryString); +} diff --git a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/IntegrationTestBase.java b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/IntegrationTestBase.java new file mode 100644 index 0000000..8464e8f --- /dev/null +++ b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/IntegrationTestBase.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.experimental.driver; + +import java.util.Properties; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.contrib.java.lang.system.RestoreSystemProperties; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.distributed.ConfigurationProperties; +import org.apache.geode.distributed.Locator; + +/** + * Created by dan on 2/23/18. + */ +public class IntegrationTestBase { + private static final String REGION = "region"; + @Rule + public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); + protected Driver driver; + protected org.apache.geode.cache.Region<Object, Object> serverRegion; + private Locator locator; + private Cache cache; + + @Before + public void createServerAndDriver() throws Exception { + System.setProperty("geode.feature-protobuf-protocol", "true"); + + // Create a cache + CacheFactory cf = new CacheFactory(); + cf.set(ConfigurationProperties.MCAST_PORT, "0"); + cache = cf.create(); + + // Start a locator + locator = Locator.startLocatorAndDS(0, null, new Properties()); + int locatorPort = locator.getPort(); + + // Start a server + CacheServer server = cache.addCacheServer(); + server.setPort(0); + server.start(); + + // Create a region + serverRegion = cache.createRegionFactory(RegionShortcut.REPLICATE).create(REGION); + + // Create a driver connected to the server + driver = new DriverFactory().addLocator("localhost", locatorPort).create(); + + } + + @After + public void cleanup() { + locator.stop(); + cache.close(); + } +} diff --git a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/QueryServiceIntegrationTest.java b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/QueryServiceIntegrationTest.java new file mode 100644 index 0000000..89ab58f --- /dev/null +++ b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/QueryServiceIntegrationTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.experimental.driver; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.test.junit.categories.IntegrationTest; + +@Category(IntegrationTest.class) +public class QueryServiceIntegrationTest extends IntegrationTestBase { + + @Test + public void testQuery() throws IOException { + serverRegion.put("key1", "value1"); + serverRegion.put("key2", "value2"); + + QueryService service = driver.getQueryService(); + + Query<String> query = service.newQuery("select value from /region value order by value"); + final List<String> results = query.execute(); + + assertEquals(Arrays.asList("value1", "value2"), results); + } +} diff --git a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/RegionIntegrationTest.java b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/RegionIntegrationTest.java index 72a5da3..42c4bc6 100644 --- a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/RegionIntegrationTest.java +++ b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/RegionIntegrationTest.java @@ -43,9 +43,7 @@ import org.apache.geode.pdx.PdxInstance; import org.apache.geode.test.junit.categories.IntegrationTest; @Category(IntegrationTest.class) -public class RegionIntegrationTest { - @Rule - public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); +public class RegionIntegrationTest extends IntegrationTestBase { /** a JSON document */ private static final String jsonDocument = @@ -54,47 +52,6 @@ public class RegionIntegrationTest { + System.lineSeparator() + " \"emailAddress\" : \"none\"" + System.lineSeparator() + "}"; - private static final String REGION = "region"; - private Locator locator; - private Cache cache; - private Driver driver; - - private org.apache.geode.cache.Region<Object, Object> serverRegion; - - @Before - public void createServerAndDriver() throws Exception { - System.setProperty("geode.feature-protobuf-protocol", "true"); - - // Create a cache - CacheFactory cf = new CacheFactory(); - cf.set(ConfigurationProperties.MCAST_PORT, "0"); - cache = cf.create(); - - // Start a locator - locator = Locator.startLocatorAndDS(0, null, new Properties()); - int locatorPort = locator.getPort(); - - // Start a server - CacheServer server = cache.addCacheServer(); - server.setPort(0); - server.start(); - - // Create a region - serverRegion = cache.createRegionFactory(RegionShortcut.REPLICATE).create(REGION); - - // Create a driver connected to the server - driver = new DriverFactory().addLocator("localhost", locatorPort).create(); - - } - - @After - public void cleanup() { - locator.stop(); - cache.close(); - } - - - @Test public void getShouldReturnPutValue() throws Exception { Region<String, String> region = driver.getRegion("region"); -- To stop receiving notification emails like this one, please contact [email protected].
