http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RowBuilder.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RowBuilder.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RowBuilder.scala
new file mode 100644
index 0000000..08cd042
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/RowBuilder.scala
@@ -0,0 +1,22 @@
+package io.pivotal.gemfire.spark.connector.internal.oql
+
+import com.gemstone.gemfire.cache.query.internal.StructImpl
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+
+class RowBuilder[T](queryRDD: QueryRDD[T]) {
+
+  /**
+   * Convert QueryRDD to RDD of Row
+   * @return RDD of Rows
+   */
+  def toRowRDD(): RDD[Row] = {
+    val rowRDD = queryRDD.map(row => {
+      row match {
+        case si: StructImpl => Row.fromSeq(si.getFieldValues)
+        case obj: Object => Row.fromSeq(Seq(obj))
+      }
+    })
+    rowRDD
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/SchemaBuilder.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/SchemaBuilder.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/SchemaBuilder.scala
new file mode 100644
index 0000000..d878a2a
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/SchemaBuilder.scala
@@ -0,0 +1,57 @@
+package io.pivotal.gemfire.spark.connector.internal.oql
+
+import com.gemstone.gemfire.cache.query.internal.StructImpl
+import org.apache.spark.sql.types._
+import scala.collection.mutable.ListBuffer
+import org.apache.spark.Logging
+
+class SchemaBuilder[T](queryRDD: QueryRDD[T]) extends Logging {
+
+  val nullStructType = StructType(Nil)
+  
+  val typeMap:Map[Class[_], DataType] = Map( 
+    (classOf[java.lang.String], StringType),
+    (classOf[java.lang.Integer], IntegerType),
+    (classOf[java.lang.Short], ShortType),
+    (classOf[java.lang.Long], LongType),
+    (classOf[java.lang.Double], DoubleType),
+    (classOf[java.lang.Float], FloatType),
+    (classOf[java.lang.Boolean], BooleanType),
+    (classOf[java.lang.Byte], ByteType),
+    (classOf[java.util.Date], DateType),
+    (classOf[java.lang.Object], nullStructType)
+  )
+  
+  /**
+   * Analyse QueryRDD to get the Spark schema
+   * @return The schema represented by Spark StructType
+   */
+  def toSparkSchema(): StructType = {
+    val row = queryRDD.first()
+    val tpe = row match {
+      case r: StructImpl => constructFromStruct(r)
+      case null => StructType(StructField("col1", NullType) :: Nil)
+      case default => 
+        val value = typeMap.getOrElse(default.getClass(), nullStructType)
+        StructType(StructField("col1", value) :: Nil)
+    }
+    logInfo(s"Schema: $tpe")
+    tpe
+  }
+  
+  def constructFromStruct(r:StructImpl) = {
+    val names = r.getFieldNames
+    val values = r.getFieldValues
+    val lb = new ListBuffer[StructField]()
+    for (i <- 0 until names.length) {
+      val name = names(i)
+      val value = values(i)
+      val dataType = value match {
+        case null => NullType
+        case default => typeMap.getOrElse(default.getClass,  nullStructType)
+      }
+      lb += StructField(name, dataType)
+    }
+    StructType(lb.toSeq)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/UndefinedSerializer.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/UndefinedSerializer.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/UndefinedSerializer.scala
new file mode 100644
index 0000000..497aadf
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/oql/UndefinedSerializer.scala
@@ -0,0 +1,30 @@
+package io.pivotal.gemfire.spark.connector.internal.oql
+
+import com.esotericsoftware.kryo.{Kryo, Serializer}
+import com.esotericsoftware.kryo.io.{Output, Input}
+import com.gemstone.gemfire.cache.query.QueryService
+import com.gemstone.gemfire.cache.query.internal.Undefined
+
+/**
+ * This is the customized serializer to serialize QueryService.UNDEFINED,
+ * i.e. com.gemstone.gemfire.cache.query.internal.Undefined, in order to
+ * guarantee the singleton Undefined after its deserialization within Spark.
+ */
+class UndefinedSerializer extends Serializer[Undefined] {
+
+  def write(kryo: Kryo, output: Output, u: Undefined) {
+    //Only serialize a byte for Undefined
+    output.writeByte(u.getDSFID)
+  }
+
+  def read (kryo: Kryo, input: Input, tpe: Class[Undefined]): Undefined = {
+    //Read DSFID of Undefined
+    input.readByte()
+    QueryService.UNDEFINED match {
+      case null => new Undefined
+      case _ =>
+        //Avoid calling Undefined constructor again.
+        QueryService.UNDEFINED.asInstanceOf[Undefined]
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireJoinRDD.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireJoinRDD.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireJoinRDD.scala
new file mode 100644
index 0000000..53b61b1
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireJoinRDD.scala
@@ -0,0 +1,51 @@
+package io.pivotal.gemfire.spark.connector.internal.rdd
+
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.gemfire.spark.connector.GemFireConnectionConf
+import org.apache.spark.{TaskContext, Partition}
+import org.apache.spark.rdd.RDD
+import scala.collection.JavaConversions._
+
+/**
+ * An `RDD[T, V]` that will represent the result of a join between `left` 
RDD[T]
+ * and the specified GemFire Region[K, V].
+ */
+class GemFireJoinRDD[T, K, V] private[connector]
+  ( left: RDD[T],
+    func: T => K,
+    val regionPath: String,
+    val connConf: GemFireConnectionConf
+  ) extends RDD[(T, V)](left.context, left.dependencies) {
+
+  /** validate region existence when GemFireRDD object is created */
+  validate()
+
+  /** Validate region, and make sure it exists. */
+  private def validate(): Unit = connConf.getConnection.validateRegion[K, 
V](regionPath)
+
+  override protected def getPartitions: Array[Partition] = left.partitions
+
+  override def compute(split: Partition, context: TaskContext): Iterator[(T, 
V)] = {
+    val region = connConf.getConnection.getRegionProxy[K, V](regionPath)
+    if (func == null) computeWithoutFunc(split, context, region)
+    else computeWithFunc(split, context, region)
+  }
+
+  /** T is (K, V1) since there's no map function `func` */
+  private def computeWithoutFunc(split: Partition, context: TaskContext, 
region: Region[K, V]): Iterator[(T, V)] = {
+    val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, 
_)]]
+    val leftKeys = leftPairs.map { case (k, v) => k}.toSet
+    // Note: get all will return (key, null) for non-exist entry, so remove 
those entries
+    val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null}
+    leftPairs.filter{case (k, v) => rightPairs.contains(k)}
+             .map {case (k, v) => ((k, v).asInstanceOf[T], 
rightPairs.get(k).get)}.toIterator
+  }
+  
+  private def computeWithFunc(split: Partition, context: TaskContext, region: 
Region[K, V]): Iterator[(T, V)] = {
+    val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t)))
+    val leftKeys = leftPairs.map { case (t, k) => k}.toSet
+    // Note: get all will return (key, null) for non-exist entry, so remove 
those entries
+    val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null}
+    leftPairs.filter { case (t, k) => rightPairs.contains(k)}.map {case (t, k) 
=> (t, rightPairs.get(k).get)}.toIterator
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireOuterJoinRDD.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireOuterJoinRDD.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireOuterJoinRDD.scala
new file mode 100644
index 0000000..bed291c
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireOuterJoinRDD.scala
@@ -0,0 +1,53 @@
+package io.pivotal.gemfire.spark.connector.internal.rdd
+
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.gemfire.spark.connector.GemFireConnectionConf
+import org.apache.spark.{TaskContext, Partition}
+import org.apache.spark.rdd.RDD
+import scala.collection.JavaConversions._
+
+/**
+ * An `RDD[ T, Option[V] ]` that represents the result of a left outer join 
+ * between `left` RDD[T] and the specified GemFire Region[K, V].
+ */
+class GemFireOuterJoinRDD[T, K, V] private[connector]
+ ( left: RDD[T],
+   func: T => K,
+   val regionPath: String,
+   val connConf: GemFireConnectionConf
+  ) extends RDD[(T, Option[V])](left.context, left.dependencies) {
+
+  /** validate region existence when GemFireRDD object is created */
+  validate()
+
+  /** Validate region, and make sure it exists. */
+  private def validate(): Unit = connConf.getConnection.validateRegion[K, 
V](regionPath)
+
+  override protected def getPartitions: Array[Partition] = left.partitions
+
+  override def compute(split: Partition, context: TaskContext): Iterator[(T, 
Option[V])] = {
+    val region = connConf.getConnection.getRegionProxy[K, V](regionPath)
+    if (func == null) computeWithoutFunc(split, context, region)
+    else computeWithFunc(split, context, region)
+  }
+
+  /** T is (K1, V1), and K1 and K are the same type since there's no map 
function `func` */
+  private def computeWithoutFunc(split: Partition, context: TaskContext, 
region: Region[K, V]): Iterator[(T, Option[V])] = {
+    val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, 
_)]]
+    val leftKeys = leftPairs.map { case (k, v) => k}.toSet
+    // Note: get all will return (key, null) for non-exist entry
+    val rightPairs = region.getAll(leftKeys)
+    // rightPairs is a java.util.Map, not scala map, so need to convert 
map.get() to Option
+    leftPairs.map{ case (k, v) => ((k, v).asInstanceOf[T], 
Option(rightPairs.get(k))) }.toIterator
+  }
+
+  private def computeWithFunc(split: Partition, context: TaskContext, region: 
Region[K, V]): Iterator[(T, Option[V])] = {
+    val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t)))
+    val leftKeys = leftPairs.map { case (t, k) => k}.toSet
+    // Note: get all will return (key, null) for non-exist entry
+    val rightPairs = region.getAll(leftKeys)
+    // rightPairs is a java.util.Map, not scala map, so need to convert 
map.get() to Option
+    leftPairs.map{ case (t, k) => (t, Option(rightPairs.get(k)))}.toIterator
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartition.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartition.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartition.scala
new file mode 100644
index 0000000..0895024
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartition.scala
@@ -0,0 +1,20 @@
+package io.pivotal.gemfire.spark.connector.internal.rdd
+
+import org.apache.spark.Partition
+
+/**
+ * This serializable class represents a GemFireRDD partition. Each partition 
is mapped
+ * to one or more buckets of region. The GemFireRDD can materialize the data 
of the 
+ * partition based on all information contained here.
+ * @param partitionId partition id, a 0 based number.
+ * @param bucketSet region bucket id set for this partition. Set.empty means 
whole
+ *                  region (used for replicated region)
+ * @param locations preferred location for this partition                  
+ */
+case class GemFireRDDPartition (
+  partitionId: Int, bucketSet: Set[Int], locations: Seq[String] = Nil)
+  extends Partition  {
+  
+  override def index: Int = partitionId
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitioner.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitioner.scala
new file mode 100644
index 0000000..7ec8d42
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitioner.scala
@@ -0,0 +1,43 @@
+package io.pivotal.gemfire.spark.connector.internal.rdd
+
+import io.pivotal.gemfire.spark.connector.GemFireConnection
+import io.pivotal.gemfire.spark.connector.internal.RegionMetadata
+import org.apache.spark.{Logging, Partition}
+
+import scala.reflect.ClassTag
+
+/**
+ * A GemFireRDD partitioner is used to partition the region into multiple RDD 
partitions.
+ */
+trait GemFireRDDPartitioner extends Serializable {
+
+  def name: String
+  
+  /** the function that generates partitions */
+  def partitions[K: ClassTag, V: ClassTag]
+    (conn: GemFireConnection, md: RegionMetadata, env: Map[String, String]): 
Array[Partition]
+}
+
+object GemFireRDDPartitioner extends Logging {
+
+  /** To add new partitioner, just add it to the following list */
+  final val partitioners: Map[String, GemFireRDDPartitioner] =
+    List(OnePartitionPartitioner, ServerSplitsPartitioner).map(e => (e.name, 
e)).toMap
+
+  /**
+   * Get a partitioner based on given name, a default partitioner will be 
returned if there's
+   * no partitioner for the given name. 
+   */
+  def apply(name: String = defaultPartitionedRegionPartitioner.name): 
GemFireRDDPartitioner = {
+    val p = partitioners.get(name)
+    if (p.isDefined) p.get else {
+      logWarning(s"Invalid preferred partitioner name $name.")
+      defaultPartitionedRegionPartitioner
+    }
+  }
+
+  val defaultReplicatedRegionPartitioner = OnePartitionPartitioner
+
+  val defaultPartitionedRegionPartitioner = ServerSplitsPartitioner
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala
new file mode 100644
index 0000000..0c9c34f
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDPartitionerImpl.scala
@@ -0,0 +1,73 @@
+package io.pivotal.gemfire.spark.connector.internal.rdd
+
+import io.pivotal.gemfire.spark.connector.GemFireConnection
+import io.pivotal.gemfire.spark.connector.internal.RegionMetadata
+import io.pivotal.gemfire.spark.connector.NumberPartitionsPerServerPropKey
+import org.apache.spark.Partition
+import scala.collection.JavaConversions._
+import scala.collection.immutable.SortedSet
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+/** This partitioner maps whole region to one GemFireRDDPartition */
+object OnePartitionPartitioner extends GemFireRDDPartitioner {
+
+  override val name = "OnePartition"
+
+  override def partitions[K: ClassTag, V: ClassTag]
+    (conn: GemFireConnection, md: RegionMetadata, env: Map[String, String]): 
Array[Partition] =
+    Array[Partition](new GemFireRDDPartition(0, Set.empty))
+}
+
+/**
+  * This partitioner maps whole region to N * M GemFire RDD partitions, where 
M is the number of 
+  * GemFire servers that contain the data for the given region. Th default 
value of N is 1.
+  */
+object ServerSplitsPartitioner extends GemFireRDDPartitioner {
+
+  override val name = "ServerSplits"
+
+  override def partitions[K: ClassTag, V: ClassTag]
+  (conn: GemFireConnection, md: RegionMetadata, env: Map[String, String]): 
Array[Partition] = {
+    if (md == null) throw new RuntimeException("RegionMetadata is null")
+    val n = env.getOrElse(NumberPartitionsPerServerPropKey, "2").toInt
+    if (!md.isPartitioned || md.getServerBucketMap == null || 
md.getServerBucketMap.isEmpty)
+      Array[Partition](new GemFireRDDPartition(0, Set.empty))
+    else {
+      val map = mapAsScalaMap(md.getServerBucketMap)
+        .map { case (srv, set) => (srv, asScalaSet(set).map(_.toInt)) }.toList
+        .map { case (srv, set) => (srv.getHostName, set) }
+       doPartitions(map, md.getTotalBuckets, n)
+    }
+  }
+
+  /** Converts server to bucket ID set list to array of RDD partitions */
+  def doPartitions(serverBucketMap: List[(String, mutable.Set[Int])], 
totalBuckets: Int, n: Int)
+    : Array[Partition] = {
+
+    // method that calculates the group size for splitting "k" items into "g" 
groups
+    def groupSize(k: Int, g: Int): Int = scala.math.ceil(k / g.toDouble).toInt
+
+    // 1. convert list of server and bucket set pairs to a list of server and 
sorted bucket set pairs
+    val srvToSortedBucketSet = serverBucketMap.map { case (srv, set) => (srv, 
SortedSet[Int]() ++ set) }
+
+    // 2. split bucket set of each server into n splits if possible, and 
server to Seq(server)
+    val srvToSplitedBuckeSet = srvToSortedBucketSet.flatMap { case (host, set) 
=>
+      if (set.isEmpty) Nil else set.grouped(groupSize(set.size, 
n)).toList.map(s => (Seq(host), s)) }
+
+    // 3. calculate empty bucket IDs by removing all bucket sets of all 
servers from the full bucket sets
+    val emptyIDs = SortedSet[Int]() ++ ((0 until totalBuckets).toSet /: 
srvToSortedBucketSet) {case (s1, (k, s2)) => s1 &~ s2}
+
+    // 4. distribute empty bucket IDs to all partitions evenly.
+    //    The empty buckets do not contain data when partitions are created, 
but they may contain data
+    //    when RDD is materialized, so need to include those bucket IDs in the 
partitions.
+    val srvToFinalBucketSet = if (emptyIDs.isEmpty) srvToSplitedBuckeSet
+      else srvToSplitedBuckeSet.zipAll(
+        emptyIDs.grouped(groupSize(emptyIDs.size, 
srvToSplitedBuckeSet.size)).toList, (Nil, Set.empty), Set.empty).map
+          { case ((server, set1), set2) => (server, SortedSet[Int]() ++ set1 
++ set2) }
+
+    // 5. create array of partitions w/ 0-based index
+    (0 until srvToFinalBucketSet.size).toList.zip(srvToFinalBucketSet).map
+      { case (i, (srv, set)) => new GemFireRDDPartition(i, set, srv) }.toArray
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala
new file mode 100644
index 0000000..573902b
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRDDWriter.scala
@@ -0,0 +1,55 @@
+package io.pivotal.gemfire.spark.connector.internal.rdd
+
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.gemfire.spark.connector.GemFireConnectionConf
+import org.apache.spark.{Logging, TaskContext}
+
+import scala.collection.Iterator
+import collection.JavaConversions._
+
+/** This trait provide some common code for pair and non-pair RDD writer */
+private[rdd] trait GemFireRDDWriterTraceUtils {
+  
+  def mapDump(map: Map[_, _], num: Int): String = {
+    val firstNum = map.take(num + 1)
+    if (firstNum.size > num) s"$firstNum ..." else s"$firstNum"    
+  }  
+}
+
+/**
+ * Writer object that provides write function that saves non-pair RDD 
partitions to GemFire.
+ * Those functions will be executed on Spark executors.
+ * @param regionPath the full path of the region where the data is written to
+ */
+class GemFireRDDWriter[T, K, V]
+(regionPath: String, connConf: GemFireConnectionConf) extends Serializable 
with GemFireRDDWriterTraceUtils with Logging {
+
+  def write(func: T => (K, V))(taskContext: TaskContext, data: Iterator[T]): 
Unit = {
+    val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, 
V](regionPath)
+    // todo. optimize batch size of putAll
+    val map: Map[K, V] = data.map(func).toMap
+    region.putAll(map)
+    logDebug(s"${map.size} entries are saved to region $regionPath")
+    logTrace(mapDump(map, 10))
+  }
+}
+
+
+/**
+ * Writer object that provides write function that saves pair RDD partitions 
to GemFire.
+ * Those functions will be executed on Spark executors.
+ * @param regionPath the full path of the region where the data is written to
+ */
+class GemFirePairRDDWriter[K, V]
+(regionPath: String, connConf: GemFireConnectionConf) extends Serializable 
with GemFireRDDWriterTraceUtils with Logging {
+
+  def write(taskContext: TaskContext, data: Iterator[(K, V)]): Unit = {
+    val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, 
V](regionPath)
+    // todo. optimize batch size of putAll
+    val map: Map[K, V] = data.toMap
+    region.putAll(map)
+    logDebug(s"${map.size} entries are saved to region $regionPath")
+    logTrace(mapDump(map, 10))
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
new file mode 100644
index 0000000..cff61d6
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/internal/rdd/GemFireRegionRDD.scala
@@ -0,0 +1,122 @@
+package io.pivotal.gemfire.spark.connector.internal.rdd
+
+import scala.collection.Seq
+import scala.reflect.ClassTag
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{TaskContext, Partition, SparkContext}
+import io.pivotal.gemfire.spark.connector.{GemFireConnectionConf, 
PreferredPartitionerPropKey}
+import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartitioner._
+
+/**
+ * This class exposes GemFire region as a RDD.
+ * @param sc the Spark Context
+ * @param regionPath the full path of the region
+ * @param connConf the GemFireConnectionConf to access the region
+ * @param opConf the parameters for this operation, such as preferred 
partitioner.
+ */
+class GemFireRegionRDD[K, V] private[connector]
+  (@transient sc: SparkContext,
+   val regionPath: String,
+   val connConf: GemFireConnectionConf,
+   val opConf: Map[String, String] = Map.empty,
+   val whereClause: Option[String] = None 
+  ) (implicit ctk: ClassTag[K], ctv: ClassTag[V])
+  extends RDD[(K, V)](sc, Seq.empty) {
+
+  /** validate region existence when GemFireRDD object is created */
+  validate()
+
+  /** Validate region, and make sure it exists. */
+  private def validate(): Unit = connConf.getConnection.validateRegion[K, 
V](regionPath)
+
+  def kClassTag = ctk
+  
+  def vClassTag = ctv
+
+  /**
+   * method `copy` is used by method `where` that creates new immutable
+   * GemFireRDD instance based this instance.
+   */
+  private def copy(
+    regionPath: String = regionPath,
+    connConf: GemFireConnectionConf = connConf,
+    opConf: Map[String, String] = opConf,
+    whereClause: Option[String] = None
+  ): GemFireRegionRDD[K, V] = {
+
+    require(sc != null,
+    """RDD transformation requires a non-null SparkContext. Unfortunately
+      |SparkContext in this GemFireRDD is null. This can happen after 
+      |GemFireRDD has been deserialized. SparkContext is not Serializable,
+      |therefore it deserializes to null. RDD transformations are not allowed
+      |inside lambdas used in other RDD transformations.""".stripMargin )
+
+    new GemFireRegionRDD[K, V](sc, regionPath, connConf, opConf, whereClause)
+  }
+
+  /** When where clause is specified, OQL query
+    * `select key, value from /<region-path>.entries where <where clause> `
+    * is used to filter the dataset.
+    */
+  def where(whereClause: Option[String]): GemFireRegionRDD[K, V] = {
+    if (whereClause.isDefined) copy(whereClause = whereClause)
+    else this
+  }
+
+  /** this version is for Java API that doesn't use scala.Option */
+  def where(whereClause: String): GemFireRegionRDD[K, V] = {
+    if (whereClause == null || whereClause.trim.isEmpty) this
+    else copy(whereClause = Option(whereClause.trim))
+  }
+
+  /**
+   * Use preferred partitioner generate partitions. 
`defaultReplicatedRegionPartitioner`
+   * will be used if it's a replicated region. 
+   */
+  override def getPartitions: Array[Partition] = {
+    val conn = connConf.getConnection
+    val md = conn.getRegionMetadata[K, V](regionPath)
+    md match {
+      case None => throw new RuntimeException(s"region $regionPath was not 
found.")
+      case Some(data) =>
+        logInfo(s"""RDD id=${this.id} region=$regionPath 
conn=${connConf.locators.mkString(",")}, env=$opConf""")
+        val p = if (data.isPartitioned) preferredPartitioner else 
defaultReplicatedRegionPartitioner
+        val splits = p.partitions[K, V](conn, data, opConf)
+        logDebug(s"""RDD id=${this.id} region=$regionPath 
partitions=${splits.mkString(",")}""")
+        splits
+    }
+  }
+
+  /**
+   * provide preferred location(s) (host name(s)) of the given partition. 
+   * Only some partitioner implementation(s) provides this info, which is
+   * useful when Spark cluster and GemFire cluster share some hosts.
+   */
+  override def getPreferredLocations(split: Partition) =
+    split.asInstanceOf[GemFireRDDPartition].locations
+
+  /**
+   * Get preferred partitioner. return `defaultPartitionedRegionPartitioner` 
if none
+   * preference is specified. 
+   */
+  private def preferredPartitioner = 
+    GemFireRDDPartitioner(opConf.getOrElse(
+      PreferredPartitionerPropKey, 
GemFireRDDPartitioner.defaultPartitionedRegionPartitioner.name))
+
+  /** materialize a RDD partition */
+  override def compute(split: Partition, context: TaskContext): Iterator[(K, 
V)] = {
+    val partition = split.asInstanceOf[GemFireRDDPartition]
+    logDebug(s"compute RDD id=${this.id} partition $partition")
+    connConf.getConnection.getRegionData[K,V](regionPath, whereClause, 
partition)
+    // new InterruptibleIterator(context, 
split.asInstanceOf[GemFireRDDPartition[K, V]].iterator)
+  }
+}
+
+object GemFireRegionRDD {
+
+  def apply[K: ClassTag, V: ClassTag](sc: SparkContext, regionPath: String,
+    connConf: GemFireConnectionConf, opConf: Map[String, String] = Map.empty)
+    : GemFireRegionRDD[K, V] =
+    new GemFireRegionRDD[K, V](sc, regionPath, connConf, opConf)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRegionRDD.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRegionRDD.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRegionRDD.scala
new file mode 100644
index 0000000..6d8ed59
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRegionRDD.scala
@@ -0,0 +1,10 @@
+package io.pivotal.gemfire.spark.connector.javaapi
+
+import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRegionRDD
+import org.apache.spark.api.java.JavaPairRDD
+
+class GemFireJavaRegionRDD[K, V](rdd: GemFireRegionRDD[K, V]) extends 
JavaPairRDD[K, V](rdd)(rdd.kClassTag, rdd.vClassTag) {
+  
+  def where(whereClause: String): GemFireJavaRegionRDD[K, V] = new 
GemFireJavaRegionRDD(rdd.where(whereClause))
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala
new file mode 100644
index 0000000..cf7b250
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/javaapi/JavaAPIHelper.scala
@@ -0,0 +1,35 @@
+package io.pivotal.gemfire.spark.connector.javaapi
+
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
+import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStream}
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
+/**
+ *  A helper class to make it possible to access components written in Scala 
from Java code.
+ */
+private[connector] object JavaAPIHelper {
+
+  /** Returns a `ClassTag` of a given runtime class. */
+  def getClassTag[T](clazz: Class[T]): ClassTag[T] = ClassTag(clazz)
+
+  /**
+   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
+   * see JavaSparkContext.fakeClassTag in Spark for more info.
+   */
+  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
+  /** Converts a Java `Properties` to a Scala immutable `Map[String, String]`. 
*/
+  def propertiesToScalaMap[K, V](props: java.util.Properties): Map[String, 
String] =
+    Map(props.toSeq: _*)
+
+  /** convert a JavaRDD[(K,V)] to JavaPairRDD[K,V] */
+  def toJavaPairRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] =
+    JavaPairRDD.fromJavaRDD(rdd)
+
+  /** convert a JavaDStream[(K,V)] to JavaPairDStream[K,V] */
+  def toJavaPairDStream[K, V](ds: JavaDStream[(K, V)]): JavaPairDStream[K, V] =
+    JavaPairDStream.fromJavaDStream(ds)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
new file mode 100644
index 0000000..834d6a5
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala
@@ -0,0 +1,43 @@
+package io.pivotal.gemfire.spark
+
+import 
io.pivotal.gemfire.spark.connector.internal.rdd.{ServerSplitsPartitioner, 
OnePartitionPartitioner}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+
+import scala.reflect.ClassTag
+
+/**
+ * The root package of Gemfire connector for Apache Spark.
+ * Provides handy implicit conversions that add gemfire-specific
+ * methods to `SparkContext` and `RDD`.
+ */
+package object connector {
+
+  /** constants */
+  final val GemFireLocatorPropKey = "spark.gemfire.locators"
+  // partitioner related keys and values
+  final val PreferredPartitionerPropKey = "preferred.partitioner"
+  final val NumberPartitionsPerServerPropKey = "number.partitions.per.server"
+  final val OnePartitionPartitionerName = OnePartitionPartitioner.name
+  final val ServerSplitsPartitionerName = ServerSplitsPartitioner.name
+
+  implicit def toSparkContextFunctions(sc: SparkContext): 
GemFireSparkContextFunctions =
+    new GemFireSparkContextFunctions(sc)
+
+  implicit def toSQLContextFunctions(sqlContext: SQLContext): 
GemFireSQLContextFunctions =
+    new GemFireSQLContextFunctions(sqlContext)
+
+  implicit def toGemfirePairRDDFunctions[K: ClassTag, V: ClassTag]
+    (self: RDD[(K, V)]): GemFirePairRDDFunctions[K, V] = new 
GemFirePairRDDFunctions(self)
+
+  implicit def toGemfireRDDFunctions[T: ClassTag]
+    (self: RDD[T]): GemFireRDDFunctions[T] = new GemFireRDDFunctions(self)
+
+  /** utility implicits */
+  
+  /** convert Map[String, String] to java.util.Properties */
+  implicit def map2Properties(map: Map[String,String]): java.util.Properties =
+    (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); 
props}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala
new file mode 100644
index 0000000..91eb784
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala
@@ -0,0 +1,61 @@
+package io.pivotal.gemfire.spark.connector.streaming
+
+import io.pivotal.gemfire.spark.connector.GemFireConnectionConf
+import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFirePairRDDWriter, 
GemFireRDDWriter}
+import org.apache.spark.Logging
+import org.apache.spark.api.java.function.PairFunction
+import org.apache.spark.streaming.dstream.DStream
+
+/**
+ * Extra gemFire functions on DStream of non-pair elements through an implicit 
conversion.
+ * Import `io.pivotal.gemfire.spark.connector.streaming._` at the top of your 
program to
+ * use these functions.
+ */
+class GemFireDStreamFunctions[T](val dstream: DStream[T]) extends Serializable 
with Logging {
+
+  /**
+   * Save the DStream of non-pair elements to GemFire key-value store.
+   * @param regionPath the full path of region that the DStream is stored
+   * @param func the function that converts elements of the DStream to 
key/value pairs
+   * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
+   */
+  def saveToGemfire[K, V](
+    regionPath: String, func: T => (K, V), connConf: GemFireConnectionConf = 
defaultConnectionConf): Unit = {
+    connConf.getConnection.validateRegion[K, V](regionPath)
+    val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf)
+    logInfo(s"""Save DStream region=$regionPath 
conn=${connConf.locators.mkString(",")}""")
+    dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write(func) 
_))
+  }
+
+  /** this version of saveToGemfire is just for Java API */
+  def saveToGemfire[K, V](regionPath: String, func: PairFunction[T, K, V], 
connConf: GemFireConnectionConf): Unit = {
+    saveToGemfire[K, V](regionPath, func.call _, connConf)
+  }
+
+  private[connector] def defaultConnectionConf: GemFireConnectionConf =
+    GemFireConnectionConf(dstream.context.sparkContext.getConf)
+}
+
+
+/**
+ * Extra gemFire functions on DStream of (key, value) pairs through an 
implicit conversion.
+ * Import `io.pivotal.gemfire.spark.connector.streaming._` at the top of your 
program to
+ * use these functions.
+ */
+class GemFirePairDStreamFunctions[K, V](val dstream: DStream[(K,V)]) extends 
Serializable with Logging {
+
+  /**
+   * Save the DStream of pairs to GemFire key-value store without any 
conversion
+   * @param regionPath the full path of region that the DStream is stored
+   * @param connConf the GemFireConnectionConf object that provides connection 
to GemFire cluster
+   */
+  def saveToGemfire(regionPath: String, connConf: GemFireConnectionConf = 
defaultConnectionConf): Unit = {
+    connConf.getConnection.validateRegion[K, V](regionPath)
+    val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf)
+    logInfo(s"""Save DStream region=$regionPath 
conn=${connConf.locators.mkString(",")}""")
+    dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _))
+  }
+
+  private[connector] def defaultConnectionConf: GemFireConnectionConf =
+    GemFireConnectionConf(dstream.context.sparkContext.getConf)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala
new file mode 100644
index 0000000..c003561
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala
@@ -0,0 +1,16 @@
+package io.pivotal.gemfire.spark.connector
+
+import org.apache.spark.streaming.dstream.DStream
+
+/**
+ * Provides handy implicit conversions that add gemfire-specific methods to 
`DStream`.
+ */
+package object streaming {
+
+  implicit def toGemFireDStreamFunctions[T](ds: DStream[T]): 
GemFireDStreamFunctions[T] =
+    new GemFireDStreamFunctions[T](ds)
+
+  implicit def toGemFirePairDStreamFunctions[K, V](ds: DStream[(K, V)]): 
GemFirePairDStreamFunctions[K, V] =
+    new GemFirePairDStreamFunctions[K, V](ds)
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java
 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java
new file mode 100644
index 0000000..43415fa
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java
@@ -0,0 +1,147 @@
+package io.pivotal.gemfire.spark.connector;
+
+import io.pivotal.gemfire.spark.connector.javaapi.*;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.SQLContext;
+//import org.apache.spark.sql.api.java.JavaSQLContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.dstream.DStream;
+import org.junit.Test;
+import org.scalatest.junit.JUnitSuite;
+import scala.Function1;
+import scala.Function2;
+import scala.Tuple2;
+import scala.Tuple3;
+import scala.collection.mutable.LinkedList;
+import scala.reflect.ClassTag;
+
+import static org.junit.Assert.*;
+import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+public class JavaAPITest extends JUnitSuite {
+
+  @SuppressWarnings( "unchecked" )
+  public Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> 
createCommonMocks() {
+    SparkContext mockSparkContext = mock(SparkContext.class);
+    GemFireConnectionConf mockConnConf = mock(GemFireConnectionConf.class);
+    GemFireConnection mockConnection = mock(GemFireConnection.class);
+    when(mockConnConf.getConnection()).thenReturn(mockConnection);
+    when(mockConnConf.locators()).thenReturn(new LinkedList());
+    return new Tuple3<>(mockSparkContext, mockConnConf, mockConnection);
+  }
+  
+  @Test
+  public void testSparkContextFunction() throws Exception {
+    Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = 
createCommonMocks();
+    GemFireJavaSparkContextFunctions wrapper = javaFunctions(tuple3._1());
+    assertTrue(tuple3._1() == wrapper.sc);
+    String regionPath = "testregion";
+    JavaPairRDD<String, String> rdd = wrapper.gemfireRegion(regionPath, 
tuple3._2());
+    verify(tuple3._3()).validateRegion(regionPath);
+  }
+
+  @Test
+  public void testJavaSparkContextFunctions() throws Exception {
+    SparkContext mockSparkContext = mock(SparkContext.class);
+    JavaSparkContext mockJavaSparkContext = mock(JavaSparkContext.class);
+    when(mockJavaSparkContext.sc()).thenReturn(mockSparkContext);
+    GemFireJavaSparkContextFunctions wrapper = 
javaFunctions(mockJavaSparkContext);
+    assertTrue(mockSparkContext == wrapper.sc);
+  }
+  
+  @Test
+  @SuppressWarnings( "unchecked" )
+  public void testJavaPairRDDFunctions() throws Exception {
+    JavaPairRDD<String, Integer> mockPairRDD = mock(JavaPairRDD.class);
+    RDD<Tuple2<String, Integer>> mockTuple2RDD = mock(RDD.class);
+    when(mockPairRDD.rdd()).thenReturn(mockTuple2RDD);
+    GemFireJavaPairRDDFunctions wrapper = javaFunctions(mockPairRDD);
+    assertTrue(mockTuple2RDD == wrapper.rddf.rdd());
+
+    Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = 
createCommonMocks();
+    when(mockTuple2RDD.sparkContext()).thenReturn(tuple3._1());
+    String regionPath = "testregion";
+    wrapper.saveToGemfire(regionPath, tuple3._2());
+    verify(mockTuple2RDD, times(1)).sparkContext();
+    verify(tuple3._1(), times(1)).runJob(eq(mockTuple2RDD), 
any(Function2.class), any(ClassTag.class));
+  }
+
+  @Test
+  @SuppressWarnings( "unchecked" )
+  public void testJavaRDDFunctions() throws Exception {
+    JavaRDD<String> mockJavaRDD = mock(JavaRDD.class);
+    RDD<String> mockRDD = mock(RDD.class);
+    when(mockJavaRDD.rdd()).thenReturn(mockRDD);
+    GemFireJavaRDDFunctions wrapper = javaFunctions(mockJavaRDD);
+    assertTrue(mockRDD == wrapper.rddf.rdd());
+
+    Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = 
createCommonMocks();
+    when(mockRDD.sparkContext()).thenReturn(tuple3._1());
+    PairFunction<String, String, Integer> mockPairFunc = 
mock(PairFunction.class);
+    String regionPath = "testregion";
+    wrapper.saveToGemfire(regionPath, mockPairFunc, tuple3._2());
+    verify(mockRDD, times(1)).sparkContext();
+    verify(tuple3._1(), times(1)).runJob(eq(mockRDD), any(Function2.class), 
any(ClassTag.class));
+  }
+
+  @Test
+  @SuppressWarnings( "unchecked" )
+  public void testJavaPairDStreamFunctions() throws Exception {
+    JavaPairDStream<String, String> mockJavaDStream = 
mock(JavaPairDStream.class);
+    DStream<Tuple2<String, String>> mockDStream = mock(DStream.class);
+    when(mockJavaDStream.dstream()).thenReturn(mockDStream);
+    GemFireJavaPairDStreamFunctions wrapper = javaFunctions(mockJavaDStream);
+    assertTrue(mockDStream == wrapper.dsf.dstream());
+
+    Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = 
createCommonMocks();
+    String regionPath = "testregion";
+    wrapper.saveToGemfire(regionPath, tuple3._2());
+    verify(tuple3._2()).getConnection();
+    verify(tuple3._3()).validateRegion(regionPath);
+    verify(mockDStream).foreachRDD(any(Function1.class));
+  }
+
+  @Test
+  @SuppressWarnings( "unchecked" )
+  public void testJavaPairDStreamFunctionsWithTuple2DStream() throws Exception 
{
+    JavaDStream<Tuple2<String, String>> mockJavaDStream = 
mock(JavaDStream.class);
+    DStream<Tuple2<String, String>> mockDStream = mock(DStream.class);
+    when(mockJavaDStream.dstream()).thenReturn(mockDStream);
+    GemFireJavaPairDStreamFunctions wrapper = 
javaFunctions(toJavaPairDStream(mockJavaDStream));
+    assertTrue(mockDStream == wrapper.dsf.dstream());
+  }
+
+  @Test
+  @SuppressWarnings( "unchecked" )
+  public void testJavaDStreamFunctions() throws Exception {
+    JavaDStream<String> mockJavaDStream = mock(JavaDStream.class);
+    DStream<String> mockDStream = mock(DStream.class);
+    when(mockJavaDStream.dstream()).thenReturn(mockDStream);
+    GemFireJavaDStreamFunctions wrapper = javaFunctions(mockJavaDStream);
+    assertTrue(mockDStream == wrapper.dsf.dstream());
+
+    Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = 
createCommonMocks();
+    PairFunction<String, String, Integer> mockPairFunc = 
mock(PairFunction.class);
+    String regionPath = "testregion";
+    wrapper.saveToGemfire(regionPath, mockPairFunc, tuple3._2());
+    verify(tuple3._2()).getConnection();
+    verify(tuple3._3()).validateRegion(regionPath);
+    verify(mockDStream).foreachRDD(any(Function1.class));
+  }
+
+  @Test
+  public void testSQLContextFunction() throws Exception {
+    SQLContext mockSQLContext = mock(SQLContext.class);
+    GemFireJavaSQLContextFunctions wrapper = javaFunctions(mockSQLContext);
+    assertTrue(wrapper.scf.getClass() == GemFireSQLContextFunctions.class);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala
new file mode 100644
index 0000000..bdd4fbd
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala
@@ -0,0 +1,42 @@
+package io.pivotal.gemfire.spark.connector
+
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FunSuite, Matchers}
+import org.apache.commons.httpclient.HttpClient
+import java.io.File
+
+
+class GemFireFunctionDeployerTest extends FunSuite with Matchers with 
MockitoSugar {
+  val mockHttpClient: HttpClient = mock[HttpClient]
+    
+  test("jmx url creation") {
+    val jmxHostAndPort = "localhost:7070"
+    val expectedUrlString = "http://"; + jmxHostAndPort + "/gemfire/v1/deployed"
+    val gfd = new GemFireFunctionDeployer(mockHttpClient);
+    val urlString = gfd.constructURLString(jmxHostAndPort)
+    assert(urlString === expectedUrlString)
+  }
+    
+  test("missing jar file") {
+    val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
+    val gfd = new GemFireFunctionDeployer(mockHttpClient);
+    intercept[RuntimeException] { gfd.jarFileHandle(missingJarFileLocation)}
+  }
+  
+  test("deploy with missing jar") {
+    val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist"
+    val gfd = new GemFireFunctionDeployer(mockHttpClient);
+    intercept[RuntimeException] {(gfd.deploy("localhost:7070", 
missingJarFileLocation).contains("Deployed"))}
+    intercept[RuntimeException] {(gfd.deploy("localhost", 7070, 
missingJarFileLocation).contains("Deployed"))}
+  }
+  
+  test("successful mocked deploy") {
+    val gfd = new GemFireFunctionDeployer(mockHttpClient);
+    val jar = new File("README.md");
+    assert(gfd.deploy("localhost:7070", jar).contains("Deployed"))
+  }
+  
+
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala
new file mode 100644
index 0000000..c644aa9
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala
@@ -0,0 +1,66 @@
+package io.pivotal.gemfire.spark.connector.internal
+
+import io.pivotal.gemfire.spark.connector.GemFireConnectionConf
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FunSuite, Matchers}
+
+class DefaultGemFireConnectionManagerTest extends FunSuite  with Matchers with 
MockitoSugar {
+
+  test("DefaultGemFireConnectionFactory get/closeConnection") {
+    // note: connConf 1-4 share the same set of locators
+    val connConf1 = new GemFireConnectionConf(Seq(("host1", 1234)))
+    val connConf2 = new GemFireConnectionConf(Seq(("host2", 5678)))
+    val connConf3 = new GemFireConnectionConf(Seq(("host1", 1234), ("host2", 
5678)))
+    val connConf4 = new GemFireConnectionConf(Seq(("host2", 5678), ("host1", 
1234)))
+    val connConf5 = new GemFireConnectionConf(Seq(("host5", 3333)))
+
+    val props: Map[String, String] = Map.empty
+    val mockConnFactory: DefaultGemFireConnectionFactory = 
mock[DefaultGemFireConnectionFactory]
+    val mockConn1 = mock[DefaultGemFireConnection]
+    val mockConn2 = mock[DefaultGemFireConnection]
+    when(mockConnFactory.newConnection(connConf3.locators, 
props)).thenReturn(mockConn1)
+    when(mockConnFactory.newConnection(connConf5.locators, 
props)).thenReturn(mockConn2)
+
+    
assert(DefaultGemFireConnectionManager.getConnection(connConf3)(mockConnFactory)
 == mockConn1)
+    // note: following 3 lines do not trigger connFactory.newConnection(...)
+    
assert(DefaultGemFireConnectionManager.getConnection(connConf1)(mockConnFactory)
 == mockConn1)
+    
assert(DefaultGemFireConnectionManager.getConnection(connConf2)(mockConnFactory)
 == mockConn1)
+    
assert(DefaultGemFireConnectionManager.getConnection(connConf4)(mockConnFactory)
 == mockConn1)
+    
assert(DefaultGemFireConnectionManager.getConnection(connConf5)(mockConnFactory)
 == mockConn2)
+
+    // connFactory.newConnection(...) were invoked only twice
+    verify(mockConnFactory, times(1)).newConnection(connConf3.locators, props)
+    verify(mockConnFactory, times(1)).newConnection(connConf5.locators, props)
+    assert(DefaultGemFireConnectionManager.connections.size == 3)
+
+    DefaultGemFireConnectionManager.closeConnection(connConf1)
+    assert(DefaultGemFireConnectionManager.connections.size == 1)
+    DefaultGemFireConnectionManager.closeConnection(connConf5)
+    assert(DefaultGemFireConnectionManager.connections.isEmpty)
+  }
+  
+  test("DefaultGemFireConnectionFactory newConnection(...) throws 
RuntimeException") {
+    val connConf1 = new GemFireConnectionConf(Seq(("host1", 1234)))
+    val props: Map[String, String] = Map.empty
+    val mockConnFactory: DefaultGemFireConnectionFactory = 
mock[DefaultGemFireConnectionFactory]
+    when(mockConnFactory.newConnection(connConf1.locators, 
props)).thenThrow(new RuntimeException())
+    intercept[RuntimeException] { 
DefaultGemFireConnectionManager.getConnection(connConf1)(mockConnFactory) }
+    verify(mockConnFactory, times(1)).newConnection(connConf1.locators, props)
+  }
+
+  test("DefaultGemFireConnectionFactory close() w/ non-exist connection") {
+    val props: Map[String, String] = Map.empty
+    val mockConnFactory: DefaultGemFireConnectionFactory = 
mock[DefaultGemFireConnectionFactory]
+    val connConf1 = new GemFireConnectionConf(Seq(("host1", 1234)))
+    val connConf2 = new GemFireConnectionConf(Seq(("host2", 5678)))
+    val mockConn1 = mock[DefaultGemFireConnection]
+    when(mockConnFactory.newConnection(connConf1.locators, 
props)).thenReturn(mockConn1)
+    
assert(DefaultGemFireConnectionManager.getConnection(connConf1)(mockConnFactory)
 == mockConn1)
+    assert(DefaultGemFireConnectionManager.connections.size == 1)
+    // connection does not exists in the connection manager
+    DefaultGemFireConnectionManager.closeConnection(connConf2)
+    assert(DefaultGemFireConnectionManager.connections.size == 1)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
new file mode 100644
index 0000000..04294e7
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala
@@ -0,0 +1,238 @@
+package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions
+
+import com.gemstone.gemfire.DataSerializer
+import com.gemstone.gemfire.cache.execute.{ResultCollector, ResultSender}
+import com.gemstone.gemfire.cache.query.internal.types.{ObjectTypeImpl, 
StructTypeImpl}
+import com.gemstone.gemfire.cache.query.types.ObjectType
+import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput, 
HeapDataOutputStream}
+import com.gemstone.gemfire.internal.cache.{CachedDeserializable, 
CachedDeserializableFactory}
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import scala.collection.JavaConversions._
+import scala.concurrent.{Await, ExecutionContext, Future}
+import ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+
+class StructStreamingResultSenderAndCollectorTest extends FunSuite with 
BeforeAndAfter  {
+
+  /** 
+    * A test ResultSender that connects struct ResultSender and 
ResultCollector 
+    * Note: this test ResultSender has to copy the data (byte array) since the
+    *       StructStreamingResultSender will reuse the byte array.
+    */
+  class LocalResultSender(collector: ResultCollector[Array[Byte], _], num: Int 
= 1) extends ResultSender[Object] {
+
+    var finishedNum = 0
+    
+    override def sendResult(result: Object): Unit =
+      collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
+
+    /** exception should be sent via lastResult() */
+    override def sendException(throwable: Throwable): Unit = 
+      throw new UnsupportedOperationException("sendException is not 
supported.")
+
+    override def lastResult(result: Object): Unit = {
+      collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
+      this.synchronized {
+        finishedNum += 1
+        if (finishedNum == num)
+          collector.endResults()
+      }
+    }
+  }
+
+  /** common variables */
+  var collector: StructStreamingResultCollector = _
+  var baseSender: LocalResultSender = _
+  /** common types */
+  val objType = new ObjectTypeImpl("java.lang.Object").asInstanceOf[ObjectType]
+  val TwoColType = new StructTypeImpl(Array("key", "value"), Array(objType, 
objType))
+  val OneColType = new StructTypeImpl(Array("value"), Array(objType))
+
+  before {
+    collector = new StructStreamingResultCollector
+    baseSender = new LocalResultSender(collector, 1)
+  }
+  
+  test("transfer simple data") {
+    verifySimpleTransfer(sendDataType = true)
+  }
+
+  test("transfer simple data with no type info") {
+    verifySimpleTransfer(sendDataType = false)
+  }
+
+  def verifySimpleTransfer(sendDataType: Boolean): Unit = {
+    val iter = (0 to 9).map(i => Array(i.asInstanceOf[Object], (i.toString * 
5).asInstanceOf[Object])).toIterator
+    val dataType = if (sendDataType) TwoColType else null
+    new StructStreamingResultSender(baseSender, dataType , iter).send()
+    // println("type: " + collector.getResultType.toString)
+    assert(TwoColType.equals(collector.getResultType))
+    val iter2 = collector.getResult
+    (0 to 9).foreach { i =>
+      assert(iter2.hasNext)
+      val o = iter2.next()
+      assert(o.size == 2)
+      assert(o(0).asInstanceOf[Int] == i)
+      assert(o(1).asInstanceOf[String] == i.toString * 5)
+    }
+    assert(! iter2.hasNext)
+  }
+
+  
+  /**
+   * A test iterator that generate integer data
+   * @param start the 1st value
+   * @param n number of integers generated
+   * @param genExcp generate Exception if true. This is used to test exception 
handling.
+   */
+  def intIterator(start: Int, n: Int, genExcp: Boolean): 
Iterator[Array[Object]] = {
+    new Iterator[Array[Object]] {
+      val max = if (genExcp) start + n else start + n - 1
+      var index: Int = start - 1
+
+      override def hasNext: Boolean = if (index < max) true else false
+
+      override def next(): Array[Object] =
+        if (index < (start + n - 1)) {
+          index += 1
+          Array(index.asInstanceOf[Object])
+        } else throw new RuntimeException("simulated error")
+    }
+  }
+
+  test("transfer data with 0 row") {
+    new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 0, 
genExcp = false)).send()
+    // println("type: " + collector.getResultType.toString)
+    assert(collector.getResultType == null)
+    val iter = collector.getResult
+    assert(! iter.hasNext)
+  }
+
+  test("transfer data with 10K rows") {
+    new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 
10000, genExcp = false)).send()
+    // println("type: " + collector.getResultType.toString)
+    assert(OneColType.equals(collector.getResultType))
+    val iter = collector.getResult
+    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+    (1 to 10000).foreach { i =>
+      assert(iter.hasNext)
+      val o = iter.next()
+      assert(o.size == 1)
+      assert(o(0).asInstanceOf[Int] == i)
+    }
+    assert(! iter.hasNext)
+  }
+
+  test("transfer data with 10K rows with 2 sender") {
+    baseSender = new LocalResultSender(collector, 2)
+    val total = 300
+    val sender1 = Future { new StructStreamingResultSender(baseSender, 
OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
+    val sender2 = Future { new StructStreamingResultSender(baseSender, 
OneColType, intIterator(total/2+1, total/2, genExcp = false), "sender2").send()}
+    Await.result(sender1, 1.seconds)
+    Await.result(sender2, 1.seconds)
+
+    // println("type: " + collector.getResultType.toString)
+    assert(OneColType.equals(collector.getResultType))
+    val iter = collector.getResult
+    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+    val set = scala.collection.mutable.Set[Int]()
+    (1 to total).foreach { i =>
+      assert(iter.hasNext)
+      val o = iter.next()
+      assert(o.size == 1)
+      assert(! set.contains(o(0).asInstanceOf[Int]))
+      set.add(o(0).asInstanceOf[Int])
+    }
+    assert(! iter.hasNext)
+  }
+
+  test("transfer data with 10K rows with 2 sender with error") {
+    baseSender = new LocalResultSender(collector, 2)
+    val total = 1000
+    val sender1 = Future { new StructStreamingResultSender(baseSender, 
OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
+    val sender2 = Future { new StructStreamingResultSender(baseSender, 
OneColType, intIterator(total/2+1, total/2, genExcp = true), "sender2").send()}
+    Await.result(sender1, 1 seconds)
+    Await.result(sender2, 1 seconds)
+
+    // println("type: " + collector.getResultType.toString)
+    assert(OneColType.equals(collector.getResultType))
+    val iter = collector.getResult
+    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+    val set = scala.collection.mutable.Set[Int]()
+    intercept[RuntimeException] {
+      (1 to total).foreach { i =>
+        assert(iter.hasNext)
+        val o = iter.next()
+        assert(o.size == 1)
+        assert(! set.contains(o(0).asInstanceOf[Int]))
+        set.add(o(0).asInstanceOf[Int])
+      }
+    }
+    // println(s"rows received: ${set.size}")
+  }
+
+  test("transfer data with Exception") {
+    new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 
200, genExcp = true)).send()
+    // println("type: " + collector.getResultType.toString)
+    val iter = collector.getResult
+    intercept[RuntimeException] ( iter.foreach(_.mkString(",")) )
+  }
+
+  def stringPairIterator(n: Int, genExcp: Boolean): Iterator[Array[Object]] =
+    intIterator(1, n, genExcp).map(x => Array(s"key-${x(0)}", 
s"value-${x(0)}"))
+
+  test("transfer string pair data with 200 rows") {
+    new StructStreamingResultSender(baseSender, TwoColType, 
stringPairIterator(1000, genExcp = false)).send()
+    // println("type: " + collector.getResultType.toString)
+    assert(TwoColType.equals(collector.getResultType))
+    val iter = collector.getResult
+    // println(iter.toList.map(list => list.mkString(",")).mkString("; "))
+    (1 to 1000).foreach { i =>
+      assert(iter.hasNext)
+      val o = iter.next()
+      assert(o.size == 2)
+      assert(o(0) == s"key-$i")
+      assert(o(1) == s"value-$i")
+    }
+    assert(! iter.hasNext)
+  }
+
+  /**
+   * Usage notes: There are 3 kinds of data to transfer:
+   * (1) object, (2) byte array of serialized object, and (3) byte array
+   * this test shows how to handle all of them.
+   */
+  test("DataSerializer usage") {
+    val outBuf = new HeapDataOutputStream(1024, null)
+    val inBuf = new ByteArrayDataInput()
+
+    // 1. a regular object
+    val hello = "Hello World!" * 30
+    // serialize the data
+    DataSerializer.writeObject(hello, outBuf)
+    val bytesHello = outBuf.toByteArray.clone()
+    // de-serialize the data
+    inBuf.initialize(bytesHello, Version.CURRENT)
+    val hello2 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
+    assert(hello == hello2)
+    
+    // 2. byte array of serialized object
+    // serialize: byte array from `CachedDeserializable`
+    val cd: CachedDeserializable = 
CachedDeserializableFactory.create(bytesHello)
+    outBuf.reset()
+    DataSerializer.writeByteArray(cd.getSerializedValue, outBuf)
+    // de-serialize the data in 2 steps
+    inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
+    val bytesHello2: Array[Byte] = DataSerializer.readByteArray(inBuf)
+    inBuf.initialize(bytesHello2, Version.CURRENT)
+    val hello3 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
+    assert(hello == hello3)
+
+    // 3. byte array
+    outBuf.reset()
+    DataSerializer.writeByteArray(bytesHello, outBuf)
+    inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
+    val bytesHello3: Array[Byte] = DataSerializer.readByteArray(inBuf)
+    assert(bytesHello sameElements bytesHello3)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala
new file mode 100644
index 0000000..86210b6
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala
@@ -0,0 +1,67 @@
+package io.pivotal.gemfire.spark.connector.internal.oql
+
+import org.scalatest.FunSuite
+
+class QueryParserTest extends FunSuite {
+
+  test("select * from /r1") {
+    val r = QueryParser.parseOQL("select * from /r1").get
+    assert(r == "List(/r1)")
+  }
+
+  test("select c2 from /r1") {
+    val r = QueryParser.parseOQL("select c2 from /r1").get
+    assert(r == "List(/r1)")
+  }
+
+  test("select key, value from /r1.entries") {
+    val r = QueryParser.parseOQL("select key, value from /r1.entries").get
+    assert(r == "List(/r1.entries)")
+  }
+
+  test("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2") {
+    val r = QueryParser.parseOQL("select c1, c2 from /r1 where col1 > 100 and 
col2 <= 120 or c3 = 2").get
+    assert(r == "List(/r1)")
+  }
+
+  test("select * from /r1/r2 where c1 >= 200") {
+    val r = QueryParser.parseOQL("select * from /r1/r2 where c1 >= 200").get
+    assert(r == "List(/r1/r2)")
+  }
+
+  test("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 
and c2 = 100") {
+    val r = QueryParser.parseOQL("import io.pivotal select c1, c2, c3 from 
/r1/r2, /r3/r4 where c1 <= 15 and c2 = 100").get
+    assert(r == "List(/r1/r2, /r3/r4)")
+  }
+
+  test("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100") {
+    val r = QueryParser.parseOQL("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 
100").get
+    assert(r == "List(/r1/r2)")
+  }
+
+  test("IMPORT io.pivotal.gemfire IMPORT com.mypackage SELECT key,value FROM 
/root/sub.entries WHERE status = 'active' ORDER BY id desc") {
+    val r = QueryParser.parseOQL("IMPORT io.pivotal.gemfire IMPORT 
com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' 
ORDER BY id desc").get
+    assert(r == "List(/root/sub.entries)")
+  }
+
+  test("select distinct p.ID, p.status from /region p where p.ID > 5 order by 
p.status") {
+    val r = QueryParser.parseOQL("select distinct p.ID, p.status from /region 
p where p.ID > 5 order by p.status").get
+    assert(r == "List(/region)")
+  }
+
+  test("SELECT DISTINCT * FROM /QueryRegion1 r1,  /QueryRegion2 r2 WHERE r1.ID 
= r2.ID") {
+    val r = QueryParser.parseOQL("SELECT DISTINCT * FROM /QueryRegion1 r1,  
/QueryRegion2 r2 WHERE r1.ID = r2.ID").get
+    assert(r == "List(/QueryRegion1, /QueryRegion2)")
+  }
+
+  test("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE 
status = 'active'") {
+    val r = QueryParser.parseOQL("SELECT id, \"type\", positions, status FROM 
/obj_obj_region WHERE status = 'active'").get
+    println("r.type=" + r.getClass.getName + " r=" + r)
+    assert(r == "List(/obj_obj_region)")
+  }
+
+  test("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, 
r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'") {
+    val r = QueryParser.parseOQL("SELECT r.id, r.\"type\", r.positions, 
r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' 
and f.secId = 'MSFT'").get
+    assert(r == "List(/obj_obj_region, r.positions.values)")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala
new file mode 100644
index 0000000..c50bbc7
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala
@@ -0,0 +1,34 @@
+package unittest.io.pivotal.gemfire.spark.connector
+
+import io.pivotal.gemfire.spark.connector._
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.Matchers
+
+class ConnectorImplicitsTest extends FunSuite with Matchers with MockitoSugar {
+
+  test("implicit map2Properties") {
+    verifyProperties(Map.empty)
+    verifyProperties(Map("One" -> "1", "Two" -> "2", "Three" ->"3"))
+  }
+  
+  def verifyProperties(map: Map[String, String]): Unit = {
+    val props: java.util.Properties = map
+    assert(props.size() == map.size)
+    map.foreach(p => assert(props.getProperty(p._1) == p._2))    
+  }
+
+  test("Test Implicit SparkContext Conversion") {
+    val mockSparkContext = mock[SparkContext]
+    val gfscf: GemFireSparkContextFunctions = mockSparkContext
+    assert(gfscf.isInstanceOf[GemFireSparkContextFunctions])
+  }
+
+  test("Test Implicit SQLContext Conversion") {
+    val mockSQLContext = mock[SQLContext]
+    val gfscf: GemFireSQLContextFunctions = mockSQLContext
+    assert(gfscf.isInstanceOf[GemFireSQLContextFunctions])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala
new file mode 100644
index 0000000..2c64e19
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala
@@ -0,0 +1,84 @@
+package unittest.io.pivotal.gemfire.spark.connector
+
+import org.apache.spark.SparkConf
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, FunSuite}
+import io.pivotal.gemfire.spark.connector._
+
+class GemFireConnectionConfTest extends FunSuite with Matchers with 
MockitoSugar {
+
+  test("apply(SparkConf) w/ GemFireLocator property and empty gemfireProps") {
+    val (host1, port1) = ("host1", 1234)
+    val (host2, port2) = ("host2", 5678)
+    val conf = new SparkConf().set(GemFireLocatorPropKey, 
s"$host1[$port1],$host2[$port2]")
+    val connConf = GemFireConnectionConf(conf)
+    assert(connConf.locators == Seq((host1, port1),(host2, port2)))
+    assert(connConf.gemfireProps.isEmpty)
+  }
+  
+  test("apply(SparkConf) w/ GemFireLocator property and gemfire properties") {
+    val (host1, port1) = ("host1", 1234)
+    val (host2, port2) = ("host2", 5678)
+    val (propK1, propV1) = ("ack-severe-alert-threshold", "1")
+    val (propK2, propV2) = ("ack-wait-threshold", "10")
+    val conf = new SparkConf().set(GemFireLocatorPropKey, 
s"$host1[$port1],$host2[$port2]")
+                              .set(s"spark.gemfire.$propK1", 
propV1).set(s"spark.gemfire.$propK2", propV2)
+    val connConf = GemFireConnectionConf(conf)
+    assert(connConf.locators == Seq((host1, port1),(host2, port2)))
+    assert(connConf.gemfireProps == Map(propK1 -> propV1, propK2 -> propV2))
+  }
+
+  test("apply(SparkConf) w/o GemFireLocator property") {
+    intercept[RuntimeException] { GemFireConnectionConf(new SparkConf()) }
+  }
+
+  test("apply(SparkConf) w/ invalid GemFireLocator property") {
+    val conf = new SparkConf().set(GemFireLocatorPropKey, "local^host:1234")
+    intercept[Exception] { GemFireConnectionConf(conf) }
+  }
+
+  test("apply(locatorStr, gemfireProps) w/ valid locatorStr and non 
gemfireProps") {
+    val (host1, port1) = ("host1", 1234)
+    val connConf = GemFireConnectionConf(s"$host1:$port1")
+    assert(connConf.locators == Seq((host1, port1)))
+    assert(connConf.gemfireProps.isEmpty)
+  }
+
+  test("apply(locatorStr, gemfireProps) w/ valid locatorStr and non-empty 
gemfireProps") {
+    val (host1, port1) = ("host1", 1234)
+    val (host2, port2) = ("host2", 5678)
+    val (propK1, propV1) = ("ack-severe-alert-threshold", "1")
+    val (propK2, propV2) = ("ack-wait-threshold", "10")
+    val props = Map(propK1 -> propV1, propK2 -> propV2)
+    val connConf = GemFireConnectionConf(s"$host1:$port1,$host2:$port2", props)
+    assert(connConf.locators == Seq((host1, port1),(host2, port2)))
+    assert(connConf.gemfireProps == props)
+  }
+
+  test("apply(locatorStr, gemfireProps) w/ invalid locatorStr") {
+    intercept[Exception] { GemFireConnectionConf("local~host:4321") }
+  }
+
+  test("constructor w/ empty (host,port) pairs") {
+    intercept[IllegalArgumentException] { new GemFireConnectionConf(Seq.empty) 
}
+  }
+
+  test("getConnection() normal") {
+    implicit val mockFactory = mock[GemFireConnectionManager]
+    val mockConnection = mock[GemFireConnection]
+    
when(mockFactory.getConnection(org.mockito.Matchers.any[GemFireConnectionConf])).thenReturn(mockConnection)
+    val connConf = GemFireConnectionConf("localhost:1234")
+    assert(connConf.getConnection == mockConnection)
+    verify(mockFactory).getConnection(connConf)
+  }
+
+  test("getConnection() failure") {
+    implicit val mockFactory = mock[GemFireConnectionManager]
+    
when(mockFactory.getConnection(org.mockito.Matchers.any[GemFireConnectionConf])).thenThrow(new
 RuntimeException)
+    val connConf = GemFireConnectionConf("localhost:1234")
+    intercept[RuntimeException] { connConf.getConnection }
+    verify(mockFactory).getConnection(connConf)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala
new file mode 100644
index 0000000..ef4728c
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala
@@ -0,0 +1,63 @@
+package unittest.io.pivotal.gemfire.spark.connector
+
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.gemfire.spark.connector.{GemFireConnection, 
GemFireConnectionConf}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.DStream
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, FunSuite}
+import org.mockito.Matchers.{eq => mockEq, any => mockAny}
+
+import scala.reflect.ClassTag
+
+class GemFireDStreamFunctionsTest extends FunSuite with Matchers with 
MockitoSugar {
+
+  test("test GemFirePairDStreamFunctions Implicit") {
+    import io.pivotal.gemfire.spark.connector.streaming._
+    val mockDStream = mock[DStream[(Int, String)]]
+    // the implicit make the following line valid
+    val pairDStream: GemFirePairDStreamFunctions[Int, String] = mockDStream
+    pairDStream shouldBe a[GemFirePairDStreamFunctions[_, _]]
+  }
+
+  test("test GemFireDStreamFunctions Implicit") {
+    import io.pivotal.gemfire.spark.connector.streaming._
+    val mockDStream = mock[DStream[String]]
+    // the implicit make the following line valid
+    val dstream: GemFireDStreamFunctions[String] = mockDStream
+    dstream shouldBe a[GemFireDStreamFunctions[_]]
+  }
+
+  def createMocks[K, V](regionPath: String)
+    (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]])
+    : (String, GemFireConnectionConf, GemFireConnection, Region[K, V]) = {
+    val mockConnection = mock[GemFireConnection]
+    val mockConnConf = mock[GemFireConnectionConf]
+    val mockRegion = mock[Region[K, V]]
+    when(mockConnConf.getConnection).thenReturn(mockConnection)
+    when(mockConnConf.locators).thenReturn(Seq.empty)
+    (regionPath, mockConnConf, mockConnection, mockRegion)
+  }
+
+  test("test GemFirePairDStreamFunctions.saveToGemfire()") {
+    import io.pivotal.gemfire.spark.connector.streaming._
+    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[String, String]("test")
+    val mockDStream = mock[DStream[(String, String)]]
+    mockDStream.saveToGemfire(regionPath, mockConnConf)
+    verify(mockConnConf).getConnection
+    verify(mockConnection).validateRegion[String, String](regionPath)
+    verify(mockDStream).foreachRDD(mockAny[(RDD[(String, String)]) => Unit])
+  }
+
+  test("test GemFireDStreamFunctions.saveToGemfire()") {
+    import io.pivotal.gemfire.spark.connector.streaming._
+    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[String, Int]("test")
+    val mockDStream = mock[DStream[String]]
+    mockDStream.saveToGemfire[String, Int](regionPath,  (s: String) => (s, 
s.length), mockConnConf)
+    verify(mockConnConf).getConnection
+    verify(mockConnection).validateRegion[String, String](regionPath)
+    verify(mockDStream).foreachRDD(mockAny[(RDD[String]) => Unit])
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala
new file mode 100644
index 0000000..659fca2
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala
@@ -0,0 +1,99 @@
+package unittest.io.pivotal.gemfire.spark.connector
+
+import com.gemstone.gemfire.cache.Region
+import io.pivotal.gemfire.spark.connector._
+import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireRDDWriter, 
GemFirePairRDDWriter}
+import org.apache.spark.{TaskContext, SparkContext}
+import org.apache.spark.rdd.RDD
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FunSuite, Matchers}
+import collection.JavaConversions._
+import scala.reflect.ClassTag
+import org.mockito.Matchers.{eq => mockEq, any => mockAny}
+
+class GemFireRDDFunctionsTest extends FunSuite with Matchers with MockitoSugar 
{
+
+  test("test PairRDDFunction Implicit") {
+    import io.pivotal.gemfire.spark.connector._
+    val mockRDD = mock[RDD[(Int, String)]]
+    // the implicit make the following line valid
+    val pairRDD: GemFirePairRDDFunctions[Int, String] = mockRDD
+    pairRDD shouldBe a [GemFirePairRDDFunctions[_, _]]
+  }
+  
+  test("test RDDFunction Implicit") {
+    import io.pivotal.gemfire.spark.connector._
+    val mockRDD = mock[RDD[String]]
+    // the implicit make the following line valid
+    val nonPairRDD: GemFireRDDFunctions[String] = mockRDD
+    nonPairRDD shouldBe a [GemFireRDDFunctions[_]]
+  }
+
+  def createMocks[K, V](regionPath: String)
+    (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]): 
(String, GemFireConnectionConf, GemFireConnection, Region[K, V]) = {
+    val mockConnection = mock[GemFireConnection]
+    val mockConnConf = mock[GemFireConnectionConf]
+    val mockRegion = mock[Region[K, V]]
+    when(mockConnConf.getConnection).thenReturn(mockConnection)
+    when(mockConnection.getRegionProxy[K, 
V](regionPath)).thenReturn(mockRegion)
+    // mockRegion shouldEqual mockConn.getRegionProxy[K, V](regionPath)
+    (regionPath, mockConnConf, mockConnection, mockRegion)
+  }
+
+  test("test GemFirePairRDDWriter") {
+    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[String, String]("test")
+    val writer = new GemFirePairRDDWriter[String, String](regionPath, 
mockConnConf)
+    val data = List(("1", "one"), ("2", "two"), ("3", "three"))
+    writer.write(null, data.toIterator)
+    val expectedMap: Map[String, String] = data.toMap
+    verify(mockRegion).putAll(expectedMap)
+  }
+
+  test("test GemFireNonPairRDDWriter") {
+    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[Int, String]("test")
+    val writer = new GemFireRDDWriter[String, Int, String](regionPath, 
mockConnConf)
+    val data = List("a", "ab", "abc")
+    val f: String => (Int, String) = s => (s.length, s)
+    writer.write(f)(null, data.toIterator)
+    val expectedMap: Map[Int, String] = data.map(f).toMap
+    verify(mockRegion).putAll(expectedMap)
+  }
+  
+  test("test PairRDDFunctions.saveToGemfire") {
+    import io.pivotal.gemfire.spark.connector._
+    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[String, String]("test")
+    val mockRDD = mock[RDD[(String, String)]]
+    val mockSparkContext = mock[SparkContext]
+    when(mockRDD.sparkContext).thenReturn(mockSparkContext)
+    val result = mockRDD.saveToGemfire(regionPath, mockConnConf)
+    verify(mockConnection, times(1)).validateRegion[String, String](regionPath)
+    result === Unit
+    verify(mockSparkContext, times(1)).runJob[(String, String), Unit](
+      mockEq(mockRDD), mockAny[(TaskContext, Iterator[(String, String)]) => 
Unit])(mockAny(classOf[ClassTag[Unit]]))
+
+    // Note: current implementation make following code not compilable
+    //       so not negative test for this case   
+    //  val rdd: RDD[(K, V)] = ...
+    //  rdd.saveToGemfire(regionPath, s => (s.length, s))
+  }
+
+  test("test RDDFunctions.saveToGemfire") {
+    import io.pivotal.gemfire.spark.connector._
+    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[Int, String]("test")
+    val mockRDD = mock[RDD[(String)]]
+    val mockSparkContext = mock[SparkContext]
+    when(mockRDD.sparkContext).thenReturn(mockSparkContext)
+    val result = mockRDD.saveToGemfire(regionPath, s => (s.length, s), 
mockConnConf)
+    verify(mockConnection, times(1)).validateRegion[Int, String](regionPath)
+    result === Unit
+    verify(mockSparkContext, times(1)).runJob[String, Unit](
+      mockEq(mockRDD), mockAny[(TaskContext, Iterator[String]) => 
Unit])(mockAny(classOf[ClassTag[Unit]]))
+
+    // Note: current implementation make following code not compilable
+    //       so not negative test for this case
+    //  val rdd: RDD[T] = ...   // T is not a (K, V) tuple
+    //  rdd.saveToGemfire(regionPath)
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/85d44f04/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
----------------------------------------------------------------------
diff --git 
a/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
new file mode 100644
index 0000000..508666a
--- /dev/null
+++ 
b/gemfire-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala
@@ -0,0 +1,75 @@
+package unittest.io.pivotal.gemfire.spark.connector
+
+import io.pivotal.gemfire.spark.connector.internal.LocatorHelper
+import org.scalatest.FunSuite
+
+class LocatorHelperTest extends FunSuite {
+
+  test("locatorStr2HostPortPair hostname w/o domain") {
+    val (host, port) = ("localhost", 10334)
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, 
port))
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, 
port))
+  }
+
+  test("locatorStr2HostPortPair hostname w/ domain") {
+    val (host, port) = ("localhost", 10334)
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, 
port))
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, 
port))
+  }
+
+  test("locatorStr2HostPortPair w/ invalid host name") {
+    // empty or null locatorStr
+    assert(LocatorHelper.locatorStr2HostPortPair("").isFailure)
+    assert(LocatorHelper.locatorStr2HostPortPair(null).isFailure)
+    // host name has leading `.`
+    assert(LocatorHelper.locatorStr2HostPortPair(".localhost.1234").isFailure)
+    // host name has leading and/or tail white space
+    assert(LocatorHelper.locatorStr2HostPortPair(" localhost.1234").isFailure)
+    assert(LocatorHelper.locatorStr2HostPortPair("localhost .1234").isFailure)
+    assert(LocatorHelper.locatorStr2HostPortPair(" localhost .1234").isFailure)
+    // host name contain invalid characters
+    assert(LocatorHelper.locatorStr2HostPortPair("local%host.1234").isFailure)
+    assert(LocatorHelper.locatorStr2HostPortPair("localhost*.1234").isFailure)
+    assert(LocatorHelper.locatorStr2HostPortPair("^localhost.1234").isFailure)
+  }
+
+  test("locatorStr2HostPortPair w/ valid port") {
+    val host = "192.168.0.1"
+    // port has 2, 3, 4, 5 digits
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:20").get ==(host, 20))
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:300").get ==(host, 
300))
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:4000").get ==(host, 
4000))
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:50000").get ==(host, 
50000))
+  }
+  
+  test("locatorStr2HostPortPair w/ invalid port") {
+    // port number is less than 2 digits
+    assert(LocatorHelper.locatorStr2HostPortPair("locslhost.9").isFailure)
+    // port number is more than 5 digits
+    assert(LocatorHelper.locatorStr2HostPortPair("locslhost.100000").isFailure)
+    // port number is invalid
+    assert(LocatorHelper.locatorStr2HostPortPair("locslhost.1xx1").isFailure)
+  }
+  
+  test("parseLocatorsString with valid locator(s)") {
+    val (host1, port1) = ("localhost", 10334)
+    assert(LocatorHelper.parseLocatorsString(s"$host1:$port1") == Seq((host1, 
port1)))
+    val (host2, port2) = ("localhost2", 10335)
+    assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2") 
== Seq((host1, port1),(host2, port2)))
+    val (host3, port3) = ("localhost2", 10336)
+    
assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2,$host3:$port3")
 == 
+      Seq((host1, port1),(host2, port2),(host3, port3)))
+  }
+
+  test("parseLocatorsString with invalid locator(s)") {
+    // empty and null locatorsStr
+    intercept[Exception] { LocatorHelper.parseLocatorsString("") }
+    intercept[Exception] { LocatorHelper.parseLocatorsString(null) }
+    // 1 bad locatorStr
+    intercept[Exception] { 
LocatorHelper.parseLocatorsString("local%host.1234") }
+    // 1 good locatorStr and 1 bad locatorStr
+    intercept[Exception] { 
LocatorHelper.parseLocatorsString("localhost:2345,local%host.1234") }
+    intercept[Exception] { 
LocatorHelper.parseLocatorsString("local^host:2345,localhost.1234") }
+  }
+
+}


Reply via email to