http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java new file mode 100644 index 0000000..3278a5b --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.javaapi; + +import io.pivotal.geode.spark.connector.GeodeConnectionConf; +import io.pivotal.geode.spark.connector.GeodePairRDDFunctions; +import io.pivotal.geode.spark.connector.internal.rdd.GeodeJoinRDD; +import io.pivotal.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function; +import scala.Option; +import scala.Tuple2; +import scala.reflect.ClassTag; + +import java.util.Properties; + +import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*; + +/** + * A Java API wrapper over {@link org.apache.spark.api.java.JavaPairRDD} to provide Geode Spark + * Connector functionality. + * + * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> + */ +public class GeodeJavaPairRDDFunctions<K, V> { + + public final GeodePairRDDFunctions<K, V> rddf; + + public GeodeJavaPairRDDFunctions(JavaPairRDD<K, V> rdd) { + this.rddf = new GeodePairRDDFunctions<K, V>(rdd.rdd()); + } + + /** + * Save the pair RDD to Geode key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param opConf the parameters for this operation + */ + public void saveToGeode(String regionPath, GeodeConnectionConf connConf, Properties opConf) { + rddf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf)); + } + + /** + * Save the pair RDD to Geode key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param opConf the parameters for this operation + */ + public void saveToGeode(String regionPath, Properties opConf) { + rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf)); + } + + /** + * Save the pair RDD to Geode key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + */ + public void saveToGeode(String regionPath, GeodeConnectionConf connConf) { + rddf.saveToGeode(regionPath, connConf, emptyStrStrMap()); + } + + /** + * Save the pair RDD to Geode key-value store with the default GeodeConnector. + * @param regionPath the full path of region that the RDD is stored + */ + public void saveToGeode(String regionPath) { + rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), emptyStrStrMap()); + } + + /** + * Return an JavaPairRDD containing all pairs of elements with matching keys in + * this RDD<K, V> and the Geode `Region<K, V2>`. Each pair of elements + * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and + * (k, v2) is in the Geode region. + * + * @param regionPath the region path of the Geode region + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<<K, V>, V2> + */ + public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(String regionPath) { + return joinGeodeRegion(regionPath, rddf.defaultConnectionConf()); + } + + /** + * Return an JavaPairRDD containing all pairs of elements with matching keys in + * this RDD<K, V> and the Geode `Region<K, V2>`. Each pair of elements + * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and + * (k, v2) is in the Geode region. + * + * @param regionPath the region path of the Geode region + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<<K, V>, V2> + */ + public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion( + String regionPath, GeodeConnectionConf connConf) { + GeodeJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.joinGeodeRegion(regionPath, connConf); + ClassTag<Tuple2<K, V>> kt = fakeClassTag(); + ClassTag<V2> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + + /** + * Return an RDD containing all pairs of elements with matching keys in this + * RDD<K, V> and the Geode `Region<K2, V2>`. The join key from RDD + * element is generated by `func(K, V) => K2`, and the key from the Geode + * region is just the key of the key/value pair. + * + * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, + * where (k, v) is in this RDD and (k2, v2) is in the Geode region. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element (K, V) + * @param <K2> the key type of the Geode region + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<Tuple2<K, V>, V2> + */ + public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion( + String regionPath, Function<Tuple2<K, V>, K2> func) { + return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf()); + } + + /** + * Return an RDD containing all pairs of elements with matching keys in this + * RDD<K, V> and the Geode `Region<K2, V2>`. The join key from RDD + * element is generated by `func(K, V) => K2`, and the key from the Geode + * region is just the key of the key/value pair. + * + * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, + * where (k, v) is in this RDD and (k2, v2) is in the Geode region. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element (K, V) + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param <K2> the key type of the Geode region + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<Tuple2<K, V>, V2> + */ + public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion( + String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf connConf) { + GeodeJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.joinGeodeRegion(regionPath, func, connConf); + ClassTag<Tuple2<K, V>> kt = fakeClassTag(); + ClassTag<V2> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + + /** + * Perform a left outer join of this RDD<K, V> and the Geode `Region<K, V2>`. + * For each element (k, v) in this RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair + * ((k, v), None)) if no element in the Geode region have key k. + * + * @param regionPath the region path of the Geode region + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<Tuple2<K, V>, Option<V>> + */ + public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(String regionPath) { + return outerJoinGeodeRegion(regionPath, rddf.defaultConnectionConf()); + } + + /** + * Perform a left outer join of this RDD<K, V> and the Geode `Region<K, V2>`. + * For each element (k, v) in this RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair + * ((k, v), None)) if no element in the Geode region have key k. + * + * @param regionPath the region path of the Geode region + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<Tuple2<K, V>, Option<V>> + */ + public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion( + String regionPath, GeodeConnectionConf connConf) { + GeodeOuterJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.outerJoinGeodeRegion(regionPath, connConf); + ClassTag<Tuple2<K, V>> kt = fakeClassTag(); + ClassTag<Option<V2>> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + + /** + * Perform a left outer join of this RDD<K, V> and the Geode `Region<K2, V2>`. + * The join key from RDD element is generated by `func(K, V) => K2`, and the + * key from region is just the key of the key/value pair. + * + * For each element (k, v) in `this` RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair + * ((k, v), None)) if no element in the Geode region have key `func(k, v)`. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element (K, V) + * @param <K2> the key type of the Geode region + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<Tuple2<K, V>, Option<V>> + */ + public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion( + String regionPath, Function<Tuple2<K, V>, K2> func) { + return outerJoinGeodeRegion(regionPath, func, rddf.defaultConnectionConf()); + } + + /** + * Perform a left outer join of this RDD<K, V> and the Geode `Region<K2, V2>`. + * The join key from RDD element is generated by `func(K, V) => K2`, and the + * key from region is just the key of the key/value pair. + * + * For each element (k, v) in `this` RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair + * ((k, v), None)) if no element in the Geode region have key `func(k, v)`. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element (K, V) + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param <K2> the key type of the Geode region + * @param <V2> the value type of the Geode region + * @return JavaPairRDD<Tuple2<K, V>, Option<V>> + */ + public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion( + String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf connConf) { + GeodeOuterJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.outerJoinGeodeRegion(regionPath, func, connConf); + ClassTag<Tuple2<K, V>> kt = fakeClassTag(); + ClassTag<Option<V2>> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java new file mode 100644 index 0000000..e4f6f36 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.javaapi; + +import io.pivotal.geode.spark.connector.GeodeConnectionConf; +import io.pivotal.geode.spark.connector.GeodeRDDFunctions; +import io.pivotal.geode.spark.connector.internal.rdd.GeodeJoinRDD; +import io.pivotal.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import scala.Option; +import scala.reflect.ClassTag; + +import java.util.Properties; + +import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*; + +/** + * A Java API wrapper over {@link org.apache.spark.api.java.JavaRDD} to provide Geode Spark + * Connector functionality. + * + * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> + */ +public class GeodeJavaRDDFunctions<T> { + + public final GeodeRDDFunctions<T> rddf; + + public GeodeJavaRDDFunctions(JavaRDD<T> rdd) { + this.rddf = new GeodeRDDFunctions<T>(rdd.rdd()); + } + + /** + * Save the non-pair RDD to Geode key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param func the PairFunction that converts elements of JavaRDD to key/value pairs + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param opConf the parameters for this operation + */ + public <K, V> void saveToGeode( + String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf, Properties opConf) { + rddf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf)); + } + + /** + * Save the non-pair RDD to Geode key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param func the PairFunction that converts elements of JavaRDD to key/value pairs + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + */ + public <K, V> void saveToGeode( + String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf connConf) { + rddf.saveToGeode(regionPath, func, connConf, emptyStrStrMap()); + } + + /** + * Save the non-pair RDD to Geode key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param func the PairFunction that converts elements of JavaRDD to key/value pairs + * @param opConf the parameters for this operation + */ + public <K, V> void saveToGeode( + String regionPath, PairFunction<T, K, V> func, Properties opConf) { + rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf)); + } + + /** + * Save the non-pair RDD to Geode key-value store with default GeodeConnector. + * @param regionPath the full path of region that the RDD is stored + * @param func the PairFunction that converts elements of JavaRDD to key/value pairs + */ + public <K, V> void saveToGeode(String regionPath, PairFunction<T, K, V> func) { + rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), emptyStrStrMap()); + } + + /** + * Return an RDD containing all pairs of elements with matching keys in this + * RDD<T> and the Geode `Region<K, V>`. The join key from RDD + * element is generated by `func(T) => K`, and the key from the Geode + * region is just the key of the key/value pair. + * + * Each pair of elements of result RDD will be returned as a (t, v2) tuple, + * where t is from this RDD and v is from the Geode region. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element T + * @param <K> the key type of the Geode region + * @param <V> the value type of the Geode region + * @return JavaPairRDD<T, V> + */ + public <K, V> JavaPairRDD<T, V> joinGeodeRegion(String regionPath, Function<T, K> func) { + return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf()); + } + + /** + * Return an RDD containing all pairs of elements with matching keys in this + * RDD<T> and the Geode `Region<K, V>`. The join key from RDD + * element is generated by `func(T) => K`, and the key from the Geode + * region is just the key of the key/value pair. + * + * Each pair of elements of result RDD will be returned as a (t, v2) tuple, + * where t is from this RDD and v is from the Geode region. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element T + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param <K> the key type of the Geode region + * @param <V> the value type of the Geode region + * @return JavaPairRDD<T, V> + */ + public <K, V> JavaPairRDD<T, V> joinGeodeRegion( + String regionPath, Function<T, K> func, GeodeConnectionConf connConf) { + GeodeJoinRDD<T, K, V> rdd = rddf.joinGeodeRegion(regionPath, func, connConf); + ClassTag<T> kt = fakeClassTag(); + ClassTag<V> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + + /** + * Perform a left outer join of this RDD<T> and the Geode `Region<K, V>`. + * The join key from RDD element is generated by `func(T) => K`, and the + * key from region is just the key of the key/value pair. + * + * For each element (t) in this RDD, the resulting RDD will either contain + * all pairs (t, Some(v)) for v in the Geode region, or the pair + * (t, None) if no element in the Geode region have key `func(t)`. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element T + * @param <K> the key type of the Geode region + * @param <V> the value type of the Geode region + * @return JavaPairRDD<T, Option<V>> + */ + public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion(String regionPath, Function<T, K> func) { + return outerJoinGeodeRegion(regionPath, func, rddf.defaultConnectionConf()); + } + + /** + * Perform a left outer join of this RDD<T> and the Geode `Region<K, V>`. + * The join key from RDD element is generated by `func(T) => K`, and the + * key from region is just the key of the key/value pair. + * + * For each element (t) in this RDD, the resulting RDD will either contain + * all pairs (t, Some(v)) for v in the Geode region, or the pair + * (t, None) if no element in the Geode region have key `func(t)`. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element T + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param <K> the key type of the Geode region + * @param <V> the value type of the Geode region + * @return JavaPairRDD<T, Option<V>> + */ + public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion( + String regionPath, Function<T, K> func, GeodeConnectionConf connConf) { + GeodeOuterJoinRDD<T, K, V> rdd = rddf.outerJoinGeodeRegion(regionPath, func, connConf); + ClassTag<T> kt = fakeClassTag(); + ClassTag<Option<V>> vt = fakeClassTag(); + return new JavaPairRDD<>(rdd, kt, vt); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java new file mode 100644 index 0000000..3471bf90 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.javaapi; + +import io.pivotal.geode.spark.connector.GeodeConnectionConf; +import io.pivotal.geode.spark.connector.GeodeSQLContextFunctions; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.SQLContext; + +/** + * Java API wrapper over {@link org.apache.spark.sql.SQLContext} to provide Geode + * OQL functionality. + * + * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> + */ +public class GeodeJavaSQLContextFunctions { + + public final GeodeSQLContextFunctions scf; + + public GeodeJavaSQLContextFunctions(SQLContext sqlContext) { + scf = new GeodeSQLContextFunctions(sqlContext); + } + + public <T> DataFrame geodeOQL(String query) { + DataFrame df = scf.geodeOQL(query, scf.defaultConnectionConf()); + return df; + } + + public <T> DataFrame geodeOQL(String query, GeodeConnectionConf connConf) { + DataFrame df = scf.geodeOQL(query, connConf); + return df; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java new file mode 100644 index 0000000..ce6b1ff --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.javaapi; + + +import io.pivotal.geode.spark.connector.GeodeConnectionConf; +import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD; +import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD$; +import org.apache.spark.SparkContext; +import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*; + +import scala.reflect.ClassTag; +import java.util.Properties; + +/** + * Java API wrapper over {@link org.apache.spark.SparkContext} to provide Geode + * Connector functionality. + * + * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link + * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p> + */ +public class GeodeJavaSparkContextFunctions { + + public final SparkContext sc; + + public GeodeJavaSparkContextFunctions(SparkContext sc) { + this.sc = sc; + } + + /** + * Expose a Geode region as a JavaPairRDD + * @param regionPath the full path of the region + * @param connConf the GeodeConnectionConf that can be used to access the region + * @param opConf the parameters for this operation, such as preferred partitioner. + */ + public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion( + String regionPath, GeodeConnectionConf connConf, Properties opConf) { + ClassTag<K> kt = fakeClassTag(); + ClassTag<V> vt = fakeClassTag(); + GeodeRegionRDD<K, V> rdd = GeodeRegionRDD$.MODULE$.apply( + sc, regionPath, connConf, propertiesToScalaMap(opConf), kt, vt); + return new GeodeJavaRegionRDD<>(rdd); + } + + /** + * Expose a Geode region as a JavaPairRDD with default GeodeConnector and no preferred partitioner. + * @param regionPath the full path of the region + */ + public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath) { + GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf()); + return geodeRegion(regionPath, connConf, new Properties()); + } + + /** + * Expose a Geode region as a JavaPairRDD with no preferred partitioner. + * @param regionPath the full path of the region + * @param connConf the GeodeConnectionConf that can be used to access the region + */ + public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, GeodeConnectionConf connConf) { + return geodeRegion(regionPath, connConf, new Properties()); + } + + /** + * Expose a Geode region as a JavaPairRDD with default GeodeConnector. + * @param regionPath the full path of the region + * @param opConf the parameters for this operation, such as preferred partitioner. + */ + public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, Properties opConf) { + GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf()); + return geodeRegion(regionPath, connConf, opConf); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java new file mode 100644 index 0000000..41fe7e5 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.javaapi; + +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import scala.Tuple2; + +import io.pivotal.geode.spark.connector.package$; + +/** + * The main entry point to Spark Geode Connector Java API. + * + * There are several helpful static factory methods which build useful wrappers + * around Spark Context, Streaming Context and RDD. There are also helper methods + * to convert JavaRDD<Tuple2<K, V>> to JavaPairRDD<K, V>. + */ +public final class GeodeJavaUtil { + + /** constants */ + public static String GeodeLocatorPropKey = package$.MODULE$.GeodeLocatorPropKey(); + // partitioner related keys and values + public static String PreferredPartitionerPropKey = package$.MODULE$.PreferredPartitionerPropKey(); + public static String NumberPartitionsPerServerPropKey = package$.MODULE$.NumberPartitionsPerServerPropKey(); + public static String OnePartitionPartitionerName = package$.MODULE$.OnePartitionPartitionerName(); + public static String ServerSplitsPartitionerName = package$.MODULE$.ServerSplitsPartitionerName(); + public static String RDDSaveBatchSizePropKey = package$.MODULE$.RDDSaveBatchSizePropKey(); + public static int RDDSaveBatchSizeDefault = package$.MODULE$.RDDSaveBatchSizeDefault(); + + /** The private constructor is used prevents user from creating instance of this class. */ + private GeodeJavaUtil() { } + + /** + * A static factory method to create a {@link GeodeJavaSparkContextFunctions} based + * on an existing {@link SparkContext} instance. + */ + public static GeodeJavaSparkContextFunctions javaFunctions(SparkContext sc) { + return new GeodeJavaSparkContextFunctions(sc); + } + + /** + * A static factory method to create a {@link GeodeJavaSparkContextFunctions} based + * on an existing {@link JavaSparkContext} instance. + */ + public static GeodeJavaSparkContextFunctions javaFunctions(JavaSparkContext jsc) { + return new GeodeJavaSparkContextFunctions(JavaSparkContext.toSparkContext(jsc)); + } + + /** + * A static factory method to create a {@link GeodeJavaPairRDDFunctions} based on an + * existing {@link org.apache.spark.api.java.JavaPairRDD} instance. + */ + public static <K, V> GeodeJavaPairRDDFunctions<K, V> javaFunctions(JavaPairRDD<K, V> rdd) { + return new GeodeJavaPairRDDFunctions<K, V>(rdd); + } + + /** + * A static factory method to create a {@link GeodeJavaRDDFunctions} based on an + * existing {@link org.apache.spark.api.java.JavaRDD} instance. + */ + public static <T> GeodeJavaRDDFunctions<T> javaFunctions(JavaRDD<T> rdd) { + return new GeodeJavaRDDFunctions<T>(rdd); + } + + /** + * A static factory method to create a {@link GeodeJavaPairDStreamFunctions} based on an + * existing {@link org.apache.spark.streaming.api.java.JavaPairDStream} instance. + */ + public static <K, V> GeodeJavaPairDStreamFunctions<K, V> javaFunctions(JavaPairDStream<K, V> ds) { + return new GeodeJavaPairDStreamFunctions<>(ds); + } + + /** + * A static factory method to create a {@link GeodeJavaDStreamFunctions} based on an + * existing {@link org.apache.spark.streaming.api.java.JavaDStream} instance. + */ + public static <T> GeodeJavaDStreamFunctions<T> javaFunctions(JavaDStream<T> ds) { + return new GeodeJavaDStreamFunctions<>(ds); + } + + /** Convert an instance of {@link org.apache.spark.api.java.JavaRDD}<<Tuple2<K, V>> + * to a {@link org.apache.spark.api.java.JavaPairRDD}<K, V>. + */ + public static <K, V> JavaPairRDD<K, V> toJavaPairRDD(JavaRDD<Tuple2<K, V>> rdd) { + return JavaAPIHelper.toJavaPairRDD(rdd); + } + + /** Convert an instance of {@link org.apache.spark.streaming.api.java.JavaDStream}<<Tuple2<K, V>> + * to a {@link org.apache.spark.streaming.api.java.JavaPairDStream}<K, V>. + */ + public static <K, V> JavaPairDStream<K, V> toJavaPairDStream(JavaDStream<Tuple2<K, V>> ds) { + return JavaAPIHelper.toJavaPairDStream(ds); + } + + /** + * A static factory method to create a {@link GeodeJavaSQLContextFunctions} based + * on an existing {@link SQLContext} instance. + */ + public static GeodeJavaSQLContextFunctions javaFunctions(SQLContext sqlContext) { + return new GeodeJavaSQLContextFunctions(sqlContext); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala new file mode 100644 index 0000000..ff4cd17 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import com.gemstone.gemfire.cache.execute.ResultCollector +import com.gemstone.gemfire.cache.query.Query +import com.gemstone.gemfire.cache.Region +import io.pivotal.geode.spark.connector.internal.RegionMetadata +import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartition + + +trait GeodeConnection { + + /** + * Validate region existence and key/value type constraints, throw RuntimeException + * if region does not exist or key and/or value type do(es) not match. + * @param regionPath the full path of region + */ + def validateRegion[K, V](regionPath: String): Unit + + /** + * Get Region proxy for the given region + * @param regionPath the full path of region + */ + def getRegionProxy[K, V](regionPath: String): Region[K, V] + + /** + * Retrieve region meta data for the given region. + * @param regionPath: the full path of the region + * @return Some[RegionMetadata] if region exists, None otherwise + */ + def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] + + /** + * Retrieve region data for the given region and bucket set + * @param regionPath: the full path of the region + * @param whereClause: the set of bucket IDs + * @param split: Geode RDD Partition instance + */ + def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GeodeRDDPartition): Iterator[(K, V)] + + def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String): Object + /** + * Create a geode OQL query + * @param queryString Geode OQL query string + */ + def getQuery(queryString: String): Query + + /** Close the connection */ + def close(): Unit +} + + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala new file mode 100644 index 0000000..38d9e07 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import org.apache.spark.SparkConf +import io.pivotal.geode.spark.connector.internal.{DefaultGeodeConnectionManager, LocatorHelper} + +/** + * Stores configuration of a connection to Geode cluster. It is serializable and can + * be safely sent over network. + * + * @param locators Geode locator host:port pairs, the default is (localhost,10334) + * @param geodeProps The initial geode properties to be used. + * @param connectionManager GeodeConnectionFactory instance + */ +class GeodeConnectionConf( + val locators: Seq[(String, Int)], + val geodeProps: Map[String, String] = Map.empty, + connectionManager: GeodeConnectionManager = new DefaultGeodeConnectionManager + ) extends Serializable { + + /** require at least 1 pair of (host,port) */ + require(locators.nonEmpty) + + def getConnection: GeodeConnection = connectionManager.getConnection(this) + +} + +object GeodeConnectionConf { + + /** + * create GeodeConnectionConf object based on locator string and optional GeodeConnectionFactory + * @param locatorStr Geode cluster locator string + * @param connectionManager GeodeConnection factory + */ + def apply(locatorStr: String, geodeProps: Map[String, String] = Map.empty) + (implicit connectionManager: GeodeConnectionManager = new DefaultGeodeConnectionManager): GeodeConnectionConf = { + new GeodeConnectionConf(LocatorHelper.parseLocatorsString(locatorStr), geodeProps, connectionManager) + } + + /** + * create GeodeConnectionConf object based on SparkConf. Note that implicit can + * be used to control what GeodeConnectionFactory instance to use if desired + * @param conf a SparkConf instance + */ + def apply(conf: SparkConf): GeodeConnectionConf = { + val locatorStr = conf.getOption(GeodeLocatorPropKey).getOrElse( + throw new RuntimeException(s"SparkConf does not contain property $GeodeLocatorPropKey")) + // SparkConf only holds properties whose key starts with "spark.", In order to + // put geode properties in SparkConf, all geode properties are prefixes with + // "spark.geode.". This prefix was removed before the properties were put in `geodeProp` + val prefix = "spark.geode." + val geodeProps = conf.getAll.filter { + case (k, v) => k.startsWith(prefix) && k != GeodeLocatorPropKey + }.map { case (k, v) => (k.substring(prefix.length), v) }.toMap + apply(locatorStr, geodeProps) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala new file mode 100644 index 0000000..bf678f0 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +/** + * GeodeConnectionFactory provide an common interface that manages Geode + * connections, and it's serializable. Each factory instance will handle + * connection instance creation and connection pool management. + */ +trait GeodeConnectionManager extends Serializable { + + /** get connection for the given connector */ + def getConnection(connConf: GeodeConnectionConf): GeodeConnection + + /** close the connection */ + def closeConnection(connConf: GeodeConnectionConf): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala new file mode 100644 index 0000000..6e93b05 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import java.io.File +import java.net.URL +import org.apache.commons.httpclient.methods.PostMethod +import org.apache.commons.httpclient.methods.multipart.{FilePart, Part, MultipartRequestEntity} +import org.apache.commons.httpclient.HttpClient +import org.apache.spark.Logging + +object GeodeFunctionDeployer { + def main(args: Array[String]) { + new GeodeFunctionDeployer(new HttpClient()).commandLineRun(args) + } +} + +class GeodeFunctionDeployer(val httpClient:HttpClient) extends Logging { + + def deploy(host: String, port: Int, jarLocation: String): String = + deploy(host + ":" + port, jarLocation) + + def deploy(host: String, port: Int, jar:File): String = + deploy(host + ":" + port, jar) + + def deploy(jmxHostAndPort: String, jarLocation: String): String = + deploy(jmxHostAndPort, jarFileHandle(jarLocation)) + + def deploy(jmxHostAndPort: String, jar: File): String = { + val urlString = constructURLString(jmxHostAndPort) + val filePost: PostMethod = new PostMethod(urlString) + val parts: Array[Part] = new Array[Part](1) + parts(0) = new FilePart("resources", jar) + filePost.setRequestEntity(new MultipartRequestEntity(parts, filePost.getParams)) + val status: Int = httpClient.executeMethod(filePost) + "Deployed Jar with status:" + status + } + + private[connector] def constructURLString(jmxHostAndPort: String) = + "http://" + jmxHostAndPort + "/gemfire/v1/deployed" + + private[connector]def jarFileHandle(jarLocation: String) = { + val f: File = new File(jarLocation) + if (!f.exists()) { + val errorMessage: String = "Invalid jar file:" + f.getAbsolutePath + logInfo(errorMessage) + throw new RuntimeException(errorMessage) + } + f + } + + def commandLineRun(args: Array[String]):Unit = { + val (hostPort: String, jarFile: String) = + if (args.length < 2) { + logInfo("JMX Manager Host and Port (example: localhost:7070):") + val bufferedReader = new java.io.BufferedReader(new java.io.InputStreamReader(System.in)) + val jmxHostAndPort = bufferedReader.readLine() + logInfo("Location of geode-functions.jar:") + val functionJarLocation = bufferedReader.readLine() + (jmxHostAndPort, functionJarLocation) + } else { + (args(0), args(1)) + } + val status = deploy(hostPort, jarFile) + logInfo(status) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala new file mode 100644 index 0000000..8c0aeca --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import com.esotericsoftware.kryo.Kryo +import io.pivotal.geode.spark.connector.internal.oql.UndefinedSerializer +import org.apache.spark.serializer.KryoRegistrator +import com.gemstone.gemfire.cache.query.internal.Undefined + +class GeodeKryoRegistrator extends KryoRegistrator{ + + override def registerClasses(kyro: Kryo): Unit = { + kyro.addDefaultSerializer(classOf[Undefined], classOf[UndefinedSerializer]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala new file mode 100644 index 0000000..ba5d2df --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import io.pivotal.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, GeodeJoinRDD, GeodePairRDDWriter} +import org.apache.spark.Logging +import org.apache.spark.api.java.function.Function +import org.apache.spark.rdd.RDD + +/** + * Extra gemFire functions on RDDs of (key, value) pairs through an implicit conversion. + * Import `io.pivotal.geode.spark.connector._` at the top of your program to + * use these functions. + */ +class GeodePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable with Logging { + + /** + * Save the RDD of pairs to Geode key-value store without any conversion + * @param regionPath the full path of region that the RDD is stored + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param opConf the optional parameters for this operation + */ + def saveToGeode( + regionPath: String, + connConf: GeodeConnectionConf = defaultConnectionConf, + opConf: Map[String, String] = Map.empty): Unit = { + connConf.getConnection.validateRegion[K, V](regionPath) + if (log.isDebugEnabled) + logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""") + else + logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""") + val writer = new GeodePairRDDWriter[K, V](regionPath, connConf, opConf) + rdd.sparkContext.runJob(rdd, writer.write _) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` + * RDD and the Geode `Region[K, V2]`. Each pair of elements will be returned + * as a ((k, v), v2) tuple, where (k, v) is in `this` RDD and (k, v2) is in the + * Geode region. + * + *@param regionPath the region path of the Geode region + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @tparam K2 the key type of the Geode region + * @tparam V2 the value type of the Geode region + * @return RDD[T, V] + */ + def joinGeodeRegion[K2 <: K, V2]( + regionPath: String, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[(K, V), K, V2] = { + new GeodeJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` RDD + * and the Geode `Region[K2, V2]`. The join key from RDD element is generated by + * `func(K, V) => K2`, and the key from the Geode region is jus the key of the + * key/value pair. + * + * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, + * where (k, v) is in `this` RDD and (k2, v2) is in the Geode region. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element (K, V) + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @tparam K2 the key type of the Geode region + * @tparam V2 the value type of the Geode region + * @return RDD[(K, V), V2] + */ + def joinGeodeRegion[K2, V2]( + regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[(K, V), K2, V2] = + new GeodeJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf) + + /** This version of joinGeodeRegion(...) is just for Java API. */ + private[connector] def joinGeodeRegion[K2, V2]( + regionPath: String, func: Function[(K, V), K2], connConf: GeodeConnectionConf): GeodeJoinRDD[(K, V), K2, V2] = { + new GeodeJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf) + } + + /** + * Perform a left outer join of `this` RDD and the Geode `Region[K, V2]`. + * For each element (k, v) in `this` RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair + * ((k, v), None)) if no element in the Geode region have key k. + * + * @param regionPath the region path of the Geode region + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @tparam K2 the key type of the Geode region + * @tparam V2 the value type of the Geode region + * @return RDD[ (K, V), Option[V] ] + */ + def outerJoinGeodeRegion[K2 <: K, V2]( + regionPath: String, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K, V2] = { + new GeodeOuterJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf) + } + + /** + * Perform a left outer join of `this` RDD and the Geode `Region[K2, V2]`. + * The join key from RDD element is generated by `func(K, V) => K2`, and the + * key from region is jus the key of the key/value pair. + * + * For each element (k, v) in `this` RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair + * ((k, v), None)) if no element in the Geode region have key `func(k, v)`. + * + *@param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element (K, V) + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @tparam K2 the key type of the Geode region + * @tparam V2 the value type of the Geode region + * @return RDD[ (K, V), Option[V] ] + */ + def outerJoinGeodeRegion[K2, V2]( + regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = { + new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf) + } + + /** This version of outerJoinGeodeRegion(...) is just for Java API. */ + private[connector] def outerJoinGeodeRegion[K2, V2]( + regionPath: String, func: Function[(K, V), K2], connConf: GeodeConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = { + new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf) + } + + private[connector] def defaultConnectionConf: GeodeConnectionConf = + GeodeConnectionConf(rdd.sparkContext.getConf) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala new file mode 100644 index 0000000..2e5c92a --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import io.pivotal.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, GeodeJoinRDD, GeodeRDDWriter} +import org.apache.spark.Logging +import org.apache.spark.api.java.function.{PairFunction, Function} +import org.apache.spark.rdd.RDD + +/** + * Extra gemFire functions on non-Pair RDDs through an implicit conversion. + * Import `io.pivotal.geode.spark.connector._` at the top of your program to + * use these functions. + */ +class GeodeRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging { + + /** + * Save the non-pair RDD to Geode key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param func the function that converts elements of RDD to key/value pairs + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param opConf the optional parameters for this operation + */ + def saveToGeode[K, V]( + regionPath: String, + func: T => (K, V), + connConf: GeodeConnectionConf = defaultConnectionConf, + opConf: Map[String, String] = Map.empty): Unit = { + connConf.getConnection.validateRegion[K, V](regionPath) + if (log.isDebugEnabled) + logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""") + else + logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""") + val writer = new GeodeRDDWriter[T, K, V](regionPath, connConf, opConf) + rdd.sparkContext.runJob(rdd, writer.write(func) _) + } + + /** This version of saveToGeode(...) is just for Java API. */ + private[connector] def saveToGeode[K, V]( + regionPath: String, + func: PairFunction[T, K, V], + connConf: GeodeConnectionConf, + opConf: Map[String, String]): Unit = { + saveToGeode[K, V](regionPath, func.call _, connConf, opConf) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` RDD + * and the Geode `Region[K, V]`. The join key from RDD element is generated by + * `func(T) => K`, and the key from the Geode region is just the key of the + * key/value pair. + * + * Each pair of elements of result RDD will be returned as a (t, v) tuple, + * where (t) is in `this` RDD and (k, v) is in the Geode region. + * + * @param regionPath the region path of the Geode region + * @param func the function that generate region key from RDD element T + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @tparam K the key type of the Geode region + * @tparam V the value type of the Geode region + * @return RDD[T, V] + */ + def joinGeodeRegion[K, V](regionPath: String, func: T => K, + connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[T, K, V] = { + new GeodeJoinRDD[T, K, V](rdd, func, regionPath, connConf) + } + + /** This version of joinGeodeRegion(...) is just for Java API. */ + private[connector] def joinGeodeRegion[K, V]( + regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): GeodeJoinRDD[T, K, V] = { + joinGeodeRegion(regionPath, func.call _, connConf) + } + + /** + * Perform a left outer join of `this` RDD and the Geode `Region[K, V]`. + * The join key from RDD element is generated by `func(T) => K`, and the + * key from region is just the key of the key/value pair. + * + * For each element (t) in `this` RDD, the resulting RDD will either contain + * all pairs (t, Some(v)) for v in the Geode region, or the pair + * (t, None) if no element in the Geode region have key `func(t)` + * + * @param regionPath the region path of the Geode region + * @param func the function that generate region key from RDD element T + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @tparam K the key type of the Geode region + * @tparam V the value type of the Geode region + * @return RDD[ T, Option[V] ] + */ + def outerJoinGeodeRegion[K, V](regionPath: String, func: T => K, + connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[T, K, V] = { + new GeodeOuterJoinRDD[T, K, V](rdd, func, regionPath, connConf) + } + + /** This version of outerJoinGeodeRegion(...) is just for Java API. */ + private[connector] def outerJoinGeodeRegion[K, V]( + regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): GeodeOuterJoinRDD[T, K, V] = { + outerJoinGeodeRegion(regionPath, func.call _, connConf) + } + + private[connector] def defaultConnectionConf: GeodeConnectionConf = + GeodeConnectionConf(rdd.sparkContext.getConf) + +} + + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala new file mode 100644 index 0000000..83aab7a --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import io.pivotal.geode.spark.connector.internal.oql.{OQLRelation, QueryRDD} +import org.apache.spark.Logging +import org.apache.spark.sql.{DataFrame, SQLContext} + +/** + * Provide Geode OQL specific functions + */ +class GeodeSQLContextFunctions(@transient sqlContext: SQLContext) extends Serializable with Logging { + + /** + * Expose a Geode OQL query result as a DataFrame + * @param query the OQL query string. + */ + def geodeOQL( + query: String, + connConf: GeodeConnectionConf = GeodeConnectionConf(sqlContext.sparkContext.getConf)): DataFrame = { + logInfo(s"OQL query = $query") + val rdd = new QueryRDD[Object](sqlContext.sparkContext, query, connConf) + sqlContext.baseRelationToDataFrame(OQLRelation(rdd)(sqlContext)) + } + + private[connector] def defaultConnectionConf: GeodeConnectionConf = + GeodeConnectionConf(sqlContext.sparkContext.getConf) +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala new file mode 100644 index 0000000..617cb33 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD +import org.apache.spark.SparkContext + +import scala.reflect.ClassTag + +/** Provides Geode specific methods on `SparkContext` */ +class GeodeSparkContextFunctions(@transient sc: SparkContext) extends Serializable { + + /** + * Expose a Geode region as a GeodeRDD + * @param regionPath the full path of the region + * @param connConf the GeodeConnectionConf that can be used to access the region + * @param opConf use this to specify preferred partitioner + * and its parameters. The implementation will use it if it's applicable + */ + def geodeRegion[K: ClassTag, V: ClassTag] ( + regionPath: String, connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf), + opConf: Map[String, String] = Map.empty): GeodeRegionRDD[K, V] = + GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala new file mode 100644 index 0000000..52f9961 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal + +import java.net.InetAddress + +import com.gemstone.gemfire.cache.client.{ClientCache, ClientCacheFactory, ClientRegionShortcut} +import com.gemstone.gemfire.cache.execute.{FunctionException, FunctionService} +import com.gemstone.gemfire.cache.query.Query +import com.gemstone.gemfire.cache.{Region, RegionService} +import com.gemstone.gemfire.internal.cache.execute.InternalExecution +import io.pivotal.geode.spark.connector.internal.oql.QueryResultCollector +import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartition +import org.apache.spark.{SparkEnv, Logging} +import io.pivotal.geode.spark.connector.GeodeConnection +import io.pivotal.geode.spark.connector.internal.geodefunctions._ +import java.util.{Set => JSet, List => JList } + +/** + * Default GeodeConnection implementation. The instance of this should be + * created by DefaultGeodeConnectionFactory + * @param locators pairs of host/port of locators + * @param gemFireProps The initial geode properties to be used. + */ +private[connector] class DefaultGeodeConnection ( + locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) + extends GeodeConnection with Logging { + + private val clientCache = initClientCache() + + /** Register Geode functions to the Geode cluster */ + FunctionService.registerFunction(RetrieveRegionMetadataFunction.getInstance()) + FunctionService.registerFunction(RetrieveRegionFunction.getInstance()) + + private def initClientCache() : ClientCache = { + try { + val ccf = getClientCacheFactory + ccf.create() + } catch { + case e: Exception => + logError(s"""Failed to init ClientCache, locators=${locators.mkString(",")}, Error: $e""") + throw new RuntimeException(e) + } + } + + private def getClientCacheFactory: ClientCacheFactory = { + import io.pivotal.geode.spark.connector.map2Properties + val ccf = new ClientCacheFactory(gemFireProps) + ccf.setPoolReadTimeout(30000) + val servers = LocatorHelper.getAllGeodeServers(locators) + if (servers.isDefined && servers.get.size > 0) { + val sparkIp = System.getenv("SPARK_LOCAL_IP") + val hostName = if (sparkIp != null) InetAddress.getByName(sparkIp).getCanonicalHostName + else InetAddress.getLocalHost.getCanonicalHostName + val executorId = SparkEnv.get.executorId + val pickedServers = LocatorHelper.pickPreferredGeodeServers(servers.get, hostName, executorId) + logInfo(s"""Init ClientCache: severs=${pickedServers.mkString(",")}, host=$hostName executor=$executorId props=$gemFireProps""") + logDebug(s"""Init ClientCache: all-severs=${pickedServers.mkString(",")}""") + pickedServers.foreach{ case (host, port) => ccf.addPoolServer(host, port) } + } else { + logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""") + locators.foreach { case (host, port) => ccf.addPoolLocator(host, port) } + } + ccf + } + + /** close the clientCache */ + override def close(): Unit = + if (! clientCache.isClosed) clientCache.close() + + /** ----------------------------------------- */ + /** implementation of GeodeConnection trait */ + /** ----------------------------------------- */ + + override def getQuery(queryString: String): Query = + clientCache.asInstanceOf[RegionService].getQueryService.newQuery(queryString) + + override def validateRegion[K, V](regionPath: String): Unit = { + val md = getRegionMetadata[K, V](regionPath) + if (! md.isDefined) throw new RuntimeException(s"The region named $regionPath was not found") + } + + def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] = { + import scala.collection.JavaConversions.setAsJavaSet + val region = getRegionProxy[K, V](regionPath) + val set0: JSet[Integer] = Set[Integer](0) + val exec = FunctionService.onRegion(region).asInstanceOf[InternalExecution].withBucketFilter(set0) + exec.setWaitOnExceptionFlag(true) + try { + val collector = exec.execute(RetrieveRegionMetadataFunction.ID) + val r = collector.getResult.asInstanceOf[JList[RegionMetadata]] + logDebug(r.get(0).toString) + Some(r.get(0)) + } catch { + case e: FunctionException => + if (e.getMessage.contains(s"The region named /$regionPath was not found")) None + else throw e + } + } + + def getRegionProxy[K, V](regionPath: String): Region[K, V] = { + val region1: Region[K, V] = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]] + if (region1 != null) region1 + else DefaultGeodeConnection.regionLock.synchronized { + val region2 = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]] + if (region2 != null) region2 + else clientCache.createClientRegionFactory[K, V](ClientRegionShortcut.PROXY).create(regionPath) + } + } + + override def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GeodeRDDPartition): Iterator[(K, V)] = { + val region = getRegionProxy[K, V](regionPath) + val desc = s"""RDD($regionPath, "${whereClause.getOrElse("")}", ${split.index})""" + val args : Array[String] = Array[String](whereClause.getOrElse(""), desc) + val collector = new StructStreamingResultCollector(desc) + // RetrieveRegionResultCollector[(K, V)] + import scala.collection.JavaConversions.setAsJavaSet + val exec = FunctionService.onRegion(region).withArgs(args).withCollector(collector).asInstanceOf[InternalExecution] + .withBucketFilter(split.bucketSet.map(Integer.valueOf)) + exec.setWaitOnExceptionFlag(true) + exec.execute(RetrieveRegionFunction.ID) + collector.getResult.map{objs: Array[Object] => (objs(0).asInstanceOf[K], objs(1).asInstanceOf[V])} + } + + override def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String) = { + import scala.collection.JavaConversions.setAsJavaSet + FunctionService.registerFunction(QueryFunction.getInstance()) + val collector = new QueryResultCollector + val region = getRegionProxy(regionPath) + val args: Array[String] = Array[String](queryString, bucketSet.toString) + val exec = FunctionService.onRegion(region).withCollector(collector).asInstanceOf[InternalExecution] + .withBucketFilter(bucketSet.map(Integer.valueOf)) + .withArgs(args) + exec.execute(QueryFunction.ID) + collector.getResult + } +} + +private[connector] object DefaultGeodeConnection { + /** a lock object only used by getRegionProxy...() */ + private val regionLock = new Object +} + +/** The purpose of this class is making unit test DefaultGeodeConnectionManager easier */ +class DefaultGeodeConnectionFactory { + + def newConnection(locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) = + new DefaultGeodeConnection(locators, gemFireProps) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala new file mode 100644 index 0000000..eb67cda --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal + +import io.pivotal.geode.spark.connector.{GeodeConnection, GeodeConnectionConf, GeodeConnectionManager} + +import scala.collection.mutable + +/** + * Default implementation of GeodeConnectionFactory + */ +class DefaultGeodeConnectionManager extends GeodeConnectionManager { + + def getConnection(connConf: GeodeConnectionConf): GeodeConnection = + DefaultGeodeConnectionManager.getConnection(connConf) + + def closeConnection(connConf: GeodeConnectionConf): Unit = + DefaultGeodeConnectionManager.closeConnection(connConf) + +} + +object DefaultGeodeConnectionManager { + + /** connection cache, keyed by host:port pair */ + private[connector] val connections = mutable.Map[(String, Int), GeodeConnection]() + + /** + * use locator host:port pair to lookup cached connection. create new connection + * and add it to the cache `connections` if it does not exist. + */ + def getConnection(connConf: GeodeConnectionConf) + (implicit factory: DefaultGeodeConnectionFactory = new DefaultGeodeConnectionFactory): GeodeConnection = { + + def getCachedConnection(locators: Seq[(String, Int)]): GeodeConnection = { + val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null) + if (conns.nonEmpty) conns(0) else null + } + + val conn1 = getCachedConnection(connConf.locators) + if (conn1 != null) conn1 + else connections.synchronized { + val conn2 = getCachedConnection(connConf.locators) + if (conn2 != null) conn2 + else { + val conn3 = factory.newConnection(connConf.locators, connConf.geodeProps) + connConf.locators.foreach(pair => connections += (pair -> conn3)) + conn3 + } + } + } + + /** + * Close the connection and remove it from connection cache. + * Note: multiple entries may share the same connection, all those entries are removed. + */ + def closeConnection(connConf: GeodeConnectionConf): Unit = { + val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null) + if (conns.nonEmpty) connections.synchronized { + conns(0).close() + connections.retain((k,v) => v != conns(0)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala new file mode 100644 index 0000000..71fed52 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal + +import java.net.InetSocketAddress +import java.util.{ArrayList => JArrayList} + +import com.gemstone.gemfire.cache.client.internal.locator.{GetAllServersResponse, GetAllServersRequest} +import com.gemstone.gemfire.distributed.internal.ServerLocation +import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient +import org.apache.spark.Logging + +import scala.util.{Failure, Success, Try} + + +object LocatorHelper extends Logging { + + /** valid locator strings are: host[port] and host:port */ + final val LocatorPattern1 = """([\w-_]+(\.[\w-_]+)*)\[([0-9]{2,5})\]""".r + final val LocatorPattern2 = """([\w-_]+(\.[\w-_]+)*):([0-9]{2,5})""".r + + /** convert single locator string to Try[(host, port)] */ + def locatorStr2HostPortPair(locatorStr: String): Try[(String, Int)] = + locatorStr match { + case LocatorPattern1(host, domain, port) => Success((host, port.toInt)) + case LocatorPattern2(host, domain, port) => Success((host, port.toInt)) + case _ => Failure(new Exception(s"invalid locator: $locatorStr")) + } + + /** + * Parse locator strings and returns Seq of (hostname, port) pair. + * Valid locator string are one or more "host[port]" and/or "host:port" + * separated by `,`. For example: + * host1.mydomain.com[8888],host2.mydomain.com[8889] + * host1.mydomain.com:8888,host2.mydomain.com:8889 + */ + def parseLocatorsString(locatorsStr: String): Seq[(String, Int)] = + locatorsStr.split(",").map(locatorStr2HostPortPair).map(_.get) + + + /** + * Return the list of live Geode servers for the given locators. + * @param locators locators for the given Geode cluster + * @param serverGroup optional server group name, default is "" (empty string) + */ + def getAllGeodeServers(locators: Seq[(String, Int)], serverGroup: String = ""): Option[Seq[(String, Int)]] = { + var result: Option[Seq[(String, Int)]] = None + locators.find { case (host, port) => + try { + val addr = new InetSocketAddress(host, port) + val req = new GetAllServersRequest(serverGroup) + val res = TcpClient.requestToServer(addr.getAddress, addr.getPort, req, 2000) + if (res != null) { + import scala.collection.JavaConverters._ + val servers = res.asInstanceOf[GetAllServersResponse].getServers.asInstanceOf[JArrayList[ServerLocation]] + if (servers.size > 0) + result = Some(servers.asScala.map(e => (e.getHostName, e.getPort))) + } + } catch { case e: Exception => logWarning("getAllGeodeServers error", e) + } + result.isDefined + } + result + } + + /** + * Pick up at most 3 preferred servers from all available servers based on + * host name and Spark executor id. + * + * This method is used by DefaultGeodeConnection to create ClientCache. Usually + * one server is enough to initialize ClientCacheFactory, but this provides two + * backup servers in case of the 1st server can't be connected. + * + * @param servers all available servers in the form of (hostname, port) pairs + * @param hostName the host name of the Spark executor + * @param executorId the Spark executor Id, such as "<driver>", "0", "1", ... + * @return Seq[(hostname, port)] of preferred servers + */ + def pickPreferredGeodeServers( + servers: Seq[(String, Int)], hostName: String, executorId: String): Seq[(String, Int)] = { + + // pick up `length` items form the Seq starts at the `start` position. + // The Seq is treated as a ring, so at most `Seq.size` items can be picked + def circularTake[T](seq: Seq[T], start: Int, length: Int): Seq[T] = { + val size = math.min(seq.size, length) + (start until start + size).map(x => seq(x % seq.size)) + } + + // map executor id to int: "<driver>" (or non-number string) to 0, and "n" to n + 1 + val id = try { executorId.toInt + 1 } catch { case e: NumberFormatException => 0 } + + // algorithm: + // 1. sort server list + // 2. split sorted server list into 3 sub-lists a, b, and c: + // list-a: servers on the given host + // list-b: servers that are in front of list-a on the sorted server list + // list-c: servers that are behind list-a on the sorted server list + // then rotate list-a based on executor id, then create new server list: + // modified list-a ++ list-c ++ list-b + // 3. if there's no server on the given host, then create new server list + // by rotating sorted server list based on executor id. + // 4. take up to 3 servers from the new server list + val sortedServers = servers.sorted + val firstIdx = sortedServers.indexWhere(p => p._1 == hostName) + val lastIdx = if (firstIdx < 0) -1 else sortedServers.lastIndexWhere(p => p._1 == hostName) + + if (firstIdx < 0) { // no local server + circularTake(sortedServers, id, 3) + } else { + val (seq1, seq2) = sortedServers.splitAt(firstIdx) + val seq = if (firstIdx == lastIdx) { // one local server + seq2 ++ seq1 + } else { // multiple local server + val (seq3, seq4) = seq2.splitAt(lastIdx - firstIdx + 1) + val seq3b = if (id % seq3.size == 0) seq3 else circularTake(seq3, id, seq3.size) + seq3b ++ seq4 ++ seq1 + } + circularTake(seq, 0, 3) + } + } +}