Repository: incubator-geode Updated Branches: refs/heads/develop 4af9bb381 -> b387e3587
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRDDPartitionerTest.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRDDPartitionerTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRDDPartitionerTest.scala new file mode 100644 index 0000000..20f753e --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRDDPartitionerTest.scala @@ -0,0 +1,174 @@ +package unittest.io.pivotal.gemfire.spark.connector.rdd + +import com.gemstone.gemfire.distributed.internal.ServerLocation +import io.pivotal.gemfire.spark.connector.internal.RegionMetadata +import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartitioner._ +import io.pivotal.gemfire.spark.connector.GemFireConnection +import io.pivotal.gemfire.spark.connector.internal.rdd._ +import org.apache.spark.Partition +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, FunSuite} + +import java.util.{HashSet => JHashSet, HashMap => JHashMap} + +import scala.collection.mutable + +class GemFireRDDPartitionerTest extends FunSuite with Matchers with MockitoSugar { + + val emptyServerBucketMap: JHashMap[ServerLocation, JHashSet[Integer]] = new JHashMap() + + def toJavaServerBucketMap(map: Map[(String, Int), Set[Int]]): JHashMap[ServerLocation, JHashSet[Integer]] = { + import scala.collection.JavaConversions._ + val tmp = map.map {case ((host, port), set) => (new ServerLocation(host, port), set.map(Integer.valueOf))} + (new JHashMap[ServerLocation, JHashSet[Integer]]() /: tmp) { case (acc, (s, jset)) => acc.put(s, new JHashSet(jset)); acc } + } + + val map: mutable.Map[(String, Int), mutable.Set[Int]] = mutable.Map( + ("s0",1) -> mutable.Set.empty, ("s1",2) -> mutable.Set(0), ("s2",3) -> mutable.Set(1, 2), ("s3",4) -> mutable.Set(3, 4, 5)) + + + // update this test whenever change default setting + test("default partitioned region partitioner") { + assert(GemFireRDDPartitioner.defaultPartitionedRegionPartitioner === ServerSplitsPartitioner) + } + + // update this test whenever change default setting + test("default replicated region partitioner") { + assert(GemFireRDDPartitioner.defaultReplicatedRegionPartitioner === OnePartitionPartitioner) + } + + test("GemFireRDDPartitioner.apply method") { + import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartitioner._ + for ((name, partitioner) <- partitioners) assert(GemFireRDDPartitioner(name) == partitioner) + assert(GemFireRDDPartitioner("dummy") == GemFireRDDPartitioner.defaultPartitionedRegionPartitioner) + assert(GemFireRDDPartitioner() == GemFireRDDPartitioner.defaultPartitionedRegionPartitioner) + } + + test("OnePartitionPartitioner") { + val mockConnection = mock[GemFireConnection] + val partitions = OnePartitionPartitioner.partitions[String, String](mockConnection, null, Map.empty) + verifySinglePartition(partitions) + } + + def verifySinglePartition(partitions: Array[Partition]): Unit = { + assert(1 == partitions.size) + assert(partitions(0).index === 0) + assert(partitions(0).isInstanceOf[GemFireRDDPartition]) + assert(partitions(0).asInstanceOf[GemFireRDDPartition].bucketSet.isEmpty) + } + + test("ServerSplitsPartitioner.doPartitions(): n=1 & no empty bucket") { + val map: List[(String, mutable.Set[Int])] = List( + "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4)) + val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 1) + verifyPartitions(partitions, List( + (Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5), Seq("server2")))) + } + + test("ServerSplitsPartitioner.doPartitions(): n=1 & 1 empty bucket") { + val map: List[(String, mutable.Set[Int])] = List( + "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4)) + val partitions = ServerSplitsPartitioner.doPartitions(map, 7, 1) + verifyPartitions(partitions, List( + (Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5), Seq("server2")))) + } + + test("ServerSplitsPartitioner.doPartitions(): n=1 & 2 empty bucket") { + val map: List[(String, mutable.Set[Int])] = List( + "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4)) + val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1) + verifyPartitions(partitions, List( + (Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5, 7), Seq("server2")))) + } + + test("ServerSplitsPartitioner.doPartitions(): n=1 & 5 empty bucket") { + val map: List[(String, mutable.Set[Int])] = List( + "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4)) + val partitions = ServerSplitsPartitioner.doPartitions(map, 11, 1) + verifyPartitions(partitions, List( + (Set(0, 1, 2, 3, 6, 7, 8), Seq("server1")), (Set(4, 5, 9, 10), Seq("server2")))) + } + + test("ServerSplitsPartitioner.doPartitions(): n=1, 4 empty-bucket, non-continuous IDs") { + val map: List[(String, mutable.Set[Int])] = List( + "server1" -> mutable.Set(1, 3), "server2" -> mutable.Set(5,7)) + val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1) + verifyPartitions(partitions, List( + (Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5, 6, 7), Seq("server2")))) + } + + test("ServerSplitsPartitioner.doPartitions(): n=2, no empty buckets, 3 servers have 1, 2, and 3 buckets") { + val map: List[(String, mutable.Set[Int])] = List( + "s1" -> mutable.Set(0), "s2" -> mutable.Set(1, 2), "s3" -> mutable.Set(3, 4, 5)) + val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 2) + // partitions.foreach(println) + verifyPartitions(partitions, List( + (Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 4), Seq("s3")), (Set(5), Seq("s3")))) + } + + test("ServerSplitsPartitioner.doPartitions(): n=3, no empty buckets, 4 servers have 0, 2, 3, and 4 buckets") { + val map: List[(String, mutable.Set[Int])] = List( + "s0" -> mutable.Set.empty, "s1" -> mutable.Set(0, 1), "s2" -> mutable.Set(2, 3, 4), "s3" -> mutable.Set(5, 6, 7, 8)) + val partitions = ServerSplitsPartitioner.doPartitions(map, 9, 3) + // partitions.foreach(println) + verifyPartitions(partitions, List( + (Set(0), Seq("s1")), (Set(1), Seq("s1")), (Set(2), Seq("s2")), (Set(3), Seq("s2")), (Set(4), Seq("s2")), + (Set(5, 6), Seq("s3")), (Set(7, 8), Seq("s3")) )) + } + + test("ServerSplitsPartitioner.partitions(): metadata = None ") { + val regionPath = "test" + val mockConnection = mock[GemFireConnection] + intercept[RuntimeException] { ServerSplitsPartitioner.partitions[String, String](mockConnection, null, Map.empty) } + } + + test("ServerSplitsPartitioner.partitions(): replicated region ") { + val regionPath = "test" + val mockConnection = mock[GemFireConnection] + val md = new RegionMetadata(regionPath, false, 11, null) + when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md)) + val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map.empty) + verifySinglePartition(partitions) + } + + test("ServerSplitsPartitioner.partitions(): partitioned region w/o data ") { + val regionPath = "test" + val mockConnection = mock[GemFireConnection] + val md = new RegionMetadata(regionPath, true, 6, emptyServerBucketMap) + when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md)) + val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map.empty) + verifySinglePartition(partitions) + } + + test("ServerSplitsPartitioner.partitions(): partitioned region w/ some data ") { + import io.pivotal.gemfire.spark.connector.NumberPartitionsPerServerPropKey + val regionPath = "test" + val mockConnection = mock[GemFireConnection] + val map: Map[(String, Int), Set[Int]] = Map( + ("s0",1) -> Set.empty, ("s1",2) -> Set(0), ("s2",3) -> Set(1, 2), ("s3",4) -> Set(3, 4, 5)) + val md = new RegionMetadata(regionPath, true, 6, toJavaServerBucketMap(map)) + when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md)) + val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map(NumberPartitionsPerServerPropKey->"2")) + // partitions.foreach(println) + verifyPartitions(partitions, List( + (Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 4), Seq("s3")), (Set(5), Seq("s3")))) + } + + // Note: since the order of partitions is not pre-determined, we have to verify partition id + // and contents separately + def verifyPartitions(partitions: Array[Partition], expPartitions: List[(Set[Int], Seq[String])]): Unit = { + // 1. check size + assert(partitions.size == expPartitions.size) + // 2. check IDs are 0 to n-1 + (0 until partitions.size).toList.zip(partitions).foreach { case (id, p) => assert(id == p.index) } + + // 3. get all pairs of bucket set and its locations, and compare to the expected pairs + val list = partitions.map { e => + val p = e.asInstanceOf[GemFireRDDPartition] + (p.bucketSet, p.locations) + } + expPartitions.foreach(e => assert(list.contains(e))) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala new file mode 100644 index 0000000..ef46ec1 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRegionRDDTest.scala @@ -0,0 +1,101 @@ +package unittest.io.pivotal.gemfire.spark.connector.rdd + +import com.gemstone.gemfire.cache.Region +import io.pivotal.gemfire.spark.connector.internal.RegionMetadata +import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireRDDPartition, GemFireRegionRDD} +import io.pivotal.gemfire.spark.connector.{GemFireConnectionConf, GemFireConnection} +import org.apache.spark.{TaskContext, Partition, SparkContext} +import org.mockito.Mockito._ +import org.mockito.Matchers.{eq => mockEq, any => mockAny} +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, FunSuite} + +import scala.reflect.ClassTag + +class GemFireRegionRDDTest extends FunSuite with Matchers with MockitoSugar { + + /** create common mocks, not all mocks are used by all tests */ + def createMocks[K, V](regionPath: String)(implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]) + : (String, Region[K,V], GemFireConnectionConf, GemFireConnection) = { + val mockConnection = mock[GemFireConnection] + val mockRegion = mock[Region[K, V]] + val mockConnConf = mock[GemFireConnectionConf] + when(mockConnConf.getConnection).thenReturn(mockConnection) + when(mockConnection.getRegionProxy[K, V](regionPath)).thenReturn(mockRegion) + when(mockConnConf.locators).thenReturn(Seq.empty) + (regionPath, mockRegion, mockConnConf, mockConnection) + } + + test("create GemFireRDD with non-existing region") { + val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") + when(mockConnConf.getConnection).thenReturn(mockConnection) + when(mockConnection.validateRegion[String,String](regionPath)).thenThrow(new RuntimeException) + val mockSparkContext = mock[SparkContext] + intercept[RuntimeException] { GemFireRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf) } + verify(mockConnConf).getConnection + verify(mockConnection).validateRegion[String, String](regionPath) + } + + test("getPartitions with non-existing region") { + // region exists when RDD is created, but get removed before getPartitions() is invoked + val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") + when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(None) + val mockSparkContext = mock[SparkContext] + intercept[RuntimeException] { GemFireRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf).getPartitions } + } + + test("getPartitions with replicated region and not preferred env") { + val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") + implicit val mockConnConf2 = mockConnConf + val mockSparkContext = mock[SparkContext] + when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, false, 0, null))) + val partitions = GemFireRegionRDD(mockSparkContext, regionPath, mockConnConf).partitions + verifySinglePartition(partitions) + } + + def verifySinglePartition(partitions: Array[Partition]): Unit = { + assert(1 == partitions.size) + assert(partitions(0).index === 0) + assert(partitions(0).isInstanceOf[GemFireRDDPartition]) + assert(partitions(0).asInstanceOf[GemFireRDDPartition].bucketSet.isEmpty) + } + + test("getPartitions with replicated region and preferred OnePartitionPartitioner") { + // since it's replicated region, so OnePartitionPartitioner will be used, i.e., override preferred partitioner + import io.pivotal.gemfire.spark.connector.{PreferredPartitionerPropKey, OnePartitionPartitionerName} + val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") + when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, false, 0, null))) + implicit val mockConnConf2 = mockConnConf + val mockSparkContext = mock[SparkContext] + val env = Map(PreferredPartitionerPropKey -> OnePartitionPartitionerName) + val partitions = GemFireRegionRDD(mockSparkContext, regionPath, mockConnConf, env).partitions + verifySinglePartition(partitions) + } + + test("getPartitions with partitioned region and not preferred env") { + val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") + implicit val mockConnConf2 = mockConnConf + val mockSparkContext = mock[SparkContext] + when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, true, 2, null))) + val partitions = GemFireRegionRDD(mockSparkContext, regionPath, mockConnConf).partitions + verifySinglePartition(partitions) + } + + test("GemFireRDD.compute() method") { + val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") + implicit val mockConnConf2 = mockConnConf + val mockIter = mock[Iterator[(String, String)]] + val partition = GemFireRDDPartition(0, Set.empty) + when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, true, 2, null))) + when(mockConnection.getRegionData[String, String](regionPath, None, partition)).thenReturn(mockIter) + val mockSparkContext = mock[SparkContext] + val rdd = GemFireRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf) + val partitions = rdd.partitions + assert(1 == partitions.size) + val mockTaskContext = mock[TaskContext] + rdd.compute(partitions(0), mockTaskContext) + verify(mockConnection).getRegionData[String, String](mockEq(regionPath), mockEq(None), mockEq(partition)) + // verify(mockConnection).getRegionData[String, String](regionPath, Set.empty.asInstanceOf[Set[Int]], "gemfireRDD 0.0") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java b/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java new file mode 100644 index 0000000..6207eab --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/Emp.java @@ -0,0 +1,79 @@ +package demo; + +import java.io.Serializable; + +/** + * This is a demo class used in doc/?.md + */ +public class Emp implements Serializable { + + private int id; + + private String lname; + + private String fname; + + private int age; + + private String loc; + + public Emp(int id, String lname, String fname, int age, String loc) { + this.id = id; + this.lname = lname; + this.fname = fname; + this.age = age; + this.loc = loc; + } + + public int getId() { + return id; + } + + public String getLname() { + return lname; + } + + public String getFname() { + return fname; + } + + public int getAge() { + return age; + } + + public String getLoc() { + return loc; + } + + @Override + public String toString() { + return "Emp(" + id + ", " + lname + ", " + fname + ", " + age + ", " + loc + ")"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Emp emp = (Emp) o; + + if (age != emp.age) return false; + if (id != emp.id) return false; + if (fname != null ? !fname.equals(emp.fname) : emp.fname != null) return false; + if (lname != null ? !lname.equals(emp.lname) : emp.lname != null) return false; + if (loc != null ? !loc.equals(emp.loc) : emp.loc != null) return false; + + return true; + } + + @Override + public int hashCode() { + int result = id; + result = 31 * result + (lname != null ? lname.hashCode() : 0); + result = 31 * result + (fname != null ? fname.hashCode() : 0); + result = 31 * result + age; + result = 31 * result + (loc != null ? loc.hashCode() : 0); + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java b/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java new file mode 100644 index 0000000..b4ec109 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java @@ -0,0 +1,43 @@ +package demo; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.SQLContext; +import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*; + + +/** + * This Spark application demonstrates how to get region data from GemFire using GemFire + * OQL Java API. The result is a Spark DataFrame. + * <p> + * In order to run it, you will need to start a GemFire cluster, and run demo PairRDDSaveJavaDemo + * first to create some data in the region. + * <p> + * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar + * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/. + * Then run the following command to start a Spark job: + * <pre> + * <path to spark>/bin/spark-submit --master=local[2] --class demo.OQLJavaDemo \ + * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port> + * </pre> + */ +public class OQLJavaDemo { + + public static void main(String[] argv) { + + if (argv.length != 1) { + System.err.printf("Usage: OQLJavaDemo <locators>\n"); + return; + } + + SparkConf conf = new SparkConf().setAppName("OQLJavaDemo"); + conf.set(GemFireLocatorPropKey, argv[0]); // "192.168.1.47[10335]" + JavaSparkContext sc = new JavaSparkContext(conf); + SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); + DataFrame df = javaFunctions(sqlContext).gemfireOQL("select * from /str_str_region"); + System.out.println("======= DataFrame =======\n"); + df.show(); + sc.stop(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java b/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java new file mode 100644 index 0000000..eaeb400 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java @@ -0,0 +1,70 @@ +package demo; + +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import scala.Tuple2; +import java.util.*; + +import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*; + +/** + * This Spark application demonstrates how to save a RDD to GemFire using GemFire Spark + * Connector with Java. + * <p/> + * In order to run it, you will need to start GemFire cluster, and create the following region + * with GFSH: + * <pre> + * gfsh> create region --name=str_str_region --type=REPLICATE \ + * --key-constraint=java.lang.String --value-constraint=java.lang.String + * </pre> + * + * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar + * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/. + * Then run the following command to start a Spark job: + * <pre> + * <path to spark>/bin/spark-submit --master=local[2] --class demo.PairRDDSaveJavaDemo \ + * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port> + * </pre> + * + * Verify the data was saved to GemFire with GFSH: + * <pre>gfsh> query --query="select * from /str_str_region.entrySet" </pre> + */ +public class PairRDDSaveJavaDemo { + + public static void main(String[] argv) { + + if (argv.length != 1) { + System.err.printf("Usage: PairRDDSaveJavaDemo <locators>\n"); + return; + } + + SparkConf conf = new SparkConf().setAppName("PairRDDSaveJavaDemo"); + conf.set(GemFireLocatorPropKey, argv[0]); + JavaSparkContext sc = new JavaSparkContext(conf); + GemFireConnectionConf connConf = GemFireConnectionConf.apply(conf); + + List<Tuple2<String, String>> data = new ArrayList<>(); + data.add(new Tuple2<>("7", "seven")); + data.add(new Tuple2<>("8", "eight")); + data.add(new Tuple2<>("9", "nine")); + + List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>(); + data2.add(new Tuple2<>("11", "eleven")); + data2.add(new Tuple2<>("12", "twelve")); + data2.add(new Tuple2<>("13", "thirteen")); + + // method 1: generate JavaPairRDD directly + JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(data); + javaFunctions(rdd1).saveToGemfire("str_str_region", connConf); + + // method 2: convert JavaRDD<Tuple2<K,V>> to JavaPairRDD<K, V> + JavaRDD<Tuple2<String, String>> rdd2 = sc.parallelize(data2); + javaFunctions(toJavaPairRDD(rdd2)).saveToGemfire("str_str_region", connConf); + + sc.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java b/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java new file mode 100644 index 0000000..e117abe --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java @@ -0,0 +1,69 @@ +package demo; + +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.List; + +import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*; + +/** + * This Spark application demonstrates how to save a RDD to GemFire using GemFire Spark + * Connector with Java. + * <p/> + * In order to run it, you will need to start GemFire cluster, and create the following region + * with GFSH: + * <pre> + * gfsh> create region --name=str_int_region --type=REPLICATE \ + * --key-constraint=java.lang.String --value-constraint=java.lang.Integer + * </pre> + * + * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar + * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/. + * Then run the following command to start a Spark job: + * <pre> + * <path to spark>/bin/spark-submit --master=local[2] --class demo.RDDSaveJavaDemo \ + * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port> + * </pre> + * + * Verify the data was saved to GemFire with GFSH: + * <pre>gfsh> query --query="select * from /str_int_region.entrySet" </pre> + */ +public class RDDSaveJavaDemo { + + public static void main(String[] argv) { + + if (argv.length != 1) { + System.err.printf("Usage: RDDSaveJavaDemo <locators>\n"); + return; + } + + SparkConf conf = new SparkConf().setAppName("RDDSaveJavaDemo"); + conf.set(GemFireLocatorPropKey, argv[0]); + JavaSparkContext sc = new JavaSparkContext(conf); + + List<String> data = new ArrayList<String>(); + data.add("abcdefg"); + data.add("abcdefgh"); + data.add("abcdefghi"); + JavaRDD<String> rdd = sc.parallelize(data); + + GemFireConnectionConf connConf = GemFireConnectionConf.apply(conf); + + PairFunction<String, String, Integer> func = new PairFunction<String, String, Integer>() { + @Override public Tuple2<String, Integer> call(String s) throws Exception { + return new Tuple2<String, Integer>(s, s.length()); + } + }; + + javaFunctions(rdd).saveToGemfire("str_int_region", func, connConf); + + sc.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java b/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java new file mode 100644 index 0000000..325efc8 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java @@ -0,0 +1,41 @@ +package demo; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*; + +/** + * This Spark application demonstrates how to expose a region in GemFire as a RDD using GemFire + * Spark Connector with Java. + * <p> + * In order to run it, you will need to start GemFire cluster, and run demo PairRDDSaveJavaDemo + * first to create some data in the region. + * <p> + * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar + * should be generated under gemfire-spark-demos/basic-demos/target/scala-2.10/. + * Then run the following command to start a Spark job: + * <pre> + * <path to spark>/bin/spark-submit --master=local[2] --class demo.RegionToRDDJavaDemo \ + * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port> + * </pre> + */ +public class RegionToRDDJavaDemo { + + public static void main(String[] argv) { + + if (argv.length != 1) { + System.err.printf("Usage: RegionToRDDJavaDemo <locators>\n"); + return; + } + + SparkConf conf = new SparkConf().setAppName("RegionToRDDJavaDemo"); + conf.set(GemFireLocatorPropKey, argv[0]); + JavaSparkContext sc = new JavaSparkContext(conf); + + JavaPairRDD<String, String> rdd = javaFunctions(sc).gemfireRegion("str_str_region"); + System.out.println("=== gemfireRegion =======\n" + rdd.collect() + "\n========================="); + + sc.stop(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala b/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala new file mode 100644 index 0000000..01a0588 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala @@ -0,0 +1,59 @@ +package demo + +import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Seconds, StreamingContext} +import io.pivotal.gemfire.spark.connector.GemFireLocatorPropKey +import io.pivotal.gemfire.spark.connector.streaming._ + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * <p><p> + * In order to run it, you will need to start GemFire cluster, and create the following region + * with GFSH: + * <pre> + * gfsh> create region --name=str_int_region --type=REPLICATE \ + * --key-constraint=java.lang.String --value-constraint=java.lang.Integer + * </pre> + * + * <p>To run this on your local machine, you need to first run a net cat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/spark-submit --master=local[2] --class demo.NetworkWordCount <path to>/basic-demos_2.10-0.5.0.jar localhost 9999 locatorHost:port` + * + * <p><p> check result that was saved to GemFire with GFSH: + * <pre>gfsh> query --query="select * from /str_int_region.entrySet" </pre> + */ +object NetworkWordCount { + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: NetworkWordCount <hostname> <port> <gemfire locator>") + System.exit(1) + } + + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + val currentCount = values.foldLeft(0)(_ + _) + val previousCount = state.getOrElse(0) + Some(currentCount + previousCount) + } + + // Create the context with a 1 second batch size + val sparkConf = new SparkConf().setAppName("NetworkWordCount").set(GemFireLocatorPropKey, args(2)) + val ssc = new StreamingContext(sparkConf, Seconds(1)) + ssc.checkpoint(".") + + // Create a socket stream on target ip:port and count the + // words in input stream of \n delimited text (eg. generated by 'nc') + // Note that no duplication in storage level only for running locally. + // Replication necessary in distributed scenario for fault tolerance. + val lines = ssc.socketTextStream(args(0), args(1).toInt) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + val runningCounts = wordCounts.updateStateByKey[Int](updateFunc) + // runningCounts.print() + runningCounts.saveToGemfire("str_int_region") + ssc.start() + ssc.awaitTermination() + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/project/Dependencies.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/project/Dependencies.scala b/gemfire-spark-connector/project/Dependencies.scala new file mode 100644 index 0000000..899e182 --- /dev/null +++ b/gemfire-spark-connector/project/Dependencies.scala @@ -0,0 +1,29 @@ +import sbt._ +import sbt.Keys._ + +object Dependencies { + + object Compile { + val sparkStreaming = "org.apache.spark" %% "spark-streaming" % "1.3.0" + val sparkSql = "org.apache.spark" %% "spark-sql" % "1.3.0" + val gemfire = "com.gemstone.gemfire" % "gemfire" % "9.0.Beta-SNAPSHOT" + } + + object Test { + val scalaTest = "org.scalatest" % "scalatest_2.10" % "2.2.1" % "it, test" //scala test framework + val mockito = "org.mockito" % "mockito-all" % "1.10.19" % "test" //mockito mock test framework + val junit = "junit" % "junit" % "4.11" % "it, test" //4.11 because the junit-interface was complaining when using 4.12 + val novoCode = "com.novocode" % "junit-interface" % "0.11" % "it, test"//for junit to run with sbt + } + + import Test._ + import Compile._ + + val unitTests = Seq(scalaTest, mockito, junit, novoCode) + + val connector = unitTests ++ Seq(sparkStreaming, sparkSql, gemfire) + + val functions = Seq(gemfire, junit) + + val demos = Seq(sparkStreaming, sparkSql, gemfire) +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/project/GemFireSparkBuild.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/project/GemFireSparkBuild.scala b/gemfire-spark-connector/project/GemFireSparkBuild.scala new file mode 100644 index 0000000..32101b9 --- /dev/null +++ b/gemfire-spark-connector/project/GemFireSparkBuild.scala @@ -0,0 +1,60 @@ +import sbt._ +import sbt.Keys._ +import scoverage.ScoverageSbtPlugin._ +import scoverage.ScoverageSbtPlugin + +object GemFireSparkConnectorBuild extends Build { + import Settings._ + import Dependencies._ + + lazy val root = Project( + id = "root", + base =file("."), + aggregate = Seq(gemfireFunctions, gemfireSparkConnector,demos), + settings = commonSettings ++ Seq( + name := "GemFire Connector for Apache Spark", + publishArtifact := false, + publishLocal := { }, + publish := { } + ) + ) + + lazy val gemfireFunctions = Project( + id = "gemfire-functions", + base = file("gemfire-functions"), + settings = commonSettings ++ Seq(libraryDependencies ++= Dependencies.functions, + resolvers ++= gfcResolvers, + description := "Required GemFire Functions to be deployed onto the GemFire Cluster before using the GemFire Spark Connector" + ) + ).configs(IntegrationTest) + + lazy val gemfireSparkConnector = Project( + id = "gemfire-spark-connector", + base = file("gemfire-spark-connector"), + settings = gfcSettings ++ Seq(libraryDependencies ++= Dependencies.connector, + resolvers ++= gfcResolvers, + description := "A library that exposes GemFire regions as Spark RDDs, writes Spark RDDs to GemFire regions, and executes OQL queries from Spark Applications to GemFire" + ) + ).dependsOn(gemfireFunctions).configs(IntegrationTest) + + + /******** Demo Project Definitions ********/ + lazy val demoPath = file("gemfire-spark-demos") + + lazy val demos = Project ( + id = "gemfire-spark-demos", + base = demoPath, + settings = demoSettings, + aggregate = Seq(basicDemos) + ) + + lazy val basicDemos = Project ( + id = "basic-demos", + base = demoPath / "basic-demos", + settings = demoSettings ++ Seq(libraryDependencies ++= Dependencies.demos, + resolvers ++= gfcResolvers, + description := "Sample applications that demonstrates functionality of the GemFire Spark Connector" + ) + ).dependsOn(gemfireSparkConnector) +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/project/Settings.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/project/Settings.scala b/gemfire-spark-connector/project/Settings.scala new file mode 100644 index 0000000..ec61884 --- /dev/null +++ b/gemfire-spark-connector/project/Settings.scala @@ -0,0 +1,42 @@ +import sbt._ +import sbt.Keys._ +import org.scalastyle.sbt.ScalastylePlugin + +object Settings extends Build { + + lazy val commonSettings = Seq( + organization := "io.pivotal", + version := "0.5.0", + scalaVersion := "2.10.4", + organization := "io.pivotal.gemfire.spark", + organizationHomepage := Some(url("http://www.pivotal.io/")) + ) + + lazy val gfcResolvers = Seq( + "GemStone Snapshots" at "http://nexus.gemstone.com:8081/nexus/content/repositories/snapshots/", + //"GemStone Official Release" at "http://dist.gemstone.com/maven/release", + "Repo for JLine" at "http://repo.springsource.org/libs-release", + "Local Maven" at Path.userHome.asFile.toURI.toURL + ".m2/repository" + //"Apache Repository" at "https://repository.apache.org/content/repositories/releases/", + // "Akka Repository" at "http://repo.akka.io/releases/", + // "Spray Repository" at "http://repo.spray.cc/" + //"Local Maven Repository" at "file://"+Path.userHome.absolutePath+"/.m2/repository" + ) + + + val gfcITSettings = inConfig(IntegrationTest)(Defaults.itSettings) ++ + Seq(parallelExecution in IntegrationTest := false, fork in IntegrationTest := true) + + val gfcCompileSettings = inConfig(Compile)(Defaults.compileSettings) ++ Seq(unmanagedSourceDirectories in Compile += baseDirectory.value /"../gemfire-functions/src") + + val gfcSettings = commonSettings ++ gfcITSettings ++ gfcCompileSettings + + val demoSettings = commonSettings ++ Seq( + scoverage.ScoverageSbtPlugin.ScoverageKeys.coverageExcludedPackages := ".*" + ) + + val scalastyleSettings = Seq( + ScalastylePlugin.scalastyleConfig := baseDirectory.value / "project/scalastyle-config.xml" + ) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/project/build.properties ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/project/build.properties b/gemfire-spark-connector/project/build.properties new file mode 100644 index 0000000..64abd37 --- /dev/null +++ b/gemfire-spark-connector/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.6 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/project/plugins.sbt ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/project/plugins.sbt b/gemfire-spark-connector/project/plugins.sbt new file mode 100644 index 0000000..313bbbf --- /dev/null +++ b/gemfire-spark-connector/project/plugins.sbt @@ -0,0 +1,8 @@ +addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.6.0") + +resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/" + +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.0.4") + +resolvers += Classpaths.sbtPluginReleases + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/scalastyle-config.xml ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/scalastyle-config.xml b/gemfire-spark-connector/scalastyle-config.xml new file mode 100644 index 0000000..fcbfc0e --- /dev/null +++ b/gemfire-spark-connector/scalastyle-config.xml @@ -0,0 +1,117 @@ +<scalastyle> + <name>Scalastyle standard configuration</name> + <check level="warning" class="org.scalastyle.file.FileTabChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.file.FileLengthChecker" enabled="true"> + <parameters> + <parameter name="maxFileLength"><![CDATA[800]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.file.HeaderMatchesChecker" enabled="false"> + <parameters> + <parameter name="header"><![CDATA[// Copyright (C) 2011-2012 the original author or authors. +// See the LICENCE.txt file distributed with this work for additional +// information regarding copyright ownership. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="false"></check> + <check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true"> + <parameters> + <parameter name="maxLineLength"><![CDATA[160]]></parameter> + <parameter name="tabSize"><![CDATA[4]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"> + <parameters> + <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true"> + <parameters> + <parameter name="maxParameters"><![CDATA[8]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> + <parameters> + <parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="false"></check> + <check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.file.RegexChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[println]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true"> + <parameters> + <parameter name="maxTypes"><![CDATA[30]]></parameter> + </parameters> + </check> + <check level="error" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true"> + <parameters> + <parameter name="maximum"><![CDATA[10]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" enabled="false"> + <parameters> + <parameter name="singleLineAllowed"><![CDATA[true]]></parameter> + <parameter name="doubleLineAllowed"><![CDATA[false]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true"> + <parameters> + <parameter name="maxLength"><![CDATA[50]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true"> + <parameters> + <parameter name="maxMethods"><![CDATA[30]]></parameter> + </parameters> + </check> + <check level="warning" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check> + <check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="false"></check> + <check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check> +</scalastyle>