http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala b/gemfire-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala new file mode 100644 index 0000000..efe229c --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala @@ -0,0 +1,28 @@ +package org.apache.spark.streaming + +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.dstream.InputDStream + +import scala.reflect.ClassTag + +class TestInputDStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int) + extends InputDStream[T](ssc_) { + + def start() {} + + def stop() {} + + def compute(validTime: Time): Option[RDD[T]] = { + logInfo("Computing RDD for time " + validTime) + val index = ((validTime - zeroTime) / slideDuration - 1).toInt + val selectedInput = if (index < input.size) input(index) else Seq[T]() + + // lets us test cases where RDDs are not created + if (selectedInput == null) + return None + + val rdd = ssc.sc.makeRDD(selectedInput, numPartitions) + logInfo("Created RDD " + rdd.id + " with " + selectedInput) + Some(rdd) + } +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java new file mode 100644 index 0000000..951d6c9 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java @@ -0,0 +1,44 @@ +package io.pivotal.gemfire.spark.connector.javaapi; + +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; +import io.pivotal.gemfire.spark.connector.streaming.GemFireDStreamFunctions; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.streaming.api.java.JavaDStream; + +/** + * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaDStream} + * to provide GemFire Spark Connector functionality. + * + * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p> + */ +public class GemFireJavaDStreamFunctions<T> { + + public final GemFireDStreamFunctions<T> dsf; + + public GemFireJavaDStreamFunctions(JavaDStream<T> ds) { + this.dsf = new GemFireDStreamFunctions<T>(ds.dstream()); + } + + /** + * Save the JavaDStream to GemFire key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param func the PairFunction that converts elements of JavaDStream to key/value pairs + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + */ + public <K, V> void saveToGemfire( + String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf) { + dsf.saveToGemfire(regionPath, func, connConf); + } + + /** + * Save the JavaDStream to GemFire key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param func the PairFunction that converts elements of JavaDStream to key/value pairs + */ + public <K, V> void saveToGemfire( + String regionPath, PairFunction<T, K, V> func) { + dsf.saveToGemfire(regionPath, func, dsf.defaultConnectionConf()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java new file mode 100644 index 0000000..3b43a65 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java @@ -0,0 +1,39 @@ +package io.pivotal.gemfire.spark.connector.javaapi; + +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; +import io.pivotal.gemfire.spark.connector.streaming.GemFirePairDStreamFunctions; +import org.apache.spark.streaming.api.java.JavaPairDStream; + +/** + * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaPairDStream} + * to provide GemFire Spark Connector functionality. + * + * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p> + */ +public class GemFireJavaPairDStreamFunctions<K, V> { + + public final GemFirePairDStreamFunctions<K, V> dsf; + + public GemFireJavaPairDStreamFunctions(JavaPairDStream<K, V> ds) { + this.dsf = new GemFirePairDStreamFunctions<K, V>(ds.dstream()); + } + + /** + * Save the JavaPairDStream to GemFire key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + */ + public void saveToGemfire(String regionPath, GemFireConnectionConf connConf) { + dsf.saveToGemfire(regionPath, connConf); + } + + /** + * Save the JavaPairDStream to GemFire key-value store. + * @param regionPath the full path of region that the DStream is stored + */ + public void saveToGemfire(String regionPath) { + dsf.saveToGemfire(regionPath, dsf.defaultConnectionConf()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java new file mode 100644 index 0000000..609cdbf --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java @@ -0,0 +1,203 @@ +package io.pivotal.gemfire.spark.connector.javaapi; + +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; +import io.pivotal.gemfire.spark.connector.GemFirePairRDDFunctions; +import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireJoinRDD; +import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireOuterJoinRDD; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function; +import scala.Option; +import scala.Tuple2; +import scala.reflect.ClassTag; + +import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*; + +/** + * A Java API wrapper over {@link org.apache.spark.api.java.JavaPairRDD} to provide GemFire Spark + * Connector functionality. + * + * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p> + */ +public class GemFireJavaPairRDDFunctions<K, V> { + + public final GemFirePairRDDFunctions<K, V> rddf; + + public GemFireJavaPairRDDFunctions(JavaPairRDD<K, V> rdd) { + this.rddf = new GemFirePairRDDFunctions<K, V>(rdd.rdd()); + } + + /** + * Save the pair RDD to GemFire key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + */ + public void saveToGemfire(String regionPath, GemFireConnectionConf connConf) { + rddf.saveToGemfire(regionPath, connConf); + } + + /** + * Save the pair RDD to GemFire key-value store with the default GemFireConnector. + * @param regionPath the full path of region that the RDD is stored + */ + public void saveToGemfire(String regionPath) { + rddf.saveToGemfire(regionPath, rddf.defaultConnectionConf()); + } + + /** + * Return an JavaPairRDD containing all pairs of elements with matching keys in + * this RDD<K, V> and the GemFire `Region<K, V2>`. Each pair of elements + * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and + * (k, v2) is in the GemFire region. + * + * @param regionPath the region path of the GemFire region + * @tparam V2 the value type of the GemFire region + * @return JavaPairRDD<<K, V>, V2> + */ + public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(String regionPath) { + return joinGemfireRegion(regionPath, rddf.defaultConnectionConf()); + } + + /** + * Return an JavaPairRDD containing all pairs of elements with matching keys in + * this RDD<K, V> and the GemFire `Region<K, V2>`. Each pair of elements + * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and + * (k, v2) is in the GemFire region. + * + * @param regionPath the region path of the GemFire region + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @tparam V2 the value type of the GemFire region + * @return JavaPairRDD<<K, V>, V2> + */ + public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion( + String regionPath, GemFireConnectionConf connConf) { + GemFireJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.joinGemfireRegion(regionPath, connConf); + ClassTag<Tuple2<K, V>> kt = fakeClassTag(); + ClassTag<V2> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + + /** + * Return an RDD containing all pairs of elements with matching keys in this + * RDD<K, V> and the GemFire `Region<K2, V2>`. The join key from RDD + * element is generated by `func(K, V) => K2`, and the key from the GemFire + * region is just the key of the key/value pair. + * + * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, + * where (k, v) is in this RDD and (k2, v2) is in the GemFire region. + * + * @param regionPath the region path of the GemFire region + * @param func the function that generates region key from RDD element (K, V) + * @tparam K2 the key type of the GemFire region + * @tparam V2 the value type of the GemFire region + * @return JavaPairRDD<Tuple2<K, V>, V2> + */ + public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion( + String regionPath, Function<Tuple2<K, V>, K2> func) { + return joinGemfireRegion(regionPath, func, rddf.defaultConnectionConf()); + } + + /** + * Return an RDD containing all pairs of elements with matching keys in this + * RDD<K, V> and the GemFire `Region<K2, V2>`. The join key from RDD + * element is generated by `func(K, V) => K2`, and the key from the GemFire + * region is just the key of the key/value pair. + * + * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, + * where (k, v) is in this RDD and (k2, v2) is in the GemFire region. + * + * @param regionPath the region path of the GemFire region + * @param func the function that generates region key from RDD element (K, V) + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @tparam K2 the key type of the GemFire region + * @tparam V2 the value type of the GemFire region + * @return JavaPairRDD<Tuple2<K, V>, V2> + */ + public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion( + String regionPath, Function<Tuple2<K, V>, K2> func, GemFireConnectionConf connConf) { + GemFireJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.joinGemfireRegion(regionPath, func, connConf); + ClassTag<Tuple2<K, V>> kt = fakeClassTag(); + ClassTag<V2> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + + /** + * Perform a left outer join of this RDD<K, V> and the GemFire `Region<K, V2>`. + * For each element (k, v) in this RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair + * ((k, v), None)) if no element in the GemFire region have key k. + * + * @param regionPath the region path of the GemFire region + * @tparam K2 the key type of the GemFire region + * @tparam V2 the value type of the GemFire region + * @return JavaPairRDD<Tuple2<K, V>, Option<V>> + */ + public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion(String regionPath) { + return outerJoinGemfireRegion(regionPath, rddf.defaultConnectionConf()); + } + + /** + * Perform a left outer join of this RDD<K, V> and the GemFire `Region<K, V2>`. + * For each element (k, v) in this RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair + * ((k, v), None)) if no element in the GemFire region have key k. + * + * @param regionPath the region path of the GemFire region + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @tparam K2 the key type of the GemFire region + * @tparam V2 the value type of the GemFire region + * @return JavaPairRDD<Tuple2<K, V>, Option<V>> + */ + public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion( + String regionPath, GemFireConnectionConf connConf) { + GemFireOuterJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.outerJoinGemfireRegion(regionPath, connConf); + ClassTag<Tuple2<K, V>> kt = fakeClassTag(); + ClassTag<Option<V2>> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + + /** + * Perform a left outer join of this RDD<K, V> and the GemFire `Region<K2, V2>`. + * The join key from RDD element is generated by `func(K, V) => K2`, and the + * key from region is just the key of the key/value pair. + * + * For each element (k, v) in `this` RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair + * ((k, v), None)) if no element in the GemFire region have key `func(k, v)`. + * + * @param regionPath the region path of the GemFire region + * @param func the function that generates region key from RDD element (K, V) + * @tparam K2 the key type of the GemFire region + * @tparam V2 the value type of the GemFire region + * @return JavaPairRDD<Tuple2<K, V>, Option<V>> + */ + public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion( + String regionPath, Function<Tuple2<K, V>, K2> func) { + return outerJoinGemfireRegion(regionPath, func, rddf.defaultConnectionConf()); + } + + /** + * Perform a left outer join of this RDD<K, V> and the GemFire `Region<K2, V2>`. + * The join key from RDD element is generated by `func(K, V) => K2`, and the + * key from region is just the key of the key/value pair. + * + * For each element (k, v) in `this` RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair + * ((k, v), None)) if no element in the GemFire region have key `func(k, v)`. + * + * @param regionPath the region path of the GemFire region + * @param func the function that generates region key from RDD element (K, V) + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @tparam K2 the key type of the GemFire region + * @tparam V2 the value type of the GemFire region + * @return JavaPairRDD<Tuple2<K, V>, Option<V>> + */ + public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion( + String regionPath, Function<Tuple2<K, V>, K2> func, GemFireConnectionConf connConf) { + GemFireOuterJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.outerJoinGemfireRegion(regionPath, func, connConf); + ClassTag<Tuple2<K, V>> kt = fakeClassTag(); + ClassTag<Option<V2>> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java new file mode 100644 index 0000000..ccdabb7 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java @@ -0,0 +1,136 @@ +package io.pivotal.gemfire.spark.connector.javaapi; + +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; +import io.pivotal.gemfire.spark.connector.GemFireRDDFunctions; +import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireJoinRDD; +import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireOuterJoinRDD; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import scala.Option; +import scala.reflect.ClassTag; + +import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*; + +/** + * A Java API wrapper over {@link org.apache.spark.api.java.JavaRDD} to provide GemFire Spark + * Connector functionality. + * + * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p> + */ +public class GemFireJavaRDDFunctions<T> { + + public final GemFireRDDFunctions<T> rddf; + + public GemFireJavaRDDFunctions(JavaRDD<T> rdd) { + this.rddf = new GemFireRDDFunctions<T>(rdd.rdd()); + } + + /** + * Save the non-pair RDD to GemFire key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param func the PairFunction that converts elements of JavaRDD to key/value pairs + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + */ + public <K, V> void saveToGemfire(String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf) { + rddf.saveToGemfire(regionPath, func, connConf); + } + + /** + * Save the non-pair RDD to GemFire key-value store with default GemFireConnector. + * @param regionPath the full path of region that the RDD is stored + * @param func the PairFunction that converts elements of JavaRDD to key/value pairs + */ + public <K, V> void saveToGemfire(String regionPath, PairFunction<T, K, V> func) { + rddf.saveToGemfire(regionPath, func, rddf.defaultConnectionConf()); + } + + /** + * Return an RDD containing all pairs of elements with matching keys in this + * RDD<T> and the GemFire `Region<K, V>`. The join key from RDD + * element is generated by `func(T) => K`, and the key from the GemFire + * region is just the key of the key/value pair. + * + * Each pair of elements of result RDD will be returned as a (t, v2) tuple, + * where t is from this RDD and v is from the GemFire region. + * + * @param regionPath the region path of the GemFire region + * @param func the function that generates region key from RDD element T + * @tparam K the key type of the GemFire region + * @tparam V the value type of the GemFire region + * @return JavaPairRDD<T, V> + */ + public <K, V> JavaPairRDD<T, V> joinGemfireRegion(String regionPath, Function<T, K> func) { + return joinGemfireRegion(regionPath, func, rddf.defaultConnectionConf()); + } + + /** + * Return an RDD containing all pairs of elements with matching keys in this + * RDD<T> and the GemFire `Region<K, V>`. The join key from RDD + * element is generated by `func(T) => K`, and the key from the GemFire + * region is just the key of the key/value pair. + * + * Each pair of elements of result RDD will be returned as a (t, v2) tuple, + * where t is from this RDD and v is from the GemFire region. + * + * @param regionPath the region path of the GemFire region + * @param func the function that generates region key from RDD element T + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @tparam K the key type of the GemFire region + * @tparam V the value type of the GemFire region + * @return JavaPairRDD<T, V> + */ + public <K, V> JavaPairRDD<T, V> joinGemfireRegion( + String regionPath, Function<T, K> func, GemFireConnectionConf connConf) { + GemFireJoinRDD<T, K, V> rdd = rddf.joinGemfireRegion(regionPath, func, connConf); + ClassTag<T> kt = fakeClassTag(); + ClassTag<V> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + + /** + * Perform a left outer join of this RDD<T> and the GemFire `Region<K, V>`. + * The join key from RDD element is generated by `func(T) => K`, and the + * key from region is just the key of the key/value pair. + * + * For each element (t) in this RDD, the resulting RDD will either contain + * all pairs (t, Some(v)) for v in the GemFire region, or the pair + * (t, None) if no element in the GemFire region have key `func(t)`. + * + * @param regionPath the region path of the GemFire region + * @param func the function that generates region key from RDD element T + * @tparam K the key type of the GemFire region + * @tparam V the value type of the GemFire region + * @return JavaPairRDD<T, Option<V>> + */ + public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion(String regionPath, Function<T, K> func) { + return outerJoinGemfireRegion(regionPath, func, rddf.defaultConnectionConf()); + } + + /** + * Perform a left outer join of this RDD<T> and the GemFire `Region<K, V>`. + * The join key from RDD element is generated by `func(T) => K`, and the + * key from region is just the key of the key/value pair. + * + * For each element (t) in this RDD, the resulting RDD will either contain + * all pairs (t, Some(v)) for v in the GemFire region, or the pair + * (t, None) if no element in the GemFire region have key `func(t)`. + * + * @param regionPath the region path of the GemFire region + * @param func the function that generates region key from RDD element T + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @tparam K the key type of the GemFire region + * @tparam V the value type of the GemFire region + * @return JavaPairRDD<T, Option<V>> + */ + public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion( + String regionPath, Function<T, K> func, GemFireConnectionConf connConf) { + GemFireOuterJoinRDD<T, K, V> rdd = rddf.outerJoinGemfireRegion(regionPath, func, connConf); + ClassTag<T> kt = fakeClassTag(); + ClassTag<Option<V>> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java new file mode 100644 index 0000000..eba1ad9 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java @@ -0,0 +1,33 @@ +package io.pivotal.gemfire.spark.connector.javaapi; + +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; +import io.pivotal.gemfire.spark.connector.GemFireSQLContextFunctions; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.SQLContext; + +/** + * Java API wrapper over {@link org.apache.spark.sql.SQLContext} to provide GemFire + * OQL functionality. + * + * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p> + */ +public class GemFireJavaSQLContextFunctions { + + public final GemFireSQLContextFunctions scf; + + public GemFireJavaSQLContextFunctions(SQLContext sqlContext) { + scf = new GemFireSQLContextFunctions(sqlContext); + } + + public <T> DataFrame gemfireOQL(String query) { + DataFrame df = scf.gemfireOQL(query, scf.defaultConnectionConf()); + return df; + } + + public <T> DataFrame gemfireOQL(String query, GemFireConnectionConf connConf) { + DataFrame df = scf.gemfireOQL(query, connConf); + return df; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java new file mode 100644 index 0000000..f822a1f --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java @@ -0,0 +1,71 @@ +package io.pivotal.gemfire.spark.connector.javaapi; + + +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; +import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRegionRDD; +import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRegionRDD$; +import org.apache.spark.SparkContext; +import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*; + +import scala.reflect.ClassTag; +import java.util.Properties; + +/** + * Java API wrapper over {@link org.apache.spark.SparkContext} to provide GemFire + * Connector functionality. + * + * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p> + */ +public class GemFireJavaSparkContextFunctions { + + public final SparkContext sc; + + public GemFireJavaSparkContextFunctions(SparkContext sc) { + this.sc = sc; + } + + /** + * Expose a GemFire region as a JavaPairRDD + * @param regionPath the full path of the region + * @param connConf the GemFireConnectionConf that can be used to access the region + * @param opConf the parameters for this operation, such as preferred partitioner. + */ + public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion( + String regionPath, GemFireConnectionConf connConf, Properties opConf) { + ClassTag<K> kt = fakeClassTag(); + ClassTag<V> vt = fakeClassTag(); + GemFireRegionRDD<K, V> rdd = GemFireRegionRDD$.MODULE$.apply( + sc, regionPath, connConf, propertiesToScalaMap(opConf), kt, vt); + return new GemFireJavaRegionRDD<>(rdd); + } + + /** + * Expose a GemFire region as a JavaPairRDD with default GemFireConnector and no preferred partitioner. + * @param regionPath the full path of the region + */ + public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion(String regionPath) { + GemFireConnectionConf connConf = GemFireConnectionConf.apply(sc.getConf()); + return gemfireRegion(regionPath, connConf, new Properties()); + } + + /** + * Expose a GemFire region as a JavaPairRDD with no preferred partitioner. + * @param regionPath the full path of the region + * @param connConf the GemFireConnectionConf that can be used to access the region + */ + public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion(String regionPath, GemFireConnectionConf connConf) { + return gemfireRegion(regionPath, connConf, new Properties()); + } + + /** + * Expose a GemFire region as a JavaPairRDD with default GemFireConnector. + * @param regionPath the full path of the region + * @param opConf the parameters for this operation, such as preferred partitioner. + */ + public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion(String regionPath, Properties opConf) { + GemFireConnectionConf connConf = GemFireConnectionConf.apply(sc.getConf()); + return gemfireRegion(regionPath, connConf, opConf); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java new file mode 100644 index 0000000..ff11588 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java @@ -0,0 +1,105 @@ +package io.pivotal.gemfire.spark.connector.javaapi; + +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import scala.Tuple2; + +import io.pivotal.gemfire.spark.connector.package$; + +/** + * The main entry point to Spark GemFire Connector Java API. + * + * There are several helpful static factory methods which build useful wrappers + * around Spark Context, Streaming Context and RDD. There are also helper methods + * to convert JavaRDD<Tuple2<K, V>> to JavaPairRDD<K, V>. + */ +public final class GemFireJavaUtil { + + /** constants */ + public static String GemFireLocatorPropKey = package$.MODULE$.GemFireLocatorPropKey(); + // partitioner related keys and values + public static String PreferredPartitionerPropKey = package$.MODULE$.PreferredPartitionerPropKey(); + public static String NumberPartitionsPerServerPropKey = package$.MODULE$.NumberPartitionsPerServerPropKey(); + public static String OnePartitionPartitionerName = package$.MODULE$.OnePartitionPartitionerName(); + public static String ServerSplitsPartitionerName = package$.MODULE$.ServerSplitsPartitionerName(); + + + /** The private constructor is used prevents user from creating instance of this class. */ + private GemFireJavaUtil() { } + + /** + * A static factory method to create a {@link GemFireJavaSparkContextFunctions} based + * on an existing {@link SparkContext} instance. + */ + public static GemFireJavaSparkContextFunctions javaFunctions(SparkContext sc) { + return new GemFireJavaSparkContextFunctions(sc); + } + + /** + * A static factory method to create a {@link GemFireJavaSparkContextFunctions} based + * on an existing {@link JavaSparkContext} instance. + */ + public static GemFireJavaSparkContextFunctions javaFunctions(JavaSparkContext jsc) { + return new GemFireJavaSparkContextFunctions(JavaSparkContext.toSparkContext(jsc)); + } + + /** + * A static factory method to create a {@link GemFireJavaPairRDDFunctions} based on an + * existing {@link org.apache.spark.api.java.JavaPairRDD} instance. + */ + public static <K, V> GemFireJavaPairRDDFunctions<K, V> javaFunctions(JavaPairRDD<K, V> rdd) { + return new GemFireJavaPairRDDFunctions<K, V>(rdd); + } + + /** + * A static factory method to create a {@link GemFireJavaRDDFunctions} based on an + * existing {@link org.apache.spark.api.java.JavaRDD} instance. + */ + public static <T> GemFireJavaRDDFunctions<T> javaFunctions(JavaRDD<T> rdd) { + return new GemFireJavaRDDFunctions<T>(rdd); + } + + /** + * A static factory method to create a {@link GemFireJavaPairDStreamFunctions} based on an + * existing {@link org.apache.spark.streaming.api.java.JavaPairDStream} instance. + */ + public static <K, V> GemFireJavaPairDStreamFunctions<K, V> javaFunctions(JavaPairDStream<K, V> ds) { + return new GemFireJavaPairDStreamFunctions<>(ds); + } + + /** + * A static factory method to create a {@link GemFireJavaDStreamFunctions} based on an + * existing {@link org.apache.spark.streaming.api.java.JavaDStream} instance. + */ + public static <T> GemFireJavaDStreamFunctions<T> javaFunctions(JavaDStream<T> ds) { + return new GemFireJavaDStreamFunctions<>(ds); + } + + /** Convert an instance of {@link org.apache.spark.api.java.JavaRDD}<<Tuple2<K, V>> + * to a {@link org.apache.spark.api.java.JavaPairRDD}<K, V>. + */ + public static <K, V> JavaPairRDD<K, V> toJavaPairRDD(JavaRDD<Tuple2<K, V>> rdd) { + return JavaAPIHelper.toJavaPairRDD(rdd); + } + + /** Convert an instance of {@link org.apache.spark.streaming.api.java.JavaDStream}<<Tuple2<K, V>> + * to a {@link org.apache.spark.streaming.api.java.JavaPairDStream}<K, V>. + */ + public static <K, V> JavaPairDStream<K, V> toJavaPairDStream(JavaDStream<Tuple2<K, V>> ds) { + return JavaAPIHelper.toJavaPairDStream(ds); + } + + /** + * A static factory method to create a {@link GemFireJavaSQLContextFunctions} based + * on an existing {@link SQLContext} instance. + */ + public static GemFireJavaSQLContextFunctions javaFunctions(SQLContext sqlContext) { + return new GemFireJavaSQLContextFunctions(sqlContext); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala new file mode 100644 index 0000000..9c58b78 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala @@ -0,0 +1,51 @@ +package io.pivotal.gemfire.spark.connector + +import com.gemstone.gemfire.cache.execute.ResultCollector +import com.gemstone.gemfire.cache.query.Query +import com.gemstone.gemfire.cache.Region +import io.pivotal.gemfire.spark.connector.internal.RegionMetadata +import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartition + + +trait GemFireConnection { + + /** + * Validate region existence and key/value type constraints, throw RuntimeException + * if region does not exist or key and/or value type do(es) not match. + * @param regionPath the full path of region + */ + def validateRegion[K, V](regionPath: String): Unit + + /** + * Get Region proxy for the given region + * @param regionPath the full path of region + */ + def getRegionProxy[K, V](regionPath: String): Region[K, V] + + /** + * Retrieve region meta data for the given region. + * @param regionPath: the full path of the region + * @return Some[RegionMetadata] if region exists, None otherwise + */ + def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] + + /** + * Retrieve region data for the given region and bucket set + * @param regionPath: the full path of the region + * @param whereClause: the set of bucket IDs + * @param split: GemFire RDD Partition instance + */ + def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GemFireRDDPartition): Iterator[(K, V)] + + def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String): Object + /** + * Create a gemfire OQL query + * @param queryString GemFire OQL query string + */ + def getQuery(queryString: String): Query + + /** Close the connection */ + def close(): Unit +} + + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala new file mode 100644 index 0000000..2b8a0a8 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala @@ -0,0 +1,57 @@ +package io.pivotal.gemfire.spark.connector + +import org.apache.spark.SparkConf +import io.pivotal.gemfire.spark.connector.internal.{DefaultGemFireConnectionManager, LocatorHelper} + +/** + * Stores configuration of a connection to GemFire cluster. It is serializable and can + * be safely sent over network. + * + * @param locators GemFire locator host:port pairs, the default is (localhost,10334) + * @param gemfireProps The initial gemfire properties to be used. + * @param connectionManager GemFireConnectionFactory instance + */ +class GemFireConnectionConf( + val locators: Seq[(String, Int)], + val gemfireProps: Map[String, String] = Map.empty, + connectionManager: GemFireConnectionManager = new DefaultGemFireConnectionManager + ) extends Serializable { + + /** require at least 1 pair of (host,port) */ + require(locators.nonEmpty) + + def getConnection: GemFireConnection = connectionManager.getConnection(this) + +} + +object GemFireConnectionConf { + + /** + * create GemFireConnectionConf object based on locator string and optional GemFireConnectionFactory + * @param locatorStr GemFire cluster locator string + * @param connectionManager GemFireConnection factory + */ + def apply(locatorStr: String, gemfireProps: Map[String, String] = Map.empty) + (implicit connectionManager: GemFireConnectionManager = new DefaultGemFireConnectionManager): GemFireConnectionConf = { + new GemFireConnectionConf(LocatorHelper.parseLocatorsString(locatorStr), gemfireProps, connectionManager) + } + + /** + * create GemFireConnectionConf object based on SparkConf. Note that implicit can + * be used to control what GemFireConnectionFactory instance to use if desired + * @param conf a SparkConf instance + */ + def apply(conf: SparkConf): GemFireConnectionConf = { + val locatorStr = conf.getOption(GemFireLocatorPropKey).getOrElse( + throw new RuntimeException(s"SparkConf does not contain property $GemFireLocatorPropKey")) + // SparkConf only holds properties whose key starts with "spark.", In order to + // put gemfire properties in SparkConf, all gemfire properties are prefixes with + // "spark.gemfire.". This prefix was removed before the properties were put in `gemfireProp` + val prefix = "spark.gemfire." + val gemfireProps = conf.getAll.filter { + case (k, v) => k.startsWith(prefix) && k != GemFireLocatorPropKey + }.map { case (k, v) => (k.substring(prefix.length), v) }.toMap + apply(locatorStr, gemfireProps) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala new file mode 100644 index 0000000..3b42737 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala @@ -0,0 +1,15 @@ +package io.pivotal.gemfire.spark.connector + +/** + * GemFireConnectionFactory provide an common interface that manages GemFire + * connections, and it's serializable. Each factory instance will handle + * connection instance creation and connection pool management. + */ +trait GemFireConnectionManager extends Serializable { + + /** get connection for the given connector */ + def getConnection(connConf: GemFireConnectionConf): GemFireConnection + + /** close the connection */ + def closeConnection(connConf: GemFireConnectionConf): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala new file mode 100644 index 0000000..726b785 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala @@ -0,0 +1,65 @@ +package io.pivotal.gemfire.spark.connector + +import java.io.File +import java.net.URL +import org.apache.commons.httpclient.methods.PostMethod +import org.apache.commons.httpclient.methods.multipart.{FilePart, Part, MultipartRequestEntity} +import org.apache.commons.httpclient.HttpClient +import org.apache.spark.Logging + +object GemFireFunctionDeployer { + def main(args: Array[String]) { + new GemFireFunctionDeployer(new HttpClient()).commandLineRun(args) + } +} + +class GemFireFunctionDeployer(val httpClient:HttpClient) extends Logging { + + def deploy(host: String, port: Int, jarLocation: String): String = + deploy(host + ":" + port, jarLocation) + + def deploy(host: String, port: Int, jar:File): String = + deploy(host + ":" + port, jar) + + def deploy(jmxHostAndPort: String, jarLocation: String): String = + deploy(jmxHostAndPort, jarFileHandle(jarLocation)) + + def deploy(jmxHostAndPort: String, jar: File): String = { + val urlString = constructURLString(jmxHostAndPort) + val filePost: PostMethod = new PostMethod(urlString) + val parts: Array[Part] = new Array[Part](1) + parts(0) = new FilePart("resources", jar) + filePost.setRequestEntity(new MultipartRequestEntity(parts, filePost.getParams)) + val status: Int = httpClient.executeMethod(filePost) + "Deployed Jar with status:" + status + } + + private[connector] def constructURLString(jmxHostAndPort: String) = + "http://" + jmxHostAndPort + "/gemfire/v1/deployed" + + private[connector]def jarFileHandle(jarLocation: String) = { + val f: File = new File(jarLocation) + if (!f.exists()) { + val errorMessage: String = "Invalid jar file:" + f.getAbsolutePath + logInfo(errorMessage) + throw new RuntimeException(errorMessage) + } + f + } + + def commandLineRun(args: Array[String]):Unit = { + val (hostPort: String, jarFile: String) = + if (args.length < 2) { + logInfo("JMX Manager Host and Port (example: localhost:7070):") + val bufferedReader = new java.io.BufferedReader(new java.io.InputStreamReader(System.in)) + val jmxHostAndPort = bufferedReader.readLine() + logInfo("Location of gemfire-functions.jar:") + val functionJarLocation = bufferedReader.readLine() + (jmxHostAndPort, functionJarLocation) + } else { + (args(0), args(1)) + } + val status = deploy(hostPort, jarFile) + logInfo(status) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala new file mode 100644 index 0000000..f1213a1 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala @@ -0,0 +1,13 @@ +package io.pivotal.gemfire.spark.connector + +import com.esotericsoftware.kryo.Kryo +import io.pivotal.gemfire.spark.connector.internal.oql.UndefinedSerializer +import org.apache.spark.serializer.KryoRegistrator +import com.gemstone.gemfire.cache.query.internal.Undefined + +class GemFireKryoRegistrator extends KryoRegistrator{ + + override def registerClasses(kyro: Kryo): Unit = { + kyro.addDefaultSerializer(classOf[Undefined], classOf[UndefinedSerializer]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala new file mode 100644 index 0000000..9fb1c04 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala @@ -0,0 +1,117 @@ +package io.pivotal.gemfire.spark.connector + +import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireOuterJoinRDD, GemFireJoinRDD, GemFirePairRDDWriter} +import org.apache.spark.Logging +import org.apache.spark.api.java.function.Function +import org.apache.spark.rdd.RDD + +/** + * Extra gemFire functions on RDDs of (key, value) pairs through an implicit conversion. + * Import `io.pivotal.gemfire.spark.connector._` at the top of your program to + * use these functions. + */ +class GemFirePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable with Logging { + + /** + * Save the RDD of pairs to GemFire key-value store without any conversion + * @param regionPath the full path of region that the RDD is stored + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + */ + def saveToGemfire(regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): Unit = { + connConf.getConnection.validateRegion[K, V](regionPath) + logInfo(s"Save RDD id=${rdd.id} to region $regionPath") + val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf) + rdd.sparkContext.runJob(rdd, writer.write _) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` + * RDD and the GemFire `Region[K, V2]`. Each pair of elements will be returned + * as a ((k, v), v2) tuple, where (k, v) is in `this` RDD and (k, v2) is in the + * GemFire region. + * + *@param regionPath the region path of the GemFire region + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @tparam K2 the key type of the GemFire region + * @tparam V2 the value type of the GemFire region + * @return RDD[T, V] + */ + def joinGemfireRegion[K2 <: K, V2]( + regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[(K, V), K, V2] = { + new GemFireJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` RDD + * and the GemFire `Region[K2, V2]`. The join key from RDD element is generated by + * `func(K, V) => K2`, and the key from the GemFire region is jus the key of the + * key/value pair. + * + * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, + * where (k, v) is in `this` RDD and (k2, v2) is in the GemFire region. + * + * @param regionPath the region path of the GemFire region + * @param func the function that generates region key from RDD element (K, V) + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @tparam K2 the key type of the GemFire region + * @tparam V2 the value type of the GemFire region + * @return RDD[(K, V), V2] + */ + def joinGemfireRegion[K2, V2]( + regionPath: String, func: ((K, V)) => K2, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[(K, V), K2, V2] = + new GemFireJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf) + + /** This version of joinGemfireRegion(...) is just for Java API. */ + private[connector] def joinGemfireRegion[K2, V2]( + regionPath: String, func: Function[(K, V), K2], connConf: GemFireConnectionConf): GemFireJoinRDD[(K, V), K2, V2] = { + new GemFireJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf) + } + + /** + * Perform a left outer join of `this` RDD and the GemFire `Region[K, V2]`. + * For each element (k, v) in `this` RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair + * ((k, v), None)) if no element in the GemFire region have key k. + * + * @param regionPath the region path of the GemFire region + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @tparam K2 the key type of the GemFire region + * @tparam V2 the value type of the GemFire region + * @return RDD[ (K, V), Option[V] ] + */ + def outerJoinGemfireRegion[K2 <: K, V2]( + regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[(K, V), K, V2] = { + new GemFireOuterJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf) + } + + /** + * Perform a left outer join of `this` RDD and the GemFire `Region[K2, V2]`. + * The join key from RDD element is generated by `func(K, V) => K2`, and the + * key from region is jus the key of the key/value pair. + * + * For each element (k, v) in `this` RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair + * ((k, v), None)) if no element in the GemFire region have key `func(k, v)`. + * + *@param regionPath the region path of the GemFire region + * @param func the function that generates region key from RDD element (K, V) + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @tparam K2 the key type of the GemFire region + * @tparam V2 the value type of the GemFire region + * @return RDD[ (K, V), Option[V] ] + */ + def outerJoinGemfireRegion[K2, V2]( + regionPath: String, func: ((K, V)) => K2, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[(K, V), K2, V2] = { + new GemFireOuterJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf) + } + + /** This version of outerJoinGemfireRegion(...) is just for Java API. */ + private[connector] def outerJoinGemfireRegion[K2, V2]( + regionPath: String, func: Function[(K, V), K2], connConf: GemFireConnectionConf): GemFireOuterJoinRDD[(K, V), K2, V2] = { + new GemFireOuterJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf) + } + + private[connector] def defaultConnectionConf: GemFireConnectionConf = + GemFireConnectionConf(rdd.sparkContext.getConf) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala new file mode 100644 index 0000000..4ffacc5 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala @@ -0,0 +1,93 @@ +package io.pivotal.gemfire.spark.connector + +import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireOuterJoinRDD, GemFireJoinRDD, GemFireRDDWriter} +import org.apache.spark.Logging +import org.apache.spark.api.java.function.{PairFunction, Function} +import org.apache.spark.rdd.RDD + +/** + * Extra gemFire functions on non-Pair RDDs through an implicit conversion. + * Import `io.pivotal.gemfire.spark.connector._` at the top of your program to + * use these functions. + */ +class GemFireRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging { + + /** + * Save the non-pair RDD to GemFire key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param func the function that converts elements of RDD to key/value pairs + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + */ + def saveToGemfire[K, V](regionPath: String, func: T => (K, V), connConf: GemFireConnectionConf = defaultConnectionConf): Unit = { + connConf.getConnection.validateRegion[K, V](regionPath) + logInfo(s"Save RDD id=${rdd.id} to region $regionPath") + val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf) + rdd.sparkContext.runJob(rdd, writer.write(func) _) + } + + /** This version of saveToGemfire(...) is just for Java API. */ + private[connector] def saveToGemfire[K, V]( + regionPath: String, func: PairFunction[T, K, V], connConf: GemFireConnectionConf): Unit = { + saveToGemfire[K, V](regionPath, func.call _, connConf) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` RDD + * and the GemFire `Region[K, V]`. The join key from RDD element is generated by + * `func(T) => K`, and the key from the GemFire region is just the key of the + * key/value pair. + * + * Each pair of elements of result RDD will be returned as a (t, v) tuple, + * where (t) is in `this` RDD and (k, v) is in the GemFire region. + * + * @param regionPath the region path of the GemFire region + * @param func the function that generate region key from RDD element T + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @tparam K the key type of the GemFire region + * @tparam V the value type of the GemFire region + * @return RDD[T, V] + */ + def joinGemfireRegion[K, V](regionPath: String, func: T => K, + connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[T, K, V] = { + new GemFireJoinRDD[T, K, V](rdd, func, regionPath, connConf) + } + + /** This version of joinGemfireRegion(...) is just for Java API. */ + private[connector] def joinGemfireRegion[K, V]( + regionPath: String, func: Function[T, K], connConf: GemFireConnectionConf): GemFireJoinRDD[T, K, V] = { + joinGemfireRegion(regionPath, func.call _, connConf) + } + + /** + * Perform a left outer join of `this` RDD and the GemFire `Region[K, V]`. + * The join key from RDD element is generated by `func(T) => K`, and the + * key from region is just the key of the key/value pair. + * + * For each element (t) in `this` RDD, the resulting RDD will either contain + * all pairs (t, Some(v)) for v in the GemFire region, or the pair + * (t, None) if no element in the GemFire region have key `func(t)` + * + * @param regionPath the region path of the GemFire region + * @param func the function that generate region key from RDD element T + * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster + * @tparam K the key type of the GemFire region + * @tparam V the value type of the GemFire region + * @return RDD[ T, Option[V] ] + */ + def outerJoinGemfireRegion[K, V](regionPath: String, func: T => K, + connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[T, K, V] = { + new GemFireOuterJoinRDD[T, K, V](rdd, func, regionPath, connConf) + } + + /** This version of outerJoinGemfireRegion(...) is just for Java API. */ + private[connector] def outerJoinGemfireRegion[K, V]( + regionPath: String, func: Function[T, K], connConf: GemFireConnectionConf): GemFireOuterJoinRDD[T, K, V] = { + outerJoinGemfireRegion(regionPath, func.call _, connConf) + } + + private[connector] def defaultConnectionConf: GemFireConnectionConf = + GemFireConnectionConf(rdd.sparkContext.getConf) + +} + + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSQLContextFunctions.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSQLContextFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSQLContextFunctions.scala new file mode 100644 index 0000000..2fb128a --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSQLContextFunctions.scala @@ -0,0 +1,26 @@ +package io.pivotal.gemfire.spark.connector + +import io.pivotal.gemfire.spark.connector.internal.oql.{OQLRelation, QueryRDD} +import org.apache.spark.Logging +import org.apache.spark.sql.{DataFrame, SQLContext} + +/** + * Provide GemFire OQL specific functions + */ +class GemFireSQLContextFunctions(@transient sqlContext: SQLContext) extends Serializable with Logging { + + /** + * Expose a GemFire OQL query result as a DataFrame + * @param query the OQL query string. + */ + def gemfireOQL( + query: String, + connConf: GemFireConnectionConf = GemFireConnectionConf(sqlContext.sparkContext.getConf)): DataFrame = { + logInfo(s"OQL query = $query") + val rdd = new QueryRDD[Object](sqlContext.sparkContext, query, connConf) + sqlContext.baseRelationToDataFrame(OQLRelation(rdd)(sqlContext)) + } + + private[connector] def defaultConnectionConf: GemFireConnectionConf = + GemFireConnectionConf(sqlContext.sparkContext.getConf) +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSparkContextFunctions.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSparkContextFunctions.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSparkContextFunctions.scala new file mode 100644 index 0000000..ce13786 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireSparkContextFunctions.scala @@ -0,0 +1,23 @@ +package io.pivotal.gemfire.spark.connector + +import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRegionRDD +import org.apache.spark.SparkContext + +import scala.reflect.ClassTag + +/** Provides GemFire specific methods on `SparkContext` */ +class GemFireSparkContextFunctions(@transient sc: SparkContext) extends Serializable { + + /** + * Expose a GemFire region as a GemFireRDD + * @param regionPath the full path of the region + * @param connConf the GemFireConnectionConf that can be used to access the region + * @param opConf use this to specify preferred partitioner + * and its parameters. The implementation will use it if it's applicable + */ + def gemfireRegion[K: ClassTag, V: ClassTag] ( + regionPath: String, connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf), + opConf: Map[String, String] = Map.empty): GemFireRegionRDD[K, V] = + GemFireRegionRDD[K, V](sc, regionPath, connConf, opConf) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala new file mode 100644 index 0000000..e31186b --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnection.scala @@ -0,0 +1,126 @@ +package io.pivotal.gemfire.spark.connector.internal + +import com.gemstone.gemfire.cache.client.{ClientCache, ClientCacheFactory, ClientRegionShortcut} +import com.gemstone.gemfire.cache.execute.{FunctionException, FunctionService} +import com.gemstone.gemfire.cache.query.Query +import com.gemstone.gemfire.cache.{Region, RegionService} +import com.gemstone.gemfire.internal.cache.execute.InternalExecution +import io.pivotal.gemfire.spark.connector.internal.oql.QueryResultCollector +import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartition +import org.apache.spark.Logging +import io.pivotal.gemfire.spark.connector.GemFireConnection +import io.pivotal.gemfire.spark.connector.internal.gemfirefunctions._ +import java.util.{Set => JSet, List => JList } + +/** + * Default GemFireConnection implementation. The instance of this should be + * created by DefaultGemFireConnectionFactory + * @param locators pairs of host/port of locators + * @param gemFireProps The initial gemfire properties to be used. + */ +private[connector] class DefaultGemFireConnection ( + locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) + extends GemFireConnection with Logging { + + private val clientCache = initClientCache() + /** a lock object only used by getRegionProxy...() */ + private val regionLock = new Object + + /** Register GemFire functions to the GemFire cluster */ + FunctionService.registerFunction(RetrieveRegionMetadataFunction.getInstance()) + FunctionService.registerFunction(RetrieveRegionFunction.getInstance()) + + private def initClientCache() : ClientCache = { + try { + import io.pivotal.gemfire.spark.connector.map2Properties + logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""") + val ccf = new ClientCacheFactory(gemFireProps) + locators.foreach { case (host, port) => ccf.addPoolLocator(host, port) } + ccf.create() + } catch { + case e: Exception => + logError(s"""Failed to init ClientCache, locators=${locators.mkString(",")}, Error: $e""") + throw new RuntimeException(e) + } + } + + /** close the clientCache */ + override def close(): Unit = + if (! clientCache.isClosed) clientCache.close() + + /** ----------------------------------------- */ + /** implementation of GemFireConnection trait */ + /** ----------------------------------------- */ + + override def getQuery(queryString: String): Query = + clientCache.asInstanceOf[RegionService].getQueryService.newQuery(queryString) + + override def validateRegion[K, V](regionPath: String): Unit = { + val md = getRegionMetadata[K, V](regionPath) + if (! md.isDefined) throw new RuntimeException(s"The region named $regionPath was not found") + } + + def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] = { + import scala.collection.JavaConversions.setAsJavaSet + val region = getRegionProxy[K, V](regionPath) + val set0: JSet[Integer] = Set[Integer](0) + val exec = FunctionService.onRegion(region).asInstanceOf[InternalExecution].withBucketFilter(set0) + exec.setWaitOnExceptionFlag(true) + try { + val collector = exec.execute(RetrieveRegionMetadataFunction.ID) + val r = collector.getResult.asInstanceOf[JList[RegionMetadata]] + logDebug(r.get(0).toString) + Some(r.get(0)) + } catch { + case e: FunctionException => + if (e.getMessage.contains(s"The region named /$regionPath was not found")) None + else throw e + } + } + + def getRegionProxy[K, V](regionPath: String): Region[K, V] = { + val region1: Region[K, V] = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]] + if (region1 != null) region1 + else regionLock.synchronized { + val region2 = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]] + if (region2 != null) region2 + else clientCache.createClientRegionFactory[K, V](ClientRegionShortcut.PROXY).create(regionPath) + } + } + + override def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GemFireRDDPartition): Iterator[(K, V)] = { + val region = getRegionProxy[K, V](regionPath) + val desc = s"""RDD($regionPath, "${whereClause.getOrElse("")}", ${split.index})""" + val args : Array[String] = Array[String](whereClause.getOrElse(""), desc) + val collector = new StructStreamingResultCollector(desc) + // RetrieveRegionResultCollector[(K, V)] + import scala.collection.JavaConversions.setAsJavaSet + val exec = FunctionService.onRegion(region).withArgs(args).withCollector(collector).asInstanceOf[InternalExecution] + .withBucketFilter(split.bucketSet.map(Integer.valueOf)) + exec.setWaitOnExceptionFlag(true) + exec.execute(RetrieveRegionFunction.ID) + collector.getResult.map{objs: Array[Object] => (objs(0).asInstanceOf[K], objs(1).asInstanceOf[V])} + } + + override def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String) = { + import scala.collection.JavaConversions.setAsJavaSet + FunctionService.registerFunction(QueryFunction.getInstance()) + val collector = new QueryResultCollector + val region = getRegionProxy(regionPath) + val args: Array[String] = Array[String](queryString, bucketSet.toString) + val exec = FunctionService.onRegion(region).withCollector(collector).asInstanceOf[InternalExecution] + .withBucketFilter(bucketSet.map(Integer.valueOf)) + .withArgs(args) + exec.execute(QueryFunction.ID) + collector.getResult + } +} + + +/** The purpose of this class is making unit test DefaultGemFireConnectionManager easier */ +class DefaultGemFireConnectionFactory { + + def newConnection(locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) = + new DefaultGemFireConnection(locators, gemFireProps) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala new file mode 100644 index 0000000..0463340 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManager.scala @@ -0,0 +1,51 @@ +package io.pivotal.gemfire.spark.connector.internal + +import io.pivotal.gemfire.spark.connector.{GemFireConnection, GemFireConnectionConf, GemFireConnectionManager} + +import scala.collection.mutable + +/** + * Default implementation of GemFireConnectionFactory + */ +class DefaultGemFireConnectionManager extends GemFireConnectionManager { + + def getConnection(connConf: GemFireConnectionConf): GemFireConnection = + DefaultGemFireConnectionManager.getConnection(connConf) + + def closeConnection(connConf: GemFireConnectionConf): Unit = + DefaultGemFireConnectionManager.closeConnection(connConf) + +} + +object DefaultGemFireConnectionManager { + + /** connection cache, keyed by host:port pair */ + private[connector] val connections = mutable.Map[(String, Int), GemFireConnection]() + + /** + * use locator host:port pair to lookup connection. create new connection and add it + * to `connections` if it does not exists. + */ + def getConnection(connConf: GemFireConnectionConf) + (implicit factory: DefaultGemFireConnectionFactory = new DefaultGemFireConnectionFactory): GemFireConnection = { + val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null) + if (conns.nonEmpty) conns(0) + else connections.synchronized { + val conn = factory.newConnection(connConf.locators, connConf.gemfireProps) + connConf.locators.foreach(pair => connections += (pair -> conn)) + conn + } + } + + /** + * Close the connection and remove it from connection cache. + * Note: multiple entries may share the same connection, all those entries are removed. + */ + def closeConnection(connConf: GemFireConnectionConf): Unit = { + val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null) + if (conns.nonEmpty) connections.synchronized { + conns(0).close() + connections.retain((k,v) => v != conns(0)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala new file mode 100644 index 0000000..550e0bc --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/LocatorHelper.scala @@ -0,0 +1,30 @@ +package io.pivotal.gemfire.spark.connector.internal + +import scala.util.{Failure, Success, Try} + + +object LocatorHelper { + + /** valid locator strings are: host[port] and host:port */ + final val LocatorPattern1 = """([\w-_]+(\.[\w-_]+)*)\[([0-9]{2,5})\]""".r + final val LocatorPattern2 = """([\w-_]+(\.[\w-_]+)*):([0-9]{2,5})""".r + + /** convert single locator string to Try[(host, port)] */ + def locatorStr2HostPortPair(locatorStr: String): Try[(String, Int)] = + locatorStr match { + case LocatorPattern1(host, domain, port) => Success((host, port.toInt)) + case LocatorPattern2(host, domain, port) => Success((host, port.toInt)) + case _ => Failure(new Exception(s"invalid locator: $locatorStr")) + } + + /** + * Parse locator strings and returns Seq of (hostname, port) pair. + * Valid locator string are one or more "host[port]" and/or "host:port" + * separated by `,`. For example: + * host1.mydomain.com[8888],host2.mydomain.com[8889] + * host1.mydomain.com:8888,host2.mydomain.com:8889 + */ + def parseLocatorsString(locatorsStr: String): Seq[(String, Int)] = + locatorsStr.split(",").map(locatorStr2HostPortPair).map(_.get) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultCollector.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultCollector.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultCollector.scala new file mode 100644 index 0000000..5d95cec --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultCollector.scala @@ -0,0 +1,136 @@ +package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions + +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, BlockingQueue} +import com.gemstone.gemfire.DataSerializer +import com.gemstone.gemfire.cache.execute.ResultCollector +import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl +import com.gemstone.gemfire.cache.query.types.StructType +import com.gemstone.gemfire.distributed.DistributedMember +import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput} +import io.pivotal.gemfire.spark.connector.internal.gemfirefunctions.StructStreamingResultSender. + {TYPE_CHUNK, DATA_CHUNK, ERROR_CHUNK, SER_DATA, UNSER_DATA, BYTEARR_DATA} + +/** + * StructStreamingResultCollector and StructStreamingResultSender are paired + * to transfer result of list of `com.gemstone.gemfire.cache.query.Struct` + * from GemFire server to Spark Connector (the client of GemFire server) + * in streaming, i.e., while sender sending the result, the collector can + * start processing the arrived result without waiting for full result to + * become available. + */ +class StructStreamingResultCollector(desc: String) extends ResultCollector[Array[Byte], Iterator[Array[Object]]] { + + /** the constructor that provide default `desc` (description) */ + def this() = this("StructStreamingResultCollector") + + private val queue: BlockingQueue[Array[Byte]] = new LinkedBlockingQueue[Array[Byte]]() + var structType: StructType = null + + /** ------------------------------------------ */ + /** ResultCollector interface implementations */ + /** ------------------------------------------ */ + + override def getResult: Iterator[Array[Object]] = resultIterator + + override def getResult(timeout: Long, unit: TimeUnit): Iterator[Array[Object]] = + throw new UnsupportedOperationException() + + /** addResult add non-empty byte array (chunk) to the queue */ + override def addResult(memberID: DistributedMember, chunk: Array[Byte]): Unit = + if (chunk != null && chunk.size > 1) { + this.queue.add(chunk) + // println(s"""$desc receive from $memberID: ${chunk.mkString(" ")}""") + } + + /** endResults add special `Array.empty` to the queue as marker of end of data */ + override def endResults(): Unit = this.queue.add(Array.empty) + + override def clearResults(): Unit = this.queue.clear() + + /** ------------------------------------------ */ + /** Internal methods */ + /** ------------------------------------------ */ + + def getResultType: StructType = { + // trigger lazy resultIterator initialization if necessary + if (structType == null) resultIterator.hasNext + structType + } + + /** + * Note: The data is sent in chunks, and each chunk contains multiple + * records. So the result iterator is an iterator (I) of iterator (II), + * i.e., go through each chunk (iterator (I)), and for each chunk, go + * through each record (iterator (II)). + */ + private lazy val resultIterator = new Iterator[Array[Object]] { + + private var currentIterator: Iterator[Array[Object]] = nextIterator() + + override def hasNext: Boolean = { + if (!currentIterator.hasNext && currentIterator != Iterator.empty) currentIterator = nextIterator() + currentIterator.hasNext + } + + /** Note: make sure call `hasNext` first to adjust `currentIterator` */ + override def next(): Array[Object] = currentIterator.next() + } + + /** get the iterator for the next chunk of data */ + private def nextIterator(): Iterator[Array[Object]] = { + val chunk: Array[Byte] = queue.take + if (chunk.isEmpty) { + Iterator.empty + } else { + val input = new ByteArrayDataInput() + input.initialize(chunk, Version.CURRENT) + val chunkType = input.readByte() + // println(s"chunk type $chunkType") + chunkType match { + case TYPE_CHUNK => + if (structType == null) + structType = DataSerializer.readObject(input).asInstanceOf[StructTypeImpl] + nextIterator() + case DATA_CHUNK => + // require(structType != null && structType.getFieldNames.length > 0) + if (structType == null) structType = StructStreamingResultSender.KeyValueType + chunkToIterator(input, structType.getFieldNames.length) + case ERROR_CHUNK => + val error = DataSerializer.readObject(input).asInstanceOf[Exception] + errorPropagationIterator(error) + case _ => throw new RuntimeException(s"unknown chunk type: $chunkType") + } + } + } + + /** create a iterator that propagate sender's exception */ + private def errorPropagationIterator(ex: Exception) = new Iterator[Array[Object]] { + val re = new RuntimeException(ex) + override def hasNext: Boolean = throw re + override def next(): Array[Object] = throw re + } + + /** convert a chunk of data to an iterator */ + private def chunkToIterator(input: ByteArrayDataInput, rowSize: Int) = new Iterator[Array[Object]] { + override def hasNext: Boolean = input.available() > 0 + val tmpInput = new ByteArrayDataInput() + override def next(): Array[Object] = + (0 until rowSize).map { ignore => + val b = input.readByte() + b match { + case SER_DATA => + val arr: Array[Byte] = DataSerializer.readByteArray(input) + tmpInput.initialize(arr, Version.CURRENT) + DataSerializer.readObject(tmpInput).asInstanceOf[Object] + case UNSER_DATA => + DataSerializer.readObject(input).asInstanceOf[Object] + case BYTEARR_DATA => + DataSerializer.readByteArray(input).asInstanceOf[Object] + case _ => + throw new RuntimeException(s"unknown data type $b") + } + }.toArray + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParser.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParser.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParser.scala new file mode 100644 index 0000000..dd4a40b --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParser.scala @@ -0,0 +1,42 @@ +package io.pivotal.gemfire.spark.connector.internal.oql + +import scala.util.parsing.combinator.RegexParsers + +class QueryParser extends RegexParsers { + + def query: Parser[String] = opt(rep(IMPORT ~ PACKAGE)) ~> select ~> opt(distinct) ~> projection ~> from ~> regions <~ opt(where ~ filter) ^^ { + _.toString + } + + val IMPORT: Parser[String] = "[Ii][Mm][Pp][Oo][Rr][Tt]".r + + val select: Parser[String] = "[Ss][Ee][Ll][Ee][Cc][Tt]".r + + val distinct: Parser[String] = "[Dd][Ii][Ss][Tt][Ii][Nn][Cc][Tt]".r + + val from: Parser[String] = "[Ff][Rr][Oo][Mm]".r + + val where: Parser[String] = "[Ww][Hh][Ee][Rr][Ee]".r + + def PACKAGE: Parser[String] = """[\w.]+""".r + + def projection: Parser[String] = "*" | repsep("""["\w]+[.\w"]*""".r, ",") ^^ { + _.toString + } + + def regions: Parser[String] = repsep(region <~ opt(alias), ",") ^^ { + _.toString + } + + def region: Parser[String] = """/[\w.]+[/[\w.]+]*""".r | """[\w]+[.\w]*""".r + + def alias: Parser[String] = not(where) ~> """[\w]+""".r + + def filter: Parser[String] = """[\w.]+[[\s]+[<>=.'\w]+]*""".r +} + +object QueryParser extends QueryParser { + + def parseOQL(expression: String) = parseAll(query, expression) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryRDD.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryRDD.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryRDD.scala new file mode 100644 index 0000000..7c0d6a9 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryRDD.scala @@ -0,0 +1,67 @@ +package io.pivotal.gemfire.spark.connector.internal.oql + +import io.pivotal.gemfire.spark.connector.GemFireConnectionConf +import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireRDDPartition, ServerSplitsPartitioner} +import org.apache.spark.rdd.RDD +import org.apache.spark.{TaskContext, SparkContext, Partition} +import scala.reflect.ClassTag + +/** + * An RDD that provides the functionality that read the OQL query result + * + * @param sc The SparkContext this RDD is associated with + * @param queryString The OQL query string + * @param connConf The GemFireConnectionConf that provide the GemFireConnection + */ +class QueryRDD[T](@transient sc: SparkContext, + queryString: String, + connConf: GemFireConnectionConf) + (implicit ct: ClassTag[T]) + extends RDD[T](sc, Seq.empty) { + + override def getPartitions: Array[Partition] = { + val conn = connConf.getConnection + val regionPath = getRegionPathFromQuery(queryString) + val md = conn.getRegionMetadata(regionPath) + md match { + case Some(metadata) => + if (metadata.isPartitioned) { + val splits = ServerSplitsPartitioner.partitions(conn, metadata, Map.empty) + logInfo(s"QueryRDD.getPartitions():isPartitioned=true, partitions=${splits.mkString(",")}") + splits + } + else { + logInfo(s"QueryRDD.getPartitions():isPartitioned=false") + Array[Partition](new GemFireRDDPartition(0, Set.empty)) + + } + case None => throw new RuntimeException(s"Region $regionPath metadata was not found.") + } + } + + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + val buckets = split.asInstanceOf[GemFireRDDPartition].bucketSet + val regionPath = getRegionPathFromQuery(queryString) + val result = connConf.getConnection.executeQuery(regionPath, buckets, queryString) + result match { + case it: Iterator[T] => + logInfo(s"QueryRDD.compute():query=$queryString, partition=$split") + it + case _ => + throw new RuntimeException("Unexpected OQL result: " + result.toString) + } + } + + private def getRegionPathFromQuery(queryString: String): String = { + val r = QueryParser.parseOQL(queryString).get + r match { + case r: String => + val start = r.indexOf("/") + 1 + var end = r.indexOf(")") + if (r.indexOf(".") > 0) end = math.min(r.indexOf("."), end) + if (r.indexOf(",") > 0) end = math.min(r.indexOf(","), end) + val regionPath = r.substring(start, end) + regionPath + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryResultCollector.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryResultCollector.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryResultCollector.scala new file mode 100644 index 0000000..6bbad26 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryResultCollector.scala @@ -0,0 +1,53 @@ +package io.pivotal.gemfire.spark.connector.internal.oql + +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import com.gemstone.gemfire.DataSerializer +import com.gemstone.gemfire.cache.execute.ResultCollector +import com.gemstone.gemfire.distributed.DistributedMember +import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput} + +class QueryResultCollector extends ResultCollector[Array[Byte], Iterator[Object]]{ + + private val queue = new LinkedBlockingDeque[Array[Byte]]() + + override def getResult = resultIterator + + override def getResult(timeout: Long, unit: TimeUnit) = throw new UnsupportedOperationException + + override def addResult(memberID: DistributedMember , chunk: Array[Byte]) = + if (chunk != null && chunk.size > 0) { + queue.add(chunk) + } + + override def endResults = queue.add(Array.empty) + + + override def clearResults = queue.clear + + private lazy val resultIterator = new Iterator[Object] { + private var currentIterator = nextIterator + def hasNext = { + if (!currentIterator.hasNext && currentIterator != Iterator.empty) + currentIterator = nextIterator + currentIterator.hasNext + } + def next = currentIterator.next + } + + private def nextIterator: Iterator[Object] = { + val chunk = queue.take + if (chunk.isEmpty) { + Iterator.empty + } + else { + val input = new ByteArrayDataInput + input.initialize(chunk, Version.CURRENT) + new Iterator[Object] { + override def hasNext: Boolean = input.available() > 0 + override def next: Object = DataSerializer.readObject(input).asInstanceOf[Object] + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RDDConverter.scala ---------------------------------------------------------------------- diff --git a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RDDConverter.scala b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RDDConverter.scala new file mode 100644 index 0000000..e4f4152 --- /dev/null +++ b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RDDConverter.scala @@ -0,0 +1,24 @@ +package io.pivotal.gemfire.spark.connector.internal.oql + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.sources.{BaseRelation, TableScan} + +import scala.tools.nsc.backend.icode.analysis.DataFlowAnalysis + +case class OQLRelation[T](queryRDD: QueryRDD[T])(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan { + + override def schema: StructType = new SchemaBuilder(queryRDD).toSparkSchema() + + override def buildScan(): RDD[Row] = new RowBuilder(queryRDD).toRowRDD() + +} + +object RDDConverter { + + def queryRDDToDataFrame[T](queryRDD: QueryRDD[T], sqlContext: SQLContext): DataFrame = { + sqlContext.baseRelationToDataFrame(OQLRelation(queryRDD)(sqlContext)) + } +} \ No newline at end of file