http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
 
b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
new file mode 100644
index 0000000..3278a5b
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaPairRDDFunctions.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi;
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.GeodePairRDDFunctions;
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeJoinRDD;
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import scala.Option;
+import scala.Tuple2;
+import scala.reflect.ClassTag;
+
+import java.util.Properties;
+
+import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
+
+/**
+ * A Java API wrapper over {@link org.apache.spark.api.java.JavaPairRDD} to 
provide Geode Spark
+ * Connector functionality.
+ *
+ * <p>To obtain an instance of this wrapper, use one of the factory methods in 
{@link
+ * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
+ */
+public class GeodeJavaPairRDDFunctions<K, V> {
+
+  public final GeodePairRDDFunctions<K, V> rddf;
+
+  public GeodeJavaPairRDDFunctions(JavaPairRDD<K, V> rdd) {
+    this.rddf = new GeodePairRDDFunctions<K, V>(rdd.rdd());
+  }
+
+  /**
+   * Save the pair RDD to Geode key-value store.
+   * @param regionPath the full path of region that the RDD is stored
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @param opConf the parameters for this operation
+   */
+  public void saveToGeode(String regionPath, GeodeConnectionConf connConf, 
Properties opConf) {
+    rddf.saveToGeode(regionPath, connConf, propertiesToScalaMap(opConf));
+  }
+
+  /**
+   * Save the pair RDD to Geode key-value store.
+   * @param regionPath the full path of region that the RDD is stored
+   * @param opConf the parameters for this operation
+   */
+  public void saveToGeode(String regionPath, Properties opConf) {
+    rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), 
propertiesToScalaMap(opConf));
+  }
+
+  /**
+   * Save the pair RDD to Geode key-value store.
+   * @param regionPath the full path of region that the RDD is stored
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   */
+  public void saveToGeode(String regionPath, GeodeConnectionConf connConf) {
+    rddf.saveToGeode(regionPath, connConf, emptyStrStrMap());
+  }
+
+  /**
+   * Save the pair RDD to Geode key-value store with the default 
GeodeConnector.
+   * @param regionPath the full path of region that the RDD is stored
+   */
+  public void saveToGeode(String regionPath) {
+    rddf.saveToGeode(regionPath, rddf.defaultConnectionConf(), 
emptyStrStrMap());
+  }
+
+  /**
+   * Return an JavaPairRDD containing all pairs of elements with matching keys 
in
+   * this RDD&lt;K, V> and the Geode `Region&lt;K, V2>`. Each pair of elements
+   * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and
+   * (k, v2) is in the Geode region.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param <V2> the value type of the Geode region
+   * @return JavaPairRDD&lt;&lt;K, V>, V2>
+   */  
+  public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(String regionPath) 
{
+    return joinGeodeRegion(regionPath, rddf.defaultConnectionConf());
+  }
+
+  /**
+   * Return an JavaPairRDD containing all pairs of elements with matching keys 
in
+   * this RDD&lt;K, V> and the Geode `Region&lt;K, V2>`. Each pair of elements
+   * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and
+   * (k, v2) is in the Geode region.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @param <V2> the value type of the Geode region
+   * @return JavaPairRDD&lt;&lt;K, V>, V2>
+   */
+  public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
+    String regionPath, GeodeConnectionConf connConf) {
+    GeodeJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.joinGeodeRegion(regionPath, 
connConf);
+    ClassTag<Tuple2<K, V>> kt = fakeClassTag();
+    ClassTag<V2> vt = fakeClassTag();
+    return new JavaPairRDD<>(rdd, kt, vt);
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in this
+   * RDD&lt;K, V> and the Geode `Region&lt;K2, V2>`. The join key from RDD
+   * element is generated by `func(K, V) => K2`, and the key from the Geode
+   * region is just the key of the key/value pair.
+   *
+   * Each pair of elements of result RDD will be returned as a ((k, v), v2) 
tuple,
+   * where (k, v) is in this RDD and (k2, v2) is in the Geode region.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param func the function that generates region key from RDD element (K, V)
+   * @param <K2> the key type of the Geode region
+   * @param <V2> the value type of the Geode region
+   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, V2>
+   */
+  public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
+    String regionPath, Function<Tuple2<K, V>, K2> func) {
+    return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in this
+   * RDD&lt;K, V> and the Geode `Region&lt;K2, V2>`. The join key from RDD 
+   * element is generated by `func(K, V) => K2`, and the key from the Geode 
+   * region is just the key of the key/value pair.
+   *
+   * Each pair of elements of result RDD will be returned as a ((k, v), v2) 
tuple,
+   * where (k, v) is in this RDD and (k2, v2) is in the Geode region.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param func the function that generates region key from RDD element (K, V)
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @param <K2> the key type of the Geode region
+   * @param <V2> the value type of the Geode region
+   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, V2>
+   */  
+  public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGeodeRegion(
+    String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf 
connConf) {
+    GeodeJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.joinGeodeRegion(regionPath, 
func, connConf);
+    ClassTag<Tuple2<K, V>> kt = fakeClassTag();
+    ClassTag<V2> vt = fakeClassTag();
+    return new JavaPairRDD<>(rdd, kt, vt);
+  }
+
+  /**
+   * Perform a left outer join of this RDD&lt;K, V> and the Geode 
`Region&lt;K, V2>`.
+   * For each element (k, v) in this RDD, the resulting RDD will either contain
+   * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
+   * ((k, v), None)) if no element in the Geode region have key k.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param <V2> the value type of the Geode region
+   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
+   */
+  public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> 
outerJoinGeodeRegion(String regionPath) {
+    return outerJoinGeodeRegion(regionPath, rddf.defaultConnectionConf());
+  }
+
+  /**
+   * Perform a left outer join of this RDD&lt;K, V> and the Geode 
`Region&lt;K, V2>`.
+   * For each element (k, v) in this RDD, the resulting RDD will either contain
+   * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
+   * ((k, v), None)) if no element in the Geode region have key k.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @param <V2> the value type of the Geode region
+   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
+   */  
+  public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
+    String regionPath, GeodeConnectionConf connConf) {
+    GeodeOuterJoinRDD<Tuple2<K, V>, K, V2> rdd = 
rddf.outerJoinGeodeRegion(regionPath, connConf);
+    ClassTag<Tuple2<K, V>> kt = fakeClassTag();
+    ClassTag<Option<V2>> vt = fakeClassTag();
+    return new JavaPairRDD<>(rdd, kt, vt);
+  }
+
+  /**
+   * Perform a left outer join of this RDD&lt;K, V> and the Geode 
`Region&lt;K2, V2>`.
+   * The join key from RDD element is generated by `func(K, V) => K2`, and the
+   * key from region is just the key of the key/value pair.
+   *
+   * For each element (k, v) in `this` RDD, the resulting RDD will either 
contain
+   * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
+   * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param func the function that generates region key from RDD element (K, V)
+   * @param <K2> the key type of the Geode region
+   * @param <V2> the value type of the Geode region
+   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
+   */
+  public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
+    String regionPath, Function<Tuple2<K, V>, K2> func) {
+    return outerJoinGeodeRegion(regionPath, func, 
rddf.defaultConnectionConf());
+  }
+
+  /**
+   * Perform a left outer join of this RDD&lt;K, V> and the Geode 
`Region&lt;K2, V2>`.
+   * The join key from RDD element is generated by `func(K, V) => K2`, and the
+   * key from region is just the key of the key/value pair.
+   *
+   * For each element (k, v) in `this` RDD, the resulting RDD will either 
contain
+   * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
+   * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param func the function that generates region key from RDD element (K, V)
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @param <K2> the key type of the Geode region
+   * @param <V2> the value type of the Geode region
+   * @return JavaPairRDD&lt;Tuple2&lt;K, V>, Option&lt;V>>
+   */
+  public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGeodeRegion(
+    String regionPath, Function<Tuple2<K, V>, K2> func, GeodeConnectionConf 
connConf) {
+    GeodeOuterJoinRDD<Tuple2<K, V>, K2, V2> rdd = 
rddf.outerJoinGeodeRegion(regionPath, func, connConf);
+    ClassTag<Tuple2<K, V>> kt = fakeClassTag();
+    ClassTag<Option<V2>> vt = fakeClassTag();
+    return new JavaPairRDD<>(rdd, kt, vt);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
 
b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
new file mode 100644
index 0000000..e4f6f36
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRDDFunctions.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi;
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.GeodeRDDFunctions;
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeJoinRDD;
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeOuterJoinRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Option;
+import scala.reflect.ClassTag;
+
+import java.util.Properties;
+
+import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
+
+/**
+ * A Java API wrapper over {@link org.apache.spark.api.java.JavaRDD} to 
provide Geode Spark
+ * Connector functionality.
+ *
+ * <p>To obtain an instance of this wrapper, use one of the factory methods in 
{@link
+ * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
+ */
+public class GeodeJavaRDDFunctions<T> {
+
+  public final GeodeRDDFunctions<T> rddf;
+
+  public GeodeJavaRDDFunctions(JavaRDD<T> rdd) {
+    this.rddf = new GeodeRDDFunctions<T>(rdd.rdd());
+  }
+
+  /**
+   * Save the non-pair RDD to Geode key-value store.
+   * @param regionPath the full path of region that the RDD is stored  
+   * @param func the PairFunction that converts elements of JavaRDD to 
key/value pairs
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @param opConf the parameters for this operation
+   */  
+  public <K, V> void saveToGeode(
+    String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf 
connConf, Properties opConf) {
+    rddf.saveToGeode(regionPath, func, connConf, propertiesToScalaMap(opConf));
+  }
+
+  /**
+   * Save the non-pair RDD to Geode key-value store.
+   * @param regionPath the full path of region that the RDD is stored  
+   * @param func the PairFunction that converts elements of JavaRDD to 
key/value pairs
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   */
+  public <K, V> void saveToGeode(
+    String regionPath, PairFunction<T, K, V> func, GeodeConnectionConf 
connConf) {
+    rddf.saveToGeode(regionPath, func, connConf, emptyStrStrMap());
+  }
+
+  /**
+   * Save the non-pair RDD to Geode key-value store.
+   * @param regionPath the full path of region that the RDD is stored
+   * @param func the PairFunction that converts elements of JavaRDD to 
key/value pairs
+   * @param opConf the parameters for this operation
+   */
+  public <K, V> void saveToGeode(
+    String regionPath, PairFunction<T, K, V> func, Properties opConf) {
+    rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), 
propertiesToScalaMap(opConf));
+  }
+
+  /**
+   * Save the non-pair RDD to Geode key-value store with default 
GeodeConnector.
+   * @param regionPath the full path of region that the RDD is stored  
+   * @param func the PairFunction that converts elements of JavaRDD to 
key/value pairs
+   */
+  public <K, V> void saveToGeode(String regionPath, PairFunction<T, K, V> 
func) {
+    rddf.saveToGeode(regionPath, func, rddf.defaultConnectionConf(), 
emptyStrStrMap());
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in this
+   * RDD&lt;T> and the Geode `Region&lt;K, V>`. The join key from RDD
+   * element is generated by `func(T) => K`, and the key from the Geode
+   * region is just the key of the key/value pair.
+   *
+   * Each pair of elements of result RDD will be returned as a (t, v2) tuple,
+   * where t is from this RDD and v is from the Geode region.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param func the function that generates region key from RDD element T
+   * @param <K> the key type of the Geode region
+   * @param <V> the value type of the Geode region
+   * @return JavaPairRDD&lt;T, V>
+   */
+  public <K, V> JavaPairRDD<T, V> joinGeodeRegion(String regionPath, 
Function<T, K> func) {
+    return joinGeodeRegion(regionPath, func, rddf.defaultConnectionConf());
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in this
+   * RDD&lt;T> and the Geode `Region&lt;K, V>`. The join key from RDD
+   * element is generated by `func(T) => K`, and the key from the Geode
+   * region is just the key of the key/value pair.
+   *
+   * Each pair of elements of result RDD will be returned as a (t, v2) tuple,
+   * where t is from this RDD and v is from the Geode region.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param func the function that generates region key from RDD element T
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @param <K> the key type of the Geode region
+   * @param <V> the value type of the Geode region
+   * @return JavaPairRDD&lt;T, V>
+   */
+  public <K, V> JavaPairRDD<T, V> joinGeodeRegion(
+    String regionPath, Function<T, K> func, GeodeConnectionConf connConf) {
+    GeodeJoinRDD<T, K, V> rdd = rddf.joinGeodeRegion(regionPath, func, 
connConf);
+    ClassTag<T> kt = fakeClassTag();
+    ClassTag<V> vt = fakeClassTag();
+    return new JavaPairRDD<>(rdd, kt, vt);
+  }
+
+  /**
+   * Perform a left outer join of this RDD&lt;T> and the Geode `Region&lt;K, 
V>`.
+   * The join key from RDD element is generated by `func(T) => K`, and the
+   * key from region is just the key of the key/value pair.
+   *
+   * For each element (t) in this RDD, the resulting RDD will either contain
+   * all pairs (t, Some(v)) for v in the Geode region, or the pair
+   * (t, None) if no element in the Geode region have key `func(t)`.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param func the function that generates region key from RDD element T
+   * @param <K> the key type of the Geode region
+   * @param <V> the value type of the Geode region
+   * @return JavaPairRDD&lt;T, Option&lt;V>>
+   */
+  public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion(String 
regionPath, Function<T, K> func) {
+    return outerJoinGeodeRegion(regionPath, func, 
rddf.defaultConnectionConf());
+  }
+
+  /**
+   * Perform a left outer join of this RDD&lt;T> and the Geode `Region&lt;K, 
V>`.
+   * The join key from RDD element is generated by `func(T) => K`, and the
+   * key from region is just the key of the key/value pair.
+   *
+   * For each element (t) in this RDD, the resulting RDD will either contain
+   * all pairs (t, Some(v)) for v in the Geode region, or the pair
+   * (t, None) if no element in the Geode region have key `func(t)`.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param func the function that generates region key from RDD element T
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @param <K> the key type of the Geode region
+   * @param <V> the value type of the Geode region
+   * @return JavaPairRDD&lt;T, Option&lt;V>>
+   */
+  public <K, V> JavaPairRDD<T, Option<V>> outerJoinGeodeRegion(
+    String regionPath, Function<T, K> func, GeodeConnectionConf connConf) {
+    GeodeOuterJoinRDD<T, K, V> rdd = rddf.outerJoinGeodeRegion(regionPath, 
func, connConf);
+    ClassTag<T> kt = fakeClassTag();
+    ClassTag<Option<V>> vt = fakeClassTag();
+    return new JavaPairRDD<>(rdd, kt, vt);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
 
b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
new file mode 100644
index 0000000..3471bf90
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSQLContextFunctions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi;
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.GeodeSQLContextFunctions;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+
+/**
+ * Java API wrapper over {@link org.apache.spark.sql.SQLContext} to provide 
Geode
+ * OQL functionality.
+ *
+ * <p></p>To obtain an instance of this wrapper, use one of the factory 
methods in {@link
+ * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
+ */
+public class GeodeJavaSQLContextFunctions {
+
+  public final GeodeSQLContextFunctions scf;
+
+  public GeodeJavaSQLContextFunctions(SQLContext sqlContext) {
+    scf = new GeodeSQLContextFunctions(sqlContext);
+  }
+
+  public <T> DataFrame geodeOQL(String query) {
+    DataFrame df = scf.geodeOQL(query, scf.defaultConnectionConf());
+    return df;
+  }
+
+  public <T> DataFrame geodeOQL(String query, GeodeConnectionConf connConf) {
+    DataFrame df = scf.geodeOQL(query, connConf);
+    return df;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
 
b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
new file mode 100644
index 0000000..ce6b1ff
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaSparkContextFunctions.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi;
+
+
+import io.pivotal.geode.spark.connector.GeodeConnectionConf;
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD;
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD$;
+import org.apache.spark.SparkContext;
+import static io.pivotal.geode.spark.connector.javaapi.JavaAPIHelper.*;
+
+import scala.reflect.ClassTag;
+import java.util.Properties;
+
+/**
+ * Java API wrapper over {@link org.apache.spark.SparkContext} to provide Geode
+ * Connector functionality.
+ *
+ * <p></p>To obtain an instance of this wrapper, use one of the factory 
methods in {@link
+ * io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil} class.</p>
+ */
+public class GeodeJavaSparkContextFunctions {
+
+  public final SparkContext sc;
+
+  public GeodeJavaSparkContextFunctions(SparkContext sc) {
+    this.sc = sc;
+  }
+
+  /**
+   * Expose a Geode region as a JavaPairRDD
+   * @param regionPath the full path of the region
+   * @param connConf the GeodeConnectionConf that can be used to access the 
region
+   * @param opConf the parameters for this operation, such as preferred 
partitioner.
+   */
+  public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(
+    String regionPath, GeodeConnectionConf connConf, Properties opConf) {
+    ClassTag<K> kt = fakeClassTag();
+    ClassTag<V> vt = fakeClassTag();    
+    GeodeRegionRDD<K, V>  rdd =  GeodeRegionRDD$.MODULE$.apply(
+      sc, regionPath, connConf, propertiesToScalaMap(opConf), kt, vt);
+    return new GeodeJavaRegionRDD<>(rdd);
+  }
+
+  /**
+   * Expose a Geode region as a JavaPairRDD with default GeodeConnector and no 
preferred partitioner.
+   * @param regionPath the full path of the region
+   */
+  public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath) {
+    GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf());
+    return geodeRegion(regionPath, connConf, new Properties());
+  }
+
+  /**
+   * Expose a Geode region as a JavaPairRDD with no preferred partitioner.
+   * @param regionPath the full path of the region
+   * @param connConf the GeodeConnectionConf that can be used to access the 
region
+   */
+  public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, 
GeodeConnectionConf connConf) {
+    return geodeRegion(regionPath, connConf, new Properties());
+  }
+
+  /**
+   * Expose a Geode region as a JavaPairRDD with default GeodeConnector.
+   * @param regionPath the full path of the region
+   * @param opConf the parameters for this operation, such as preferred 
partitioner.
+   */
+  public <K, V> GeodeJavaRegionRDD<K, V> geodeRegion(String regionPath, 
Properties opConf) {
+    GeodeConnectionConf connConf = GeodeConnectionConf.apply(sc.getConf());
+    return geodeRegion(regionPath, connConf, opConf);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java
 
b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java
new file mode 100644
index 0000000..41fe7e5
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/java/io/pivotal/geode/spark/connector/javaapi/GeodeJavaUtil.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.javaapi;
+
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import scala.Tuple2;
+
+import io.pivotal.geode.spark.connector.package$;
+
+/**
+ * The main entry point to Spark Geode Connector Java API.
+ *
+ * There are several helpful static factory methods which build useful wrappers
+ * around Spark Context, Streaming Context and RDD. There are also helper 
methods
+ * to convert JavaRDD<Tuple2<K, V>> to JavaPairRDD<K, V>.
+ */
+public final class GeodeJavaUtil {
+  
+  /** constants */
+  public static String GeodeLocatorPropKey = 
package$.MODULE$.GeodeLocatorPropKey();
+  // partitioner related keys and values
+  public static String PreferredPartitionerPropKey = 
package$.MODULE$.PreferredPartitionerPropKey();
+  public static String NumberPartitionsPerServerPropKey = 
package$.MODULE$.NumberPartitionsPerServerPropKey();
+  public static String OnePartitionPartitionerName = 
package$.MODULE$.OnePartitionPartitionerName();
+  public static String ServerSplitsPartitionerName = 
package$.MODULE$.ServerSplitsPartitionerName();
+  public static String RDDSaveBatchSizePropKey = 
package$.MODULE$.RDDSaveBatchSizePropKey();
+  public static int RDDSaveBatchSizeDefault = 
package$.MODULE$.RDDSaveBatchSizeDefault();
+  
+  /** The private constructor is used prevents user from creating instance of 
this class. */
+  private GeodeJavaUtil() { }
+
+  /**
+   * A static factory method to create a {@link 
GeodeJavaSparkContextFunctions} based
+   * on an existing {@link SparkContext} instance.
+   */
+  public static GeodeJavaSparkContextFunctions javaFunctions(SparkContext sc) {
+    return new GeodeJavaSparkContextFunctions(sc);
+  }
+
+  /**
+   * A static factory method to create a {@link 
GeodeJavaSparkContextFunctions} based
+   * on an existing {@link JavaSparkContext} instance.
+   */
+  public static GeodeJavaSparkContextFunctions javaFunctions(JavaSparkContext 
jsc) {
+    return new 
GeodeJavaSparkContextFunctions(JavaSparkContext.toSparkContext(jsc));
+  }
+
+  /**
+   * A static factory method to create a {@link GeodeJavaPairRDDFunctions} 
based on an
+   * existing {@link org.apache.spark.api.java.JavaPairRDD} instance.
+   */
+  public static <K, V> GeodeJavaPairRDDFunctions<K, V> 
javaFunctions(JavaPairRDD<K, V> rdd) {
+    return new GeodeJavaPairRDDFunctions<K, V>(rdd);
+  }
+
+  /**
+   * A static factory method to create a {@link GeodeJavaRDDFunctions} based 
on an
+   * existing {@link org.apache.spark.api.java.JavaRDD} instance.
+   */
+  public static <T> GeodeJavaRDDFunctions<T> javaFunctions(JavaRDD<T> rdd) {
+    return new GeodeJavaRDDFunctions<T>(rdd);
+  }
+
+  /**
+   * A static factory method to create a {@link GeodeJavaPairDStreamFunctions} 
based on an
+   * existing {@link org.apache.spark.streaming.api.java.JavaPairDStream} 
instance.
+   */
+  public static <K, V> GeodeJavaPairDStreamFunctions<K, V> 
javaFunctions(JavaPairDStream<K, V> ds) {
+    return new GeodeJavaPairDStreamFunctions<>(ds);
+  }
+
+  /**
+   * A static factory method to create a {@link GeodeJavaDStreamFunctions} 
based on an
+   * existing {@link org.apache.spark.streaming.api.java.JavaDStream} instance.
+   */
+  public static <T> GeodeJavaDStreamFunctions<T> javaFunctions(JavaDStream<T> 
ds) {
+    return new GeodeJavaDStreamFunctions<>(ds);
+  }
+
+  /** Convert an instance of {@link 
org.apache.spark.api.java.JavaRDD}&lt;&lt;Tuple2&lt;K, V&gt;&gt;
+   * to a {@link org.apache.spark.api.java.JavaPairRDD}&lt;K, V&gt;.
+   */
+  public static <K, V> JavaPairRDD<K, V> toJavaPairRDD(JavaRDD<Tuple2<K, V>> 
rdd) {
+    return JavaAPIHelper.toJavaPairRDD(rdd);
+  }
+
+  /** Convert an instance of {@link 
org.apache.spark.streaming.api.java.JavaDStream}&lt;&lt;Tuple2&lt;K, V&gt;&gt;
+   * to a {@link org.apache.spark.streaming.api.java.JavaPairDStream}&lt;K, 
V&gt;.
+   */
+  public static <K, V> JavaPairDStream<K, V> 
toJavaPairDStream(JavaDStream<Tuple2<K, V>> ds) {
+    return JavaAPIHelper.toJavaPairDStream(ds);
+  }
+
+  /**
+   * A static factory method to create a {@link GeodeJavaSQLContextFunctions} 
based
+   * on an existing {@link SQLContext} instance.
+   */
+  public static GeodeJavaSQLContextFunctions javaFunctions(SQLContext 
sqlContext) {
+    return new GeodeJavaSQLContextFunctions(sqlContext);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala
new file mode 100644
index 0000000..ff4cd17
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import com.gemstone.gemfire.cache.execute.ResultCollector
+import com.gemstone.gemfire.cache.query.Query
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.geode.spark.connector.internal.RegionMetadata
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartition
+
+
+trait GeodeConnection {
+
+  /**
+   * Validate region existence and key/value type constraints, throw 
RuntimeException
+   * if region does not exist or key and/or value type do(es) not match.
+   * @param regionPath the full path of region
+   */
+  def validateRegion[K, V](regionPath: String): Unit
+
+  /**
+   * Get Region proxy for the given region
+   * @param regionPath the full path of region
+   */
+  def getRegionProxy[K, V](regionPath: String): Region[K, V]
+
+  /**
+   * Retrieve region meta data for the given region. 
+   * @param regionPath: the full path of the region
+   * @return Some[RegionMetadata] if region exists, None otherwise
+   */
+  def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata]
+
+  /** 
+   * Retrieve region data for the given region and bucket set 
+   * @param regionPath: the full path of the region
+   * @param whereClause: the set of bucket IDs
+   * @param split: Geode RDD Partition instance
+   */
+  def getRegionData[K, V](regionPath: String, whereClause: Option[String], 
split: GeodeRDDPartition): Iterator[(K, V)]
+
+  def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: 
String): Object
+  /** 
+   * Create a geode OQL query
+   * @param queryString Geode OQL query string
+   */
+  def getQuery(queryString: String): Query
+
+  /** Close the connection */
+  def close(): Unit
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala
new file mode 100644
index 0000000..38d9e07
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import org.apache.spark.SparkConf
+import 
io.pivotal.geode.spark.connector.internal.{DefaultGeodeConnectionManager, 
LocatorHelper}
+
+/**
+ * Stores configuration of a connection to Geode cluster. It is serializable 
and can
+ * be safely sent over network.
+ *
+ * @param locators Geode locator host:port pairs, the default is 
(localhost,10334)
+ * @param geodeProps The initial geode properties to be used.
+ * @param connectionManager GeodeConnectionFactory instance
+ */
+class GeodeConnectionConf(
+   val locators: Seq[(String, Int)], 
+   val geodeProps: Map[String, String] = Map.empty,
+   connectionManager: GeodeConnectionManager = new 
DefaultGeodeConnectionManager
+  ) extends Serializable {
+
+  /** require at least 1 pair of (host,port) */
+  require(locators.nonEmpty)
+  
+  def getConnection: GeodeConnection = connectionManager.getConnection(this)
+  
+}
+
+object GeodeConnectionConf {
+
+  /**
+   * create GeodeConnectionConf object based on locator string and optional 
GeodeConnectionFactory
+   * @param locatorStr Geode cluster locator string
+   * @param connectionManager GeodeConnection factory
+   */
+  def apply(locatorStr: String, geodeProps: Map[String, String] = Map.empty)
+    (implicit connectionManager: GeodeConnectionManager = new 
DefaultGeodeConnectionManager): GeodeConnectionConf = {
+    new GeodeConnectionConf(LocatorHelper.parseLocatorsString(locatorStr), 
geodeProps, connectionManager)
+  }
+
+  /**
+   * create GeodeConnectionConf object based on SparkConf. Note that implicit 
can
+   * be used to control what GeodeConnectionFactory instance to use if desired
+   * @param conf a SparkConf instance 
+   */
+  def apply(conf: SparkConf): GeodeConnectionConf = {
+    val locatorStr = conf.getOption(GeodeLocatorPropKey).getOrElse(
+      throw new RuntimeException(s"SparkConf does not contain property 
$GeodeLocatorPropKey"))
+    // SparkConf only holds properties whose key starts with "spark.", In 
order to
+    // put geode properties in SparkConf, all geode properties are prefixes 
with
+    // "spark.geode.". This prefix was removed before the properties were put 
in `geodeProp`
+    val prefix = "spark.geode."
+    val geodeProps = conf.getAll.filter {
+        case (k, v) => k.startsWith(prefix) && k != GeodeLocatorPropKey
+      }.map { case (k, v) => (k.substring(prefix.length), v) }.toMap
+    apply(locatorStr, geodeProps)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala
new file mode 100644
index 0000000..bf678f0
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+/**
+ * GeodeConnectionFactory provide an common interface that manages Geode
+ * connections, and it's serializable. Each factory instance will handle
+ * connection instance creation and connection pool management.
+ */
+trait GeodeConnectionManager extends Serializable {
+
+  /** get connection for the given connector */
+  def getConnection(connConf: GeodeConnectionConf): GeodeConnection
+
+  /** close the connection */
+  def closeConnection(connConf: GeodeConnectionConf): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala
new file mode 100644
index 0000000..6e93b05
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import java.io.File
+import java.net.URL
+import org.apache.commons.httpclient.methods.PostMethod
+import org.apache.commons.httpclient.methods.multipart.{FilePart, Part, 
MultipartRequestEntity}
+import org.apache.commons.httpclient.HttpClient
+import org.apache.spark.Logging
+
+object GeodeFunctionDeployer {
+  def main(args: Array[String]) {
+    new GeodeFunctionDeployer(new HttpClient()).commandLineRun(args)
+  }
+}
+
+class GeodeFunctionDeployer(val httpClient:HttpClient) extends Logging {
+
+  def deploy(host: String, port: Int, jarLocation: String): String =
+    deploy(host + ":" + port, jarLocation)
+  
+  def deploy(host: String, port: Int, jar:File): String =
+    deploy(host + ":" + port, jar)
+  
+  def deploy(jmxHostAndPort: String, jarLocation: String): String =
+    deploy(jmxHostAndPort, jarFileHandle(jarLocation))
+  
+  def deploy(jmxHostAndPort: String, jar: File): String = {
+    val urlString = constructURLString(jmxHostAndPort)
+    val filePost: PostMethod = new PostMethod(urlString)
+    val parts: Array[Part] = new Array[Part](1)
+    parts(0) = new FilePart("resources", jar)
+    filePost.setRequestEntity(new MultipartRequestEntity(parts, 
filePost.getParams))
+    val status: Int = httpClient.executeMethod(filePost)
+    "Deployed Jar with status:" + status
+  }
+
+  private[connector] def constructURLString(jmxHostAndPort: String) =
+    "http://"; + jmxHostAndPort + "/gemfire/v1/deployed"
+
+  private[connector]def jarFileHandle(jarLocation: String) = {
+    val f: File = new File(jarLocation)
+    if (!f.exists()) {
+      val errorMessage: String = "Invalid jar file:" + f.getAbsolutePath
+      logInfo(errorMessage)
+      throw new RuntimeException(errorMessage)
+    }
+    f
+  }
+  
+  def commandLineRun(args: Array[String]):Unit = {
+    val (hostPort: String, jarFile: String) =
+    if (args.length < 2) {
+      logInfo("JMX Manager Host and Port (example: localhost:7070):")
+      val bufferedReader = new java.io.BufferedReader(new 
java.io.InputStreamReader(System.in))
+      val jmxHostAndPort = bufferedReader.readLine()
+      logInfo("Location of geode-functions.jar:")
+      val functionJarLocation = bufferedReader.readLine()
+      (jmxHostAndPort, functionJarLocation)
+    } else {
+      (args(0), args(1))
+    }
+    val status = deploy(hostPort, jarFile)
+    logInfo(status)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala
new file mode 100644
index 0000000..8c0aeca
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import com.esotericsoftware.kryo.Kryo
+import io.pivotal.geode.spark.connector.internal.oql.UndefinedSerializer
+import org.apache.spark.serializer.KryoRegistrator
+import com.gemstone.gemfire.cache.query.internal.Undefined
+
+class GeodeKryoRegistrator extends KryoRegistrator{
+
+  override def registerClasses(kyro: Kryo): Unit = {
+    kyro.addDefaultSerializer(classOf[Undefined], classOf[UndefinedSerializer])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala
new file mode 100644
index 0000000..ba5d2df
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import io.pivotal.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, 
GeodeJoinRDD, GeodePairRDDWriter}
+import org.apache.spark.Logging
+import org.apache.spark.api.java.function.Function
+import org.apache.spark.rdd.RDD
+
+/**
+ * Extra gemFire functions on RDDs of (key, value) pairs through an implicit 
conversion.
+ * Import `io.pivotal.geode.spark.connector._` at the top of your program to
+ * use these functions.
+ */
+class GeodePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable 
with Logging {
+
+  /**
+   * Save the RDD of pairs to Geode key-value store without any conversion
+   * @param regionPath the full path of region that the RDD is stored
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @param opConf the optional parameters for this operation
+   */
+  def saveToGeode(
+      regionPath: String, 
+      connConf: GeodeConnectionConf = defaultConnectionConf, 
+      opConf: Map[String, String] = Map.empty): Unit = {    
+    connConf.getConnection.validateRegion[K, V](regionPath)
+    if (log.isDebugEnabled)
+      logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n  
${getRddPartitionsInfo(rdd)}""")
+    else
+      logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
+    val writer = new GeodePairRDDWriter[K, V](regionPath, connConf, opConf)
+    rdd.sparkContext.runJob(rdd, writer.write _)
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in 
`this`
+   * RDD and the Geode `Region[K, V2]`. Each pair of elements will be returned
+   * as a ((k, v), v2) tuple, where (k, v) is in `this` RDD and (k, v2) is in 
the
+   * Geode region.
+   *
+   *@param regionPath the region path of the Geode region
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @tparam K2 the key type of the Geode region
+   * @tparam V2 the value type of the Geode region
+   * @return RDD[T, V]
+   */
+  def joinGeodeRegion[K2 <: K, V2](
+    regionPath: String, connConf: GeodeConnectionConf = 
defaultConnectionConf): GeodeJoinRDD[(K, V), K, V2] = {
+    new GeodeJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in 
`this` RDD
+   * and the Geode `Region[K2, V2]`. The join key from RDD element is 
generated by
+   * `func(K, V) => K2`, and the key from the Geode region is jus the key of 
the
+   * key/value pair.
+   *
+   * Each pair of elements of result RDD will be returned as a ((k, v), v2) 
tuple,
+   * where (k, v) is in `this` RDD and (k2, v2) is in the Geode region.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param func the function that generates region key from RDD element (K, V)
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @tparam K2 the key type of the Geode region
+   * @tparam V2 the value type of the Geode region
+   * @return RDD[(K, V), V2]
+   */
+  def joinGeodeRegion[K2, V2](
+    regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = 
defaultConnectionConf): GeodeJoinRDD[(K, V), K2, V2] =
+    new GeodeJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
+
+  /** This version of joinGeodeRegion(...) is just for Java API. */
+  private[connector] def joinGeodeRegion[K2, V2](
+    regionPath: String, func: Function[(K, V), K2], connConf: 
GeodeConnectionConf): GeodeJoinRDD[(K, V), K2, V2] = {
+    new GeodeJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
+  }
+
+  /**
+   * Perform a left outer join of `this` RDD and the Geode `Region[K, V2]`.
+   * For each element (k, v) in `this` RDD, the resulting RDD will either 
contain
+   * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
+   * ((k, v), None)) if no element in the Geode region have key k.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @tparam K2 the key type of the Geode region
+   * @tparam V2 the value type of the Geode region
+   * @return RDD[ (K, V), Option[V] ]
+   */
+  def outerJoinGeodeRegion[K2 <: K, V2](
+    regionPath: String, connConf: GeodeConnectionConf = 
defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K, V2] = {
+    new GeodeOuterJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
+  }
+
+  /**
+   * Perform a left outer join of `this` RDD and the Geode `Region[K2, V2]`.
+   * The join key from RDD element is generated by `func(K, V) => K2`, and the
+   * key from region is jus the key of the key/value pair.
+   *
+   * For each element (k, v) in `this` RDD, the resulting RDD will either 
contain
+   * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
+   * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
+   *
+   *@param regionPath the region path of the Geode region
+   * @param func the function that generates region key from RDD element (K, V)
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @tparam K2 the key type of the Geode region
+   * @tparam V2 the value type of the Geode region
+   * @return RDD[ (K, V), Option[V] ]
+   */
+  def outerJoinGeodeRegion[K2, V2](
+    regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = 
defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = {
+    new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
+  }
+
+  /** This version of outerJoinGeodeRegion(...) is just for Java API. */
+  private[connector] def outerJoinGeodeRegion[K2, V2](
+    regionPath: String, func: Function[(K, V), K2], connConf: 
GeodeConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = {
+    new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
+  }
+
+  private[connector] def defaultConnectionConf: GeodeConnectionConf =
+    GeodeConnectionConf(rdd.sparkContext.getConf)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala
new file mode 100644
index 0000000..2e5c92a
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import io.pivotal.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, 
GeodeJoinRDD, GeodeRDDWriter}
+import org.apache.spark.Logging
+import org.apache.spark.api.java.function.{PairFunction, Function}
+import org.apache.spark.rdd.RDD
+
+/**
+ * Extra gemFire functions on non-Pair RDDs through an implicit conversion.
+ * Import `io.pivotal.geode.spark.connector._` at the top of your program to 
+ * use these functions.  
+ */
+class GeodeRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging {
+
+  /**
+   * Save the non-pair RDD to Geode key-value store.
+   * @param regionPath the full path of region that the RDD is stored  
+   * @param func the function that converts elements of RDD to key/value pairs
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @param opConf the optional parameters for this operation
+   */
+  def saveToGeode[K, V](
+      regionPath: String, 
+      func: T => (K, V), 
+      connConf: GeodeConnectionConf = defaultConnectionConf,
+      opConf: Map[String, String] = Map.empty): Unit = {
+    connConf.getConnection.validateRegion[K, V](regionPath)
+    if (log.isDebugEnabled)
+      logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n  
${getRddPartitionsInfo(rdd)}""")
+    else
+      logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
+    val writer = new GeodeRDDWriter[T, K, V](regionPath, connConf, opConf)
+    rdd.sparkContext.runJob(rdd, writer.write(func) _)
+  }
+
+  /** This version of saveToGeode(...) is just for Java API. */
+  private[connector] def saveToGeode[K, V](
+      regionPath: String, 
+      func: PairFunction[T, K, V], 
+      connConf: GeodeConnectionConf, 
+      opConf: Map[String, String]): Unit = {
+    saveToGeode[K, V](regionPath, func.call _, connConf, opConf)
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in 
`this` RDD
+   * and the Geode `Region[K, V]`. The join key from RDD element is generated 
by
+   * `func(T) => K`, and the key from the Geode region is just the key of the
+   * key/value pair.
+   *
+   * Each pair of elements of result RDD will be returned as a (t, v) tuple, 
+   * where (t) is in `this` RDD and (k, v) is in the Geode region.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param func the function that generate region key from RDD element T
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @tparam K the key type of the Geode region
+   * @tparam V the value type of the Geode region
+   * @return RDD[T, V]
+   */
+  def joinGeodeRegion[K, V](regionPath: String, func: T => K, 
+    connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[T, K, 
V] = {
+    new GeodeJoinRDD[T, K, V](rdd, func, regionPath, connConf)    
+  }
+
+  /** This version of joinGeodeRegion(...) is just for Java API. */
+  private[connector] def joinGeodeRegion[K, V](
+    regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): 
GeodeJoinRDD[T, K, V] = {
+    joinGeodeRegion(regionPath, func.call _, connConf)
+  }
+
+  /**
+   * Perform a left outer join of `this` RDD and the Geode `Region[K, V]`.
+   * The join key from RDD element is generated by `func(T) => K`, and the
+   * key from region is just the key of the key/value pair.
+   *
+   * For each element (t) in `this` RDD, the resulting RDD will either contain
+   * all pairs (t, Some(v)) for v in the Geode region, or the pair
+   * (t, None) if no element in the Geode region have key `func(t)`
+   *
+   * @param regionPath the region path of the Geode region
+   * @param func the function that generate region key from RDD element T
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @tparam K the key type of the Geode region
+   * @tparam V the value type of the Geode region
+   * @return RDD[ T, Option[V] ]
+   */
+  def outerJoinGeodeRegion[K, V](regionPath: String, func: T => K,
+    connConf: GeodeConnectionConf = defaultConnectionConf): 
GeodeOuterJoinRDD[T, K, V] = {
+    new GeodeOuterJoinRDD[T, K, V](rdd, func, regionPath, connConf)
+  }
+
+  /** This version of outerJoinGeodeRegion(...) is just for Java API. */
+  private[connector] def outerJoinGeodeRegion[K, V](
+    regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): 
GeodeOuterJoinRDD[T, K, V] = {
+    outerJoinGeodeRegion(regionPath, func.call _, connConf)
+  }
+
+  private[connector] def defaultConnectionConf: GeodeConnectionConf =
+    GeodeConnectionConf(rdd.sparkContext.getConf)
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala
new file mode 100644
index 0000000..83aab7a
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import io.pivotal.geode.spark.connector.internal.oql.{OQLRelation, QueryRDD}
+import org.apache.spark.Logging
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * Provide Geode OQL specific functions
+ */
+class GeodeSQLContextFunctions(@transient sqlContext: SQLContext) extends 
Serializable with Logging {
+
+  /**
+   * Expose a Geode OQL query result as a DataFrame
+   * @param query the OQL query string.
+   */
+  def geodeOQL(
+    query: String,
+    connConf: GeodeConnectionConf = 
GeodeConnectionConf(sqlContext.sparkContext.getConf)): DataFrame = {
+    logInfo(s"OQL query = $query")
+    val rdd = new QueryRDD[Object](sqlContext.sparkContext, query, connConf)
+    sqlContext.baseRelationToDataFrame(OQLRelation(rdd)(sqlContext))
+  }
+
+  private[connector] def defaultConnectionConf: GeodeConnectionConf =
+    GeodeConnectionConf(sqlContext.sparkContext.getConf)
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala
new file mode 100644
index 0000000..617cb33
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD
+import org.apache.spark.SparkContext
+
+import scala.reflect.ClassTag
+
+/** Provides Geode specific methods on `SparkContext` */
+class GeodeSparkContextFunctions(@transient sc: SparkContext) extends 
Serializable {
+
+  /**
+   * Expose a Geode region as a GeodeRDD
+   * @param regionPath the full path of the region
+   * @param connConf the GeodeConnectionConf that can be used to access the 
region
+   * @param opConf use this to specify preferred partitioner
+   *        and its parameters. The implementation will use it if it's 
applicable
+   */
+  def geodeRegion[K: ClassTag, V: ClassTag] (
+    regionPath: String, connConf: GeodeConnectionConf = 
GeodeConnectionConf(sc.getConf),
+    opConf: Map[String, String] = Map.empty): GeodeRegionRDD[K, V] =
+    GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala
new file mode 100644
index 0000000..52f9961
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal
+
+import java.net.InetAddress
+
+import com.gemstone.gemfire.cache.client.{ClientCache, ClientCacheFactory, 
ClientRegionShortcut}
+import com.gemstone.gemfire.cache.execute.{FunctionException, FunctionService}
+import com.gemstone.gemfire.cache.query.Query
+import com.gemstone.gemfire.cache.{Region, RegionService}
+import com.gemstone.gemfire.internal.cache.execute.InternalExecution
+import io.pivotal.geode.spark.connector.internal.oql.QueryResultCollector
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartition
+import org.apache.spark.{SparkEnv, Logging}
+import io.pivotal.geode.spark.connector.GeodeConnection
+import io.pivotal.geode.spark.connector.internal.geodefunctions._
+import java.util.{Set => JSet, List => JList }
+
+/**
+ * Default GeodeConnection implementation. The instance of this should be
+ * created by DefaultGeodeConnectionFactory
+ * @param locators pairs of host/port of locators
+ * @param gemFireProps The initial geode properties to be used.
+ */
+private[connector] class DefaultGeodeConnection (
+  locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) 
+  extends GeodeConnection with Logging {
+
+  private val clientCache = initClientCache()
+
+  /** Register Geode functions to the Geode cluster */
+  
FunctionService.registerFunction(RetrieveRegionMetadataFunction.getInstance())
+  FunctionService.registerFunction(RetrieveRegionFunction.getInstance())
+
+  private def initClientCache() : ClientCache = {
+    try {
+      val ccf = getClientCacheFactory
+      ccf.create()
+    } catch {
+      case e: Exception =>
+        logError(s"""Failed to init ClientCache, 
locators=${locators.mkString(",")}, Error: $e""")
+        throw new RuntimeException(e)
+    }
+  }
+  
+  private def getClientCacheFactory: ClientCacheFactory = {
+    import io.pivotal.geode.spark.connector.map2Properties
+    val ccf = new ClientCacheFactory(gemFireProps)
+    ccf.setPoolReadTimeout(30000)
+    val servers = LocatorHelper.getAllGeodeServers(locators)
+    if (servers.isDefined && servers.get.size > 0) {
+      val sparkIp = System.getenv("SPARK_LOCAL_IP")
+      val hostName = if (sparkIp != null) 
InetAddress.getByName(sparkIp).getCanonicalHostName
+                     else InetAddress.getLocalHost.getCanonicalHostName
+      val executorId = SparkEnv.get.executorId      
+      val pickedServers = LocatorHelper.pickPreferredGeodeServers(servers.get, 
hostName, executorId)
+      logInfo(s"""Init ClientCache: severs=${pickedServers.mkString(",")}, 
host=$hostName executor=$executorId props=$gemFireProps""")
+      logDebug(s"""Init ClientCache: 
all-severs=${pickedServers.mkString(",")}""")
+      pickedServers.foreach{ case (host, port)  => ccf.addPoolServer(host, 
port) }
+    } else {
+      logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, 
props=$gemFireProps""")
+      locators.foreach { case (host, port)  => ccf.addPoolLocator(host, port) }
+    }
+    ccf
+  }
+
+  /** close the clientCache */
+  override def close(): Unit =
+    if (! clientCache.isClosed) clientCache.close()
+
+  /** ----------------------------------------- */
+  /** implementation of GeodeConnection trait */
+  /** ----------------------------------------- */
+
+  override def getQuery(queryString: String): Query =
+    
clientCache.asInstanceOf[RegionService].getQueryService.newQuery(queryString)
+
+  override def validateRegion[K, V](regionPath: String): Unit = {
+    val md = getRegionMetadata[K, V](regionPath)
+    if (! md.isDefined) throw new RuntimeException(s"The region named 
$regionPath was not found")
+  }
+
+  def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] = {
+    import scala.collection.JavaConversions.setAsJavaSet
+    val region = getRegionProxy[K, V](regionPath)
+    val set0: JSet[Integer] = Set[Integer](0)
+    val exec = 
FunctionService.onRegion(region).asInstanceOf[InternalExecution].withBucketFilter(set0)
+    exec.setWaitOnExceptionFlag(true)
+    try {
+      val collector = exec.execute(RetrieveRegionMetadataFunction.ID)
+      val r = collector.getResult.asInstanceOf[JList[RegionMetadata]]
+      logDebug(r.get(0).toString)
+      Some(r.get(0))
+    } catch {
+      case e: FunctionException => 
+        if (e.getMessage.contains(s"The region named /$regionPath was not 
found")) None
+        else throw e
+    }
+  }
+
+  def getRegionProxy[K, V](regionPath: String): Region[K, V] = {
+    val region1: Region[K, V] = 
clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]]
+    if (region1 != null) region1
+    else DefaultGeodeConnection.regionLock.synchronized {
+      val region2 = clientCache.getRegion(regionPath).asInstanceOf[Region[K, 
V]]
+      if (region2 != null) region2
+      else clientCache.createClientRegionFactory[K, 
V](ClientRegionShortcut.PROXY).create(regionPath)
+    }
+  }
+
+  override def getRegionData[K, V](regionPath: String, whereClause: 
Option[String], split: GeodeRDDPartition): Iterator[(K, V)] = {
+    val region = getRegionProxy[K, V](regionPath)
+    val desc = s"""RDD($regionPath, "${whereClause.getOrElse("")}", 
${split.index})"""
+    val args : Array[String] = Array[String](whereClause.getOrElse(""), desc)
+    val collector = new StructStreamingResultCollector(desc)
+        // RetrieveRegionResultCollector[(K, V)]
+    import scala.collection.JavaConversions.setAsJavaSet
+    val exec = 
FunctionService.onRegion(region).withArgs(args).withCollector(collector).asInstanceOf[InternalExecution]
+      .withBucketFilter(split.bucketSet.map(Integer.valueOf))
+    exec.setWaitOnExceptionFlag(true)
+    exec.execute(RetrieveRegionFunction.ID)
+    collector.getResult.map{objs: Array[Object] => (objs(0).asInstanceOf[K], 
objs(1).asInstanceOf[V])}
+  }
+
+  override def executeQuery(regionPath: String, bucketSet: Set[Int], 
queryString: String) = {
+    import scala.collection.JavaConversions.setAsJavaSet
+    FunctionService.registerFunction(QueryFunction.getInstance())
+    val collector = new QueryResultCollector
+    val region = getRegionProxy(regionPath)
+    val args: Array[String] = Array[String](queryString, bucketSet.toString)
+    val exec = 
FunctionService.onRegion(region).withCollector(collector).asInstanceOf[InternalExecution]
+      .withBucketFilter(bucketSet.map(Integer.valueOf))
+      .withArgs(args)
+    exec.execute(QueryFunction.ID)
+    collector.getResult
+  }
+}
+
+private[connector] object DefaultGeodeConnection {
+  /** a lock object only used by getRegionProxy...() */
+  private val regionLock = new Object
+}
+
+/** The purpose of this class is making unit test 
DefaultGeodeConnectionManager easier */
+class DefaultGeodeConnectionFactory {
+
+  def newConnection(locators: Seq[(String, Int)], gemFireProps: Map[String, 
String] = Map.empty) =
+    new DefaultGeodeConnection(locators, gemFireProps)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala
new file mode 100644
index 0000000..eb67cda
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal
+
+import io.pivotal.geode.spark.connector.{GeodeConnection, GeodeConnectionConf, 
GeodeConnectionManager}
+
+import scala.collection.mutable
+
+/**
+ * Default implementation of GeodeConnectionFactory
+ */
+class DefaultGeodeConnectionManager extends GeodeConnectionManager {
+
+  def getConnection(connConf: GeodeConnectionConf): GeodeConnection =
+    DefaultGeodeConnectionManager.getConnection(connConf)
+
+  def closeConnection(connConf: GeodeConnectionConf): Unit =
+    DefaultGeodeConnectionManager.closeConnection(connConf)
+
+}
+
+object DefaultGeodeConnectionManager  {
+
+  /** connection cache, keyed by host:port pair */
+  private[connector] val connections = mutable.Map[(String, Int), 
GeodeConnection]()
+
+  /**
+   * use locator host:port pair to lookup cached connection. create new 
connection 
+   * and add it to the cache `connections` if it does not exist.
+   */
+  def getConnection(connConf: GeodeConnectionConf)
+    (implicit factory: DefaultGeodeConnectionFactory = new 
DefaultGeodeConnectionFactory): GeodeConnection = {
+
+    def getCachedConnection(locators: Seq[(String, Int)]): GeodeConnection = {
+      val conns = connConf.locators.map(connections withDefaultValue 
null).filter(_ != null)
+      if (conns.nonEmpty) conns(0) else null
+    }
+
+    val conn1 = getCachedConnection(connConf.locators)
+    if (conn1 != null) conn1
+    else connections.synchronized {
+      val conn2 = getCachedConnection(connConf.locators)
+      if (conn2 != null) conn2
+      else {
+        val conn3 = factory.newConnection(connConf.locators, 
connConf.geodeProps)
+        connConf.locators.foreach(pair => connections += (pair -> conn3))
+        conn3
+      }
+    }
+  }
+
+  /**
+   * Close the connection and remove it from connection cache.
+   * Note: multiple entries may share the same connection, all those entries 
are removed.
+   */
+  def closeConnection(connConf: GeodeConnectionConf): Unit = {
+    val conns = connConf.locators.map(connections withDefaultValue 
null).filter(_ != null)
+    if (conns.nonEmpty) connections.synchronized {
+      conns(0).close()
+      connections.retain((k,v) => v != conns(0))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala
new file mode 100644
index 0000000..71fed52
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal
+
+import java.net.InetSocketAddress
+import java.util.{ArrayList => JArrayList}
+
+import 
com.gemstone.gemfire.cache.client.internal.locator.{GetAllServersResponse, 
GetAllServersRequest}
+import com.gemstone.gemfire.distributed.internal.ServerLocation
+import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient
+import org.apache.spark.Logging
+
+import scala.util.{Failure, Success, Try}
+
+
+object LocatorHelper extends Logging {
+
+  /** valid locator strings are: host[port] and host:port */
+  final val LocatorPattern1 = """([\w-_]+(\.[\w-_]+)*)\[([0-9]{2,5})\]""".r
+  final val LocatorPattern2 = """([\w-_]+(\.[\w-_]+)*):([0-9]{2,5})""".r
+
+  /** convert single locator string to Try[(host, port)] */
+  def locatorStr2HostPortPair(locatorStr: String): Try[(String, Int)] =
+    locatorStr match {
+      case LocatorPattern1(host, domain, port) => Success((host, port.toInt))
+      case LocatorPattern2(host, domain, port) => Success((host, port.toInt))
+      case _ => Failure(new Exception(s"invalid locator: $locatorStr"))
+    }
+
+  /** 
+   * Parse locator strings and returns Seq of (hostname, port) pair. 
+   * Valid locator string are one or more "host[port]" and/or "host:port"
+   * separated by `,`. For example:
+   *    host1.mydomain.com[8888],host2.mydomain.com[8889] 
+   *    host1.mydomain.com:8888,host2.mydomain.com:8889 
+   */
+  def parseLocatorsString(locatorsStr: String): Seq[(String, Int)] =
+    locatorsStr.split(",").map(locatorStr2HostPortPair).map(_.get)
+
+
+  /**
+   * Return the list of live Geode servers for the given locators.
+   * @param locators locators for the given Geode cluster
+   * @param serverGroup optional server group name, default is "" (empty 
string)
+   */
+  def getAllGeodeServers(locators: Seq[(String, Int)], serverGroup: String = 
""): Option[Seq[(String, Int)]] = {
+    var result: Option[Seq[(String, Int)]] = None
+    locators.find { case (host, port) =>
+      try {
+        val addr = new InetSocketAddress(host, port)
+        val req = new GetAllServersRequest(serverGroup)
+        val res = TcpClient.requestToServer(addr.getAddress, addr.getPort, 
req, 2000)
+        if (res != null) {
+          import scala.collection.JavaConverters._
+          val servers = 
res.asInstanceOf[GetAllServersResponse].getServers.asInstanceOf[JArrayList[ServerLocation]]
+          if (servers.size > 0)
+            result = Some(servers.asScala.map(e => (e.getHostName, e.getPort)))
+        }
+      } catch { case e: Exception => logWarning("getAllGeodeServers error", e)
+      }
+      result.isDefined
+    }
+    result
+  }
+
+  /**
+   * Pick up at most 3 preferred servers from all available servers based on
+   * host name and Spark executor id.
+   *
+   * This method is used by DefaultGeodeConnection to create ClientCache. 
Usually
+   * one server is enough to initialize ClientCacheFactory, but this provides 
two
+   * backup servers in case of the 1st server can't be connected.
+   *   
+   * @param servers all available servers in the form of (hostname, port) pairs
+   * @param hostName the host name of the Spark executor
+   * @param executorId the Spark executor Id, such as "<driver>", "0", "1", ...
+   * @return Seq[(hostname, port)] of preferred servers
+   */
+  def pickPreferredGeodeServers(
+    servers: Seq[(String, Int)], hostName: String, executorId: String): 
Seq[(String, Int)] = {
+
+    // pick up `length` items form the Seq starts at the `start` position.
+    //  The Seq is treated as a ring, so at most `Seq.size` items can be picked
+    def circularTake[T](seq: Seq[T], start: Int, length: Int): Seq[T] = {
+      val size = math.min(seq.size, length)
+      (start until start + size).map(x => seq(x % seq.size))
+    }
+
+    // map executor id to int: "<driver>" (or non-number string) to 0, and "n" 
to n + 1
+    val id = try { executorId.toInt + 1 } catch { case e: 
NumberFormatException => 0 }
+    
+    // algorithm: 
+    // 1. sort server list
+    // 2. split sorted server list into 3 sub-lists a, b, and c:
+    //      list-a: servers on the given host
+    //      list-b: servers that are in front of list-a on the sorted server 
list
+    //      list-c: servers that are behind list-a on the sorted server list
+    //    then rotate list-a based on executor id, then create new server list:
+    //      modified list-a ++ list-c ++ list-b
+    // 3. if there's no server on the given host, then create new server list
+    //    by rotating sorted server list based on executor id.
+    // 4. take up to 3 servers from the new server list
+    val sortedServers = servers.sorted
+    val firstIdx = sortedServers.indexWhere(p => p._1 == hostName)
+    val lastIdx = if (firstIdx < 0) -1 else sortedServers.lastIndexWhere(p => 
p._1 == hostName)
+
+    if (firstIdx < 0) { // no local server
+      circularTake(sortedServers, id, 3)
+    } else {
+      val (seq1, seq2) = sortedServers.splitAt(firstIdx)
+      val seq = if (firstIdx == lastIdx) {  // one local server
+        seq2 ++ seq1
+      } else { // multiple local server
+        val (seq3, seq4) = seq2.splitAt(lastIdx - firstIdx + 1)
+        val seq3b = if (id % seq3.size == 0) seq3 else circularTake(seq3, id, 
seq3.size)
+        seq3b ++ seq4 ++ seq1
+      }
+      circularTake(seq, 0, 3)
+    }
+  }  
+}

Reply via email to