http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala deleted file mode 100644 index 48f83c9..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package unittest.io.pivotal.gemfire.spark.connector.rdd - -import com.gemstone.gemfire.cache.Region -import io.pivotal.gemfire.spark.connector.internal.RegionMetadata -import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireRDDPartition, GemFireRegionRDD} -import io.pivotal.gemfire.spark.connector.{GemFireConnectionConf, GemFireConnection} -import org.apache.spark.{TaskContext, Partition, SparkContext} -import org.mockito.Mockito._ -import org.mockito.Matchers.{eq => mockEq, any => mockAny} -import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, FunSuite} - -import scala.reflect.ClassTag - -class GemFireRegionRDDTest extends FunSuite with Matchers with MockitoSugar { - - /** create common mocks, not all mocks are used by all tests */ - def createMocks[K, V](regionPath: String)(implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]) - : (String, Region[K,V], GemFireConnectionConf, GemFireConnection) = { - val mockConnection = mock[GemFireConnection] - val mockRegion = mock[Region[K, V]] - val mockConnConf = mock[GemFireConnectionConf] - when(mockConnConf.getConnection).thenReturn(mockConnection) - when(mockConnection.getRegionProxy[K, V](regionPath)).thenReturn(mockRegion) - when(mockConnConf.locators).thenReturn(Seq.empty) - (regionPath, mockRegion, mockConnConf, mockConnection) - } - - test("create GemFireRDD with non-existing region") { - val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") - when(mockConnConf.getConnection).thenReturn(mockConnection) - when(mockConnection.validateRegion[String,String](regionPath)).thenThrow(new RuntimeException) - val mockSparkContext = mock[SparkContext] - intercept[RuntimeException] { GemFireRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf) } - verify(mockConnConf).getConnection - verify(mockConnection).validateRegion[String, String](regionPath) - } - - test("getPartitions with non-existing region") { - // region exists when RDD is created, but get removed before getPartitions() is invoked - val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") - when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(None) - val mockSparkContext = mock[SparkContext] - intercept[RuntimeException] { GemFireRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf).getPartitions } - } - - test("getPartitions with replicated region and not preferred env") { - val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") - implicit val mockConnConf2 = mockConnConf - val mockSparkContext = mock[SparkContext] - when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, false, 0, null))) - val partitions = GemFireRegionRDD(mockSparkContext, regionPath, mockConnConf).partitions - verifySinglePartition(partitions) - } - - def verifySinglePartition(partitions: Array[Partition]): Unit = { - assert(1 == partitions.size) - assert(partitions(0).index === 0) - assert(partitions(0).isInstanceOf[GemFireRDDPartition]) - assert(partitions(0).asInstanceOf[GemFireRDDPartition].bucketSet.isEmpty) - } - - test("getPartitions with replicated region and preferred OnePartitionPartitioner") { - // since it's replicated region, so OnePartitionPartitioner will be used, i.e., override preferred partitioner - import io.pivotal.gemfire.spark.connector.{PreferredPartitionerPropKey, OnePartitionPartitionerName} - val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") - when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, false, 0, null))) - implicit val mockConnConf2 = mockConnConf - val mockSparkContext = mock[SparkContext] - val env = Map(PreferredPartitionerPropKey -> OnePartitionPartitionerName) - val partitions = GemFireRegionRDD(mockSparkContext, regionPath, mockConnConf, env).partitions - verifySinglePartition(partitions) - } - - test("getPartitions with partitioned region and not preferred env") { - val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") - implicit val mockConnConf2 = mockConnConf - val mockSparkContext = mock[SparkContext] - when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, true, 2, null))) - val partitions = GemFireRegionRDD(mockSparkContext, regionPath, mockConnConf).partitions - verifySinglePartition(partitions) - } - - test("GemFireRDD.compute() method") { - val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") - implicit val mockConnConf2 = mockConnConf - val mockIter = mock[Iterator[(String, String)]] - val partition = GemFireRDDPartition(0, Set.empty) - when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, true, 2, null))) - when(mockConnection.getRegionData[String, String](regionPath, None, partition)).thenReturn(mockIter) - val mockSparkContext = mock[SparkContext] - val rdd = GemFireRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf) - val partitions = rdd.partitions - assert(1 == partitions.size) - val mockTaskContext = mock[TaskContext] - rdd.compute(partitions(0), mockTaskContext) - verify(mockConnection).getRegionData[String, String](mockEq(regionPath), mockEq(None), mockEq(partition)) - // verify(mockConnection).getRegionData[String, String](regionPath, Set.empty.asInstanceOf[Set[Int]], "gemfireRDD 0.0") - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java deleted file mode 100644 index 03e15a0..0000000 --- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package demo; - -import java.io.Serializable; - -/** - * This is a demo class used in doc/?.md - */ -public class Emp implements Serializable { - - private int id; - - private String lname; - - private String fname; - - private int age; - - private String loc; - - public Emp(int id, String lname, String fname, int age, String loc) { - this.id = id; - this.lname = lname; - this.fname = fname; - this.age = age; - this.loc = loc; - } - - public int getId() { - return id; - } - - public String getLname() { - return lname; - } - - public String getFname() { - return fname; - } - - public int getAge() { - return age; - } - - public String getLoc() { - return loc; - } - - @Override - public String toString() { - return "Emp(" + id + ", " + lname + ", " + fname + ", " + age + ", " + loc + ")"; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Emp emp = (Emp) o; - - if (age != emp.age) return false; - if (id != emp.id) return false; - if (fname != null ? !fname.equals(emp.fname) : emp.fname != null) return false; - if (lname != null ? !lname.equals(emp.lname) : emp.lname != null) return false; - if (loc != null ? !loc.equals(emp.loc) : emp.loc != null) return false; - - return true; - } - - @Override - public int hashCode() { - int result = id; - result = 31 * result + (lname != null ? lname.hashCode() : 0); - result = 31 * result + (fname != null ? fname.hashCode() : 0); - result = 31 * result + age; - result = 31 * result + (loc != null ? loc.hashCode() : 0); - return result; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java deleted file mode 100644 index 41654a5..0000000 --- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package demo; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.SQLContext; -import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*; - - -/** - * This Spark application demonstrates how to get region data from GemFire using GemFire - * OQL Java API. The result is a Spark DataFrame. - * <p> - * In order to run it, you will need to start a GemFire cluster, and run demo PairRDDSaveJavaDemo - * first to create some data in the region. - * <p> - * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar - * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/. - * Then run the following command to start a Spark job: - * <pre> - * <path to spark>/bin/spark-submit --master=local[2] --class demo.OQLJavaDemo \ - * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port> - * </pre> - */ -public class OQLJavaDemo { - - public static void main(String[] argv) { - - if (argv.length != 1) { - System.err.printf("Usage: OQLJavaDemo <locators>\n"); - return; - } - - SparkConf conf = new SparkConf().setAppName("OQLJavaDemo"); - conf.set(GemFireLocatorPropKey, argv[0]); // "192.168.1.47[10335]" - JavaSparkContext sc = new JavaSparkContext(conf); - SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); - DataFrame df = javaFunctions(sqlContext).gemfireOQL("select * from /str_str_region"); - System.out.println("======= DataFrame =======\n"); - df.show(); - sc.stop(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java deleted file mode 100644 index 84f87af..0000000 --- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package demo; - -import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import scala.Tuple2; -import java.util.*; - -import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*; - -/** - * This Spark application demonstrates how to save a RDD to GemFire using GemFire Spark - * Connector with Java. - * <p/> - * In order to run it, you will need to start GemFire cluster, and create the following region - * with GFSH: - * <pre> - * gfsh> create region --name=str_str_region --type=REPLICATE \ - * --key-constraint=java.lang.String --value-constraint=java.lang.String - * </pre> - * - * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar - * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/. - * Then run the following command to start a Spark job: - * <pre> - * <path to spark>/bin/spark-submit --master=local[2] --class demo.PairRDDSaveJavaDemo \ - * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port> - * </pre> - * - * Verify the data was saved to GemFire with GFSH: - * <pre>gfsh> query --query="select * from /str_str_region.entrySet" </pre> - */ -public class PairRDDSaveJavaDemo { - - public static void main(String[] argv) { - - if (argv.length != 1) { - System.err.printf("Usage: PairRDDSaveJavaDemo <locators>\n"); - return; - } - - SparkConf conf = new SparkConf().setAppName("PairRDDSaveJavaDemo"); - conf.set(GemFireLocatorPropKey, argv[0]); - JavaSparkContext sc = new JavaSparkContext(conf); - GemFireConnectionConf connConf = GemFireConnectionConf.apply(conf); - - List<Tuple2<String, String>> data = new ArrayList<>(); - data.add(new Tuple2<>("7", "seven")); - data.add(new Tuple2<>("8", "eight")); - data.add(new Tuple2<>("9", "nine")); - - List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>(); - data2.add(new Tuple2<>("11", "eleven")); - data2.add(new Tuple2<>("12", "twelve")); - data2.add(new Tuple2<>("13", "thirteen")); - - // method 1: generate JavaPairRDD directly - JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(data); - javaFunctions(rdd1).saveToGemfire("str_str_region", connConf); - - // method 2: convert JavaRDD<Tuple2<K,V>> to JavaPairRDD<K, V> - JavaRDD<Tuple2<String, String>> rdd2 = sc.parallelize(data2); - javaFunctions(toJavaPairRDD(rdd2)).saveToGemfire("str_str_region", connConf); - - sc.stop(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java deleted file mode 100644 index 5fc5aeb..0000000 --- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package demo; - -import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFunction; -import scala.Tuple2; - -import java.util.ArrayList; -import java.util.List; - -import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*; - -/** - * This Spark application demonstrates how to save a RDD to GemFire using GemFire Spark - * Connector with Java. - * <p/> - * In order to run it, you will need to start GemFire cluster, and create the following region - * with GFSH: - * <pre> - * gfsh> create region --name=str_int_region --type=REPLICATE \ - * --key-constraint=java.lang.String --value-constraint=java.lang.Integer - * </pre> - * - * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar - * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/. - * Then run the following command to start a Spark job: - * <pre> - * <path to spark>/bin/spark-submit --master=local[2] --class demo.RDDSaveJavaDemo \ - * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port> - * </pre> - * - * Verify the data was saved to GemFire with GFSH: - * <pre>gfsh> query --query="select * from /str_int_region.entrySet" </pre> - */ -public class RDDSaveJavaDemo { - - public static void main(String[] argv) { - - if (argv.length != 1) { - System.err.printf("Usage: RDDSaveJavaDemo <locators>\n"); - return; - } - - SparkConf conf = new SparkConf().setAppName("RDDSaveJavaDemo"); - conf.set(GemFireLocatorPropKey, argv[0]); - JavaSparkContext sc = new JavaSparkContext(conf); - - List<String> data = new ArrayList<String>(); - data.add("abcdefg"); - data.add("abcdefgh"); - data.add("abcdefghi"); - JavaRDD<String> rdd = sc.parallelize(data); - - GemFireConnectionConf connConf = GemFireConnectionConf.apply(conf); - - PairFunction<String, String, Integer> func = new PairFunction<String, String, Integer>() { - @Override public Tuple2<String, Integer> call(String s) throws Exception { - return new Tuple2<String, Integer>(s, s.length()); - } - }; - - javaFunctions(rdd).saveToGemfire("str_int_region", func, connConf); - - sc.stop(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java deleted file mode 100644 index 7c1d7bb..0000000 --- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package demo; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*; - -/** - * This Spark application demonstrates how to expose a region in GemFire as a RDD using GemFire - * Spark Connector with Java. - * <p> - * In order to run it, you will need to start GemFire cluster, and run demo PairRDDSaveJavaDemo - * first to create some data in the region. - * <p> - * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar - * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/. - * Then run the following command to start a Spark job: - * <pre> - * <path to spark>/bin/spark-submit --master=local[2] --class demo.RegionToRDDJavaDemo \ - * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port> - * </pre> - */ -public class RegionToRDDJavaDemo { - - public static void main(String[] argv) { - - if (argv.length != 1) { - System.err.printf("Usage: RegionToRDDJavaDemo <locators>\n"); - return; - } - - SparkConf conf = new SparkConf().setAppName("RegionToRDDJavaDemo"); - conf.set(GemFireLocatorPropKey, argv[0]); - JavaSparkContext sc = new JavaSparkContext(conf); - - JavaPairRDD<String, String> rdd = javaFunctions(sc).gemfireRegion("str_str_region"); - System.out.println("=== gemfireRegion =======\n" + rdd.collect() + "\n========================="); - - sc.stop(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala b/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala deleted file mode 100644 index f67c32e..0000000 --- a/geode-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package demo - -import org.apache.spark.SparkConf -import org.apache.spark.streaming.{Seconds, StreamingContext} -import io.pivotal.gemfire.spark.connector.GemFireLocatorPropKey -import io.pivotal.gemfire.spark.connector.streaming._ - -/** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. - * <p><p> - * In order to run it, you will need to start GemFire cluster, and create the following region - * with GFSH: - * <pre> - * gfsh> create region --name=str_int_region --type=REPLICATE \ - * --key-constraint=java.lang.String --value-constraint=java.lang.Integer - * </pre> - * - * <p>To run this on your local machine, you need to first run a net cat server - * `$ nc -lk 9999` - * and then run the example - * `$ bin/spark-submit --master=local[2] --class demo.NetworkWordCount <path to>/basic-demos_2.10-0.5.0.jar localhost 9999 locatorHost:port` - * - * <p><p> check result that was saved to GemFire with GFSH: - * <pre>gfsh> query --query="select * from /str_int_region.entrySet" </pre> - */ -object NetworkWordCount { - - def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: NetworkWordCount <hostname> <port> <gemfire locator>") - System.exit(1) - } - - val updateFunc = (values: Seq[Int], state: Option[Int]) => { - val currentCount = values.foldLeft(0)(_ + _) - val previousCount = state.getOrElse(0) - Some(currentCount + previousCount) - } - - // Create the context with a 1 second batch size - val sparkConf = new SparkConf().setAppName("NetworkWordCount").set(GemFireLocatorPropKey, args(2)) - val ssc = new StreamingContext(sparkConf, Seconds(1)) - ssc.checkpoint(".") - - // Create a socket stream on target ip:port and count the - // words in input stream of \n delimited text (eg. generated by 'nc') - // Note that no duplication in storage level only for running locally. - // Replication necessary in distributed scenario for fault tolerance. - val lines = ssc.socketTextStream(args(0), args(1).toInt) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - val runningCounts = wordCounts.updateStateByKey[Int](updateFunc) - // runningCounts.print() - runningCounts.saveToGemfire("str_int_region") - ssc.start() - ssc.awaitTermination() - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java new file mode 100644 index 0000000..9fba9e1 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Employee.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.geode.spark.connector; + +import java.io.Serializable; + +public class Employee implements Serializable { + + private String name; + + private int age; + + public Employee(String n, int a) { + name = n; + age = a; + } + + public String getName() { + return name; + } + + public int getAge() { + return age; + } + + public String toString() { + return new StringBuilder().append("Employee[name=").append(name). + append(", age=").append(age). + append("]").toString(); + } + + public boolean equals(Object o) { + if (o instanceof Employee) { + return ((Employee) o).name.equals(name) && ((Employee) o).age == age; + } + return false; + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java new file mode 100644 index 0000000..8f5a045 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/JavaApiIntegrationTest.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.geode.spark.connector; + +import com.gemstone.gemfire.cache.Region; +import io.pivotal.geode.spark.connector.GeodeConnection; +import io.pivotal.geode.spark.connector.GeodeConnectionConf; +import io.pivotal.geode.spark.connector.GeodeConnectionConf$; +import io.pivotal.geode.spark.connector.internal.DefaultGeodeConnectionManager$; +import io.pivotal.geode.spark.connector.javaapi.GeodeJavaRegionRDD; +import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster$; +import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; +import io.pivotal.geode.spark.connector.package$; +import scala.Tuple2; +import scala.Option; +import scala.Some; + +import java.util.*; + +import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.RDDSaveBatchSizePropKey; +import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.javaFunctions; +import static org.junit.Assert.*; + +public class JavaApiIntegrationTest extends JUnitSuite { + + static JavaSparkContext jsc = null; + static GeodeConnectionConf connConf = null; + + static int numServers = 2; + static int numObjects = 1000; + static String regionPath = "pr_str_int_region"; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // start geode cluster, and spark context + Properties settings = new Properties(); + settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml"); + settings.setProperty("num-of-servers", Integer.toString(numServers)); + int locatorPort = GeodeCluster$.MODULE$.start(settings); + + // start spark context in local mode + Properties props = new Properties(); + props.put("log4j.logger.org.apache.spark", "INFO"); + props.put("log4j.logger.io.pivotal.geode.spark.connector","DEBUG"); + IOUtils.configTestLog4j("ERROR", props); + SparkConf conf = new SparkConf() + .setAppName("RetrieveRegionIntegrationTest") + .setMaster("local[2]") + .set(package$.MODULE$.GeodeLocatorPropKey(), "localhost:"+ locatorPort); + // sc = new SparkContext(conf); + jsc = new JavaSparkContext(conf); + connConf = GeodeConnectionConf.apply(jsc.getConf()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + // stop connection, spark context, and geode cluster + DefaultGeodeConnectionManager$.MODULE$.closeConnection(GeodeConnectionConf$.MODULE$.apply(jsc.getConf())); + jsc.stop(); + GeodeCluster$.MODULE$.stop(); + } + + // -------------------------------------------------------------------------------------------- + // utility methods + // -------------------------------------------------------------------------------------------- + + private <K,V> void matchMapAndPairList(Map<K,V> map, List<Tuple2<K,V>> list) { + assertTrue("size mismatch \nmap: " + map.toString() + "\nlist: " + list.toString(), map.size() == list.size()); + for (Tuple2<K, V> p : list) { + assertTrue("value mismatch: k=" + p._1() + " v1=" + p._2() + " v2=" + map.get(p._1()), + p._2().equals(map.get(p._1()))); + } + } + + private Region<String, Integer> prepareStrIntRegion(String regionPath, int start, int stop) { + HashMap<String, Integer> entriesMap = new HashMap<>(); + for (int i = start; i < stop; i ++) { + entriesMap.put("k_" + i, i); + } + + GeodeConnection conn = connConf.getConnection(); + Region<String, Integer> region = conn.getRegionProxy(regionPath); + region.removeAll(region.keySetOnServer()); + region.putAll(entriesMap); + return region; + } + + private JavaPairRDD<String, Integer> prepareStrIntJavaPairRDD(int start, int stop) { + List<Tuple2<String, Integer>> data = new ArrayList<>(); + for (int i = start; i < stop; i ++) { + data.add(new Tuple2<>("k_" + i, i)); + } + return jsc.parallelizePairs(data); + } + + private JavaPairRDD<Integer, Integer> prepareIntIntJavaPairRDD(int start, int stop) { + List<Tuple2<Integer, Integer>> data = new ArrayList<>(); + for (int i = start; i < stop; i ++) { + data.add(new Tuple2<>(i, i * 2)); + } + return jsc.parallelizePairs(data); + } + + private JavaRDD<Integer> prepareIntJavaRDD(int start, int stop) { + List<Integer> data = new ArrayList<>(); + for (int i = start; i < stop; i ++) { + data.add(i); + } + return jsc.parallelize(data); + } + + // -------------------------------------------------------------------------------------------- + // JavaRDD.saveToGeode + // -------------------------------------------------------------------------------------------- + + static class IntToStrIntPairFunction implements PairFunction<Integer, String, Integer> { + @Override public Tuple2<String, Integer> call(Integer x) throws Exception { + return new Tuple2<>("k_" + x, x); + } + } + + @Test + public void testRDDSaveToGeodeWithDefaultConnConfAndOpConf() throws Exception { + verifyRDDSaveToGeode(true, true); + } + + @Test + public void testRDDSaveToGeodeWithDefaultConnConf() throws Exception { + verifyRDDSaveToGeode(true, false); + } + + @Test + public void testRDDSaveToGeodeWithConnConfAndOpConf() throws Exception { + verifyRDDSaveToGeode(false, true); + } + + @Test + public void testRDDSaveToGeodeWithConnConf() throws Exception { + verifyRDDSaveToGeode(false, false); + } + + public void verifyRDDSaveToGeode(boolean useDefaultConnConf, boolean useOpConf) throws Exception { + Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0); // remove all entries + JavaRDD<Integer> rdd1 = prepareIntJavaRDD(0, numObjects); + + PairFunction<Integer, String, Integer> func = new IntToStrIntPairFunction(); + Properties opConf = new Properties(); + opConf.put(RDDSaveBatchSizePropKey, "200"); + + if (useDefaultConnConf) { + if (useOpConf) + javaFunctions(rdd1).saveToGeode(regionPath, func, opConf); + else + javaFunctions(rdd1).saveToGeode(regionPath, func); + } else { + if (useOpConf) + javaFunctions(rdd1).saveToGeode(regionPath, func, connConf, opConf); + else + javaFunctions(rdd1).saveToGeode(regionPath, func, connConf); + } + + Set<String> keys = region.keySetOnServer(); + Map<String, Integer> map = region.getAll(keys); + + List<Tuple2<String, Integer>> expectedList = new ArrayList<>(); + + for (int i = 0; i < numObjects; i ++) { + expectedList.add(new Tuple2<>("k_" + i, i)); + } + matchMapAndPairList(map, expectedList); + } + + // -------------------------------------------------------------------------------------------- + // JavaPairRDD.saveToGeode + // -------------------------------------------------------------------------------------------- + + @Test + public void testPairRDDSaveToGeodeWithDefaultConnConfAndOpConf() throws Exception { + verifyPairRDDSaveToGeode(true, true); + } + + @Test + public void testPairRDDSaveToGeodeWithDefaultConnConf() throws Exception { + verifyPairRDDSaveToGeode(true, false); + } + + @Test + public void testPairRDDSaveToGeodeWithConnConfAndOpConf() throws Exception { + verifyPairRDDSaveToGeode(false, true); + } + + @Test + public void testPairRDDSaveToGeodeWithConnConf() throws Exception { + verifyPairRDDSaveToGeode(false, false); + } + + public void verifyPairRDDSaveToGeode(boolean useDefaultConnConf, boolean useOpConf) throws Exception { + Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0); // remove all entries + JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(0, numObjects); + Properties opConf = new Properties(); + opConf.put(RDDSaveBatchSizePropKey, "200"); + + if (useDefaultConnConf) { + if (useOpConf) + javaFunctions(rdd1).saveToGeode(regionPath, opConf); + else + javaFunctions(rdd1).saveToGeode(regionPath); + } else { + if (useOpConf) + javaFunctions(rdd1).saveToGeode(regionPath, connConf, opConf); + else + javaFunctions(rdd1).saveToGeode(regionPath, connConf); + } + + Set<String> keys = region.keySetOnServer(); + Map<String, Integer> map = region.getAll(keys); + + List<Tuple2<String, Integer>> expectedList = new ArrayList<>(); + for (int i = 0; i < numObjects; i ++) { + expectedList.add(new Tuple2<>("k_" + i, i)); + } + matchMapAndPairList(map, expectedList); + } + + // -------------------------------------------------------------------------------------------- + // JavaSparkContext.geodeRegion and where clause + // -------------------------------------------------------------------------------------------- + + @Test + public void testJavaSparkContextGeodeRegion() throws Exception { + prepareStrIntRegion(regionPath, 0, numObjects); // remove all entries + Properties emptyProps = new Properties(); + GeodeJavaRegionRDD<String, Integer> rdd1 = javaFunctions(jsc).geodeRegion(regionPath); + GeodeJavaRegionRDD<String, Integer> rdd2 = javaFunctions(jsc).geodeRegion(regionPath, emptyProps); + GeodeJavaRegionRDD<String, Integer> rdd3 = javaFunctions(jsc).geodeRegion(regionPath, connConf); + GeodeJavaRegionRDD<String, Integer> rdd4 = javaFunctions(jsc).geodeRegion(regionPath, connConf, emptyProps); + GeodeJavaRegionRDD<String, Integer> rdd5 = rdd1.where("value.intValue() < 50"); + + HashMap<String, Integer> expectedMap = new HashMap<>(); + for (int i = 0; i < numObjects; i ++) { + expectedMap.put("k_" + i, i); + } + + matchMapAndPairList(expectedMap, rdd1.collect()); + matchMapAndPairList(expectedMap, rdd2.collect()); + matchMapAndPairList(expectedMap, rdd3.collect()); + matchMapAndPairList(expectedMap, rdd4.collect()); + + HashMap<String, Integer> expectedMap2 = new HashMap<>(); + for (int i = 0; i < 50; i ++) { + expectedMap2.put("k_" + i, i); + } + + matchMapAndPairList(expectedMap2, rdd5.collect()); + } + + // -------------------------------------------------------------------------------------------- + // JavaPairRDD.joinGeodeRegion + // -------------------------------------------------------------------------------------------- + + @Test + public void testPairRDDJoinWithSameKeyType() throws Exception { + prepareStrIntRegion(regionPath, 0, numObjects); + JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10); + + JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2a = javaFunctions(rdd1).joinGeodeRegion(regionPath); + JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2b = javaFunctions(rdd1).joinGeodeRegion(regionPath, connConf); + // System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); + + HashMap<Tuple2<String, Integer>, Integer> expectedMap = new HashMap<>(); + for (int i = 0; i < 10; i ++) { + expectedMap.put(new Tuple2<>("k_" + i, i), i); + } + matchMapAndPairList(expectedMap, rdd2a.collect()); + matchMapAndPairList(expectedMap, rdd2b.collect()); + } + + static class IntIntPairToStrKeyFunction implements Function<Tuple2<Integer, Integer>, String> { + @Override public String call(Tuple2<Integer, Integer> pair) throws Exception { + return "k_" + pair._1(); + } + } + + @Test + public void testPairRDDJoinWithDiffKeyType() throws Exception { + prepareStrIntRegion(regionPath, 0, numObjects); + JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10); + Function<Tuple2<Integer, Integer>, String> func = new IntIntPairToStrKeyFunction(); + + JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2a = javaFunctions(rdd1).joinGeodeRegion(regionPath, func); + JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2b = javaFunctions(rdd1).joinGeodeRegion(regionPath, func, connConf); + //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); + + HashMap<Tuple2<Integer, Integer>, Integer> expectedMap = new HashMap<>(); + for (int i = 0; i < 10; i ++) { + expectedMap.put(new Tuple2<>(i, i * 2), i); + } + matchMapAndPairList(expectedMap, rdd2a.collect()); + matchMapAndPairList(expectedMap, rdd2b.collect()); + } + + // -------------------------------------------------------------------------------------------- + // JavaPairRDD.outerJoinGeodeRegion + // -------------------------------------------------------------------------------------------- + + @Test + public void testPairRDDOuterJoinWithSameKeyType() throws Exception { + prepareStrIntRegion(regionPath, 0, numObjects); + JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10); + + JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath); + JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, connConf); + //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); + + HashMap<Tuple2<String, Integer>, Option<Integer>> expectedMap = new HashMap<>(); + for (int i = -5; i < 10; i ++) { + if (i < 0) + expectedMap.put(new Tuple2<>("k_" + i, i), Option.apply((Integer) null)); + else + expectedMap.put(new Tuple2<>("k_" + i, i), Some.apply(i)); + } + matchMapAndPairList(expectedMap, rdd2a.collect()); + matchMapAndPairList(expectedMap, rdd2b.collect()); + } + + @Test + public void testPairRDDOuterJoinWithDiffKeyType() throws Exception { + prepareStrIntRegion(regionPath, 0, numObjects); + JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10); + Function<Tuple2<Integer, Integer>, String> func = new IntIntPairToStrKeyFunction(); + + JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func); + JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func, connConf); + //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); + + HashMap<Tuple2<Integer, Integer>, Option<Integer>> expectedMap = new HashMap<>(); + for (int i = -5; i < 10; i ++) { + if (i < 0) + expectedMap.put(new Tuple2<>(i, i * 2), Option.apply((Integer) null)); + else + expectedMap.put(new Tuple2<>(i, i * 2), Some.apply(i)); + } + matchMapAndPairList(expectedMap, rdd2a.collect()); + matchMapAndPairList(expectedMap, rdd2b.collect()); + } + + // -------------------------------------------------------------------------------------------- + // JavaRDD.joinGeodeRegion + // -------------------------------------------------------------------------------------------- + + static class IntToStrKeyFunction implements Function<Integer, String> { + @Override public String call(Integer x) throws Exception { + return "k_" + x; + } + } + + @Test + public void testRDDJoinWithSameKeyType() throws Exception { + prepareStrIntRegion(regionPath, 0, numObjects); + JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10); + + Function<Integer, String> func = new IntToStrKeyFunction(); + JavaPairRDD<Integer, Integer> rdd2a = javaFunctions(rdd1).joinGeodeRegion(regionPath, func); + JavaPairRDD<Integer, Integer> rdd2b = javaFunctions(rdd1).joinGeodeRegion(regionPath, func, connConf); + //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); + + HashMap<Integer, Integer> expectedMap = new HashMap<>(); + for (int i = 0; i < 10; i ++) { + expectedMap.put(i, i); + } + matchMapAndPairList(expectedMap, rdd2a.collect()); + matchMapAndPairList(expectedMap, rdd2b.collect()); + } + + // -------------------------------------------------------------------------------------------- + // JavaRDD.outerJoinGeodeRegion + // -------------------------------------------------------------------------------------------- + + @Test + public void testRDDOuterJoinWithSameKeyType() throws Exception { + prepareStrIntRegion(regionPath, 0, numObjects); + JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10); + + Function<Integer, String> func = new IntToStrKeyFunction(); + JavaPairRDD<Integer, Option<Integer>> rdd2a = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func); + JavaPairRDD<Integer, Option<Integer>> rdd2b = javaFunctions(rdd1).outerJoinGeodeRegion(regionPath, func, connConf); + //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + "\n========================="); + + HashMap<Integer, Option<Integer>> expectedMap = new HashMap<>(); + for (int i = -5; i < 10; i ++) { + if (i < 0) + expectedMap.put(i, Option.apply((Integer) null)); + else + expectedMap.put(i, Some.apply(i)); + } + matchMapAndPairList(expectedMap, rdd2a.collect()); + matchMapAndPairList(expectedMap, rdd2b.collect()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java new file mode 100644 index 0000000..1457db9 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Portfolio.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.geode.spark.connector; + +import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Properties; +import com.gemstone.gemfire.cache.Declarable; + +/** + * A stock portfolio that consists of multiple {@link Position} objects that + * represent shares of stock (a "security"). Instances of + * <code>Portfolio</code> can be stored in a Geode <code>Region</code> and + * their contents can be queried using the Geode query service. + * </p> + * This class is <code>Serializable</code> because we want it to be distributed + * to multiple members of a distributed system. Because this class is + * <code>Declarable</code>, we can describe instances of it in a Geode + * <code>cache.xml</code> file. + * </p> + * + */ +public class Portfolio implements Declarable, Serializable { + + private static final long serialVersionUID = 9097335119586059309L; + + private int id; /* id is used as the entry key and is stored in the entry */ + private String type; + private Map<String,Position> positions = new LinkedHashMap<String,Position>(); + private String status; + + public Portfolio(Properties props) { + init(props); + } + + @Override + public void init(Properties props) { + this.id = Integer.parseInt(props.getProperty("id")); + this.type = props.getProperty("type", "type1"); + this.status = props.getProperty("status", "active"); + + // get the positions. These are stored in the properties object + // as Positions, not String, so use Hashtable protocol to get at them. + // the keys are named "positionN", where N is an integer. + for (Map.Entry<Object, Object> entry: props.entrySet()) { + String key = (String)entry.getKey(); + if (key.startsWith("position")) { + Position pos = (Position)entry.getValue(); + this.positions.put(pos.getSecId(), pos); + } + } + } + + public void setType(String t) {this.type = t; } + + public String getStatus(){ + return status; + } + + public int getId(){ + return this.id; + } + + public Map<String,Position> getPositions(){ + return this.positions; + } + + public String getType() { + return this.type; + } + + public boolean isActive(){ + return status.equals("active"); + } + + @Override + public String toString(){ + StringBuilder buf = new StringBuilder(); + buf.append("\n\tPortfolio [id=" + this.id + " status=" + this.status); + buf.append(" type=" + this.type); + boolean firstTime = true; + for (Map.Entry<String, Position> entry: positions.entrySet()) { + if (!firstTime) { + buf.append(", "); + } + buf.append("\n\t\t"); + buf.append(entry.getKey() + ":" + entry.getValue()); + firstTime = false; + } + buf.append("]"); + return buf.toString(); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java new file mode 100644 index 0000000..d6f8d1f --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/it/java/ittest/io/pivotal/geode/spark/connector/Position.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.geode.spark.connector; + +import java.io.Serializable; +import java.util.Properties; +import com.gemstone.gemfire.cache.Declarable; + +/** + * Represents a number of shares of a stock ("security") held in a {@link + * Portfolio}. + * </p> + * This class is <code>Serializable</code> because we want it to be distributed + * to multiple members of a distributed system. Because this class is + * <code>Declarable</code>, we can describe instances of it in a Geode + * <code>cache.xml</code> file. + * </p> + * + */ +public class Position implements Declarable, Serializable { + + private static final long serialVersionUID = -8229531542107983344L; + + private String secId; + private double qty; + private double mktValue; + + public Position(Properties props) { + init(props); + } + + @Override + public void init(Properties props) { + this.secId = props.getProperty("secId"); + this.qty = Double.parseDouble(props.getProperty("qty")); + this.mktValue = Double.parseDouble(props.getProperty("mktValue")); + } + + public String getSecId(){ + return this.secId; + } + + public double getQty(){ + return this.qty; + } + + public double getMktValue() { + return this.mktValue; + } + + @Override + public String toString(){ + return new StringBuilder() + .append("Position [secId=").append(secId) + .append(" qty=").append(this.qty) + .append(" mktValue=").append(mktValue).append("]").toString(); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml b/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml new file mode 100644 index 0000000..79893d6 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/it/resources/test-regions.xml @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!DOCTYPE cache PUBLIC + "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN" + "http://www.gemstone.com/dtd/cache6_5.dtd" > + +<cache> + <!-- test region for OQL test --> + <region name="obj_obj_region" refid="PARTITION_REDUNDANT" /> + + <region name="obj_obj_rep_region" refid="REPLICATE" /> + + <region name="str_int_region" refid="PARTITION_REDUNDANT"> + <region-attributes> + <key-constraint>java.lang.String</key-constraint> + <value-constraint>java.lang.Integer</value-constraint> + </region-attributes> + </region> + + <region name="str_str_region" refid="PARTITION_REDUNDANT"> + <region-attributes> + <key-constraint>java.lang.String</key-constraint> + <value-constraint>java.lang.String</value-constraint> + </region-attributes> + </region> + + <region name="str_str_rep_region" refid="REPLICATE"> + <region-attributes> + <key-constraint>java.lang.String</key-constraint> + <value-constraint>java.lang.String</value-constraint> + </region-attributes> + </region> +</cache> http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml b/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml new file mode 100644 index 0000000..3023959 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/it/resources/test-retrieve-regions.xml @@ -0,0 +1,57 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!DOCTYPE cache PUBLIC + "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN" + "http://www.gemstone.com/dtd/cache6_5.dtd" > + +<cache> + <!-- combinations of key, value types with region types --> + <region name="pr_r_obj_obj_region" refid="PARTITION_REDUNDANT" /> + <region name="pr_obj_obj_region" refid="PARTITION" /> + <region name="rr_obj_obj_region" refid="REPLICATE" /> + <region name="rr_p_obj_obj_region" refid="REPLICATE_PERSISTENT" /> + + <region name="pr_r_str_int_region" refid="PARTITION_REDUNDANT"> + <region-attributes> + <key-constraint>java.lang.String</key-constraint> + <value-constraint>java.lang.Integer</value-constraint> + </region-attributes> + </region> + + <region name="pr_str_int_region" refid="PARTITION"> + <region-attributes> + <key-constraint>java.lang.String</key-constraint> + <value-constraint>java.lang.Integer</value-constraint> + </region-attributes> + </region> + + <region name="rr_str_int_region" refid="REPLICATE"> + <region-attributes> + <key-constraint>java.lang.String</key-constraint> + <value-constraint>java.lang.Integer</value-constraint> + </region-attributes> + </region> + + <region name="rr_p_str_int_region" refid="REPLICATE_PERSISTENT"> + <region-attributes> + <key-constraint>java.lang.String</key-constraint> + <value-constraint>java.lang.Integer</value-constraint> + </region-attributes> + </region> +</cache> http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala new file mode 100644 index 0000000..a26bcbd --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/io/pivotal/geode/spark/connector/BasicIntegrationTest.scala @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.geode.spark.connector + +import java.util.Properties +import com.gemstone.gemfire.cache.query.QueryService +import com.gemstone.gemfire.cache.query.internal.StructImpl +import io.pivotal.geode.spark.connector._ +import com.gemstone.gemfire.cache.Region +import io.pivotal.geode.spark.connector.internal.{RegionMetadata, DefaultGeodeConnectionManager} +import io.pivotal.geode.spark.connector.internal.oql.{RDDConverter, QueryRDD} +import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster +import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils +import org.apache.spark.streaming.{Seconds, StreamingContext, TestInputDStream} +import org.apache.spark.{SparkContext, SparkConf} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +import scala.collection.JavaConversions +import scala.reflect.ClassTag + +case class Number(str: String, len: Int) + +class BasicIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster { + + var sc: SparkContext = null + + override def beforeAll() { + // start geode cluster, and spark context + val settings = new Properties() + settings.setProperty("cache-xml-file", "src/it/resources/test-regions.xml") + settings.setProperty("num-of-servers", "2") + val locatorPort = GeodeCluster.start(settings) + + // start spark context in local mode + IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO", + "log4j.logger.io.pivotal.geode.spark.connector" -> "DEBUG") + val conf = new SparkConf() + .setAppName("BasicIntegrationTest") + .setMaster("local[2]") + .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + .set(GeodeLocatorPropKey, s"localhost[$locatorPort]") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryo.registrator", "io.pivotal.geode.spark.connector.GeodeKryoRegistrator") + + sc = new SparkContext(conf) + } + + override def afterAll() { + // stop connection, spark context, and geode cluster + DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf)) + sc.stop() + GeodeCluster.stop() + } + + //Convert Map[Object, Object] to java.util.Properties + private def map2Props(map: Map[Object, Object]): java.util.Properties = + (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props} + + // =========================================================== + // DefaultGeodeConnection functional tests + // =========================================================== + + test("DefaultGeodeConnection.validateRegion()") { + val conn = GeodeConnectionConf(sc.getConf).getConnection + + // normal exist-region + var regionPath: String = "str_str_region" + conn.validateRegion[String, String](regionPath) + + // non-exist region + regionPath = "non_exist_region" + try { + conn.validateRegion[String, String](regionPath) + fail("validateRegion failed to catch non-exist region error") + } catch { + case e: RuntimeException => + if (! e.getMessage.contains(s"The region named $regionPath was not found")) + fail("validateRegion gives wrong exception on non-exist region", e) + case e: Throwable => + fail("validateRegion gives wrong exception on non-exist region", e) + } + + // Note: currently, can't catch type mismatch error + conn.validateRegion[String, Integer]("str_str_region") + } + + test("DefaultGeodeConnection.getRegionMetadata()") { + val conn = GeodeConnectionConf(sc.getConf).getConnection + + // exist region + validateRegionMetadata(conn, "obj_obj_region", true, 113, null, null, false) + validateRegionMetadata(conn, "str_int_region", true, 113, "java.lang.String", "java.lang.Integer", false) + validateRegionMetadata(conn, "str_str_rep_region", false, 0, "java.lang.String", "java.lang.String", true) + + // non-exist region + assert(! conn.getRegionMetadata("no_exist_region").isDefined) + } + + def validateRegionMetadata( + conn: GeodeConnection, regionPath: String, partitioned: Boolean, buckets: Int, + keyType: String, valueType: String, emptyMap: Boolean): Unit = { + + val mdOption = conn.getRegionMetadata(regionPath) + val md = mdOption.get + + assert(md.getRegionPath == s"/$regionPath") + assert(md.isPartitioned == partitioned) + assert(md.getKeyTypeName == keyType) + assert(md.getValueTypeName == valueType) + assert(md.getTotalBuckets == buckets) + if (emptyMap) assert(md.getServerBucketMap == null) + else assert(md.getServerBucketMap != null) + } + + test("DefaultGeodeConnection.getRegionProxy()") { + val conn = GeodeConnectionConf(sc.getConf).getConnection + + val region1 = conn.getRegionProxy[String, String]("str_str_region") + region1.put("1", "One") + assert(region1.get("1") == "One") + region1.remove("1") + assert(region1.get("1") == null) + + // getRegionProxy doesn't fail when region doesn't exist + val region2 = conn.getRegionProxy[String, String]("non_exist_region") + try { + region2.put("1", "One") + fail("getRegionProxy failed to catch non-exist region error") + } catch { + case e: Exception => + if (e.getCause == null || ! e.getCause.getMessage.contains(s"Region named /non_exist_region was not found")) { + e.printStackTrace() + fail("validateRegion gives wrong exception on non-exist region", e) + } + } + } + + // Note: DefaultGeodeConnecton.getQuery() and getRegionData() are covered by + // RetrieveRegionIntegrationTest.scala and following OQL tests. + + // =========================================================== + // OQL functional tests + // =========================================================== + + private def initRegion(regionName: String): Unit = { + + //Populate some data in the region + val conn = GeodeConnectionConf(sc.getConf).getConnection + val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + + //This will call the implicit conversion map2Properties in connector package object, since it is Map[String, String] + var position1 = new Position(Map("secId" -> "SUN", "qty" -> "34000", "mktValue" -> "24.42")) + var position2 = new Position(Map("secId" -> "IBM", "qty" -> "8765", "mktValue" -> "34.29")) + val portfolio1 = new Portfolio(map2Props(Map("id" ->"1", "type" -> "type1", "status" -> "active", + "position1" -> position1, "position2" -> position2))) + rgn.put("1", portfolio1) + + position1 = new Position(Map("secId" -> "YHOO", "qty" -> "9834", "mktValue" -> "12.925")) + position2 = new Position(Map("secId" -> "GOOG", "qty" -> "12176", "mktValue" -> "21.972")) + val portfolio2 = new Portfolio(map2Props(Map("id" -> "2", "type" -> "type2", "status" -> "inactive", + "position1" -> position1, "position2" -> position2))) + rgn.put("2", portfolio2) + + position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32")) + position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373")) + val portfolio3 = new Portfolio(map2Props(Map("id" -> "3", "type" -> "type3", "status" -> "active", + "position1" -> position1, "position2" -> position2))) + rgn.put("3", portfolio3) + + position1 = new Position(Map("secId" -> "APPL", "qty" -> "67", "mktValue" -> "67.356572")) + position2 = new Position(Map("secId" -> "ORCL", "qty" -> "376", "mktValue" -> "101.34")) + val portfolio4 = new Portfolio(map2Props(Map("id" -> "4", "type" -> "type1", "status" -> "inactive", + "position1" -> position1, "position2" -> position2))) + rgn.put("4", portfolio4) + + position1 = new Position(Map("secId" -> "SAP", "qty" -> "90", "mktValue" -> "67.356572")) + position2 = new Position(Map("secId" -> "DELL", "qty" -> "376", "mktValue" -> "101.34")) + val portfolio5 = new Portfolio(map2Props(Map("id" -> "5", "type" -> "type2", "status" -> "active", + "position1" -> position1, "position2" -> position2))) + rgn.put("5", portfolio5) + + position1 = new Position(Map("secId" -> "RHAT", "qty" -> "90", "mktValue" -> "67.356572")) + position2 = new Position(Map("secId" -> "NOVL", "qty" -> "376", "mktValue" -> "101.34")) + val portfolio6 = new Portfolio(map2Props(Map("id" -> "6", "type" -> "type3", "status" -> "inactive", + "position1" -> position1, "position2" -> position2))) + rgn.put("6", portfolio6) + + position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32")) + position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373")) + val portfolio7 = new Portfolio(map2Props(Map("id" -> "7", "type" -> "type4", "status" -> "active", + "position1" -> position1, "position2" -> position2))) + //Not using null, due to intermittent query failure on column containing null, likely a Spark SQL bug + //portfolio7.setType(null) + rgn.put("7", portfolio7) + } + + private def getQueryRDD[T: ClassTag]( + query: String, connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)): QueryRDD[T] = + new QueryRDD[T](sc, query, connConf) + + test("Run Geode OQL query and convert the returned QueryRDD to DataFrame: Partitioned Region") { + simpleQuery("obj_obj_region") + } + + test("Run Geode OQL query and convert the returned QueryRDD to DataFrame: Replicated Region") { + simpleQuery("obj_obj_rep_region") + } + + private def simpleQuery(regionName: String) { + //Populate some data in the region + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, String] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three"))) + + //Create QueryRDD using OQL + val OQLResult: QueryRDD[String] = getQueryRDD[String](s"select * from /$regionName") + + //verify the QueryRDD + val oqlRS: Array[String] = OQLResult.collect() + oqlRS should have length 3 + oqlRS should contain theSameElementsAs List("one", "two", "three") + + //Convert QueryRDD to DataFrame + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + // this is used to implicitly convert an RDD to a DataFrame. + import sqlContext.implicits._ + val dataFrame = OQLResult.map(x => Number(x, x.length)).toDF() + //Register dataFrame as a table of two columns of type String and Int respectively + dataFrame.registerTempTable("numberTable") + + //Issue SQL query against the table + val SQLResult = sqlContext.sql("SELECT * FROM numberTable") + //Verify the SQL query result, r(0) mean column 0 + val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect() + sqlRS should have length 3 + sqlRS should contain theSameElementsAs List("one", "two", "three") + + //Convert QueryRDD to DataFrame using RDDConverter + val dataFrame2 = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) + //Register dataFrame2 as a table of two columns of type String and Int respectively + dataFrame2.registerTempTable("numberTable2") + + //Issue SQL query against the table + val SQLResult2 = sqlContext.sql("SELECT * FROM numberTable2") + //Verify the SQL query result, r(0) mean column 0 + val sqlRS2: Array[Any] = SQLResult2.map(r => r(0)).collect() + sqlRS2 should have length 3 + sqlRS2 should contain theSameElementsAs List("one", "two", "three") + + //Remove the region entries, because other tests might use the same region as well + List("1", "2", "3").foreach(rgn.remove) + } + + test("Run Geode OQL query and directly return DataFrame: Partitioned Region") { + simpleQueryDataFrame("obj_obj_region") + } + + test("Run Geode OQL query and directly return DataFrame: Replicated Region") { + simpleQueryDataFrame("obj_obj_rep_region") + } + + private def simpleQueryDataFrame(regionName: String) { + //Populate some data in the region + val conn = GeodeConnectionConf(sc.getConf).getConnection + val rgn: Region[String, String] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three"))) + + //Create DataFrame using Geode OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.geodeOQL(s"select * from /$regionName") + dataFrame.registerTempTable("numberTable") + + //Issue SQL query against the table + val SQLResult = sqlContext.sql("SELECT * FROM numberTable") + //Verify the SQL query result, r(0) mean column 0 + val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect() + sqlRS should have length 3 + sqlRS should contain theSameElementsAs List("one", "two", "three") + + //Remove the region entries, because other tests might use the same region as well + List("1", "2", "3").foreach(rgn.remove) + } + + test("Geode OQL query with UDT: Partitioned Region") { + queryUDT("obj_obj_region") + } + + test("Geode OQL query with UDT: Replicated Region") { + queryUDT("obj_obj_rep_region") + } + + private def queryUDT(regionName: String) { + + //Populate some data in the region + val conn = GeodeConnectionConf(sc.getConf).getConnection + val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + val e1: Employee = new Employee("hello", 123) + val e2: Employee = new Employee("world", 456) + rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2))) + + //Create QueryRDD using OQL + val OQLResult: QueryRDD[Object] = getQueryRDD(s"select name, age from /$regionName") + + //verify the QueryRDD + val oqlRS: Array[Object] = OQLResult.collect() + oqlRS should have length 2 + oqlRS.map(e => e.asInstanceOf[StructImpl].getFieldValues.apply(1)) should contain theSameElementsAs List(123, 456) + + //Convert QueryRDD to DataFrame + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + + //Convert QueryRDD to DataFrame using RDDConverter + val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) + dataFrame.registerTempTable("employee") + val SQLResult = sqlContext.sql("SELECT * FROM employee") + + //Verify the SQL query result + val sqlRS = SQLResult.map(r => r(0)).collect() + sqlRS should have length 2 + sqlRS should contain theSameElementsAs List("hello", "world") + + List("1", "2").foreach(rgn.remove) + } + + test("Geode OQL query with UDT and directly return DataFrame: Partitioned Region") { + queryUDTDataFrame("obj_obj_region") + } + + test("Geode OQL query with UDT and directly return DataFrame: Replicated Region") { + queryUDTDataFrame("obj_obj_rep_region") + } + + private def queryUDTDataFrame(regionName: String) { + //Populate some data in the region + val conn = GeodeConnectionConf(sc.getConf).getConnection + val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + val e1: Employee = new Employee("hello", 123) + val e2: Employee = new Employee("world", 456) + rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2))) + + //Create DataFrame using Geode OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.geodeOQL(s"select name, age from /$regionName") + + dataFrame.registerTempTable("employee") + val SQLResult = sqlContext.sql("SELECT * FROM employee") + + //Verify the SQL query result + val sqlRS = SQLResult.map(r => r(0)).collect() + sqlRS should have length 2 + sqlRS should contain theSameElementsAs List("hello", "world") + + List("1", "2").foreach(rgn.remove) + } + + test("Geode OQL query with more complex UDT: Partitioned Region") { + complexUDT("obj_obj_region") + } + + test("Geode OQL query with more complex UDT: Replicated Region") { + complexUDT("obj_obj_rep_region") + } + + private def complexUDT(regionName: String) { + + initRegion(regionName) + + //Create QueryRDD using OQL + val OQLResult: QueryRDD[Object] = getQueryRDD(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'") + + //verify the QueryRDD + val oqlRS: Array[Int] = OQLResult.collect().map(r => r.asInstanceOf[Portfolio].getId) + oqlRS should contain theSameElementsAs List(1, 3, 5, 7) + + //Convert QueryRDD to DataFrame + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + + //Convert QueryRDD to DataFrame using RDDConverter + val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) + + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT * FROM Portfolio") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType) + sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4") + } + + test("Geode OQL query with more complex UDT and directly return DataFrame: Partitioned Region") { + complexUDTDataFrame("obj_obj_region") + } + + test("Geode OQL query with more complex UDT and directly return DataFrame: Replicated Region") { + complexUDTDataFrame("obj_obj_rep_region") + } + + private def complexUDTDataFrame(regionName: String) { + + initRegion(regionName) + + //Create DataFrame using Geode OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.geodeOQL(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'") + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT * FROM Portfolio") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType) + sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4") + } + + test("Geode OQL query with more complex UDT with Projection: Partitioned Region") { + queryComplexUDTProjection("obj_obj_region") + } + + test("Geode OQL query with more complex UDT with Projection: Replicated Region") { + queryComplexUDTProjection("obj_obj_rep_region") + } + + private def queryComplexUDTProjection(regionName: String) { + + initRegion(regionName) + + //Create QueryRDD using OQL + val OQLResult: QueryRDD[Object] = getQueryRDD[Object](s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""") + + //verify the QueryRDD + val oqlRS: Array[Int] = OQLResult.collect().map(si => si.asInstanceOf[StructImpl].getFieldValues.apply(0).asInstanceOf[Int]) + oqlRS should contain theSameElementsAs List(1, 3, 5, 7) + + //Convert QueryRDD to DataFrame + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + + //Convert QueryRDD to DataFrame using RDDConverter + val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) + + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0)) + sqlRS should contain theSameElementsAs List(3) + } + + test("Geode OQL query with more complex UDT with Projection and directly return DataFrame: Partitioned Region") { + queryComplexUDTProjectionDataFrame("obj_obj_region") + } + + test("Geode OQL query with more complex UDT with Projection and directly return DataFrame: Replicated Region") { + queryComplexUDTProjectionDataFrame("obj_obj_rep_region") + } + + private def queryComplexUDTProjectionDataFrame(regionName: String) { + + initRegion(regionName) + + //Create DataFrame using Geode OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.geodeOQL(s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""") + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0)) + sqlRS should contain theSameElementsAs List(3) + } + + test("Geode OQL query with more complex UDT with nested Projection and directly return DataFrame: Partitioned Region") { + queryComplexUDTNestProjectionDataFrame("obj_obj_region") + } + + test("Geode OQL query with more complex UDT with nested Projection and directly return DataFrame: Replicated Region") { + queryComplexUDTNestProjectionDataFrame("obj_obj_rep_region") + } + + private def queryComplexUDTNestProjectionDataFrame(regionName: String) { + + initRegion(regionName) + + //Create DataFrame using Geode OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.geodeOQL(s"""SELECT r.id, r."type", r.positions, r.status FROM /$regionName r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'""") + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0)) + sqlRS should contain theSameElementsAs List(3) + } + + test("Undefined instance deserialization: Partitioned Region") { + undefinedInstanceDeserialization("obj_obj_region") + } + + test("Undefined instance deserialization: Replicated Region") { + undefinedInstanceDeserialization("obj_obj_rep_region") + } + + private def undefinedInstanceDeserialization(regionName: String) { + + val conn = GeodeConnectionConf(sc.getConf).getConnection + val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + + //Put some new data + rgn.put("1", "one") + + //Query some non-existent columns, which should return UNDEFINED + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.geodeOQL(s"SELECT col100, col200 FROM /$regionName") + val col1 = dataFrame.first().apply(0) + val col2 = dataFrame.first().apply(1) + assert(col1 == QueryService.UNDEFINED) + assert(col2 == QueryService.UNDEFINED) + //Verify that col1 and col2 refer to the same Undefined object + assert(col1.asInstanceOf[AnyRef] eq col2.asInstanceOf[AnyRef]) + } + + test("RDD.saveToGeode") { + val regionName = "str_str_region" + // generate: Vector((1,11), (2,22), (3,33), (4,44), (5,55), (6,66)) + val data = (1 to 6).map(_.toString).map(e=> (e, e*2)) + val rdd = sc.parallelize(data) + rdd.saveToGeode(regionName) + + // verify + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val region: Region[String, String] = connConf.getConnection.getRegionProxy(regionName) + println("region key set on server: " + region.keySetOnServer()) + assert((1 to 6).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer())) + (1 to 6).map(_.toString).foreach(e => assert(e*2 == region.get(e))) + } + + // =========================================================== + // DStream.saveToGeode() functional tests + // =========================================================== + + test("Basic DStream test") { + import org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, StreamingListener} + import io.pivotal.geode.spark.connector.streaming._ + import org.apache.spark.streaming.ManualClockHelper + + class TestStreamListener extends StreamingListener { + var count = 0 + override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = count += 1 + } + + def batchDuration = Seconds(1) + val ssc = new StreamingContext(sc, batchDuration) + val input = Seq(1 to 4, 5 to 8, 9 to 12) + val dstream = new TestInputDStream(ssc, input, 2) + dstream.saveToGeode[String, Int]("str_int_region", (e: Int) => (e.toString, e)) + try { + val listener = new TestStreamListener + ssc.addStreamingListener(listener) + ssc.start() + ManualClockHelper.addToTime(ssc, batchDuration.milliseconds * input.length) + while (listener.count < input.length) ssc.awaitTerminationOrTimeout(50) + } catch { + case e: Exception => e.printStackTrace(); throw e +// } finally { +// ssc.stop() + } + + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val conn = connConf.getConnection + val region: Region[String, Int] = conn.getRegionProxy("str_int_region") + + // verify geode region contents + println("region key set on server: " + region.keySetOnServer()) + assert((1 to 12).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer())) + (1 to 12).foreach(e => assert(e == region.get(e.toString))) + } +}