http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java
 
b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java
new file mode 100644
index 0000000..bb75c7a
--- /dev/null
+++ 
b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/JavaApiIntegrationTest.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.gemfire.spark.connector;
+
+import com.gemstone.gemfire.cache.Region;
+import io.pivotal.gemfire.spark.connector.GemFireConnection;
+import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
+import io.pivotal.gemfire.spark.connector.GemFireConnectionConf$;
+import 
io.pivotal.gemfire.spark.connector.internal.DefaultGemFireConnectionManager$;
+import io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaRegionRDD;
+import ittest.io.pivotal.gemfire.spark.connector.testkit.GemFireCluster$;
+import ittest.io.pivotal.gemfire.spark.connector.testkit.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.scalatest.junit.JUnitSuite;
+import io.pivotal.gemfire.spark.connector.package$;
+import scala.Tuple2;
+import scala.Option;
+import scala.Some;
+
+import java.util.*;
+
+import static 
io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.RDDSaveBatchSizePropKey;
+import static 
io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.javaFunctions;
+import static org.junit.Assert.*;
+
+public class JavaApiIntegrationTest extends JUnitSuite {
+
+  static JavaSparkContext jsc = null;
+  static GemFireConnectionConf connConf = null;
+  
+  static int numServers = 2;
+  static int numObjects = 1000;
+  static String regionPath = "pr_str_int_region";
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // start gemfire cluster, and spark context
+    Properties settings = new Properties();
+    settings.setProperty("cache-xml-file", 
"src/it/resources/test-retrieve-regions.xml");
+    settings.setProperty("num-of-servers", Integer.toString(numServers));
+    int locatorPort = GemFireCluster$.MODULE$.start(settings);
+
+    // start spark context in local mode
+    Properties props = new Properties();
+    props.put("log4j.logger.org.apache.spark", "INFO");
+    props.put("log4j.logger.io.pivotal.gemfire.spark.connector","DEBUG");
+    IOUtils.configTestLog4j("ERROR", props);
+    SparkConf conf = new SparkConf()
+            .setAppName("RetrieveRegionIntegrationTest")
+            .setMaster("local[2]")
+            .set(package$.MODULE$.GemFireLocatorPropKey(), "localhost:"+ 
locatorPort);
+    // sc = new SparkContext(conf);
+    jsc = new JavaSparkContext(conf);
+    connConf = GemFireConnectionConf.apply(jsc.getConf());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    // stop connection, spark context, and gemfire cluster
+    
DefaultGemFireConnectionManager$.MODULE$.closeConnection(GemFireConnectionConf$.MODULE$.apply(jsc.getConf()));
+    jsc.stop();
+    GemFireCluster$.MODULE$.stop();
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   utility methods
+  // 
--------------------------------------------------------------------------------------------
+
+  private <K,V> void matchMapAndPairList(Map<K,V> map, List<Tuple2<K,V>> list) 
{
+    assertTrue("size mismatch \nmap: " + map.toString() + "\nlist: " + 
list.toString(), map.size() == list.size());
+    for (Tuple2<K, V> p : list) {
+      assertTrue("value mismatch: k=" + p._1() + " v1=" + p._2() + " v2=" + 
map.get(p._1()),
+                 p._2().equals(map.get(p._1())));
+    }
+  }
+
+  private Region<String, Integer> prepareStrIntRegion(String regionPath, int 
start, int stop) {
+    HashMap<String, Integer> entriesMap = new HashMap<>();
+    for (int i = start; i < stop; i ++) {
+      entriesMap.put("k_" + i, i);
+    }
+
+    GemFireConnection conn = connConf.getConnection();
+    Region<String, Integer> region = conn.getRegionProxy(regionPath);
+    region.removeAll(region.keySetOnServer());
+    region.putAll(entriesMap);
+    return region;
+  }
+
+  private JavaPairRDD<String, Integer> prepareStrIntJavaPairRDD(int start, int 
stop) {
+    List<Tuple2<String, Integer>> data = new ArrayList<>();
+    for (int i = start; i < stop; i ++) {
+      data.add(new Tuple2<>("k_" + i, i));
+    }
+    return jsc.parallelizePairs(data);
+  }
+
+  private JavaPairRDD<Integer, Integer> prepareIntIntJavaPairRDD(int start, 
int stop) {
+    List<Tuple2<Integer, Integer>> data = new ArrayList<>();
+    for (int i = start; i < stop; i ++) {
+      data.add(new Tuple2<>(i, i * 2));
+    }
+    return jsc.parallelizePairs(data);
+  }
+
+  private JavaRDD<Integer> prepareIntJavaRDD(int start, int stop) {
+    List<Integer> data = new ArrayList<>();
+    for (int i = start; i < stop; i ++) {
+      data.add(i);
+    }
+    return jsc.parallelize(data);
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   JavaRDD.saveToGemfire 
+  // 
--------------------------------------------------------------------------------------------
+
+  static class IntToStrIntPairFunction implements PairFunction<Integer, 
String, Integer> {
+    @Override public Tuple2<String, Integer> call(Integer x) throws Exception {
+      return new Tuple2<>("k_" + x, x);
+    }
+  }
+
+  @Test
+  public void testRDDSaveToGemfireWithDefaultConnConfAndOpConf() throws 
Exception {
+    verifyRDDSaveToGemfire(true, true);
+  }
+
+  @Test
+  public void testRDDSaveToGemfireWithDefaultConnConf() throws Exception {
+    verifyRDDSaveToGemfire(true, false);
+  }
+  
+  @Test
+  public void testRDDSaveToGemfireWithConnConfAndOpConf() throws Exception {
+    verifyRDDSaveToGemfire(false, true);
+  }
+
+  @Test
+  public void testRDDSaveToGemfireWithConnConf() throws Exception {
+    verifyRDDSaveToGemfire(false, false);
+  }
+  
+  public void verifyRDDSaveToGemfire(boolean useDefaultConnConf, boolean 
useOpConf) throws Exception {
+    Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0);  
// remove all entries
+    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(0, numObjects);
+
+    PairFunction<Integer, String, Integer> func = new 
IntToStrIntPairFunction();
+    Properties opConf = new Properties();
+    opConf.put(RDDSaveBatchSizePropKey, "200");
+
+    if (useDefaultConnConf) {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGemfire(regionPath, func, opConf);
+      else
+        javaFunctions(rdd1).saveToGemfire(regionPath, func);
+    } else {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGemfire(regionPath, func, connConf, opConf);
+      else
+        javaFunctions(rdd1).saveToGemfire(regionPath, func, connConf);
+    }
+    
+    Set<String> keys = region.keySetOnServer();
+    Map<String, Integer> map = region.getAll(keys);
+
+    List<Tuple2<String, Integer>> expectedList = new ArrayList<>();
+
+    for (int i = 0; i < numObjects; i ++) {
+      expectedList.add(new Tuple2<>("k_" + i, i));
+    }
+    matchMapAndPairList(map, expectedList);
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   JavaPairRDD.saveToGemfire
+  // 
--------------------------------------------------------------------------------------------
+
+  @Test
+  public void testPairRDDSaveToGemfireWithDefaultConnConfAndOpConf() throws 
Exception {
+    verifyPairRDDSaveToGemfire(true, true);
+  }
+
+  @Test
+  public void testPairRDDSaveToGemfireWithDefaultConnConf() throws Exception {
+    verifyPairRDDSaveToGemfire(true, false);
+  }
+  
+  @Test
+  public void testPairRDDSaveToGemfireWithConnConfAndOpConf() throws Exception 
{
+    verifyPairRDDSaveToGemfire(false, true);
+  }
+
+  @Test
+  public void testPairRDDSaveToGemfireWithConnConf() throws Exception {
+    verifyPairRDDSaveToGemfire(false, false);
+  }
+  
+  public void verifyPairRDDSaveToGemfire(boolean useDefaultConnConf, boolean 
useOpConf) throws Exception {
+    Region<String, Integer> region = prepareStrIntRegion(regionPath, 0, 0);  
// remove all entries
+    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(0, 
numObjects);
+    Properties opConf = new Properties();
+    opConf.put(RDDSaveBatchSizePropKey, "200");
+
+    if (useDefaultConnConf) {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGemfire(regionPath, opConf);
+      else
+        javaFunctions(rdd1).saveToGemfire(regionPath);
+    } else {
+      if (useOpConf)
+        javaFunctions(rdd1).saveToGemfire(regionPath, connConf, opConf);
+      else
+        javaFunctions(rdd1).saveToGemfire(regionPath, connConf);
+    }
+
+    Set<String> keys = region.keySetOnServer();
+    Map<String, Integer> map = region.getAll(keys);
+
+    List<Tuple2<String, Integer>> expectedList = new ArrayList<>();
+    for (int i = 0; i < numObjects; i ++) {
+      expectedList.add(new Tuple2<>("k_" + i, i));
+    }
+    matchMapAndPairList(map, expectedList);
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   JavaSparkContext.gemfireRegion and where clause
+  // 
--------------------------------------------------------------------------------------------
+
+  @Test
+  public void testJavaSparkContextGemfireRegion() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);  // remove all entries
+    Properties emptyProps = new Properties();
+    GemFireJavaRegionRDD<String, Integer> rdd1 = 
javaFunctions(jsc).gemfireRegion(regionPath);
+    GemFireJavaRegionRDD<String, Integer> rdd2 = 
javaFunctions(jsc).gemfireRegion(regionPath, emptyProps);
+    GemFireJavaRegionRDD<String, Integer> rdd3 = 
javaFunctions(jsc).gemfireRegion(regionPath, connConf);
+    GemFireJavaRegionRDD<String, Integer> rdd4 = 
javaFunctions(jsc).gemfireRegion(regionPath, connConf, emptyProps);
+    GemFireJavaRegionRDD<String, Integer> rdd5 = rdd1.where("value.intValue() 
< 50");
+
+    HashMap<String, Integer> expectedMap = new HashMap<>();
+    for (int i = 0; i < numObjects; i ++) {
+      expectedMap.put("k_" + i, i);
+    }
+
+    matchMapAndPairList(expectedMap, rdd1.collect());
+    matchMapAndPairList(expectedMap, rdd2.collect());
+    matchMapAndPairList(expectedMap, rdd3.collect());
+    matchMapAndPairList(expectedMap, rdd4.collect());
+
+    HashMap<String, Integer> expectedMap2 = new HashMap<>();
+    for (int i = 0; i < 50; i ++) {
+      expectedMap2.put("k_" + i, i);
+    }
+
+    matchMapAndPairList(expectedMap2, rdd5.collect());
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   JavaPairRDD.joinGemfireRegion
+  // 
--------------------------------------------------------------------------------------------
+
+  @Test
+  public void testPairRDDJoinWithSameKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10);
+
+    JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2a = 
javaFunctions(rdd1).joinGemfireRegion(regionPath);
+    JavaPairRDD<Tuple2<String, Integer>, Integer> rdd2b = 
javaFunctions(rdd1).joinGemfireRegion(regionPath, connConf);
+    // System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
+
+    HashMap<Tuple2<String, Integer>, Integer> expectedMap = new HashMap<>();
+    for (int i = 0; i < 10; i ++) {
+      expectedMap.put(new Tuple2<>("k_" + i, i), i);
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  static class IntIntPairToStrKeyFunction implements Function<Tuple2<Integer, 
Integer>, String> {
+    @Override public String call(Tuple2<Integer, Integer> pair) throws 
Exception {
+      return "k_" + pair._1();
+    }
+  }
+
+  @Test
+  public void testPairRDDJoinWithDiffKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10);
+    Function<Tuple2<Integer, Integer>, String> func = new 
IntIntPairToStrKeyFunction();
+
+    JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2a = 
javaFunctions(rdd1).joinGemfireRegion(regionPath, func);
+    JavaPairRDD<Tuple2<Integer, Integer>, Integer> rdd2b = 
javaFunctions(rdd1).joinGemfireRegion(regionPath, func, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
+
+    HashMap<Tuple2<Integer, Integer>, Integer> expectedMap = new HashMap<>();
+    for (int i = 0; i < 10; i ++) {
+      expectedMap.put(new Tuple2<>(i, i * 2), i);
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   JavaPairRDD.outerJoinGemfireRegion
+  // 
--------------------------------------------------------------------------------------------
+
+  @Test
+  public void testPairRDDOuterJoinWithSameKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaPairRDD<String, Integer> rdd1 = prepareStrIntJavaPairRDD(-5, 10);
+
+    JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2a = 
javaFunctions(rdd1).outerJoinGemfireRegion(regionPath);
+    JavaPairRDD<Tuple2<String, Integer>, Option<Integer>> rdd2b = 
javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
+
+    HashMap<Tuple2<String, Integer>, Option<Integer>> expectedMap = new 
HashMap<>();
+    for (int i = -5; i < 10; i ++) {
+      if (i < 0)
+        expectedMap.put(new Tuple2<>("k_" + i, i), Option.apply((Integer) 
null));
+      else
+        expectedMap.put(new Tuple2<>("k_" + i, i), Some.apply(i));
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  @Test
+  public void testPairRDDOuterJoinWithDiffKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaPairRDD<Integer, Integer> rdd1 = prepareIntIntJavaPairRDD(-5, 10);
+    Function<Tuple2<Integer, Integer>, String> func = new 
IntIntPairToStrKeyFunction();
+
+    JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2a = 
javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, func);
+    JavaPairRDD<Tuple2<Integer, Integer>, Option<Integer>> rdd2b = 
javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, func, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
+
+    HashMap<Tuple2<Integer, Integer>, Option<Integer>> expectedMap = new 
HashMap<>();
+    for (int i = -5; i < 10; i ++) {
+      if (i < 0)
+        expectedMap.put(new Tuple2<>(i, i * 2), Option.apply((Integer) null));
+      else
+        expectedMap.put(new Tuple2<>(i, i * 2), Some.apply(i));
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   JavaRDD.joinGemfireRegion
+  // 
--------------------------------------------------------------------------------------------
+
+  static class IntToStrKeyFunction implements Function<Integer, String> {
+    @Override public String call(Integer x) throws Exception {
+      return "k_" + x;
+    }
+  }
+
+  @Test
+  public void testRDDJoinWithSameKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10);
+
+    Function<Integer, String> func = new IntToStrKeyFunction();
+    JavaPairRDD<Integer, Integer> rdd2a = 
javaFunctions(rdd1).joinGemfireRegion(regionPath, func);
+    JavaPairRDD<Integer, Integer> rdd2b = 
javaFunctions(rdd1).joinGemfireRegion(regionPath, func, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
+
+    HashMap<Integer, Integer> expectedMap = new HashMap<>();
+    for (int i = 0; i < 10; i ++) {
+      expectedMap.put(i, i);
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //   JavaRDD.outerJoinGemfireRegion
+  // 
--------------------------------------------------------------------------------------------
+
+  @Test
+  public void testRDDOuterJoinWithSameKeyType() throws Exception {
+    prepareStrIntRegion(regionPath, 0, numObjects);
+    JavaRDD<Integer> rdd1 = prepareIntJavaRDD(-5, 10);
+
+    Function<Integer, String> func = new IntToStrKeyFunction();
+    JavaPairRDD<Integer, Option<Integer>> rdd2a = 
javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, func);
+    JavaPairRDD<Integer, Option<Integer>> rdd2b = 
javaFunctions(rdd1).outerJoinGemfireRegion(regionPath, func, connConf);
+    //System.out.println("=== Result RDD =======\n" + rdd2a.collect() + 
"\n=========================");
+
+    HashMap<Integer, Option<Integer>> expectedMap = new HashMap<>();
+    for (int i = -5; i < 10; i ++) {
+      if (i < 0)
+        expectedMap.put(i, Option.apply((Integer) null));
+      else
+        expectedMap.put(i, Some.apply(i));
+    }
+    matchMapAndPairList(expectedMap, rdd2a.collect());
+    matchMapAndPairList(expectedMap, rdd2b.collect());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Portfolio.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Portfolio.java
 
b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Portfolio.java
new file mode 100644
index 0000000..5fa03c6
--- /dev/null
+++ 
b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Portfolio.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.gemfire.spark.connector;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+import com.gemstone.gemfire.cache.Declarable;
+
+/**
+ * A stock portfolio that consists of multiple {@link Position} objects that
+ * represent shares of stock (a "security").  Instances of
+ * <code>Portfolio</code> can be stored in a GemFire <code>Region</code> and
+ * their contents can be queried using the GemFire query service.
+ * </p>
+ * This class is <code>Serializable</code> because we want it to be distributed
+ * to multiple members of a distributed system.  Because this class is
+ * <code>Declarable</code>, we can describe instances of it in a GemFire
+ * <code>cache.xml</code> file.
+ * </p>
+ *
+ */
+public class Portfolio implements Declarable, Serializable {
+
+  private static final long serialVersionUID = 9097335119586059309L;
+
+  private int id;  /* id is used as the entry key and is stored in the entry */
+  private String type;
+  private Map<String,Position> positions = new 
LinkedHashMap<String,Position>();
+  private String status;
+
+  public Portfolio(Properties props) {
+    init(props);
+  }
+
+  @Override
+  public void init(Properties props) {
+    this.id = Integer.parseInt(props.getProperty("id"));
+    this.type = props.getProperty("type", "type1");
+    this.status = props.getProperty("status", "active");
+
+    // get the positions. These are stored in the properties object
+    // as Positions, not String, so use Hashtable protocol to get at them.
+    // the keys are named "positionN", where N is an integer.
+    for (Map.Entry<Object, Object> entry: props.entrySet()) {
+      String key = (String)entry.getKey();
+      if (key.startsWith("position")) {
+        Position pos = (Position)entry.getValue();
+        this.positions.put(pos.getSecId(), pos);
+      }
+    }
+  }
+
+  public void setType(String t) {this.type = t; }
+
+  public String getStatus(){
+    return status;
+  }
+
+  public int getId(){
+    return this.id;
+  }
+
+  public Map<String,Position> getPositions(){
+    return this.positions;
+  }
+
+  public String getType() {
+    return this.type;
+  }
+
+  public boolean isActive(){
+    return status.equals("active");
+  }
+
+  @Override
+  public String toString(){
+    StringBuilder buf = new StringBuilder();
+    buf.append("\n\tPortfolio [id=" + this.id + " status=" + this.status);
+    buf.append(" type=" + this.type);
+    boolean firstTime = true;
+    for (Map.Entry<String, Position> entry: positions.entrySet()) {
+      if (!firstTime) {
+        buf.append(", ");
+      }
+      buf.append("\n\t\t");
+      buf.append(entry.getKey() + ":" + entry.getValue());
+      firstTime = false;
+    }
+    buf.append("]");
+    return buf.toString();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Position.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Position.java
 
b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Position.java
new file mode 100644
index 0000000..b8e8be9
--- /dev/null
+++ 
b/geode-spark-connector/gemfire-spark-connector/src/it/java/ittest/io/pivotal/gemfire/spark/connector/Position.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.gemfire.spark.connector;
+
+import java.io.Serializable;
+import java.util.Properties;
+import com.gemstone.gemfire.cache.Declarable;
+
+/**
+ * Represents a number of shares of a stock ("security") held in a {@link
+ * Portfolio}.
+ * </p>
+ * This class is <code>Serializable</code> because we want it to be distributed
+ * to multiple members of a distributed system.  Because this class is
+ * <code>Declarable</code>, we can describe instances of it in a GemFire
+ * <code>cache.xml</code> file.
+ * </p>
+ *
+ */
+public class Position implements Declarable, Serializable {
+
+  private static final long serialVersionUID = -8229531542107983344L;
+
+  private String secId;
+  private double qty;
+  private double mktValue;
+
+  public Position(Properties props) {
+    init(props);
+  }
+
+  @Override
+  public void init(Properties props) {
+    this.secId = props.getProperty("secId");
+    this.qty = Double.parseDouble(props.getProperty("qty"));
+    this.mktValue = Double.parseDouble(props.getProperty("mktValue"));
+  }
+
+  public String getSecId(){
+    return this.secId;
+  }
+
+  public double getQty(){
+    return this.qty;
+  }
+
+  public double getMktValue() {
+    return this.mktValue;
+  }
+
+  @Override
+  public String toString(){
+    return new StringBuilder()
+            .append("Position [secId=").append(secId)
+            .append(" qty=").append(this.qty)
+            .append(" mktValue=").append(mktValue).append("]").toString();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-regions.xml
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-regions.xml
 
b/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-regions.xml
new file mode 100644
index 0000000..79893d6
--- /dev/null
+++ 
b/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-regions.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE cache PUBLIC
+  "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN"
+  "http://www.gemstone.com/dtd/cache6_5.dtd"; >
+
+<cache>
+  <!-- test region for OQL test -->
+  <region name="obj_obj_region" refid="PARTITION_REDUNDANT" />
+
+  <region name="obj_obj_rep_region" refid="REPLICATE" />
+
+  <region name="str_int_region" refid="PARTITION_REDUNDANT">
+    <region-attributes>
+      <key-constraint>java.lang.String</key-constraint>
+      <value-constraint>java.lang.Integer</value-constraint>
+    </region-attributes>
+  </region>
+
+  <region name="str_str_region" refid="PARTITION_REDUNDANT">
+    <region-attributes>
+      <key-constraint>java.lang.String</key-constraint>
+      <value-constraint>java.lang.String</value-constraint>
+    </region-attributes>
+  </region>
+
+  <region name="str_str_rep_region" refid="REPLICATE">
+    <region-attributes>
+      <key-constraint>java.lang.String</key-constraint>
+      <value-constraint>java.lang.String</value-constraint>
+    </region-attributes>
+  </region>
+</cache>

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-retrieve-regions.xml
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-retrieve-regions.xml
 
b/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-retrieve-regions.xml
new file mode 100644
index 0000000..3023959
--- /dev/null
+++ 
b/geode-spark-connector/gemfire-spark-connector/src/it/resources/test-retrieve-regions.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE cache PUBLIC
+  "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN"
+  "http://www.gemstone.com/dtd/cache6_5.dtd"; >
+
+<cache>
+  <!-- combinations of key, value types with region types -->
+  <region name="pr_r_obj_obj_region" refid="PARTITION_REDUNDANT" />
+  <region name="pr_obj_obj_region" refid="PARTITION" />
+  <region name="rr_obj_obj_region" refid="REPLICATE" />
+  <region name="rr_p_obj_obj_region" refid="REPLICATE_PERSISTENT" />
+
+  <region name="pr_r_str_int_region" refid="PARTITION_REDUNDANT">
+    <region-attributes>
+      <key-constraint>java.lang.String</key-constraint>
+      <value-constraint>java.lang.Integer</value-constraint>
+    </region-attributes>
+  </region>
+  
+  <region name="pr_str_int_region" refid="PARTITION">
+    <region-attributes>
+      <key-constraint>java.lang.String</key-constraint>
+      <value-constraint>java.lang.Integer</value-constraint>
+    </region-attributes>
+  </region>
+
+  <region name="rr_str_int_region" refid="REPLICATE">
+    <region-attributes>
+      <key-constraint>java.lang.String</key-constraint>
+      <value-constraint>java.lang.Integer</value-constraint>
+    </region-attributes>
+  </region>
+  
+  <region name="rr_p_str_int_region" refid="REPLICATE_PERSISTENT">
+    <region-attributes>
+      <key-constraint>java.lang.String</key-constraint>
+      <value-constraint>java.lang.Integer</value-constraint>
+    </region-attributes>
+  </region>
+</cache>

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/BasicIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/BasicIntegrationTest.scala
 
b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/BasicIntegrationTest.scala
new file mode 100644
index 0000000..10c7eaf
--- /dev/null
+++ 
b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/BasicIntegrationTest.scala
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.gemfire.spark.connector
+
+import java.util.Properties
+import com.gemstone.gemfire.cache.query.QueryService
+import com.gemstone.gemfire.cache.query.internal.StructImpl
+import io.pivotal.gemfire.spark.connector._
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.gemfire.spark.connector.internal.{RegionMetadata, 
DefaultGemFireConnectionManager}
+import io.pivotal.gemfire.spark.connector.internal.oql.{RDDConverter, QueryRDD}
+import ittest.io.pivotal.gemfire.spark.connector.testkit.GemFireCluster
+import ittest.io.pivotal.gemfire.spark.connector.testkit.IOUtils
+import org.apache.spark.streaming.{Seconds, StreamingContext, TestInputDStream}
+import org.apache.spark.{SparkContext, SparkConf}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+import scala.collection.JavaConversions
+import scala.reflect.ClassTag
+
+case class Number(str: String, len: Int)
+
+class BasicIntegrationTest extends FunSuite with Matchers with 
BeforeAndAfterAll with GemFireCluster {
+
+  var sc: SparkContext = null
+
+  override def beforeAll() {
+    // start gemfire cluster, and spark context
+    val settings = new Properties()
+    settings.setProperty("cache-xml-file", "src/it/resources/test-regions.xml")
+    settings.setProperty("num-of-servers", "2")
+    val locatorPort = GemFireCluster.start(settings)
+
+    // start spark context in local mode
+    IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
+                            "log4j.logger.io.pivotal.gemfire.spark.connector" 
-> "DEBUG")
+    val conf = new SparkConf()
+      .setAppName("BasicIntegrationTest")
+      .setMaster("local[2]")
+      .set("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
+      .set(GemFireLocatorPropKey, s"localhost[$locatorPort]")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.kryo.registrator", 
"io.pivotal.gemfire.spark.connector.GemFireKryoRegistrator")
+
+    sc = new SparkContext(conf)
+  }
+
+  override def afterAll() {
+    // stop connection, spark context, and gemfire cluster
+    
DefaultGemFireConnectionManager.closeConnection(GemFireConnectionConf(sc.getConf))
+    sc.stop()
+    GemFireCluster.stop()
+  }
+
+  //Convert Map[Object, Object] to java.util.Properties
+  private def map2Props(map: Map[Object, Object]): java.util.Properties =
+    (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); 
props}
+
+  // ===========================================================
+  //       DefaultGemFireConnection functional tests
+  // ===========================================================
+
+  test("DefaultGemFireConnection.validateRegion()") {
+    val conn = GemFireConnectionConf(sc.getConf).getConnection
+
+    // normal exist-region
+    var regionPath: String = "str_str_region"
+    conn.validateRegion[String, String](regionPath)
+
+    // non-exist region
+    regionPath = "non_exist_region"
+    try {
+      conn.validateRegion[String, String](regionPath)
+      fail("validateRegion failed to catch non-exist region error")
+    } catch {
+      case e: RuntimeException => 
+        if (! e.getMessage.contains(s"The region named $regionPath was not 
found"))
+          fail("validateRegion gives wrong exception on non-exist region", e)
+      case e: Throwable => 
+        fail("validateRegion gives wrong exception on non-exist region", e)
+    }
+
+    // Note: currently, can't catch type mismatch error
+    conn.validateRegion[String, Integer]("str_str_region")
+  }
+
+  test("DefaultGemFireConnection.getRegionMetadata()") {
+    val conn = GemFireConnectionConf(sc.getConf).getConnection
+
+    // exist region
+    validateRegionMetadata(conn, "obj_obj_region", true, 113, null, null, 
false)
+    validateRegionMetadata(conn, "str_int_region", true, 113, 
"java.lang.String", "java.lang.Integer", false)
+    validateRegionMetadata(conn, "str_str_rep_region", false, 0, 
"java.lang.String", "java.lang.String", true)
+
+    // non-exist region
+    assert(! conn.getRegionMetadata("no_exist_region").isDefined)
+  }
+    
+  def validateRegionMetadata(
+    conn: GemFireConnection, regionPath: String, partitioned: Boolean, 
buckets: Int,
+    keyType: String, valueType: String, emptyMap: Boolean): Unit = {
+
+    val mdOption = conn.getRegionMetadata(regionPath)
+    val md = mdOption.get
+   
+    assert(md.getRegionPath == s"/$regionPath")
+    assert(md.isPartitioned == partitioned)
+    assert(md.getKeyTypeName == keyType)
+    assert(md.getValueTypeName == valueType)
+    assert(md.getTotalBuckets == buckets)
+    if (emptyMap) assert(md.getServerBucketMap == null) 
+    else assert(md.getServerBucketMap != null)
+  }
+
+  test("DefaultGemFireConnection.getRegionProxy()") {
+    val conn = GemFireConnectionConf(sc.getConf).getConnection
+
+    val region1 = conn.getRegionProxy[String, String]("str_str_region")
+    region1.put("1", "One")
+    assert(region1.get("1") == "One")
+    region1.remove("1")
+    assert(region1.get("1") == null)
+    
+    // getRegionProxy doesn't fail when region doesn't exist
+    val region2 = conn.getRegionProxy[String, String]("non_exist_region")
+    try {
+      region2.put("1", "One")
+      fail("getRegionProxy failed to catch non-exist region error")
+    } catch {
+      case e: Exception =>
+        if (e.getCause == null || ! e.getCause.getMessage.contains(s"Region 
named /non_exist_region was not found")) {
+          e.printStackTrace()
+          fail("validateRegion gives wrong exception on non-exist region", e)
+        }
+    }
+  }
+  
+  // Note: DefaultGemFireConnecton.getQuery() and getRegionData() are covered 
by 
+  //       RetrieveRegionIntegrationTest.scala and following OQL tests.
+  
+  // ===========================================================
+  //                OQL functional tests
+  // ===========================================================
+  
+  private def initRegion(regionName: String): Unit = {
+
+    //Populate some data in the region
+    val conn = GemFireConnectionConf(sc.getConf).getConnection
+    val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
+    rgn.removeAll(rgn.keySetOnServer())
+
+    //This will call the implicit conversion map2Properties in connector 
package object, since it is Map[String, String]
+    var position1 = new Position(Map("secId" -> "SUN", "qty" -> "34000", 
"mktValue" -> "24.42"))
+    var position2 = new Position(Map("secId" -> "IBM", "qty" -> "8765", 
"mktValue" -> "34.29"))
+    val portfolio1 = new Portfolio(map2Props(Map("id" ->"1", "type" -> 
"type1", "status" -> "active",
+      "position1" -> position1, "position2" -> position2)))
+    rgn.put("1", portfolio1)
+
+    position1 = new Position(Map("secId" -> "YHOO", "qty" -> "9834", 
"mktValue" -> "12.925"))
+    position2 = new Position(Map("secId" -> "GOOG", "qty" -> "12176", 
"mktValue" -> "21.972"))
+    val portfolio2 = new Portfolio(map2Props(Map("id" -> "2", "type" -> 
"type2", "status" -> "inactive",
+      "position1" -> position1, "position2" -> position2)))
+    rgn.put("2", portfolio2)
+
+    position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", 
"mktValue" -> "23.32"))
+    position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" 
-> "40.373"))
+    val portfolio3 = new Portfolio(map2Props(Map("id" -> "3", "type" -> 
"type3", "status" -> "active",
+      "position1" -> position1, "position2" -> position2)))
+    rgn.put("3", portfolio3)
+
+    position1 = new Position(Map("secId" -> "APPL", "qty" -> "67", "mktValue" 
-> "67.356572"))
+    position2 = new Position(Map("secId" -> "ORCL", "qty" -> "376", "mktValue" 
-> "101.34"))
+    val portfolio4 = new Portfolio(map2Props(Map("id" -> "4", "type" -> 
"type1", "status" -> "inactive",
+      "position1" -> position1, "position2" -> position2)))
+    rgn.put("4", portfolio4)
+
+    position1 = new Position(Map("secId" -> "SAP", "qty" -> "90", "mktValue" 
-> "67.356572"))
+    position2 = new Position(Map("secId" -> "DELL", "qty" -> "376", "mktValue" 
-> "101.34"))
+    val portfolio5 = new Portfolio(map2Props(Map("id" -> "5", "type" -> 
"type2", "status" -> "active",
+      "position1" -> position1, "position2" -> position2)))
+    rgn.put("5", portfolio5)
+
+    position1 = new Position(Map("secId" -> "RHAT", "qty" -> "90", "mktValue" 
-> "67.356572"))
+    position2 = new Position(Map("secId" -> "NOVL", "qty" -> "376", "mktValue" 
-> "101.34"))
+    val portfolio6 = new Portfolio(map2Props(Map("id" -> "6", "type" -> 
"type3", "status" -> "inactive",
+      "position1" -> position1, "position2" -> position2)))
+    rgn.put("6", portfolio6)
+
+    position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", 
"mktValue" -> "23.32"))
+    position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" 
-> "40.373"))
+    val portfolio7 = new Portfolio(map2Props(Map("id" -> "7", "type" -> 
"type4", "status" -> "active",
+      "position1" -> position1, "position2" -> position2)))
+    //Not using null, due to intermittent query failure on column containing 
null, likely a Spark SQL bug
+    //portfolio7.setType(null)
+    rgn.put("7", portfolio7)
+  }
+
+  private def getQueryRDD[T: ClassTag](
+    query: String, connConf: GemFireConnectionConf = 
GemFireConnectionConf(sc.getConf)): QueryRDD[T] =
+      new QueryRDD[T](sc, query, connConf)
+
+  test("Run GemFire OQL query and convert the returned QueryRDD to DataFrame: 
Partitioned Region") {
+    simpleQuery("obj_obj_region")
+  }
+
+  test("Run GemFire OQL query and convert the returned QueryRDD to DataFrame: 
Replicated Region") {
+    simpleQuery("obj_obj_rep_region")
+  }
+
+  private def simpleQuery(regionName: String) {
+    //Populate some data in the region
+    val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
+    val conn = connConf.getConnection
+    val rgn: Region[String, String] = conn.getRegionProxy(regionName)
+    rgn.removeAll(rgn.keySetOnServer())
+    rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", 
"3" -> "three")))
+
+    //Create QueryRDD using OQL
+    val OQLResult: QueryRDD[String] = getQueryRDD[String](s"select * from 
/$regionName")
+
+    //verify the QueryRDD
+    val oqlRS: Array[String] = OQLResult.collect()
+    oqlRS should have length 3
+    oqlRS should contain theSameElementsAs List("one", "two", "three")
+
+    //Convert QueryRDD to DataFrame
+    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+    // this is used to implicitly convert an RDD to a DataFrame.
+    import sqlContext.implicits._
+    val dataFrame = OQLResult.map(x => Number(x, x.length)).toDF()
+    //Register dataFrame as a table of two columns of type String and Int 
respectively
+    dataFrame.registerTempTable("numberTable")
+
+    //Issue SQL query against the table
+    val SQLResult = sqlContext.sql("SELECT * FROM numberTable")
+    //Verify the SQL query result, r(0) mean column 0
+    val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect()
+    sqlRS should have length 3
+    sqlRS should contain theSameElementsAs List("one", "two", "three")
+
+    //Convert QueryRDD to DataFrame using RDDConverter
+    val dataFrame2 = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
+    //Register dataFrame2 as a table of two columns of type String and Int 
respectively
+    dataFrame2.registerTempTable("numberTable2")
+
+    //Issue SQL query against the table
+    val SQLResult2 = sqlContext.sql("SELECT * FROM numberTable2")
+    //Verify the SQL query result, r(0) mean column 0
+    val sqlRS2: Array[Any] = SQLResult2.map(r => r(0)).collect()
+    sqlRS2 should have length 3
+    sqlRS2 should contain theSameElementsAs List("one", "two", "three")
+
+    //Remove the region entries, because other tests might use the same region 
as well
+    List("1", "2", "3").foreach(rgn.remove)
+  }
+
+  test("Run GemFire OQL query and directly return DataFrame: Partitioned 
Region") {
+    simpleQueryDataFrame("obj_obj_region")
+  }
+
+  test("Run GemFire OQL query and directly return DataFrame: Replicated 
Region") {
+    simpleQueryDataFrame("obj_obj_rep_region")
+  }
+
+  private def simpleQueryDataFrame(regionName: String) {
+    //Populate some data in the region
+    val conn = GemFireConnectionConf(sc.getConf).getConnection
+    val rgn: Region[String, String] = conn.getRegionProxy(regionName)
+    rgn.removeAll(rgn.keySetOnServer())
+    rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", 
"3" -> "three")))
+
+    //Create DataFrame using GemFire OQL
+    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+    val dataFrame = sqlContext.gemfireOQL(s"select * from /$regionName")
+    dataFrame.registerTempTable("numberTable")
+
+    //Issue SQL query against the table
+    val SQLResult = sqlContext.sql("SELECT * FROM numberTable")
+    //Verify the SQL query result, r(0) mean column 0
+    val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect()
+    sqlRS should have length 3
+    sqlRS should contain theSameElementsAs List("one", "two", "three")
+
+    //Remove the region entries, because other tests might use the same region 
as well
+    List("1", "2", "3").foreach(rgn.remove)
+  }
+
+  test("GemFire OQL query with UDT: Partitioned Region") {
+    queryUDT("obj_obj_region")
+  }
+
+  test("GemFire OQL query with UDT: Replicated Region") {
+    queryUDT("obj_obj_rep_region")
+  }
+
+  private def queryUDT(regionName: String) {
+
+    //Populate some data in the region
+    val conn = GemFireConnectionConf(sc.getConf).getConnection
+    val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
+    rgn.removeAll(rgn.keySetOnServer())
+    val e1: Employee = new Employee("hello", 123)
+    val e2: Employee = new Employee("world", 456)
+    rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2)))
+
+    //Create QueryRDD using OQL
+    val OQLResult: QueryRDD[Object] = getQueryRDD(s"select name, age from 
/$regionName")
+
+    //verify the QueryRDD
+    val oqlRS: Array[Object] = OQLResult.collect()
+    oqlRS should have length 2
+    oqlRS.map(e => e.asInstanceOf[StructImpl].getFieldValues.apply(1)) should 
contain theSameElementsAs List(123, 456)
+
+    //Convert QueryRDD to DataFrame
+    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+    //Convert QueryRDD to DataFrame using RDDConverter
+    val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
+    dataFrame.registerTempTable("employee")
+    val SQLResult = sqlContext.sql("SELECT * FROM employee")
+
+    //Verify the SQL query result
+    val sqlRS = SQLResult.map(r => r(0)).collect()
+    sqlRS should have length 2
+    sqlRS should contain theSameElementsAs List("hello", "world")
+
+    List("1", "2").foreach(rgn.remove)
+  }
+
+  test("GemFire OQL query with UDT and directly return DataFrame: Partitioned 
Region") {
+    queryUDTDataFrame("obj_obj_region")
+  }
+
+  test("GemFire OQL query with UDT and directly return DataFrame: Replicated 
Region") {
+    queryUDTDataFrame("obj_obj_rep_region")
+  }
+
+  private def queryUDTDataFrame(regionName: String) {
+    //Populate some data in the region
+    val conn = GemFireConnectionConf(sc.getConf).getConnection
+    val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
+    rgn.removeAll(rgn.keySetOnServer())
+    val e1: Employee = new Employee("hello", 123)
+    val e2: Employee = new Employee("world", 456)
+    rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2)))
+
+    //Create DataFrame using GemFire OQL
+    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+    val dataFrame = sqlContext.gemfireOQL(s"select name, age from 
/$regionName")
+
+    dataFrame.registerTempTable("employee")
+    val SQLResult = sqlContext.sql("SELECT * FROM employee")
+
+    //Verify the SQL query result
+    val sqlRS = SQLResult.map(r => r(0)).collect()
+    sqlRS should have length 2
+    sqlRS should contain theSameElementsAs List("hello", "world")
+
+    List("1", "2").foreach(rgn.remove)
+  }
+
+  test("GemFire OQL query with more complex UDT: Partitioned Region") {
+    complexUDT("obj_obj_region")
+  }
+
+  test("GemFire OQL query with more complex UDT: Replicated Region") {
+    complexUDT("obj_obj_rep_region")
+  }
+
+  private def complexUDT(regionName: String) {
+
+    initRegion(regionName)
+
+    //Create QueryRDD using OQL
+    val OQLResult: QueryRDD[Object] = getQueryRDD(s"SELECT DISTINCT * FROM 
/$regionName WHERE status = 'active'")
+
+    //verify the QueryRDD
+    val oqlRS: Array[Int] = OQLResult.collect().map(r => 
r.asInstanceOf[Portfolio].getId)
+    oqlRS should contain theSameElementsAs List(1, 3, 5, 7)
+
+    //Convert QueryRDD to DataFrame
+    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+    //Convert QueryRDD to DataFrame using RDDConverter
+    val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
+
+    dataFrame.registerTempTable("Portfolio")
+
+    val SQLResult = sqlContext.sql("SELECT * FROM Portfolio")
+
+    //Verify the SQL query result
+    val sqlRS = SQLResult.collect().map(r => 
r(0).asInstanceOf[Portfolio].getType)
+    sqlRS should contain theSameElementsAs List("type1", "type2", "type3", 
"type4")
+  }
+
+  test("GemFire OQL query with more complex UDT and directly return DataFrame: 
Partitioned Region") {
+    complexUDTDataFrame("obj_obj_region")
+  }
+
+  test("GemFire OQL query with more complex UDT and directly return DataFrame: 
Replicated Region") {
+    complexUDTDataFrame("obj_obj_rep_region")
+  }
+
+  private def complexUDTDataFrame(regionName: String) {
+
+    initRegion(regionName)
+
+    //Create DataFrame using GemFire OQL
+    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+    val dataFrame = sqlContext.gemfireOQL(s"SELECT DISTINCT * FROM 
/$regionName WHERE status = 'active'")
+    dataFrame.registerTempTable("Portfolio")
+
+    val SQLResult = sqlContext.sql("SELECT * FROM Portfolio")
+
+    //Verify the SQL query result
+    val sqlRS = SQLResult.collect().map(r => 
r(0).asInstanceOf[Portfolio].getType)
+    sqlRS should contain theSameElementsAs List("type1", "type2", "type3", 
"type4")
+  }
+
+  test("GemFire OQL query with more complex UDT with Projection: Partitioned 
Region") {
+    queryComplexUDTProjection("obj_obj_region")
+  }
+
+  test("GemFire OQL query with more complex UDT with Projection: Replicated 
Region") {
+    queryComplexUDTProjection("obj_obj_rep_region")
+  }
+
+  private def queryComplexUDTProjection(regionName: String) {
+
+    initRegion(regionName)
+
+    //Create QueryRDD using OQL
+    val OQLResult: QueryRDD[Object] = getQueryRDD[Object](s"""SELECT id, 
"type", positions, status FROM /$regionName WHERE status = 'active'""")
+
+    //verify the QueryRDD
+    val oqlRS: Array[Int] = OQLResult.collect().map(si => 
si.asInstanceOf[StructImpl].getFieldValues.apply(0).asInstanceOf[Int])
+    oqlRS should contain theSameElementsAs List(1, 3, 5, 7)
+
+    //Convert QueryRDD to DataFrame
+    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+    //Convert QueryRDD to DataFrame using RDDConverter
+    val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
+
+    dataFrame.registerTempTable("Portfolio")
+
+    val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type 
= 'type3'")
+
+    //Verify the SQL query result
+    val sqlRS = SQLResult.collect().map(r => r(0))
+    sqlRS should contain theSameElementsAs List(3)
+  }
+
+  test("GemFire OQL query with more complex UDT with Projection and directly 
return DataFrame: Partitioned Region") {
+    queryComplexUDTProjectionDataFrame("obj_obj_region")
+  }
+
+  test("GemFire OQL query with more complex UDT with Projection and directly 
return DataFrame: Replicated Region") {
+    queryComplexUDTProjectionDataFrame("obj_obj_rep_region")
+  }
+
+  private def queryComplexUDTProjectionDataFrame(regionName: String) {
+
+    initRegion(regionName)
+
+    //Create DataFrame using GemFire OQL
+    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+    val dataFrame = sqlContext.gemfireOQL(s"""SELECT id, "type", positions, 
status FROM /$regionName WHERE status = 'active'""")
+    dataFrame.registerTempTable("Portfolio")
+
+    val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type 
= 'type3'")
+
+    //Verify the SQL query result
+    val sqlRS = SQLResult.collect().map(r => r(0))
+    sqlRS should contain theSameElementsAs List(3)
+  }
+
+  test("GemFire OQL query with more complex UDT with nested Projection and 
directly return DataFrame: Partitioned Region") {
+    queryComplexUDTNestProjectionDataFrame("obj_obj_region")
+  }
+
+  test("GemFire OQL query with more complex UDT with nested Projection and 
directly return DataFrame: Replicated Region") {
+    queryComplexUDTNestProjectionDataFrame("obj_obj_rep_region")
+  }
+
+  private def queryComplexUDTNestProjectionDataFrame(regionName: String) {
+
+    initRegion(regionName)
+
+    //Create DataFrame using GemFire OQL
+    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+    val dataFrame = sqlContext.gemfireOQL(s"""SELECT r.id, r."type", 
r.positions, r.status FROM /$regionName r, r.positions.values f WHERE r.status 
= 'active' and f.secId = 'MSFT'""")
+    dataFrame.registerTempTable("Portfolio")
+
+    val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type 
= 'type3'")
+
+    //Verify the SQL query result
+    val sqlRS = SQLResult.collect().map(r => r(0))
+    sqlRS should contain theSameElementsAs List(3)
+  }
+
+  test("Undefined instance deserialization: Partitioned Region") {
+    undefinedInstanceDeserialization("obj_obj_region")
+  }
+
+  test("Undefined instance deserialization: Replicated Region") {
+    undefinedInstanceDeserialization("obj_obj_rep_region")
+  }
+
+  private def undefinedInstanceDeserialization(regionName: String) {
+
+    val conn = GemFireConnectionConf(sc.getConf).getConnection
+    val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
+    rgn.removeAll(rgn.keySetOnServer())
+
+    //Put some new data
+    rgn.put("1", "one")
+
+    //Query some non-existent columns, which should return UNDEFINED
+    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+    val dataFrame = sqlContext.gemfireOQL(s"SELECT col100, col200 FROM 
/$regionName")
+    val col1 = dataFrame.first().apply(0)
+    val col2 = dataFrame.first().apply(1)
+    assert(col1 == QueryService.UNDEFINED)
+    assert(col2 == QueryService.UNDEFINED)
+    //Verify that col1 and col2 refer to the same Undefined object
+    assert(col1.asInstanceOf[AnyRef] eq col2.asInstanceOf[AnyRef])
+  }
+
+  test("RDD.saveToGemFire") {
+    val regionName = "str_str_region"
+    // generate: Vector((1,11), (2,22), (3,33), (4,44), (5,55), (6,66))
+    val data = (1 to 6).map(_.toString).map(e=> (e, e*2))
+    val rdd = sc.parallelize(data)
+    rdd.saveToGemfire(regionName)
+
+    // verify
+    val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
+    val region: Region[String, String] = 
connConf.getConnection.getRegionProxy(regionName)
+    println("region key set on server: " + region.keySetOnServer())
+    assert((1 to 6).map(_.toString).toSet == 
JavaConversions.asScalaSet(region.keySetOnServer()))
+    (1 to 6).map(_.toString).foreach(e => assert(e*2 == region.get(e)))
+  }
+
+  // ===========================================================
+  //        DStream.saveToGemfire() functional tests
+  // ===========================================================
+
+  test("Basic DStream test") {
+    import 
org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, 
StreamingListener}
+    import io.pivotal.gemfire.spark.connector.streaming._
+    import org.apache.spark.streaming.ManualClockHelper
+
+    class TestStreamListener extends StreamingListener {
+      var count = 0
+      override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = 
count += 1
+    }
+
+    def batchDuration = Seconds(1)
+    val ssc = new StreamingContext(sc, batchDuration)
+    val input = Seq(1 to 4, 5 to 8, 9 to 12)
+    val dstream = new TestInputDStream(ssc, input, 2)
+    dstream.saveToGemfire[String, Int]("str_int_region", (e: Int) => 
(e.toString, e))
+    try {
+      val listener = new TestStreamListener
+      ssc.addStreamingListener(listener)
+      ssc.start()
+      ManualClockHelper.addToTime(ssc, batchDuration.milliseconds * 
input.length)
+      while (listener.count < input.length) ssc.awaitTerminationOrTimeout(50)
+    } catch {
+      case e: Exception => e.printStackTrace(); throw e
+//    } finally {
+//      ssc.stop()
+    }
+
+    val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
+    val conn = connConf.getConnection
+    val region: Region[String, Int] = conn.getRegionProxy("str_int_region")
+
+    // verify gemfire region contents
+    println("region key set on server: " + region.keySetOnServer())
+    assert((1 to 12).map(_.toString).toSet == 
JavaConversions.asScalaSet(region.keySetOnServer()))
+    (1 to 12).foreach(e => assert(e == region.get(e.toString)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RDDJoinRegionIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RDDJoinRegionIntegrationTest.scala
 
b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RDDJoinRegionIntegrationTest.scala
new file mode 100644
index 0000000..c286491
--- /dev/null
+++ 
b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RDDJoinRegionIntegrationTest.scala
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.gemfire.spark.connector
+
+import java.util.Properties
+
+import io.pivotal.gemfire.spark.connector._
+import com.gemstone.gemfire.cache.Region
+import 
io.pivotal.gemfire.spark.connector.internal.DefaultGemFireConnectionManager
+import ittest.io.pivotal.gemfire.spark.connector.testkit.GemFireCluster
+import ittest.io.pivotal.gemfire.spark.connector.testkit.IOUtils
+import org.apache.spark.{SparkContext, SparkConf}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+import java.util.{HashMap => JHashMap}
+
+class RDDJoinRegionIntegrationTest extends FunSuite with Matchers with 
BeforeAndAfterAll with GemFireCluster {
+
+  var sc: SparkContext = null
+  val numServers = 3
+  val numObjects = 1000
+
+  override def beforeAll() {
+    // start gemfire cluster, and spark context
+    val settings = new Properties()
+    settings.setProperty("cache-xml-file", 
"src/it/resources/test-retrieve-regions.xml")
+    settings.setProperty("num-of-servers", numServers.toString)
+    val locatorPort = GemFireCluster.start(settings)
+
+    // start spark context in local mode
+    IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
+      "log4j.logger.io.pivotal.gemfire.spark.connector" -> "DEBUG")
+    val conf = new SparkConf()
+      .setAppName("RDDJoinRegionIntegrationTest")
+      .setMaster("local[2]")
+      .set(GemFireLocatorPropKey, s"localhost[$locatorPort]")
+    sc = new SparkContext(conf)
+  }
+
+  override def afterAll() {
+    // stop connection, spark context, and gemfire cluster
+    
DefaultGemFireConnectionManager.closeConnection(GemFireConnectionConf(sc.getConf))
+    sc.stop()
+    GemFireCluster.stop()
+  }
+
+//  def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = {
+//    assert(map1.size == map2.size)
+//    map1.foreach(e => {
+//      assert(map2.contains(e._1))
+//      assert (e._2 == map2.get(e._1).get)
+//    })
+//  }
+  
+  // 
--------------------------------------------------------------------------------------------
 
+  // PairRDD.joinGemfireRegion[K2 <: K, V2](regionPath, connConf): 
GemFireJoinRDD[(K, V), K, V2]  
+  // 
--------------------------------------------------------------------------------------------
 
+
+  test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K, V2], replicated 
region", JoinTest) {
+    verifyPairRDDJoinRegionWithSameKeyType("rr_str_int_region")
+  }
+
+  test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K, V2], partitioned 
region", JoinTest) {
+    verifyPairRDDJoinRegionWithSameKeyType("pr_str_int_region")
+  }
+
+  test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K, V2], partitioned 
redundant region", JoinTest) {
+    verifyPairRDDJoinRegionWithSameKeyType("pr_r_str_int_region")
+  }
+
+  def verifyPairRDDJoinRegionWithSameKeyType(regionPath: String): Unit = {
+    val entriesMap: JHashMap[String, Int] = new JHashMap()
+    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
+
+    val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
+    val conn = connConf.getConnection
+    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
+    rgn.removeAll(rgn.keySetOnServer())
+    rgn.putAll(entriesMap)
+
+    val data = (-5 until 50).map(x => ("k_" + x, x*2))
+    val rdd = sc.parallelize(data)
+
+    val rdd2 = rdd.joinGemfireRegion[String, Int](regionPath, connConf)
+    val rdd2Content = rdd2.collect()
+
+    val expectedMap = (0 until 50).map(i => ((s"k_$i", i*2), i)).toMap
+    // matchMaps[(String, Int), Int](expectedMap, rdd2Content.toMap)
+    assert(expectedMap == rdd2Content.toMap)
+  }
+
+  // 
------------------------------------------------------------------------------------------------------
+  // PairRDD.joinGemfireRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): 
GemFireJoinRDD[(K, V), K2, V2]
+  // 
-------------------------------------------------------------------------------------------------------
+
+  test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K2, V2], replicated 
region", JoinTest) {
+    verifyPairRDDJoinRegionWithDiffKeyType("rr_str_int_region")
+  }
+
+  test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K2, V2], partitioned 
region", JoinTest) {
+    verifyPairRDDJoinRegionWithDiffKeyType("pr_str_int_region")
+  }
+
+  test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K2, V2], partitioned 
redundant region", JoinTest) {
+    verifyPairRDDJoinRegionWithDiffKeyType("pr_r_str_int_region")
+  }
+
+  def verifyPairRDDJoinRegionWithDiffKeyType(regionPath: String): Unit = {
+    val entriesMap: JHashMap[String, Int] = new JHashMap()
+    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
+
+    val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
+    val conn = connConf.getConnection
+    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
+    rgn.removeAll(rgn.keySetOnServer())
+    rgn.putAll(entriesMap)
+
+    val data = (-5 until 50).map(x => (x, x*2))
+    val rdd = sc.parallelize(data)
+
+    val func :((Int, Int)) => String = pair => s"k_${pair._1}"
+
+    val rdd2 = rdd.joinGemfireRegion[String, Int](regionPath, func /*, 
connConf*/)
+    val rdd2Content = rdd2.collect()
+
+    val expectedMap = (0 until 50).map(i => ((i, i*2), i)).toMap
+    // matchMaps[(Int, Int), Int](expectedMap, rdd2Content.toMap)
+    assert(expectedMap == rdd2Content.toMap)
+  }
+
+  // 
------------------------------------------------------------------------------------------------
 
+  // PairRDD.outerJoinGemfireRegion[K2 <: K, V2](regionPath, connConf): 
GemFireJoinRDD[(K, V), K, V2]  
+  // 
------------------------------------------------------------------------------------------------
 
+
+  test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K, V2], 
replicated region", OuterJoinTest) {
+    verifyPairRDDOuterJoinRegionWithSameKeyType("rr_str_int_region")
+  }
+
+  test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K, V2], 
partitioned region", OuterJoinTest) {
+    verifyPairRDDOuterJoinRegionWithSameKeyType("pr_str_int_region")
+  }
+
+  test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K, V2], 
partitioned redundant region", OuterJoinTest) {
+    verifyPairRDDOuterJoinRegionWithSameKeyType("pr_r_str_int_region")
+  }
+
+  def verifyPairRDDOuterJoinRegionWithSameKeyType(regionPath: String): Unit = {
+    val entriesMap: JHashMap[String, Int] = new JHashMap()
+    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
+
+    val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
+    val conn = connConf.getConnection
+    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
+    rgn.removeAll(rgn.keySetOnServer())
+    rgn.putAll(entriesMap)
+
+    val data = (-5 until 50).map(x => ("k_" + x, x*2))
+    val rdd = sc.parallelize(data)
+
+    val rdd2 = rdd.outerJoinGemfireRegion[String, Int](regionPath /*, 
connConf*/)
+    val rdd2Content = rdd2.collect()
+
+    val expectedMap = (-5 until 50).map {
+      i => if (i < 0) ((s"k_$i", i * 2), None)
+      else ((s"k_$i", i*2), Some(i))}.toMap
+    // matchMaps[(String, Int), Option[Int]](expectedMap, rdd2Content.toMap)
+    assert(expectedMap == rdd2Content.toMap)
+  }
+
+  // 
------------------------------------------------------------------------------------------------------
+  // PairRDD.joinGemfireRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): 
GemFireJoinRDD[(K, V), K2, V2]
+  // 
-------------------------------------------------------------------------------------------------------
+
+  test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K2, V2], 
replicated region", OuterJoinTest) {
+    verifyPairRDDOuterJoinRegionWithDiffKeyType("rr_str_int_region")
+  }
+
+  test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K2, V2], 
partitioned region", OuterJoinTest) {
+    verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_str_int_region")
+  }
+
+  test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K2, V2], 
partitioned redundant region", OuterJoinTest) {
+    verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_r_str_int_region")
+  }
+
+  def verifyPairRDDOuterJoinRegionWithDiffKeyType(regionPath: String): Unit = {
+    val entriesMap: JHashMap[String, Int] = new JHashMap()
+    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
+
+    val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
+    val conn = connConf.getConnection
+    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
+    rgn.removeAll(rgn.keySetOnServer())
+    rgn.putAll(entriesMap)
+
+    val data = (-5 until 50).map(x => (x, x*2))
+    val rdd = sc.parallelize(data)
+
+    val func :((Int, Int)) => String = pair => s"k_${pair._1}"
+
+    val rdd2 = rdd.outerJoinGemfireRegion[String, Int](regionPath, func, 
connConf)
+    val rdd2Content = rdd2.collect()
+
+    val expectedMap = (-5 until 50).map {
+      i => if (i < 0) ((i, i * 2), None)
+      else ((i, i*2), Some(i))}.toMap
+    // matchMaps[(Int, Int), Option[Int]](expectedMap, rdd2Content.toMap)
+    assert(expectedMap == rdd2Content.toMap)
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  // RDD.joinGemfireRegion[K, V](regionPath, T => K,  connConf): 
GemFireJoinRDD[T, K, V]
+  // 
--------------------------------------------------------------------------------------------
+
+  test("RDD.joinGemFireRegion: RDD[T] with Region[K, V], replicated region", 
JoinTest) {
+    verifyRDDJoinRegion("rr_str_int_region")
+  }
+
+  test("RDD.joinGemFireRegion: RDD[T] with Region[K, V], partitioned region", 
JoinTest) {
+    verifyRDDJoinRegion("pr_str_int_region")
+  }
+
+  test("RDD.joinGemFireRegion: RDD[T] with Region[K, V], partitioned redundant 
region", JoinTest) {
+    verifyRDDJoinRegion("pr_r_str_int_region")
+  }
+
+  def verifyRDDJoinRegion(regionPath: String): Unit = {
+    val entriesMap: JHashMap[String, Int] = new JHashMap()
+    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
+
+    val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
+    val conn = connConf.getConnection
+    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
+    rgn.removeAll(rgn.keySetOnServer())
+    rgn.putAll(entriesMap)
+
+    val data = (-5 until 50).map(x => s"k_$x")
+    val rdd = sc.parallelize(data)
+
+    val rdd2 = rdd.joinGemfireRegion[String, Int](regionPath, x => x, connConf)
+    val rdd2Content = rdd2.collect()
+
+    val expectedMap = (0 until 50).map(i => (s"k_$i", i)).toMap
+    // matchMaps[String, Int](expectedMap, rdd2Content.toMap)
+    assert(expectedMap == rdd2Content.toMap)
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  // RDD.outerJoinGemfireRegion[K, V](regionPath, T => K, connConf): 
GemFireJoinRDD[T, K, V]
+  // 
--------------------------------------------------------------------------------------------
+
+  test("RDD.outerJoinGemFireRegion: RDD[T] with Region[K, V], replicated 
region", OnlyTest) {
+    verifyRDDOuterJoinRegion("rr_str_int_region")
+  }
+
+  test("RDD.outerJoinGemFireRegion: RDD[T] with Region[K, V], partitioned 
region", OnlyTest) {
+    verifyRDDOuterJoinRegion("pr_str_int_region")
+  }
+
+  test("RDD.outerJoinGemFireRegion: RDD[T] with Region[K, V], partitioned 
redundant region", OnlyTest) {
+    verifyRDDOuterJoinRegion("pr_r_str_int_region")
+  }
+
+  def verifyRDDOuterJoinRegion(regionPath: String): Unit = {
+    val entriesMap: JHashMap[String, Int] = new JHashMap()
+    (0 until numObjects).map(i => entriesMap.put("k_" + i, i))
+
+    val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
+    val conn = connConf.getConnection
+    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
+    rgn.removeAll(rgn.keySetOnServer())
+    rgn.putAll(entriesMap)
+
+    val data = (-5 until 50).map(x => s"k_$x")
+    val rdd = sc.parallelize(data)
+
+    val rdd2 = rdd.outerJoinGemfireRegion[String, Int](regionPath, x => x /*, 
connConf */)
+    val rdd2Content = rdd2.collect()
+
+    val expectedMap = (-5 until 50).map {
+      i => if (i < 0) (s"k_$i", None)
+           else (s"k_$i", Some(i))}.toMap
+    // matchMaps[String, Option[Int]](expectedMap, rdd2Content.toMap)
+    assert(expectedMap == rdd2Content.toMap)
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RetrieveRegionIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RetrieveRegionIntegrationTest.scala
 
b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RetrieveRegionIntegrationTest.scala
new file mode 100644
index 0000000..0ab8110
--- /dev/null
+++ 
b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/RetrieveRegionIntegrationTest.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.gemfire.spark.connector
+
+import java.util.Properties
+
+import io.pivotal.gemfire.spark.connector._
+import com.gemstone.gemfire.cache.Region
+import 
io.pivotal.gemfire.spark.connector.internal.DefaultGemFireConnectionManager
+import ittest.io.pivotal.gemfire.spark.connector.testkit.GemFireCluster
+import ittest.io.pivotal.gemfire.spark.connector.testkit.IOUtils
+import org.apache.spark.{SparkContext, SparkConf}
+import org.scalatest.{Tag, BeforeAndAfterAll, FunSuite, Matchers}
+import java.util.{HashMap => JHashMap}
+
+
+class RetrieveRegionIntegrationTest extends FunSuite with Matchers with 
BeforeAndAfterAll with GemFireCluster {
+
+  var sc: SparkContext = null
+  val numServers = 4
+  val numObjects = 1000
+
+  override def beforeAll() {
+    // start gemfire cluster, and spark context
+    val settings = new Properties()
+    settings.setProperty("cache-xml-file", 
"src/it/resources/test-retrieve-regions.xml")
+    settings.setProperty("num-of-servers", numServers.toString)
+    val locatorPort = GemFireCluster.start(settings)
+
+    // start spark context in local mode
+    IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
+                            "log4j.logger.io.pivotal.gemfire.spark.connector" 
-> "DEBUG")
+    val conf = new SparkConf()
+      .setAppName("RetrieveRegionIntegrationTest")
+      .setMaster("local[2]")
+      .set(GemFireLocatorPropKey, s"localhost[$locatorPort]")
+    sc = new SparkContext(conf)
+  }
+
+  override def afterAll() {
+    // stop connection, spark context, and gemfire cluster
+    
DefaultGemFireConnectionManager.closeConnection(GemFireConnectionConf(sc.getConf))
+    sc.stop()
+    GemFireCluster.stop()
+  }
+  
+  def executeTest[K,V](regionName:String, numObjects:Int, 
entriesMap:java.util.Map[K,V]) = {
+    //Populate some data in the region
+    val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
+    val conn = connConf.getConnection
+    val rgn: Region[K, V] = conn.getRegionProxy(regionName)
+    rgn.removeAll(rgn.keySetOnServer())
+    rgn.putAll(entriesMap)
+    verifyRetrieveRegion[K,V](regionName, entriesMap)
+  }
+    
+  def verifyRetrieveRegion[K,V](regionName:String, 
entriesMap:java.util.Map[K,V])  = {
+    val rdd = sc.gemfireRegion(regionName)
+    val collectedObjs = rdd.collect()
+    collectedObjs should have length entriesMap.size
+    import scala.collection.JavaConverters._
+    matchMaps[K,V](entriesMap.asScala.toMap, collectedObjs.toMap)
+  }
+ 
+  def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = {
+    assert(map1.size == map2.size)
+    map1.foreach(e => {
+      assert(map2.contains(e._1))
+      assert (e._2 == map2.get(e._1).get)
+      }
+    )
+  }
+  
+  //Retrieve region for Partitioned Region where some nodes are empty (empty 
iterator)
+  //This test has to run first...the rest of the tests always use the same num 
objects
+  test("Retrieve Region for PR where some nodes are empty (Empty Iterator)") {
+    val numObjects = numServers - 1
+    val entriesMap:JHashMap[String, Int] = new JHashMap()
+    (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
+    executeTest[String, Int]("rr_str_int_region", numObjects, entriesMap)
+  }
+
+  //Test for retrieving from region containing string key and int value
+  def verifyRetrieveStringStringRegion(regionName:String) = {
+    val entriesMap:JHashMap[String, String] = new JHashMap()
+    (0 until numObjects).map(i => entriesMap.put("key_" + i, "value_" + i))
+    executeTest[String, String](regionName, numObjects, entriesMap)
+  }
+
+  test("Retrieve Region with replicate redundant string string") {
+    verifyRetrieveStringStringRegion("rr_obj_obj_region")
+  }
+
+  test("Retrieve Region with partitioned string string") {
+    verifyRetrieveStringStringRegion("pr_obj_obj_region")
+  }
+
+  test("Retrieve Region with partitioned redundant string string") {
+    verifyRetrieveStringStringRegion("pr_r_obj_obj_region")
+  }
+  
+
+  //Test for retrieving from region containing string key and string value
+  def verifyRetrieveStringIntRegion(regionName:String) = {
+    val entriesMap:JHashMap[String, Int] = new JHashMap()
+    (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
+    executeTest[String, Int](regionName, numObjects, entriesMap)
+  }
+
+  test("Retrieve Region with replicate string int region") {
+    verifyRetrieveStringIntRegion("rr_str_int_region")
+  }
+
+  test("Retrieve Region with partitioned string int region") {
+    verifyRetrieveStringIntRegion("pr_str_int_region")
+  }
+
+  test("Retrieve Region with partitioned redundant string int region") {
+    verifyRetrieveStringIntRegion("pr_r_str_int_region")
+  }
+
+  //Tests for retrieving from region containing string key and object value
+  def verifyRetrieveStringObjectRegion(regionName:String) = {
+    val entriesMap:JHashMap[String, Object] = new JHashMap()
+    (0 until numObjects).map(i => entriesMap.put("key_" + i, new 
Employee("ename" + i, i)))
+    executeTest[String, Object](regionName, numObjects, entriesMap)
+  }
+
+  test("Retrieve Region with replicate string obj") {
+    verifyRetrieveStringObjectRegion("rr_obj_obj_region")
+  }
+
+  test("Retrieve Region with partitioned string obj") {
+    verifyRetrieveStringObjectRegion("pr_obj_obj_region")
+  }
+
+  test("Retrieve Region with partitioned redundant string obj") {
+    verifyRetrieveStringObjectRegion("pr_r_obj_obj_region")
+  }
+
+  //Test for retrieving from region containing string key and map value
+  def verifyRetrieveStringMapRegion(regionName:String) = {
+    val entriesMap:JHashMap[String,JHashMap[String,String]] = new JHashMap()
+    (0 until numObjects).map(i => {
+      val hashMap:JHashMap[String, String] = new JHashMap()
+      hashMap.put("mapKey:" + i, "mapValue:" + i)
+      entriesMap.put("key_" + i, hashMap)
+    })
+    executeTest(regionName, numObjects, entriesMap)
+  }
+
+  test("Retrieve Region with replicate string map region") {
+    verifyRetrieveStringMapRegion("rr_obj_obj_region")
+  }
+
+  test("Retrieve Region with partitioned string map region") {
+    verifyRetrieveStringMapRegion("pr_obj_obj_region")
+  }
+
+  test("Retrieve Region with partitioned redundant string map region") {
+    verifyRetrieveStringMapRegion("pr_r_obj_obj_region")
+  }
+  
+  //Test and helpers specific for retrieving from region containing string key 
and byte[] value
+  def executeTestWithByteArrayValues[K](regionName:String, numObjects:Int, 
entriesMap:java.util.Map[K,Array[Byte]]) = {
+    //Populate some data in the region
+    val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
+    val conn = connConf.getConnection
+    val rgn: Region[K, Array[Byte]] = conn.getRegionProxy(regionName)
+    rgn.putAll(entriesMap)
+    verifyRetrieveRegionWithByteArrayValues[K](regionName, entriesMap)
+  }
+  
+  def verifyRetrieveRegionWithByteArrayValues[K](regionName:String, 
entriesMap:java.util.Map[K,Array[Byte]])  = {
+    val rdd = sc.gemfireRegion(regionName)
+    val collectedObjs = rdd.collect()
+    collectedObjs should have length entriesMap.size
+    import scala.collection.JavaConverters._
+    matchByteArrayMaps[K](entriesMap.asScala.toMap, collectedObjs.toMap)
+  }
+  
+  def matchByteArrayMaps[K](map1:Map[K,Array[Byte]], map2:Map[K,Array[Byte]]) 
= {
+    map1.foreach(e => {
+      assert(map2.contains(e._1))
+      assert (java.util.Arrays.equals(e._2, map2.get(e._1).get))
+      }
+    )
+    assert(map1.size == map2.size)
+
+  }
+  
+  def verifyRetrieveStringByteArrayRegion(regionName:String) = {
+    val entriesMap:JHashMap[String, Array[Byte]] = new JHashMap()
+    (0 until numObjects).map(i => entriesMap.put("key_" + i, 
Array[Byte](192.toByte, 168.toByte, 0, i.toByte)))
+    executeTestWithByteArrayValues[String](regionName, numObjects, entriesMap)
+  }
+      
+  test("Retrieve Region with replicate region string byte[] region") {
+    verifyRetrieveStringByteArrayRegion("rr_obj_obj_region")
+  }
+
+  test("Retrieve Region with partition region string byte[] region") {
+    verifyRetrieveStringByteArrayRegion("pr_obj_obj_region")
+  }
+
+  test("Retrieve Region with partition redundant region string byte[] region") 
{
+    verifyRetrieveStringByteArrayRegion("pr_r_obj_obj_region")
+  }
+
+  test("Retrieve Region with where clause on partitioned redundant region", 
FilterTest) {
+    verifyRetrieveRegionWithWhereClause("pr_r_str_int_region")
+  }
+
+  test("Retrieve Region with where clause on partitioned region", FilterTest) {
+    verifyRetrieveRegionWithWhereClause("pr_str_int_region")
+  }
+
+  test("Retrieve Region with where clause on replicated region", FilterTest) {
+    verifyRetrieveRegionWithWhereClause("rr_str_int_region")
+  }
+
+  def verifyRetrieveRegionWithWhereClause(regionPath: String): Unit = {
+    val entriesMap: JHashMap[String, Int] = new JHashMap()
+    (0 until numObjects).map(i => entriesMap.put("key_" + i, i))
+
+    val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
+    val conn = connConf.getConnection
+    val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
+    rgn.removeAll(rgn.keySetOnServer())
+    rgn.putAll(entriesMap)
+
+    val rdd = sc.gemfireRegion(regionPath).where("value.intValue() < 50")
+    val expectedMap = (0 until 50).map(i => (s"key_$i", i)).toMap
+    val collectedObjs = rdd.collect()
+    // collectedObjs should have length expectedMap.size
+    matchMaps[String, Int](expectedMap, collectedObjs.toMap)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/package.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/package.scala
 
b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/package.scala
new file mode 100644
index 0000000..298dc4a
--- /dev/null
+++ 
b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/package.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.gemfire.spark
+
+import org.scalatest.Tag
+
+package object connector {
+
+  object OnlyTest extends Tag("OnlyTest")
+  object FetchDataTest extends Tag("FetchDateTest")
+  object FilterTest extends Tag("FilterTest")
+  object JoinTest extends Tag("JoinTest")
+  object OuterJoinTest extends Tag("OuterJoinTest")  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireCluster.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireCluster.scala
 
b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireCluster.scala
new file mode 100644
index 0000000..d8e07f5
--- /dev/null
+++ 
b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireCluster.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ittest.io.pivotal.gemfire.spark.connector.testkit
+
+import java.util.Properties
+
+trait GemFireCluster {
+  def startGemFireCluster(settings: Properties): Int = {
+    println("=== GemFireCluster start()")
+    GemFireCluster.start(settings)
+  }
+}
+
+object GemFireCluster {
+  private var gemfire: Option[GemFireRunner] = None
+
+  def start(settings: Properties): Int = {
+    gemfire.map(_.stopGemFireCluster()) // Clean up any old running GemFire 
instances
+    val runner = new GemFireRunner(settings)
+    gemfire = Some(runner)
+    runner.getLocatorPort
+  }
+
+  def stop(): Unit = {
+    println("=== GemFireCluster shutdown: " + gemfire.toString)
+    gemfire match {
+      case None => println("Nothing to shutdown.")
+      case Some(runner) => runner.stopGemFireCluster()
+    }
+    gemfire = None
+    println("=== GemFireCluster shutdown finished.")
+  }
+}

Reply via email to