http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java new file mode 100644 index 0000000..bb75c7a --- /dev/null +++ b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.gemfire.spark.connector; + +import com.gemstone.gemfire.cache.Region; +import io.pivotal.gemfire.spark.connector.GemFireConnection; +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf$; +import io.pivotal.gemfire.spark.connector.internal.DefaultGemFireConnectionManager$; +import io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaRegionRDD; +import ittest.io.pivotal.gemfire.spark.connector.testkit.GemFireCluster$; +import ittest.io.pivotal.gemfire.spark.connector.testkit.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; +import io.pivotal.gemfire.spark.connector.package$; +import scala.Tuple2; +import scala.Option; +import scala.Some; + +import java.util.*; + +import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.RDDSaveBatchSizePropKey; +import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.javaFunctions; +import static org.junit.Assert.*; + +public class JavaApiIntegrationTest extends JUnitSuite { + + static JavaSparkContext jsc = null; + static GemFireConnectionConf connConf = null; + + static int numServers = 2; + static int numObjects = 1000; + static String regionPath = "pr_str_int_region"; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // start gemfire cluster, and spark context + Properties settings = new Properties(); + settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml"); + settings.setProperty("num-of-servers", Integer.toString(numServers)); + int locatorPort = GemFireCluster$.MODULE$.start(settings); + + // start spark context in local mode + Properties props = new Properties(); + props.put("log4j.logger.org.apache.spark", "INFO"); + props.put("log4j.logger.io.pivotal.gemfire.spark.connector","DEBUG"); + IOUtils.configTestLog4j("ERROR", props); + SparkConf conf = new SparkConf() + .setAppName("RetrieveRegionIntegrationTest") + .setMaster("local[2]") + .set(package$.MODULE$.GemFireLocatorPropKey(), "localhost:"+ locatorPort); + // sc = new SparkContext(conf); + jsc = new JavaSparkContext(conf); + connConf = GemFireConnectionConf.apply(jsc.getConf()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + // stop connection, spark context, and gemfire cluster + DefaultGemFireConnectionManager$.MODULE$.closeConnection(GemFireConnectionConf$.MODULE$.apply(jsc.getConf())); + jsc.stop(); + GemFireCluster$.MODULE$.stop(); + } + + // -------------------------------------------------------------------------------------------- + // utility methods + // -------------------------------------------------------------------------------------------- + + private <K,V> void matchMapAndPairList(Map<K,V> map, List<Tuple2<K,V>> list) { + assertTrue("size mismatch \nmap: " + map.toString() + "\nlist: " + list.toString(), map.size() == list.size()); + for (Tuple2<K, V> p : list) { + assertTrue("value mismatch: k=" + p._1() + " v1=" + p._2() + " v2=" + map.get(p._1()), + p._2().equals(map.get(p._1()))); + } + } + + private Region<String, Integer> prepareStrIntRegion(String regionPath, int start, int stop) { + HashMap<String, Integer> entriesMap = new HashMap<>(); + for (int i = start; i < stop; i ++) { + entriesMap.put("k_" + i, i); + } + + GemFireConnection conn = connConf.getConnection(); + Region<String, Integer> region = conn.getRegionProxy(regionPath); + region.removeAll(region.keySetOnServer()); + region.putAll(entriesMap); + return region; + } + + private JavaPairRDD<String, Integer> prepareStrIntJavaPairRDD(int start, int stop) { + List<Tuple2<String, Integer>> data = new ArrayList<>(); + for (int i = start; i < stop; i ++) { + data.add(new Tuple2<>("k_" + i, i)); + } + return jsc.parallelizePairs(data); + } + + private JavaPairRDD<Integer, Integer> prepareIntIntJavaPairRDD(int start, int stop) { + List<Tuple2<Integer, Integer>> data = new ArrayList<>(); + for (int i = start; i < stop; i ++) { + data.add(new Tuple2<>(i, i * 2)); + } + return jsc.parallelizePairs(data); + } + + private JavaRDD<Integer> prepareIntJavaRDD(int start, int stop) { + List<Integer> data = new ArrayList<>(); + for (int i = start; i < stop; i ++) { + data.add(i); + } + return jsc.parallelize(data); + } + + // -------------------------------------------------------------------------------------------- + // JavaRDD.saveToGemfire + // -------------------------------------------------------------------------------------------- + + static class IntToStrIntPairFunction implements PairFunction<Integer, String, Integer> { + @Override public Tuple2<String, Integer> call(Integer x) throws Exception { + return new Tuple2<>("k_" + x, x); + } + } + + @Test + public void testRDDSaveToGemfireWithDefaultConnConfAndOpConf() throws Exception { + verifyRDDSaveToGemfire(true, true); + } + + @Test + public void testRDDSaveToGemfireWithDefaultConnConf() throws Exception { + verifyRDDSaveToGemfire(true, false); + } + + @Test + public void testRDDSaveToGemfireWithConnConfAndOpConf() throws Exception { + verifyRDDSaveToGemfire(false, true); + } + + @Test + public void testRDDSaveToGemfireWithConnConf() throws Exception { + verifyRDDSaveToGemfire(false, false); + } + + public void verifyRDDSaveToGemfire(boolean useDefaultConnConf, boolean useOpConf) throws Exception { + Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0); // remove all entries + JavaRDD<Integer> rdd1 = prepareIntJavaRDD(0, numObjects); + + PairFunction<Integer, String, Integer> func = new IntToStrIntPairFunction(); + Properties opConf = new Properties(); + opConf.put(RDDSaveBatchSizePropKey, "200"); + + if (useDefaultConnConf) { + if (useOpConf) + javaFunctions(rdd1).saveToGemfire(regionPath, func, opConf); + else + javaFunctions(rdd1).saveToGemfire(regionPath, func); + } else { + if (useOpConf) + javaFunctions(rdd1).saveToGemfire(regionPath, func, connConf, opConf); + else + javaFunctions(rdd1).saveToGemfire(regionPath, func, connConf); + } + + Set<String> keys = region.keySetOnServer(); + Map<String, Integer> map = region.getAll(keys); + + List<Tuple2<String, Integer>> expectedList = new ArrayList<>(); + + for (int i = 0; i < numObjects; i ++) { + expectedList.add(new Tuple2<>("k_" + i, i)); + } + matchMapAndPairList(map, expectedList); + } + + // -------------------------------------------------------------------------------------------- + // JavaPairRDD.saveToGemfire + // -------------------------------------------------------------------------------------------- + + @Test + public void testPairRDDSaveToGemfireWithDefaultConnConfAndOpConf() throws Exception { + verifyPairRDDSaveToGemfire(true, true); + } + + @Test + public void testPairRDDSaveToGemfireWithDefaultConnConf() throws Exception { + verifyPairRDDSaveToGemfire(true, false); + } + + @Test + public void testPairRDDSaveToGemfireWithConnConfAndOpConf() throws Exception { + verifyPairRDDSaveToGemfire(false, true); + } + + @Test + public void testPairRDDSaveToGemfireWithConnConf() throws Exception { + verifyPairRDDSaveToGemfire(false, false); + } + + public void verifyPairRDDSaveToGemfire(boolean useDefaultConnConf, boolean useOpConf) throws Exception { + Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0); // remove all entries + JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(0, numObjects); + Properties opConf = new Properties(); + opConf.put(RDDSaveBatchSizePropKey, "200"); + + if (useDefaultConnConf) { + if (useOpConf) + javaFunctions(rdd1).saveToGemfire(regionPath, opConf); + else + javaFunctions(rdd1).saveToGemfire(regionPath); + } else { + if (useOpConf) + javaFunctions(rdd1).saveToGemfire(regionPath, connConf, opConf); + else + javaFunctions(rdd1).saveToGemfire(regionPath, connConf); + } + + Set<String> keys = region.keySetOnServer(); + Map<String, Integer> map = region.getAll(keys); + + List<Tuple2<String, Integer>> expectedList = new ArrayList<>(); + for (int i = 0; i < numObjects; i ++) { + expectedList.add(new Tuple2<>("k_" + i, i)); + } + matchMapAndPairList(map, expectedList); + } + + // -------------------------------------------------------------------------------------------- + // JavaSparkContext.gemfireRegion and where clause + // -------------------------------------------------------------------------------------------- + + @Test + public void testJavaSparkContextGemfireRegion() throws Exception { + prepareStrIntRegion(regionPath, 0, numObjects); // remove all entries + Properties emptyProps = new Properties(); + GemFireJavaRegionRDD<String, Integer> rdd1 = javaFunctions(jsc).gemfireRegion(regionPath); + GemFireJavaRegionRDD<String, Integer> rdd2 = javaFunctions(jsc).gemfireRegion(regionPath, emptyProps); + GemFireJavaRegionRDD<String, Integer> rdd3 = javaFunctions(jsc).gemfireRegion(regionPath, connConf); + GemFireJavaRegionRDD<String, Integer> rdd4 = javaFunctions(jsc).gemfireRegion(regionPath, connConf, emptyProps); + GemFireJavaRegionRDD<String, Integer> rdd5 = rdd1.where("value.intValue() < 50"); + + HashMap<String, Integer> expectedMap = new HashMap<>(); + for (int i = 0; i < numObjects; i ++) { + expectedMap.put("k_" + i, i); + } + + matchMapAndPairList(expectedMap, rdd1.collect()); + matchMapAndPairList(expectedMap, rdd2.collect()); + matchMapAndPairList(expectedMap, rdd3.collect()); + matchMapAndPairList(expectedMap, rdd4.collect()); + + HashMap<String, Integer> expectedMap2 = new HashMap<>(); + for (int i = 0; i < 50; i ++) { + expectedMap2.put("k_" + i, i); + } + + matchMapAndPairList(expectedMap2, rdd5.collect()); + } + + // -------------------------------------------------------------------------------------------- + // JavaPairRDD.joinGemfireRegion + // -------------------------------------------------------------------------------------------- + + @Test + public void testPairRDDJoinWithSameKeyType() throws Exception { + prepareStrIntRegion(regionPath, 0, numObjects); + JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10); + + JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2a = javaFunctions(rdd1).joinGemfireRegion(regionPath); + JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2b = javaFunctions(rdd1).joinGemfireRegion(regionPath, connConf); + // System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); + + HashMap<Tuple2<String, Integer>, Integer> expectedMap = new HashMap<>(); + for (int i = 0; i < 10; i ++) { + expectedMap.put(new Tuple2<>("k_" + i, i), i); + } + matchMapAndPairList(expectedMap, rdd2a.collect()); + matchMapAndPairList(expectedMap, rdd2b.collect()); + } + + static class IntIntPairToStrKeyFunction implements Function<Tuple2<Integer, Integer>, String> { + @Override public String call(Tuple2<Integer, Integer> pair) throws Exception { + return "k_" + pair._1(); + } + } + + @Test + public void testPairRDDJoinWithDiffKeyType() throws Exception { + prepareStrIntRegion(regionPath, 0, numObjects); + JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10); + Function<Tuple2<Integer, Integer>, String> func = new IntIntPairToStrKeyFunction(); + + JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2a = javaFunctions(rdd1).joinGemfireRegion(regionPath, func); + JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2b = javaFunctions(rdd1).joinGemfireRegion(regionPath, func, connConf); + //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); + + HashMap<Tuple2<Integer, Integer>, Integer> expectedMap = new HashMap<>(); + for (int i = 0; i < 10; i ++) { + expectedMap.put(new Tuple2<>(i, i * 2), i); + } + matchMapAndPairList(expectedMap, rdd2a.collect()); + matchMapAndPairList(expectedMap, rdd2b.collect()); + } + + // -------------------------------------------------------------------------------------------- + // JavaPairRDD.outerJoinGemfireRegion + // -------------------------------------------------------------------------------------------- + + @Test + public void testPairRDDOuterJoinWithSameKeyType() throws Exception { + prepareStrIntRegion(regionPath, 0, numObjects); + JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10); + + JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGemfireRegion(regionPath); + JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, connConf); + //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); + + HashMap<Tuple2<String, Integer>, Option<Integer>> expectedMap = new HashMap<>(); + for (int i = -5; i < 10; i ++) { + if (i < 0) + expectedMap.put(new Tuple2<>("k_" + i, i), Option.apply((Integer) null)); + else + expectedMap.put(new Tuple2<>("k_" + i, i), Some.apply(i)); + } + matchMapAndPairList(expectedMap, rdd2a.collect()); + matchMapAndPairList(expectedMap, rdd2b.collect()); + } + + @Test + public void testPairRDDOuterJoinWithDiffKeyType() throws Exception { + prepareStrIntRegion(regionPath, 0, numObjects); + JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10); + Function<Tuple2<Integer, Integer>, String> func = new IntIntPairToStrKeyFunction(); + + JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, func); + JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, func, connConf); + //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); + + HashMap<Tuple2<Integer, Integer>, Option<Integer>> expectedMap = new HashMap<>(); + for (int i = -5; i < 10; i ++) { + if (i < 0) + expectedMap.put(new Tuple2<>(i, i * 2), Option.apply((Integer) null)); + else + expectedMap.put(new Tuple2<>(i, i * 2), Some.apply(i)); + } + matchMapAndPairList(expectedMap, rdd2a.collect()); + matchMapAndPairList(expectedMap, rdd2b.collect()); + } + + // -------------------------------------------------------------------------------------------- + // JavaRDD.joinGemfireRegion + // -------------------------------------------------------------------------------------------- + + static class IntToStrKeyFunction implements Function<Integer, String> { + @Override public String call(Integer x) throws Exception { + return "k_" + x; + } + } + + @Test + public void testRDDJoinWithSameKeyType() throws Exception { + prepareStrIntRegion(regionPath, 0, numObjects); + JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10); + + Function<Integer, String> func = new IntToStrKeyFunction(); + JavaPairRDD<Integer, Integer> rdd2a = javaFunctions(rdd1).joinGemfireRegion(regionPath, func); + JavaPairRDD<Integer, Integer> rdd2b = javaFunctions(rdd1).joinGemfireRegion(regionPath, func, connConf); + //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); + + HashMap<Integer, Integer> expectedMap = new HashMap<>(); + for (int i = 0; i < 10; i ++) { + expectedMap.put(i, i); + } + matchMapAndPairList(expectedMap, rdd2a.collect()); + matchMapAndPairList(expectedMap, rdd2b.collect()); + } + + // -------------------------------------------------------------------------------------------- + // JavaRDD.outerJoinGemfireRegion + // -------------------------------------------------------------------------------------------- + + @Test + public void testRDDOuterJoinWithSameKeyType() throws Exception { + prepareStrIntRegion(regionPath, 0, numObjects); + JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10); + + Function<Integer, String> func = new IntToStrKeyFunction(); + JavaPairRDD<Integer, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, func); + JavaPairRDD<Integer, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, func, connConf); + //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); + + HashMap<Integer, Option<Integer>> expectedMap = new HashMap<>(); + for (int i = -5; i < 10; i ++) { + if (i < 0) + expectedMap.put(i, Option.apply((Integer) null)); + else + expectedMap.put(i, Some.apply(i)); + } + matchMapAndPairList(expectedMap, rdd2a.collect()); + matchMapAndPairList(expectedMap, rdd2b.collect()); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Portfolio.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Portfolio.java b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Portfolio.java new file mode 100644 index 0000000..5fa03c6 --- /dev/null +++ b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Portfolio.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.gemfire.spark.connector; + +import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Properties; +import com.gemstone.gemfire.cache.Declarable; + +/** + * A stock portfolio that consists of multiple {@link Position} objects that + * represent shares of stock (a "security"). Instances of + * <code>Portfolio</code> can be stored in a GemFire <code>Region</code> and + * their contents can be queried using the GemFire query service. + * </p> + * This class is <code>Serializable</code> because we want it to be distributed + * to multiple members of a distributed system. Because this class is + * <code>Declarable</code>, we can describe instances of it in a GemFire + * <code>cache.xml</code> file. + * </p> + * + */ +public class Portfolio implements Declarable, Serializable { + + private static final long serialVersionUID = 9097335119586059309L; + + private int id; /* id is used as the entry key and is stored in the entry */ + private String type; + private Map<String,Position> positions = new LinkedHashMap<String,Position>(); + private String status; + + public Portfolio(Properties props) { + init(props); + } + + @Override + public void init(Properties props) { + this.id = Integer.parseInt(props.getProperty("id")); + this.type = props.getProperty("type", "type1"); + this.status = props.getProperty("status", "active"); + + // get the positions. These are stored in the properties object + // as Positions, not String, so use Hashtable protocol to get at them. + // the keys are named "positionN", where N is an integer. + for (Map.Entry<Object, Object> entry: props.entrySet()) { + String key = (String)entry.getKey(); + if (key.startsWith("position")) { + Position pos = (Position)entry.getValue(); + this.positions.put(pos.getSecId(), pos); + } + } + } + + public void setType(String t) {this.type = t; } + + public String getStatus(){ + return status; + } + + public int getId(){ + return this.id; + } + + public Map<String,Position> getPositions(){ + return this.positions; + } + + public String getType() { + return this.type; + } + + public boolean isActive(){ + return status.equals("active"); + } + + @Override + public String toString(){ + StringBuilder buf = new StringBuilder(); + buf.append("\n\tPortfolio [id=" + this.id + " status=" + this.status); + buf.append(" type=" + this.type); + boolean firstTime = true; + for (Map.Entry<String, Position> entry: positions.entrySet()) { + if (!firstTime) { + buf.append(", "); + } + buf.append("\n\t\t"); + buf.append(entry.getKey() + ":" + entry.getValue()); + firstTime = false; + } + buf.append("]"); + return buf.toString(); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Position.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Position.java b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Position.java new file mode 100644 index 0000000..b8e8be9 --- /dev/null +++ b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Position.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.gemfire.spark.connector; + +import java.io.Serializable; +import java.util.Properties; +import com.gemstone.gemfire.cache.Declarable; + +/** + * Represents a number of shares of a stock ("security") held in a {@link + * Portfolio}. + * </p> + * This class is <code>Serializable</code> because we want it to be distributed + * to multiple members of a distributed system. Because this class is + * <code>Declarable</code>, we can describe instances of it in a GemFire + * <code>cache.xml</code> file. + * </p> + * + */ +public class Position implements Declarable, Serializable { + + private static final long serialVersionUID = -8229531542107983344L; + + private String secId; + private double qty; + private double mktValue; + + public Position(Properties props) { + init(props); + } + + @Override + public void init(Properties props) { + this.secId = props.getProperty("secId"); + this.qty = Double.parseDouble(props.getProperty("qty")); + this.mktValue = Double.parseDouble(props.getProperty("mktValue")); + } + + public String getSecId(){ + return this.secId; + } + + public double getQty(){ + return this.qty; + } + + public double getMktValue() { + return this.mktValue; + } + + @Override + public String toString(){ + return new StringBuilder() + .append("Position [secId=").append(secId) + .append(" qty=").append(this.qty) + .append(" mktValue=").append(mktValue).append("]").toString(); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-regions.xml ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-regions.xml b/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-regions.xml new file mode 100644 index 0000000..79893d6 --- /dev/null +++ b/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-regions.xml @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!DOCTYPE cache PUBLIC + "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN" + "http://www.gemstone.com/dtd/cache6_5.dtd" > + +<cache> + <!-- test region for OQL test --> + <region name="obj_obj_region" refid="PARTITION_REDUNDANT" /> + + <region name="obj_obj_rep_region" refid="REPLICATE" /> + + <region name="str_int_region" refid="PARTITION_REDUNDANT"> + <region-attributes> + <key-constraint>java.lang.String</key-constraint> + <value-constraint>java.lang.Integer</value-constraint> + </region-attributes> + </region> + + <region name="str_str_region" refid="PARTITION_REDUNDANT"> + <region-attributes> + <key-constraint>java.lang.String</key-constraint> + <value-constraint>java.lang.String</value-constraint> + </region-attributes> + </region> + + <region name="str_str_rep_region" refid="REPLICATE"> + <region-attributes> + <key-constraint>java.lang.String</key-constraint> + <value-constraint>java.lang.String</value-constraint> + </region-attributes> + </region> +</cache> http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-retrieve-regions.xml ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-retrieve-regions.xml b/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-retrieve-regions.xml new file mode 100644 index 0000000..3023959 --- /dev/null +++ b/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-retrieve-regions.xml @@ -0,0 +1,57 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!DOCTYPE cache PUBLIC + "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN" + "http://www.gemstone.com/dtd/cache6_5.dtd" > + +<cache> + <!-- combinations of key, value types with region types --> + <region name="pr_r_obj_obj_region" refid="PARTITION_REDUNDANT" /> + <region name="pr_obj_obj_region" refid="PARTITION" /> + <region name="rr_obj_obj_region" refid="REPLICATE" /> + <region name="rr_p_obj_obj_region" refid="REPLICATE_PERSISTENT" /> + + <region name="pr_r_str_int_region" refid="PARTITION_REDUNDANT"> + <region-attributes> + <key-constraint>java.lang.String</key-constraint> + <value-constraint>java.lang.Integer</value-constraint> + </region-attributes> + </region> + + <region name="pr_str_int_region" refid="PARTITION"> + <region-attributes> + <key-constraint>java.lang.String</key-constraint> + <value-constraint>java.lang.Integer</value-constraint> + </region-attributes> + </region> + + <region name="rr_str_int_region" refid="REPLICATE"> + <region-attributes> + <key-constraint>java.lang.String</key-constraint> + <value-constraint>java.lang.Integer</value-constraint> + </region-attributes> + </region> + + <region name="rr_p_str_int_region" refid="REPLICATE_PERSISTENT"> + <region-attributes> + <key-constraint>java.lang.String</key-constraint> + <value-constraint>java.lang.Integer</value-constraint> + </region-attributes> + </region> +</cache> http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/BasicIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/BasicIntegrationTest.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/BasicIntegrationTest.scala new file mode 100644 index 0000000..10c7eaf --- /dev/null +++ b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/BasicIntegrationTest.scala @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.gemfire.spark.connector + +import java.util.Properties +import com.gemstone.gemfire.cache.query.QueryService +import com.gemstone.gemfire.cache.query.internal.StructImpl +import io.pivotal.gemfire.spark.connector._ +import com.gemstone.gemfire.cache.Region +import io.pivotal.gemfire.spark.connector.internal.{RegionMetadata, DefaultGemFireConnectionManager} +import io.pivotal.gemfire.spark.connector.internal.oql.{RDDConverter, QueryRDD} +import ittest.io.pivotal.gemfire.spark.connector.testkit.GemFireCluster +import ittest.io.pivotal.gemfire.spark.connector.testkit.IOUtils +import org.apache.spark.streaming.{Seconds, StreamingContext, TestInputDStream} +import org.apache.spark.{SparkContext, SparkConf} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +import scala.collection.JavaConversions +import scala.reflect.ClassTag + +case class Number(str: String, len: Int) + +class BasicIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GemFireCluster { + + var sc: SparkContext = null + + override def beforeAll() { + // start gemfire cluster, and spark context + val settings = new Properties() + settings.setProperty("cache-xml-file", "src/it/resources/test-regions.xml") + settings.setProperty("num-of-servers", "2") + val locatorPort = GemFireCluster.start(settings) + + // start spark context in local mode + IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO", + "log4j.logger.io.pivotal.gemfire.spark.connector" -> "DEBUG") + val conf = new SparkConf() + .setAppName("BasicIntegrationTest") + .setMaster("local[2]") + .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + .set(GemFireLocatorPropKey, s"localhost[$locatorPort]") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryo.registrator", "io.pivotal.gemfire.spark.connector.GemFireKryoRegistrator") + + sc = new SparkContext(conf) + } + + override def afterAll() { + // stop connection, spark context, and gemfire cluster + DefaultGemFireConnectionManager.closeConnection(GemFireConnectionConf(sc.getConf)) + sc.stop() + GemFireCluster.stop() + } + + //Convert Map[Object, Object] to java.util.Properties + private def map2Props(map: Map[Object, Object]): java.util.Properties = + (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props} + + // =========================================================== + // DefaultGemFireConnection functional tests + // =========================================================== + + test("DefaultGemFireConnection.validateRegion()") { + val conn = GemFireConnectionConf(sc.getConf).getConnection + + // normal exist-region + var regionPath: String = "str_str_region" + conn.validateRegion[String, String](regionPath) + + // non-exist region + regionPath = "non_exist_region" + try { + conn.validateRegion[String, String](regionPath) + fail("validateRegion failed to catch non-exist region error") + } catch { + case e: RuntimeException => + if (! e.getMessage.contains(s"The region named $regionPath was not found")) + fail("validateRegion gives wrong exception on non-exist region", e) + case e: Throwable => + fail("validateRegion gives wrong exception on non-exist region", e) + } + + // Note: currently, can't catch type mismatch error + conn.validateRegion[String, Integer]("str_str_region") + } + + test("DefaultGemFireConnection.getRegionMetadata()") { + val conn = GemFireConnectionConf(sc.getConf).getConnection + + // exist region + validateRegionMetadata(conn, "obj_obj_region", true, 113, null, null, false) + validateRegionMetadata(conn, "str_int_region", true, 113, "java.lang.String", "java.lang.Integer", false) + validateRegionMetadata(conn, "str_str_rep_region", false, 0, "java.lang.String", "java.lang.String", true) + + // non-exist region + assert(! conn.getRegionMetadata("no_exist_region").isDefined) + } + + def validateRegionMetadata( + conn: GemFireConnection, regionPath: String, partitioned: Boolean, buckets: Int, + keyType: String, valueType: String, emptyMap: Boolean): Unit = { + + val mdOption = conn.getRegionMetadata(regionPath) + val md = mdOption.get + + assert(md.getRegionPath == s"/$regionPath") + assert(md.isPartitioned == partitioned) + assert(md.getKeyTypeName == keyType) + assert(md.getValueTypeName == valueType) + assert(md.getTotalBuckets == buckets) + if (emptyMap) assert(md.getServerBucketMap == null) + else assert(md.getServerBucketMap != null) + } + + test("DefaultGemFireConnection.getRegionProxy()") { + val conn = GemFireConnectionConf(sc.getConf).getConnection + + val region1 = conn.getRegionProxy[String, String]("str_str_region") + region1.put("1", "One") + assert(region1.get("1") == "One") + region1.remove("1") + assert(region1.get("1") == null) + + // getRegionProxy doesn't fail when region doesn't exist + val region2 = conn.getRegionProxy[String, String]("non_exist_region") + try { + region2.put("1", "One") + fail("getRegionProxy failed to catch non-exist region error") + } catch { + case e: Exception => + if (e.getCause == null || ! e.getCause.getMessage.contains(s"Region named /non_exist_region was not found")) { + e.printStackTrace() + fail("validateRegion gives wrong exception on non-exist region", e) + } + } + } + + // Note: DefaultGemFireConnecton.getQuery() and getRegionData() are covered by + // RetrieveRegionIntegrationTest.scala and following OQL tests. + + // =========================================================== + // OQL functional tests + // =========================================================== + + private def initRegion(regionName: String): Unit = { + + //Populate some data in the region + val conn = GemFireConnectionConf(sc.getConf).getConnection + val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + + //This will call the implicit conversion map2Properties in connector package object, since it is Map[String, String] + var position1 = new Position(Map("secId" -> "SUN", "qty" -> "34000", "mktValue" -> "24.42")) + var position2 = new Position(Map("secId" -> "IBM", "qty" -> "8765", "mktValue" -> "34.29")) + val portfolio1 = new Portfolio(map2Props(Map("id" ->"1", "type" -> "type1", "status" -> "active", + "position1" -> position1, "position2" -> position2))) + rgn.put("1", portfolio1) + + position1 = new Position(Map("secId" -> "YHOO", "qty" -> "9834", "mktValue" -> "12.925")) + position2 = new Position(Map("secId" -> "GOOG", "qty" -> "12176", "mktValue" -> "21.972")) + val portfolio2 = new Portfolio(map2Props(Map("id" -> "2", "type" -> "type2", "status" -> "inactive", + "position1" -> position1, "position2" -> position2))) + rgn.put("2", portfolio2) + + position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32")) + position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373")) + val portfolio3 = new Portfolio(map2Props(Map("id" -> "3", "type" -> "type3", "status" -> "active", + "position1" -> position1, "position2" -> position2))) + rgn.put("3", portfolio3) + + position1 = new Position(Map("secId" -> "APPL", "qty" -> "67", "mktValue" -> "67.356572")) + position2 = new Position(Map("secId" -> "ORCL", "qty" -> "376", "mktValue" -> "101.34")) + val portfolio4 = new Portfolio(map2Props(Map("id" -> "4", "type" -> "type1", "status" -> "inactive", + "position1" -> position1, "position2" -> position2))) + rgn.put("4", portfolio4) + + position1 = new Position(Map("secId" -> "SAP", "qty" -> "90", "mktValue" -> "67.356572")) + position2 = new Position(Map("secId" -> "DELL", "qty" -> "376", "mktValue" -> "101.34")) + val portfolio5 = new Portfolio(map2Props(Map("id" -> "5", "type" -> "type2", "status" -> "active", + "position1" -> position1, "position2" -> position2))) + rgn.put("5", portfolio5) + + position1 = new Position(Map("secId" -> "RHAT", "qty" -> "90", "mktValue" -> "67.356572")) + position2 = new Position(Map("secId" -> "NOVL", "qty" -> "376", "mktValue" -> "101.34")) + val portfolio6 = new Portfolio(map2Props(Map("id" -> "6", "type" -> "type3", "status" -> "inactive", + "position1" -> position1, "position2" -> position2))) + rgn.put("6", portfolio6) + + position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32")) + position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373")) + val portfolio7 = new Portfolio(map2Props(Map("id" -> "7", "type" -> "type4", "status" -> "active", + "position1" -> position1, "position2" -> position2))) + //Not using null, due to intermittent query failure on column containing null, likely a Spark SQL bug + //portfolio7.setType(null) + rgn.put("7", portfolio7) + } + + private def getQueryRDD[T: ClassTag]( + query: String, connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)): QueryRDD[T] = + new QueryRDD[T](sc, query, connConf) + + test("Run GemFire OQL query and convert the returned QueryRDD to DataFrame: Partitioned Region") { + simpleQuery("obj_obj_region") + } + + test("Run GemFire OQL query and convert the returned QueryRDD to DataFrame: Replicated Region") { + simpleQuery("obj_obj_rep_region") + } + + private def simpleQuery(regionName: String) { + //Populate some data in the region + val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, String] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three"))) + + //Create QueryRDD using OQL + val OQLResult: QueryRDD[String] = getQueryRDD[String](s"select * from /$regionName") + + //verify the QueryRDD + val oqlRS: Array[String] = OQLResult.collect() + oqlRS should have length 3 + oqlRS should contain theSameElementsAs List("one", "two", "three") + + //Convert QueryRDD to DataFrame + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + // this is used to implicitly convert an RDD to a DataFrame. + import sqlContext.implicits._ + val dataFrame = OQLResult.map(x => Number(x, x.length)).toDF() + //Register dataFrame as a table of two columns of type String and Int respectively + dataFrame.registerTempTable("numberTable") + + //Issue SQL query against the table + val SQLResult = sqlContext.sql("SELECT * FROM numberTable") + //Verify the SQL query result, r(0) mean column 0 + val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect() + sqlRS should have length 3 + sqlRS should contain theSameElementsAs List("one", "two", "three") + + //Convert QueryRDD to DataFrame using RDDConverter + val dataFrame2 = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) + //Register dataFrame2 as a table of two columns of type String and Int respectively + dataFrame2.registerTempTable("numberTable2") + + //Issue SQL query against the table + val SQLResult2 = sqlContext.sql("SELECT * FROM numberTable2") + //Verify the SQL query result, r(0) mean column 0 + val sqlRS2: Array[Any] = SQLResult2.map(r => r(0)).collect() + sqlRS2 should have length 3 + sqlRS2 should contain theSameElementsAs List("one", "two", "three") + + //Remove the region entries, because other tests might use the same region as well + List("1", "2", "3").foreach(rgn.remove) + } + + test("Run GemFire OQL query and directly return DataFrame: Partitioned Region") { + simpleQueryDataFrame("obj_obj_region") + } + + test("Run GemFire OQL query and directly return DataFrame: Replicated Region") { + simpleQueryDataFrame("obj_obj_rep_region") + } + + private def simpleQueryDataFrame(regionName: String) { + //Populate some data in the region + val conn = GemFireConnectionConf(sc.getConf).getConnection + val rgn: Region[String, String] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three"))) + + //Create DataFrame using GemFire OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.gemfireOQL(s"select * from /$regionName") + dataFrame.registerTempTable("numberTable") + + //Issue SQL query against the table + val SQLResult = sqlContext.sql("SELECT * FROM numberTable") + //Verify the SQL query result, r(0) mean column 0 + val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect() + sqlRS should have length 3 + sqlRS should contain theSameElementsAs List("one", "two", "three") + + //Remove the region entries, because other tests might use the same region as well + List("1", "2", "3").foreach(rgn.remove) + } + + test("GemFire OQL query with UDT: Partitioned Region") { + queryUDT("obj_obj_region") + } + + test("GemFire OQL query with UDT: Replicated Region") { + queryUDT("obj_obj_rep_region") + } + + private def queryUDT(regionName: String) { + + //Populate some data in the region + val conn = GemFireConnectionConf(sc.getConf).getConnection + val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + val e1: Employee = new Employee("hello", 123) + val e2: Employee = new Employee("world", 456) + rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2))) + + //Create QueryRDD using OQL + val OQLResult: QueryRDD[Object] = getQueryRDD(s"select name, age from /$regionName") + + //verify the QueryRDD + val oqlRS: Array[Object] = OQLResult.collect() + oqlRS should have length 2 + oqlRS.map(e => e.asInstanceOf[StructImpl].getFieldValues.apply(1)) should contain theSameElementsAs List(123, 456) + + //Convert QueryRDD to DataFrame + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + + //Convert QueryRDD to DataFrame using RDDConverter + val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) + dataFrame.registerTempTable("employee") + val SQLResult = sqlContext.sql("SELECT * FROM employee") + + //Verify the SQL query result + val sqlRS = SQLResult.map(r => r(0)).collect() + sqlRS should have length 2 + sqlRS should contain theSameElementsAs List("hello", "world") + + List("1", "2").foreach(rgn.remove) + } + + test("GemFire OQL query with UDT and directly return DataFrame: Partitioned Region") { + queryUDTDataFrame("obj_obj_region") + } + + test("GemFire OQL query with UDT and directly return DataFrame: Replicated Region") { + queryUDTDataFrame("obj_obj_rep_region") + } + + private def queryUDTDataFrame(regionName: String) { + //Populate some data in the region + val conn = GemFireConnectionConf(sc.getConf).getConnection + val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + val e1: Employee = new Employee("hello", 123) + val e2: Employee = new Employee("world", 456) + rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2))) + + //Create DataFrame using GemFire OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.gemfireOQL(s"select name, age from /$regionName") + + dataFrame.registerTempTable("employee") + val SQLResult = sqlContext.sql("SELECT * FROM employee") + + //Verify the SQL query result + val sqlRS = SQLResult.map(r => r(0)).collect() + sqlRS should have length 2 + sqlRS should contain theSameElementsAs List("hello", "world") + + List("1", "2").foreach(rgn.remove) + } + + test("GemFire OQL query with more complex UDT: Partitioned Region") { + complexUDT("obj_obj_region") + } + + test("GemFire OQL query with more complex UDT: Replicated Region") { + complexUDT("obj_obj_rep_region") + } + + private def complexUDT(regionName: String) { + + initRegion(regionName) + + //Create QueryRDD using OQL + val OQLResult: QueryRDD[Object] = getQueryRDD(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'") + + //verify the QueryRDD + val oqlRS: Array[Int] = OQLResult.collect().map(r => r.asInstanceOf[Portfolio].getId) + oqlRS should contain theSameElementsAs List(1, 3, 5, 7) + + //Convert QueryRDD to DataFrame + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + + //Convert QueryRDD to DataFrame using RDDConverter + val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) + + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT * FROM Portfolio") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType) + sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4") + } + + test("GemFire OQL query with more complex UDT and directly return DataFrame: Partitioned Region") { + complexUDTDataFrame("obj_obj_region") + } + + test("GemFire OQL query with more complex UDT and directly return DataFrame: Replicated Region") { + complexUDTDataFrame("obj_obj_rep_region") + } + + private def complexUDTDataFrame(regionName: String) { + + initRegion(regionName) + + //Create DataFrame using GemFire OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.gemfireOQL(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'") + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT * FROM Portfolio") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType) + sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4") + } + + test("GemFire OQL query with more complex UDT with Projection: Partitioned Region") { + queryComplexUDTProjection("obj_obj_region") + } + + test("GemFire OQL query with more complex UDT with Projection: Replicated Region") { + queryComplexUDTProjection("obj_obj_rep_region") + } + + private def queryComplexUDTProjection(regionName: String) { + + initRegion(regionName) + + //Create QueryRDD using OQL + val OQLResult: QueryRDD[Object] = getQueryRDD[Object](s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""") + + //verify the QueryRDD + val oqlRS: Array[Int] = OQLResult.collect().map(si => si.asInstanceOf[StructImpl].getFieldValues.apply(0).asInstanceOf[Int]) + oqlRS should contain theSameElementsAs List(1, 3, 5, 7) + + //Convert QueryRDD to DataFrame + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + + //Convert QueryRDD to DataFrame using RDDConverter + val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) + + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0)) + sqlRS should contain theSameElementsAs List(3) + } + + test("GemFire OQL query with more complex UDT with Projection and directly return DataFrame: Partitioned Region") { + queryComplexUDTProjectionDataFrame("obj_obj_region") + } + + test("GemFire OQL query with more complex UDT with Projection and directly return DataFrame: Replicated Region") { + queryComplexUDTProjectionDataFrame("obj_obj_rep_region") + } + + private def queryComplexUDTProjectionDataFrame(regionName: String) { + + initRegion(regionName) + + //Create DataFrame using GemFire OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.gemfireOQL(s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""") + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0)) + sqlRS should contain theSameElementsAs List(3) + } + + test("GemFire OQL query with more complex UDT with nested Projection and directly return DataFrame: Partitioned Region") { + queryComplexUDTNestProjectionDataFrame("obj_obj_region") + } + + test("GemFire OQL query with more complex UDT with nested Projection and directly return DataFrame: Replicated Region") { + queryComplexUDTNestProjectionDataFrame("obj_obj_rep_region") + } + + private def queryComplexUDTNestProjectionDataFrame(regionName: String) { + + initRegion(regionName) + + //Create DataFrame using GemFire OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.gemfireOQL(s"""SELECT r.id, r."type", r.positions, r.status FROM /$regionName r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'""") + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0)) + sqlRS should contain theSameElementsAs List(3) + } + + test("Undefined instance deserialization: Partitioned Region") { + undefinedInstanceDeserialization("obj_obj_region") + } + + test("Undefined instance deserialization: Replicated Region") { + undefinedInstanceDeserialization("obj_obj_rep_region") + } + + private def undefinedInstanceDeserialization(regionName: String) { + + val conn = GemFireConnectionConf(sc.getConf).getConnection + val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + + //Put some new data + rgn.put("1", "one") + + //Query some non-existent columns, which should return UNDEFINED + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.gemfireOQL(s"SELECT col100, col200 FROM /$regionName") + val col1 = dataFrame.first().apply(0) + val col2 = dataFrame.first().apply(1) + assert(col1 == QueryService.UNDEFINED) + assert(col2 == QueryService.UNDEFINED) + //Verify that col1 and col2 refer to the same Undefined object + assert(col1.asInstanceOf[AnyRef] eq col2.asInstanceOf[AnyRef]) + } + + test("RDD.saveToGemFire") { + val regionName = "str_str_region" + // generate: Vector((1,11), (2,22), (3,33), (4,44), (5,55), (6,66)) + val data = (1 to 6).map(_.toString).map(e=> (e, e*2)) + val rdd = sc.parallelize(data) + rdd.saveToGemfire(regionName) + + // verify + val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) + val region: Region[String, String] = connConf.getConnection.getRegionProxy(regionName) + println("region key set on server: " + region.keySetOnServer()) + assert((1 to 6).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer())) + (1 to 6).map(_.toString).foreach(e => assert(e*2 == region.get(e))) + } + + // =========================================================== + // DStream.saveToGemfire() functional tests + // =========================================================== + + test("Basic DStream test") { + import org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, StreamingListener} + import io.pivotal.gemfire.spark.connector.streaming._ + import org.apache.spark.streaming.ManualClockHelper + + class TestStreamListener extends StreamingListener { + var count = 0 + override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = count += 1 + } + + def batchDuration = Seconds(1) + val ssc = new StreamingContext(sc, batchDuration) + val input = Seq(1 to 4, 5 to 8, 9 to 12) + val dstream = new TestInputDStream(ssc, input, 2) + dstream.saveToGemfire[String, Int]("str_int_region", (e: Int) => (e.toString, e)) + try { + val listener = new TestStreamListener + ssc.addStreamingListener(listener) + ssc.start() + ManualClockHelper.addToTime(ssc, batchDuration.milliseconds * input.length) + while (listener.count < input.length) ssc.awaitTerminationOrTimeout(50) + } catch { + case e: Exception => e.printStackTrace(); throw e +// } finally { +// ssc.stop() + } + + val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) + val conn = connConf.getConnection + val region: Region[String, Int] = conn.getRegionProxy("str_int_region") + + // verify gemfire region contents + println("region key set on server: " + region.keySetOnServer()) + assert((1 to 12).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer())) + (1 to 12).foreach(e => assert(e == region.get(e.toString))) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RDDJoinRegionIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RDDJoinRegionIntegrationTest.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RDDJoinRegionIntegrationTest.scala new file mode 100644 index 0000000..c286491 --- /dev/null +++ b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RDDJoinRegionIntegrationTest.scala @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.gemfire.spark.connector + +import java.util.Properties + +import io.pivotal.gemfire.spark.connector._ +import com.gemstone.gemfire.cache.Region +import io.pivotal.gemfire.spark.connector.internal.DefaultGemFireConnectionManager +import ittest.io.pivotal.gemfire.spark.connector.testkit.GemFireCluster +import ittest.io.pivotal.gemfire.spark.connector.testkit.IOUtils +import org.apache.spark.{SparkContext, SparkConf} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +import java.util.{HashMap => JHashMap} + +class RDDJoinRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GemFireCluster { + + var sc: SparkContext = null + val numServers = 3 + val numObjects = 1000 + + override def beforeAll() { + // start gemfire cluster, and spark context + val settings = new Properties() + settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml") + settings.setProperty("num-of-servers", numServers.toString) + val locatorPort = GemFireCluster.start(settings) + + // start spark context in local mode + IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO", + "log4j.logger.io.pivotal.gemfire.spark.connector" -> "DEBUG") + val conf = new SparkConf() + .setAppName("RDDJoinRegionIntegrationTest") + .setMaster("local[2]") + .set(GemFireLocatorPropKey, s"localhost[$locatorPort]") + sc = new SparkContext(conf) + } + + override def afterAll() { + // stop connection, spark context, and gemfire cluster + DefaultGemFireConnectionManager.closeConnection(GemFireConnectionConf(sc.getConf)) + sc.stop() + GemFireCluster.stop() + } + +// def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = { +// assert(map1.size == map2.size) +// map1.foreach(e => { +// assert(map2.contains(e._1)) +// assert (e._2 == map2.get(e._1).get) +// }) +// } + + // -------------------------------------------------------------------------------------------- + // PairRDD.joinGemfireRegion[K2 <: K, V2](regionPath, connConf): GemFireJoinRDD[(K, V), K, V2] + // -------------------------------------------------------------------------------------------- + + test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K, V2], replicated region", JoinTest) { + verifyPairRDDJoinRegionWithSameKeyType("rr_str_int_region") + } + + test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K, V2], partitioned region", JoinTest) { + verifyPairRDDJoinRegionWithSameKeyType("pr_str_int_region") + } + + test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", JoinTest) { + verifyPairRDDJoinRegionWithSameKeyType("pr_r_str_int_region") + } + + def verifyPairRDDJoinRegionWithSameKeyType(regionPath: String): Unit = { + val entriesMap: JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) + + val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + + val data = (-5 until 50).map(x => ("k_" + x, x*2)) + val rdd = sc.parallelize(data) + + val rdd2 = rdd.joinGemfireRegion[String, Int](regionPath, connConf) + val rdd2Content = rdd2.collect() + + val expectedMap = (0 until 50).map(i => ((s"k_$i", i*2), i)).toMap + // matchMaps[(String, Int), Int](expectedMap, rdd2Content.toMap) + assert(expectedMap == rdd2Content.toMap) + } + + // ------------------------------------------------------------------------------------------------------ + // PairRDD.joinGemfireRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GemFireJoinRDD[(K, V), K2, V2] + // ------------------------------------------------------------------------------------------------------- + + test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K2, V2], replicated region", JoinTest) { + verifyPairRDDJoinRegionWithDiffKeyType("rr_str_int_region") + } + + test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K2, V2], partitioned region", JoinTest) { + verifyPairRDDJoinRegionWithDiffKeyType("pr_str_int_region") + } + + test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", JoinTest) { + verifyPairRDDJoinRegionWithDiffKeyType("pr_r_str_int_region") + } + + def verifyPairRDDJoinRegionWithDiffKeyType(regionPath: String): Unit = { + val entriesMap: JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) + + val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + + val data = (-5 until 50).map(x => (x, x*2)) + val rdd = sc.parallelize(data) + + val func :((Int, Int)) => String = pair => s"k_${pair._1}" + + val rdd2 = rdd.joinGemfireRegion[String, Int](regionPath, func /*, connConf*/) + val rdd2Content = rdd2.collect() + + val expectedMap = (0 until 50).map(i => ((i, i*2), i)).toMap + // matchMaps[(Int, Int), Int](expectedMap, rdd2Content.toMap) + assert(expectedMap == rdd2Content.toMap) + } + + // ------------------------------------------------------------------------------------------------ + // PairRDD.outerJoinGemfireRegion[K2 <: K, V2](regionPath, connConf): GemFireJoinRDD[(K, V), K, V2] + // ------------------------------------------------------------------------------------------------ + + test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K, V2], replicated region", OuterJoinTest) { + verifyPairRDDOuterJoinRegionWithSameKeyType("rr_str_int_region") + } + + test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K, V2], partitioned region", OuterJoinTest) { + verifyPairRDDOuterJoinRegionWithSameKeyType("pr_str_int_region") + } + + test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", OuterJoinTest) { + verifyPairRDDOuterJoinRegionWithSameKeyType("pr_r_str_int_region") + } + + def verifyPairRDDOuterJoinRegionWithSameKeyType(regionPath: String): Unit = { + val entriesMap: JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) + + val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + + val data = (-5 until 50).map(x => ("k_" + x, x*2)) + val rdd = sc.parallelize(data) + + val rdd2 = rdd.outerJoinGemfireRegion[String, Int](regionPath /*, connConf*/) + val rdd2Content = rdd2.collect() + + val expectedMap = (-5 until 50).map { + i => if (i < 0) ((s"k_$i", i * 2), None) + else ((s"k_$i", i*2), Some(i))}.toMap + // matchMaps[(String, Int), Option[Int]](expectedMap, rdd2Content.toMap) + assert(expectedMap == rdd2Content.toMap) + } + + // ------------------------------------------------------------------------------------------------------ + // PairRDD.joinGemfireRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GemFireJoinRDD[(K, V), K2, V2] + // ------------------------------------------------------------------------------------------------------- + + test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K2, V2], replicated region", OuterJoinTest) { + verifyPairRDDOuterJoinRegionWithDiffKeyType("rr_str_int_region") + } + + test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K2, V2], partitioned region", OuterJoinTest) { + verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_str_int_region") + } + + test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", OuterJoinTest) { + verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_r_str_int_region") + } + + def verifyPairRDDOuterJoinRegionWithDiffKeyType(regionPath: String): Unit = { + val entriesMap: JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) + + val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + + val data = (-5 until 50).map(x => (x, x*2)) + val rdd = sc.parallelize(data) + + val func :((Int, Int)) => String = pair => s"k_${pair._1}" + + val rdd2 = rdd.outerJoinGemfireRegion[String, Int](regionPath, func, connConf) + val rdd2Content = rdd2.collect() + + val expectedMap = (-5 until 50).map { + i => if (i < 0) ((i, i * 2), None) + else ((i, i*2), Some(i))}.toMap + // matchMaps[(Int, Int), Option[Int]](expectedMap, rdd2Content.toMap) + assert(expectedMap == rdd2Content.toMap) + } + + // -------------------------------------------------------------------------------------------- + // RDD.joinGemfireRegion[K, V](regionPath, T => K, connConf): GemFireJoinRDD[T, K, V] + // -------------------------------------------------------------------------------------------- + + test("RDD.joinGemFireRegion: RDD[T] with Region[K, V], replicated region", JoinTest) { + verifyRDDJoinRegion("rr_str_int_region") + } + + test("RDD.joinGemFireRegion: RDD[T] with Region[K, V], partitioned region", JoinTest) { + verifyRDDJoinRegion("pr_str_int_region") + } + + test("RDD.joinGemFireRegion: RDD[T] with Region[K, V], partitioned redundant region", JoinTest) { + verifyRDDJoinRegion("pr_r_str_int_region") + } + + def verifyRDDJoinRegion(regionPath: String): Unit = { + val entriesMap: JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) + + val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + + val data = (-5 until 50).map(x => s"k_$x") + val rdd = sc.parallelize(data) + + val rdd2 = rdd.joinGemfireRegion[String, Int](regionPath, x => x, connConf) + val rdd2Content = rdd2.collect() + + val expectedMap = (0 until 50).map(i => (s"k_$i", i)).toMap + // matchMaps[String, Int](expectedMap, rdd2Content.toMap) + assert(expectedMap == rdd2Content.toMap) + } + + // -------------------------------------------------------------------------------------------- + // RDD.outerJoinGemfireRegion[K, V](regionPath, T => K, connConf): GemFireJoinRDD[T, K, V] + // -------------------------------------------------------------------------------------------- + + test("RDD.outerJoinGemFireRegion: RDD[T] with Region[K, V], replicated region", OnlyTest) { + verifyRDDOuterJoinRegion("rr_str_int_region") + } + + test("RDD.outerJoinGemFireRegion: RDD[T] with Region[K, V], partitioned region", OnlyTest) { + verifyRDDOuterJoinRegion("pr_str_int_region") + } + + test("RDD.outerJoinGemFireRegion: RDD[T] with Region[K, V], partitioned redundant region", OnlyTest) { + verifyRDDOuterJoinRegion("pr_r_str_int_region") + } + + def verifyRDDOuterJoinRegion(regionPath: String): Unit = { + val entriesMap: JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) + + val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + + val data = (-5 until 50).map(x => s"k_$x") + val rdd = sc.parallelize(data) + + val rdd2 = rdd.outerJoinGemfireRegion[String, Int](regionPath, x => x /*, connConf */) + val rdd2Content = rdd2.collect() + + val expectedMap = (-5 until 50).map { + i => if (i < 0) (s"k_$i", None) + else (s"k_$i", Some(i))}.toMap + // matchMaps[String, Option[Int]](expectedMap, rdd2Content.toMap) + assert(expectedMap == rdd2Content.toMap) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RetrieveRegionIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RetrieveRegionIntegrationTest.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RetrieveRegionIntegrationTest.scala new file mode 100644 index 0000000..0ab8110 --- /dev/null +++ b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RetrieveRegionIntegrationTest.scala @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.gemfire.spark.connector + +import java.util.Properties + +import io.pivotal.gemfire.spark.connector._ +import com.gemstone.gemfire.cache.Region +import io.pivotal.gemfire.spark.connector.internal.DefaultGemFireConnectionManager +import ittest.io.pivotal.gemfire.spark.connector.testkit.GemFireCluster +import ittest.io.pivotal.gemfire.spark.connector.testkit.IOUtils +import org.apache.spark.{SparkContext, SparkConf} +import org.scalatest.{Tag, BeforeAndAfterAll, FunSuite, Matchers} +import java.util.{HashMap => JHashMap} + + +class RetrieveRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GemFireCluster { + + var sc: SparkContext = null + val numServers = 4 + val numObjects = 1000 + + override def beforeAll() { + // start gemfire cluster, and spark context + val settings = new Properties() + settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml") + settings.setProperty("num-of-servers", numServers.toString) + val locatorPort = GemFireCluster.start(settings) + + // start spark context in local mode + IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO", + "log4j.logger.io.pivotal.gemfire.spark.connector" -> "DEBUG") + val conf = new SparkConf() + .setAppName("RetrieveRegionIntegrationTest") + .setMaster("local[2]") + .set(GemFireLocatorPropKey, s"localhost[$locatorPort]") + sc = new SparkContext(conf) + } + + override def afterAll() { + // stop connection, spark context, and gemfire cluster + DefaultGemFireConnectionManager.closeConnection(GemFireConnectionConf(sc.getConf)) + sc.stop() + GemFireCluster.stop() + } + + def executeTest[K,V](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,V]) = { + //Populate some data in the region + val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[K, V] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + verifyRetrieveRegion[K,V](regionName, entriesMap) + } + + def verifyRetrieveRegion[K,V](regionName:String, entriesMap:java.util.Map[K,V]) = { + val rdd = sc.gemfireRegion(regionName) + val collectedObjs = rdd.collect() + collectedObjs should have length entriesMap.size + import scala.collection.JavaConverters._ + matchMaps[K,V](entriesMap.asScala.toMap, collectedObjs.toMap) + } + + def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = { + assert(map1.size == map2.size) + map1.foreach(e => { + assert(map2.contains(e._1)) + assert (e._2 == map2.get(e._1).get) + } + ) + } + + //Retrieve region for Partitioned Region where some nodes are empty (empty iterator) + //This test has to run first...the rest of the tests always use the same num objects + test("Retrieve Region for PR where some nodes are empty (Empty Iterator)") { + val numObjects = numServers - 1 + val entriesMap:JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("key_" + i, i)) + executeTest[String, Int]("rr_str_int_region", numObjects, entriesMap) + } + + //Test for retrieving from region containing string key and int value + def verifyRetrieveStringStringRegion(regionName:String) = { + val entriesMap:JHashMap[String, String] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("key_" + i, "value_" + i)) + executeTest[String, String](regionName, numObjects, entriesMap) + } + + test("Retrieve Region with replicate redundant string string") { + verifyRetrieveStringStringRegion("rr_obj_obj_region") + } + + test("Retrieve Region with partitioned string string") { + verifyRetrieveStringStringRegion("pr_obj_obj_region") + } + + test("Retrieve Region with partitioned redundant string string") { + verifyRetrieveStringStringRegion("pr_r_obj_obj_region") + } + + + //Test for retrieving from region containing string key and string value + def verifyRetrieveStringIntRegion(regionName:String) = { + val entriesMap:JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("key_" + i, i)) + executeTest[String, Int](regionName, numObjects, entriesMap) + } + + test("Retrieve Region with replicate string int region") { + verifyRetrieveStringIntRegion("rr_str_int_region") + } + + test("Retrieve Region with partitioned string int region") { + verifyRetrieveStringIntRegion("pr_str_int_region") + } + + test("Retrieve Region with partitioned redundant string int region") { + verifyRetrieveStringIntRegion("pr_r_str_int_region") + } + + //Tests for retrieving from region containing string key and object value + def verifyRetrieveStringObjectRegion(regionName:String) = { + val entriesMap:JHashMap[String, Object] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("key_" + i, new Employee("ename" + i, i))) + executeTest[String, Object](regionName, numObjects, entriesMap) + } + + test("Retrieve Region with replicate string obj") { + verifyRetrieveStringObjectRegion("rr_obj_obj_region") + } + + test("Retrieve Region with partitioned string obj") { + verifyRetrieveStringObjectRegion("pr_obj_obj_region") + } + + test("Retrieve Region with partitioned redundant string obj") { + verifyRetrieveStringObjectRegion("pr_r_obj_obj_region") + } + + //Test for retrieving from region containing string key and map value + def verifyRetrieveStringMapRegion(regionName:String) = { + val entriesMap:JHashMap[String,JHashMap[String,String]] = new JHashMap() + (0 until numObjects).map(i => { + val hashMap:JHashMap[String, String] = new JHashMap() + hashMap.put("mapKey:" + i, "mapValue:" + i) + entriesMap.put("key_" + i, hashMap) + }) + executeTest(regionName, numObjects, entriesMap) + } + + test("Retrieve Region with replicate string map region") { + verifyRetrieveStringMapRegion("rr_obj_obj_region") + } + + test("Retrieve Region with partitioned string map region") { + verifyRetrieveStringMapRegion("pr_obj_obj_region") + } + + test("Retrieve Region with partitioned redundant string map region") { + verifyRetrieveStringMapRegion("pr_r_obj_obj_region") + } + + //Test and helpers specific for retrieving from region containing string key and byte[] value + def executeTestWithByteArrayValues[K](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,Array[Byte]]) = { + //Populate some data in the region + val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[K, Array[Byte]] = conn.getRegionProxy(regionName) + rgn.putAll(entriesMap) + verifyRetrieveRegionWithByteArrayValues[K](regionName, entriesMap) + } + + def verifyRetrieveRegionWithByteArrayValues[K](regionName:String, entriesMap:java.util.Map[K,Array[Byte]]) = { + val rdd = sc.gemfireRegion(regionName) + val collectedObjs = rdd.collect() + collectedObjs should have length entriesMap.size + import scala.collection.JavaConverters._ + matchByteArrayMaps[K](entriesMap.asScala.toMap, collectedObjs.toMap) + } + + def matchByteArrayMaps[K](map1:Map[K,Array[Byte]], map2:Map[K,Array[Byte]]) = { + map1.foreach(e => { + assert(map2.contains(e._1)) + assert (java.util.Arrays.equals(e._2, map2.get(e._1).get)) + } + ) + assert(map1.size == map2.size) + + } + + def verifyRetrieveStringByteArrayRegion(regionName:String) = { + val entriesMap:JHashMap[String, Array[Byte]] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("key_" + i, Array[Byte](192.toByte, 168.toByte, 0, i.toByte))) + executeTestWithByteArrayValues[String](regionName, numObjects, entriesMap) + } + + test("Retrieve Region with replicate region string byte[] region") { + verifyRetrieveStringByteArrayRegion("rr_obj_obj_region") + } + + test("Retrieve Region with partition region string byte[] region") { + verifyRetrieveStringByteArrayRegion("pr_obj_obj_region") + } + + test("Retrieve Region with partition redundant region string byte[] region") { + verifyRetrieveStringByteArrayRegion("pr_r_obj_obj_region") + } + + test("Retrieve Region with where clause on partitioned redundant region", FilterTest) { + verifyRetrieveRegionWithWhereClause("pr_r_str_int_region") + } + + test("Retrieve Region with where clause on partitioned region", FilterTest) { + verifyRetrieveRegionWithWhereClause("pr_str_int_region") + } + + test("Retrieve Region with where clause on replicated region", FilterTest) { + verifyRetrieveRegionWithWhereClause("rr_str_int_region") + } + + def verifyRetrieveRegionWithWhereClause(regionPath: String): Unit = { + val entriesMap: JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("key_" + i, i)) + + val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + + val rdd = sc.gemfireRegion(regionPath).where("value.intValue() < 50") + val expectedMap = (0 until 50).map(i => (s"key_$i", i)).toMap + val collectedObjs = rdd.collect() + // collectedObjs should have length expectedMap.size + matchMaps[String, Int](expectedMap, collectedObjs.toMap) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/package.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/package.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/package.scala new file mode 100644 index 0000000..298dc4a --- /dev/null +++ b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/package.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.gemfire.spark + +import org.scalatest.Tag + +package object connector { + + object OnlyTest extends Tag("OnlyTest") + object FetchDataTest extends Tag("FetchDateTest") + object FilterTest extends Tag("FilterTest") + object JoinTest extends Tag("JoinTest") + object OuterJoinTest extends Tag("OuterJoinTest") + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireCluster.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireCluster.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireCluster.scala new file mode 100644 index 0000000..d8e07f5 --- /dev/null +++ b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireCluster.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.gemfire.spark.connector.testkit + +import java.util.Properties + +trait GemFireCluster { + def startGemFireCluster(settings: Properties): Int = { + println("=== GemFireCluster start()") + GemFireCluster.start(settings) + } +} + +object GemFireCluster { + private var gemfire: Option[GemFireRunner] = None + + def start(settings: Properties): Int = { + gemfire.map(_.stopGemFireCluster()) // Clean up any old running GemFire instances + val runner = new GemFireRunner(settings) + gemfire = Some(runner) + runner.getLocatorPort + } + + def stop(): Unit = { + println("=== GemFireCluster shutdown: " + gemfire.toString) + gemfire match { + case None => println("Nothing to shutdown.") + case Some(runner) => runner.stopGemFireCluster() + } + gemfire = None + println("=== GemFireCluster shutdown finished.") + } +}