http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala new file mode 100644 index 0000000..a8666fc --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.geodefunctions + +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, BlockingQueue} +import com.gemstone.gemfire.DataSerializer +import com.gemstone.gemfire.cache.execute.ResultCollector +import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl +import com.gemstone.gemfire.cache.query.types.StructType +import com.gemstone.gemfire.distributed.DistributedMember +import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput} +import io.pivotal.geode.spark.connector.internal.geodefunctions.StructStreamingResultSender. + {TYPE_CHUNK, DATA_CHUNK, ERROR_CHUNK, SER_DATA, UNSER_DATA, BYTEARR_DATA} + +/** + * StructStreamingResultCollector and StructStreamingResultSender are paired + * to transfer result of list of `com.gemstone.gemfire.cache.query.Struct` + * from Geode server to Spark Connector (the client of Geode server) + * in streaming, i.e., while sender sending the result, the collector can + * start processing the arrived result without waiting for full result to + * become available. + */ +class StructStreamingResultCollector(desc: String) extends ResultCollector[Array[Byte], Iterator[Array[Object]]] { + + /** the constructor that provide default `desc` (description) */ + def this() = this("StructStreamingResultCollector") + + private val queue: BlockingQueue[Array[Byte]] = new LinkedBlockingQueue[Array[Byte]]() + var structType: StructType = null + + /** ------------------------------------------ */ + /** ResultCollector interface implementations */ + /** ------------------------------------------ */ + + override def getResult: Iterator[Array[Object]] = resultIterator + + override def getResult(timeout: Long, unit: TimeUnit): Iterator[Array[Object]] = + throw new UnsupportedOperationException() + + /** addResult add non-empty byte array (chunk) to the queue */ + override def addResult(memberID: DistributedMember, chunk: Array[Byte]): Unit = + if (chunk != null && chunk.size > 1) { + this.queue.add(chunk) + // println(s"""$desc receive from $memberID: ${chunk.mkString(" ")}""") + } + + /** endResults add special `Array.empty` to the queue as marker of end of data */ + override def endResults(): Unit = this.queue.add(Array.empty) + + override def clearResults(): Unit = this.queue.clear() + + /** ------------------------------------------ */ + /** Internal methods */ + /** ------------------------------------------ */ + + def getResultType: StructType = { + // trigger lazy resultIterator initialization if necessary + if (structType == null) resultIterator.hasNext + structType + } + + /** + * Note: The data is sent in chunks, and each chunk contains multiple + * records. So the result iterator is an iterator (I) of iterator (II), + * i.e., go through each chunk (iterator (I)), and for each chunk, go + * through each record (iterator (II)). + */ + private lazy val resultIterator = new Iterator[Array[Object]] { + + private var currentIterator: Iterator[Array[Object]] = nextIterator() + + override def hasNext: Boolean = { + if (!currentIterator.hasNext && currentIterator != Iterator.empty) currentIterator = nextIterator() + currentIterator.hasNext + } + + /** Note: make sure call `hasNext` first to adjust `currentIterator` */ + override def next(): Array[Object] = currentIterator.next() + } + + /** get the iterator for the next chunk of data */ + private def nextIterator(): Iterator[Array[Object]] = { + val chunk: Array[Byte] = queue.take + if (chunk.isEmpty) { + Iterator.empty + } else { + val input = new ByteArrayDataInput() + input.initialize(chunk, Version.CURRENT) + val chunkType = input.readByte() + // println(s"chunk type $chunkType") + chunkType match { + case TYPE_CHUNK => + if (structType == null) + structType = DataSerializer.readObject(input).asInstanceOf[StructTypeImpl] + nextIterator() + case DATA_CHUNK => + // require(structType != null && structType.getFieldNames.length > 0) + if (structType == null) structType = StructStreamingResultSender.KeyValueType + chunkToIterator(input, structType.getFieldNames.length) + case ERROR_CHUNK => + val error = DataSerializer.readObject(input).asInstanceOf[Exception] + errorPropagationIterator(error) + case _ => throw new RuntimeException(s"unknown chunk type: $chunkType") + } + } + } + + /** create a iterator that propagate sender's exception */ + private def errorPropagationIterator(ex: Exception) = new Iterator[Array[Object]] { + val re = new RuntimeException(ex) + override def hasNext: Boolean = throw re + override def next(): Array[Object] = throw re + } + + /** convert a chunk of data to an iterator */ + private def chunkToIterator(input: ByteArrayDataInput, rowSize: Int) = new Iterator[Array[Object]] { + override def hasNext: Boolean = input.available() > 0 + val tmpInput = new ByteArrayDataInput() + override def next(): Array[Object] = + (0 until rowSize).map { ignore => + val b = input.readByte() + b match { + case SER_DATA => + val arr: Array[Byte] = DataSerializer.readByteArray(input) + tmpInput.initialize(arr, Version.CURRENT) + DataSerializer.readObject(tmpInput).asInstanceOf[Object] + case UNSER_DATA => + DataSerializer.readObject(input).asInstanceOf[Object] + case BYTEARR_DATA => + DataSerializer.readByteArray(input).asInstanceOf[Object] + case _ => + throw new RuntimeException(s"unknown data type $b") + } + }.toArray + } + +} +
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala new file mode 100644 index 0000000..3f6dfad --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.oql + +import scala.util.parsing.combinator.RegexParsers + +class QueryParser extends RegexParsers { + + def query: Parser[String] = opt(rep(IMPORT ~ PACKAGE)) ~> select ~> opt(distinct) ~> projection ~> from ~> regions <~ opt(where ~ filter) ^^ { + _.toString + } + + val IMPORT: Parser[String] = "[Ii][Mm][Pp][Oo][Rr][Tt]".r + + val select: Parser[String] = "[Ss][Ee][Ll][Ee][Cc][Tt]".r + + val distinct: Parser[String] = "[Dd][Ii][Ss][Tt][Ii][Nn][Cc][Tt]".r + + val from: Parser[String] = "[Ff][Rr][Oo][Mm]".r + + val where: Parser[String] = "[Ww][Hh][Ee][Rr][Ee]".r + + def PACKAGE: Parser[String] = """[\w.]+""".r + + def projection: Parser[String] = "*" | repsep("""["\w]+[.\w"]*""".r, ",") ^^ { + _.toString + } + + def regions: Parser[String] = repsep(region <~ opt(alias), ",") ^^ { + _.toString + } + + def region: Parser[String] = """/[\w.]+[/[\w.]+]*""".r | """[\w]+[.\w]*""".r + + def alias: Parser[String] = not(where) ~> """[\w]+""".r + + def filter: Parser[String] = """[\w.]+[[\s]+[<>=.'\w]+]*""".r +} + +object QueryParser extends QueryParser { + + def parseOQL(expression: String) = parseAll(query, expression) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala new file mode 100644 index 0000000..474aa6a --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.oql + +import io.pivotal.geode.spark.connector.GeodeConnectionConf +import io.pivotal.geode.spark.connector.internal.rdd.{GeodeRDDPartition, ServerSplitsPartitioner} +import org.apache.spark.rdd.RDD +import org.apache.spark.{TaskContext, SparkContext, Partition} +import scala.reflect.ClassTag + +/** + * An RDD that provides the functionality that read the OQL query result + * + * @param sc The SparkContext this RDD is associated with + * @param queryString The OQL query string + * @param connConf The GeodeConnectionConf that provide the GeodeConnection + */ +class QueryRDD[T](@transient sc: SparkContext, + queryString: String, + connConf: GeodeConnectionConf) + (implicit ct: ClassTag[T]) + extends RDD[T](sc, Seq.empty) { + + override def getPartitions: Array[Partition] = { + val conn = connConf.getConnection + val regionPath = getRegionPathFromQuery(queryString) + val md = conn.getRegionMetadata(regionPath) + md match { + case Some(metadata) => + if (metadata.isPartitioned) { + val splits = ServerSplitsPartitioner.partitions(conn, metadata, Map.empty) + logInfo(s"QueryRDD.getPartitions():isPartitioned=true, partitions=${splits.mkString(",")}") + splits + } + else { + logInfo(s"QueryRDD.getPartitions():isPartitioned=false") + Array[Partition](new GeodeRDDPartition(0, Set.empty)) + + } + case None => throw new RuntimeException(s"Region $regionPath metadata was not found.") + } + } + + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + val buckets = split.asInstanceOf[GeodeRDDPartition].bucketSet + val regionPath = getRegionPathFromQuery(queryString) + val result = connConf.getConnection.executeQuery(regionPath, buckets, queryString) + result match { + case it: Iterator[T] => + logInfo(s"QueryRDD.compute():query=$queryString, partition=$split") + it + case _ => + throw new RuntimeException("Unexpected OQL result: " + result.toString) + } + } + + private def getRegionPathFromQuery(queryString: String): String = { + val r = QueryParser.parseOQL(queryString).get + r match { + case r: String => + val start = r.indexOf("/") + 1 + var end = r.indexOf(")") + if (r.indexOf(".") > 0) end = math.min(r.indexOf("."), end) + if (r.indexOf(",") > 0) end = math.min(r.indexOf(","), end) + val regionPath = r.substring(start, end) + regionPath + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala new file mode 100644 index 0000000..bedc58d --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.oql + +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import com.gemstone.gemfire.DataSerializer +import com.gemstone.gemfire.cache.execute.ResultCollector +import com.gemstone.gemfire.distributed.DistributedMember +import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput} + +class QueryResultCollector extends ResultCollector[Array[Byte], Iterator[Object]]{ + + private val queue = new LinkedBlockingDeque[Array[Byte]]() + + override def getResult = resultIterator + + override def getResult(timeout: Long, unit: TimeUnit) = throw new UnsupportedOperationException + + override def addResult(memberID: DistributedMember , chunk: Array[Byte]) = + if (chunk != null && chunk.size > 0) { + queue.add(chunk) + } + + override def endResults = queue.add(Array.empty) + + + override def clearResults = queue.clear + + private lazy val resultIterator = new Iterator[Object] { + private var currentIterator = nextIterator + def hasNext = { + if (!currentIterator.hasNext && currentIterator != Iterator.empty) + currentIterator = nextIterator + currentIterator.hasNext + } + def next = currentIterator.next + } + + private def nextIterator: Iterator[Object] = { + val chunk = queue.take + if (chunk.isEmpty) { + Iterator.empty + } + else { + val input = new ByteArrayDataInput + input.initialize(chunk, Version.CURRENT) + new Iterator[Object] { + override def hasNext: Boolean = input.available() > 0 + override def next: Object = DataSerializer.readObject(input).asInstanceOf[Object] + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala new file mode 100644 index 0000000..6a1611c --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.oql + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.sources.{BaseRelation, TableScan} + +import scala.tools.nsc.backend.icode.analysis.DataFlowAnalysis + +case class OQLRelation[T](queryRDD: QueryRDD[T])(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan { + + override def schema: StructType = new SchemaBuilder(queryRDD).toSparkSchema() + + override def buildScan(): RDD[Row] = new RowBuilder(queryRDD).toRowRDD() + +} + +object RDDConverter { + + def queryRDDToDataFrame[T](queryRDD: QueryRDD[T], sqlContext: SQLContext): DataFrame = { + sqlContext.baseRelationToDataFrame(OQLRelation(queryRDD)(sqlContext)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala new file mode 100644 index 0000000..e54411c --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.oql + +import com.gemstone.gemfire.cache.query.internal.StructImpl +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ + +class RowBuilder[T](queryRDD: QueryRDD[T]) { + + /** + * Convert QueryRDD to RDD of Row + * @return RDD of Rows + */ + def toRowRDD(): RDD[Row] = { + val rowRDD = queryRDD.map(row => { + row match { + case si: StructImpl => Row.fromSeq(si.getFieldValues) + case obj: Object => Row.fromSeq(Seq(obj)) + } + }) + rowRDD + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala new file mode 100644 index 0000000..3ca20b7 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.oql + +import com.gemstone.gemfire.cache.query.internal.StructImpl +import org.apache.spark.sql.types._ +import scala.collection.mutable.ListBuffer +import org.apache.spark.Logging + +class SchemaBuilder[T](queryRDD: QueryRDD[T]) extends Logging { + + val nullStructType = StructType(Nil) + + val typeMap:Map[Class[_], DataType] = Map( + (classOf[java.lang.String], StringType), + (classOf[java.lang.Integer], IntegerType), + (classOf[java.lang.Short], ShortType), + (classOf[java.lang.Long], LongType), + (classOf[java.lang.Double], DoubleType), + (classOf[java.lang.Float], FloatType), + (classOf[java.lang.Boolean], BooleanType), + (classOf[java.lang.Byte], ByteType), + (classOf[java.util.Date], DateType), + (classOf[java.lang.Object], nullStructType) + ) + + /** + * Analyse QueryRDD to get the Spark schema + * @return The schema represented by Spark StructType + */ + def toSparkSchema(): StructType = { + val row = queryRDD.first() + val tpe = row match { + case r: StructImpl => constructFromStruct(r) + case null => StructType(StructField("col1", NullType) :: Nil) + case default => + val value = typeMap.getOrElse(default.getClass(), nullStructType) + StructType(StructField("col1", value) :: Nil) + } + logInfo(s"Schema: $tpe") + tpe + } + + def constructFromStruct(r:StructImpl) = { + val names = r.getFieldNames + val values = r.getFieldValues + val lb = new ListBuffer[StructField]() + for (i <- 0 until names.length) { + val name = names(i) + val value = values(i) + val dataType = value match { + case null => NullType + case default => typeMap.getOrElse(default.getClass, nullStructType) + } + lb += StructField(name, dataType) + } + StructType(lb.toSeq) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala new file mode 100644 index 0000000..37dec42 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.oql + +import com.esotericsoftware.kryo.{Kryo, Serializer} +import com.esotericsoftware.kryo.io.{Output, Input} +import com.gemstone.gemfire.cache.query.QueryService +import com.gemstone.gemfire.cache.query.internal.Undefined + +/** + * This is the customized serializer to serialize QueryService.UNDEFINED, + * i.e. com.gemstone.gemfire.cache.query.internal.Undefined, in order to + * guarantee the singleton Undefined after its deserialization within Spark. + */ +class UndefinedSerializer extends Serializer[Undefined] { + + def write(kryo: Kryo, output: Output, u: Undefined) { + //Only serialize a byte for Undefined + output.writeByte(u.getDSFID) + } + + def read (kryo: Kryo, input: Input, tpe: Class[Undefined]): Undefined = { + //Read DSFID of Undefined + input.readByte() + QueryService.UNDEFINED match { + case null => new Undefined + case _ => + //Avoid calling Undefined constructor again. + QueryService.UNDEFINED.asInstanceOf[Undefined] + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala new file mode 100644 index 0000000..e9dd658 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.rdd + +import com.gemstone.gemfire.cache.Region +import io.pivotal.geode.spark.connector.GeodeConnectionConf +import org.apache.spark.{TaskContext, Partition} +import org.apache.spark.rdd.RDD +import scala.collection.JavaConversions._ + +/** + * An `RDD[T, V]` that will represent the result of a join between `left` RDD[T] + * and the specified Geode Region[K, V]. + */ +class GeodeJoinRDD[T, K, V] private[connector] + ( left: RDD[T], + func: T => K, + val regionPath: String, + val connConf: GeodeConnectionConf + ) extends RDD[(T, V)](left.context, left.dependencies) { + + /** validate region existence when GeodeRDD object is created */ + validate() + + /** Validate region, and make sure it exists. */ + private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath) + + override protected def getPartitions: Array[Partition] = left.partitions + + override def compute(split: Partition, context: TaskContext): Iterator[(T, V)] = { + val region = connConf.getConnection.getRegionProxy[K, V](regionPath) + if (func == null) computeWithoutFunc(split, context, region) + else computeWithFunc(split, context, region) + } + + /** T is (K, V1) since there's no map function `func` */ + private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = { + val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]] + val leftKeys = leftPairs.map { case (k, v) => k}.toSet + // Note: get all will return (key, null) for non-exist entry, so remove those entries + val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null} + leftPairs.filter{case (k, v) => rightPairs.contains(k)} + .map {case (k, v) => ((k, v).asInstanceOf[T], rightPairs.get(k).get)}.toIterator + } + + private def computeWithFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = { + val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t))) + val leftKeys = leftPairs.map { case (t, k) => k}.toSet + // Note: get all will return (key, null) for non-exist entry, so remove those entries + val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null} + leftPairs.filter { case (t, k) => rightPairs.contains(k)}.map {case (t, k) => (t, rightPairs.get(k).get)}.toIterator + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala new file mode 100644 index 0000000..3d61d47 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.rdd + +import com.gemstone.gemfire.cache.Region +import io.pivotal.geode.spark.connector.GeodeConnectionConf +import org.apache.spark.{TaskContext, Partition} +import org.apache.spark.rdd.RDD +import scala.collection.JavaConversions._ + +/** + * An `RDD[ T, Option[V] ]` that represents the result of a left outer join + * between `left` RDD[T] and the specified Geode Region[K, V]. + */ +class GeodeOuterJoinRDD[T, K, V] private[connector] + ( left: RDD[T], + func: T => K, + val regionPath: String, + val connConf: GeodeConnectionConf + ) extends RDD[(T, Option[V])](left.context, left.dependencies) { + + /** validate region existence when GeodeRDD object is created */ + validate() + + /** Validate region, and make sure it exists. */ + private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath) + + override protected def getPartitions: Array[Partition] = left.partitions + + override def compute(split: Partition, context: TaskContext): Iterator[(T, Option[V])] = { + val region = connConf.getConnection.getRegionProxy[K, V](regionPath) + if (func == null) computeWithoutFunc(split, context, region) + else computeWithFunc(split, context, region) + } + + /** T is (K1, V1), and K1 and K are the same type since there's no map function `func` */ + private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, Option[V])] = { + val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]] + val leftKeys = leftPairs.map { case (k, v) => k}.toSet + // Note: get all will return (key, null) for non-exist entry + val rightPairs = region.getAll(leftKeys) + // rightPairs is a java.util.Map, not scala map, so need to convert map.get() to Option + leftPairs.map{ case (k, v) => ((k, v).asInstanceOf[T], Option(rightPairs.get(k))) }.toIterator + } + + private def computeWithFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, Option[V])] = { + val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t))) + val leftKeys = leftPairs.map { case (t, k) => k}.toSet + // Note: get all will return (key, null) for non-exist entry + val rightPairs = region.getAll(leftKeys) + // rightPairs is a java.util.Map, not scala map, so need to convert map.get() to Option + leftPairs.map{ case (t, k) => (t, Option(rightPairs.get(k)))}.toIterator + } +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala new file mode 100644 index 0000000..24fe72e --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.rdd + +import org.apache.spark.Partition + +/** + * This serializable class represents a GeodeRDD partition. Each partition is mapped + * to one or more buckets of region. The GeodeRDD can materialize the data of the + * partition based on all information contained here. + * @param partitionId partition id, a 0 based number. + * @param bucketSet region bucket id set for this partition. Set.empty means whole + * region (used for replicated region) + * @param locations preferred location for this partition + */ +case class GeodeRDDPartition ( + partitionId: Int, bucketSet: Set[Int], locations: Seq[String] = Nil) + extends Partition { + + override def index: Int = partitionId + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala new file mode 100644 index 0000000..d960cab --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.rdd + +import io.pivotal.geode.spark.connector.GeodeConnection +import io.pivotal.geode.spark.connector.internal.RegionMetadata +import org.apache.spark.{Logging, Partition} + +import scala.reflect.ClassTag + +/** + * A GeodeRDD partitioner is used to partition the region into multiple RDD partitions. + */ +trait GeodeRDDPartitioner extends Serializable { + + def name: String + + /** the function that generates partitions */ + def partitions[K: ClassTag, V: ClassTag] + (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] +} + +object GeodeRDDPartitioner extends Logging { + + /** To add new partitioner, just add it to the following list */ + final val partitioners: Map[String, GeodeRDDPartitioner] = + List(OnePartitionPartitioner, ServerSplitsPartitioner).map(e => (e.name, e)).toMap + + /** + * Get a partitioner based on given name, a default partitioner will be returned if there's + * no partitioner for the given name. + */ + def apply(name: String = defaultPartitionedRegionPartitioner.name): GeodeRDDPartitioner = { + val p = partitioners.get(name) + if (p.isDefined) p.get else { + logWarning(s"Invalid preferred partitioner name $name.") + defaultPartitionedRegionPartitioner + } + } + + val defaultReplicatedRegionPartitioner = OnePartitionPartitioner + + val defaultPartitionedRegionPartitioner = ServerSplitsPartitioner + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala new file mode 100644 index 0000000..4606114 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.rdd + +import io.pivotal.geode.spark.connector.GeodeConnection +import io.pivotal.geode.spark.connector.internal.RegionMetadata +import io.pivotal.geode.spark.connector.NumberPartitionsPerServerPropKey +import org.apache.spark.Partition +import scala.collection.JavaConversions._ +import scala.collection.immutable.SortedSet +import scala.collection.mutable +import scala.reflect.ClassTag + +/** This partitioner maps whole region to one GeodeRDDPartition */ +object OnePartitionPartitioner extends GeodeRDDPartitioner { + + override val name = "OnePartition" + + override def partitions[K: ClassTag, V: ClassTag] + (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] = + Array[Partition](new GeodeRDDPartition(0, Set.empty)) +} + +/** + * This partitioner maps whole region to N * M Geode RDD partitions, where M is the number of + * Geode servers that contain the data for the given region. Th default value of N is 1. + */ +object ServerSplitsPartitioner extends GeodeRDDPartitioner { + + override val name = "ServerSplits" + + override def partitions[K: ClassTag, V: ClassTag] + (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] = { + if (md == null) throw new RuntimeException("RegionMetadata is null") + val n = try { env.getOrElse(NumberPartitionsPerServerPropKey, "2").toInt } catch { case e: NumberFormatException => 2 } + if (!md.isPartitioned || md.getServerBucketMap == null || md.getServerBucketMap.isEmpty) + Array[Partition](new GeodeRDDPartition(0, Set.empty)) + else { + val map = mapAsScalaMap(md.getServerBucketMap) + .map { case (srv, set) => (srv, asScalaSet(set).map(_.toInt)) }.toList + .map { case (srv, set) => (srv.getHostName, set) } + doPartitions(map, md.getTotalBuckets, n) + } + } + + /** Converts server to bucket ID set list to array of RDD partitions */ + def doPartitions(serverBucketMap: List[(String, mutable.Set[Int])], totalBuckets: Int, n: Int) + : Array[Partition] = { + + // method that calculates the group size for splitting "k" items into "g" groups + def groupSize(k: Int, g: Int): Int = scala.math.ceil(k / g.toDouble).toInt + + // 1. convert list of server and bucket set pairs to a list of server and sorted bucket set pairs + val srvToSortedBucketSet = serverBucketMap.map { case (srv, set) => (srv, SortedSet[Int]() ++ set) } + + // 2. split bucket set of each server into n splits if possible, and server to Seq(server) + val srvToSplitedBuckeSet = srvToSortedBucketSet.flatMap { case (host, set) => + if (set.isEmpty) Nil else set.grouped(groupSize(set.size, n)).toList.map(s => (Seq(host), s)) } + + // 3. calculate empty bucket IDs by removing all bucket sets of all servers from the full bucket sets + val emptyIDs = SortedSet[Int]() ++ ((0 until totalBuckets).toSet /: srvToSortedBucketSet) {case (s1, (k, s2)) => s1 &~ s2} + + // 4. distribute empty bucket IDs to all partitions evenly. + // The empty buckets do not contain data when partitions are created, but they may contain data + // when RDD is materialized, so need to include those bucket IDs in the partitions. + val srvToFinalBucketSet = if (emptyIDs.isEmpty) srvToSplitedBuckeSet + else srvToSplitedBuckeSet.zipAll( + emptyIDs.grouped(groupSize(emptyIDs.size, srvToSplitedBuckeSet.size)).toList, (Nil, Set.empty), Set.empty).map + { case ((server, set1), set2) => (server, SortedSet[Int]() ++ set1 ++ set2) } + + // 5. create array of partitions w/ 0-based index + (0 until srvToFinalBucketSet.size).toList.zip(srvToFinalBucketSet).map + { case (i, (srv, set)) => new GeodeRDDPartition(i, set, srv) }.toArray + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala new file mode 100644 index 0000000..dba15f3 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.rdd + +import com.gemstone.gemfire.cache.Region +import io.pivotal.geode.spark.connector._ +import org.apache.spark.{Logging, TaskContext} + +import scala.collection.Iterator +import java.util.{HashMap => JMap} + +/** This trait provide some common code for pair and non-pair RDD writer */ +private[rdd] abstract class GeodeRDDWriterBase (opConf: Map[String, String]) extends Serializable { + + val batchSize = try { opConf.getOrElse(RDDSaveBatchSizePropKey, RDDSaveBatchSizeDefault.toString).toInt} + catch { case e: NumberFormatException => RDDSaveBatchSizeDefault } + + def mapDump(map: Map[_, _], num: Int): String = { + val firstNum = map.take(num + 1) + if (firstNum.size > num) s"$firstNum ..." else s"$firstNum" + } +} + +/** + * Writer object that provides write function that saves non-pair RDD partitions to Geode. + * Those functions will be executed on Spark executors. + * @param regionPath the full path of the region where the data is written to + */ +class GeodeRDDWriter[T, K, V] + (regionPath: String, connConf: GeodeConnectionConf, opConf: Map[String, String] = Map.empty) + extends GeodeRDDWriterBase(opConf) with Serializable with Logging { + + def write(func: T => (K, V))(taskContext: TaskContext, data: Iterator[T]): Unit = { + val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, V](regionPath) + var count = 0 + val chunks = data.grouped(batchSize) + chunks.foreach { chunk => + val map = chunk.foldLeft(new JMap[K, V]()){case (m, t) => val (k, v) = func(t); m.put(k, v); m} + region.putAll(map) + count += chunk.length + } + logDebug(s"$count entries (batch.size = $batchSize) are saved to region $regionPath") + } +} + + +/** + * Writer object that provides write function that saves pair RDD partitions to Geode. + * Those functions will be executed on Spark executors. + * @param regionPath the full path of the region where the data is written to + */ +class GeodePairRDDWriter[K, V] + (regionPath: String, connConf: GeodeConnectionConf, opConf: Map[String, String] = Map.empty) + extends GeodeRDDWriterBase(opConf) with Serializable with Logging { + + def write(taskContext: TaskContext, data: Iterator[(K, V)]): Unit = { + val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, V](regionPath) + var count = 0 + val chunks = data.grouped(batchSize) + chunks.foreach { chunk => + val map = chunk.foldLeft(new JMap[K, V]()){case (m, (k,v)) => m.put(k,v); m} + region.putAll(map) + count += chunk.length + } + logDebug(s"$count entries (batch.batch = $batchSize) are saved to region $regionPath") + } +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala new file mode 100644 index 0000000..6980c0f --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.rdd + +import scala.collection.Seq +import scala.reflect.ClassTag +import org.apache.spark.rdd.RDD +import org.apache.spark.{TaskContext, Partition, SparkContext} +import io.pivotal.geode.spark.connector.{GeodeConnectionConf, PreferredPartitionerPropKey} +import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartitioner._ + +/** + * This class exposes Geode region as a RDD. + * @param sc the Spark Context + * @param regionPath the full path of the region + * @param connConf the GeodeConnectionConf to access the region + * @param opConf the parameters for this operation, such as preferred partitioner. + */ +class GeodeRegionRDD[K, V] private[connector] + (@transient sc: SparkContext, + val regionPath: String, + val connConf: GeodeConnectionConf, + val opConf: Map[String, String] = Map.empty, + val whereClause: Option[String] = None + ) (implicit ctk: ClassTag[K], ctv: ClassTag[V]) + extends RDD[(K, V)](sc, Seq.empty) { + + /** validate region existence when GeodeRDD object is created */ + validate() + + /** Validate region, and make sure it exists. */ + private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath) + + def kClassTag = ctk + + def vClassTag = ctv + + /** + * method `copy` is used by method `where` that creates new immutable + * GeodeRDD instance based this instance. + */ + private def copy( + regionPath: String = regionPath, + connConf: GeodeConnectionConf = connConf, + opConf: Map[String, String] = opConf, + whereClause: Option[String] = None + ): GeodeRegionRDD[K, V] = { + + require(sc != null, + """RDD transformation requires a non-null SparkContext. Unfortunately + |SparkContext in this GeodeRDD is null. This can happen after + |GeodeRDD has been deserialized. SparkContext is not Serializable, + |therefore it deserializes to null. RDD transformations are not allowed + |inside lambdas used in other RDD transformations.""".stripMargin ) + + new GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf, whereClause) + } + + /** When where clause is specified, OQL query + * `select key, value from /<region-path>.entries where <where clause> ` + * is used to filter the dataset. + */ + def where(whereClause: Option[String]): GeodeRegionRDD[K, V] = { + if (whereClause.isDefined) copy(whereClause = whereClause) + else this + } + + /** this version is for Java API that doesn't use scala.Option */ + def where(whereClause: String): GeodeRegionRDD[K, V] = { + if (whereClause == null || whereClause.trim.isEmpty) this + else copy(whereClause = Option(whereClause.trim)) + } + + /** + * Use preferred partitioner generate partitions. `defaultReplicatedRegionPartitioner` + * will be used if it's a replicated region. + */ + override def getPartitions: Array[Partition] = { + val conn = connConf.getConnection + val md = conn.getRegionMetadata[K, V](regionPath) + md match { + case None => throw new RuntimeException(s"region $regionPath was not found.") + case Some(data) => + logInfo(s"""RDD id=${this.id} region=$regionPath conn=${connConf.locators.mkString(",")}, env=$opConf""") + val p = if (data.isPartitioned) preferredPartitioner else defaultReplicatedRegionPartitioner + val splits = p.partitions[K, V](conn, data, opConf) + logDebug(s"""RDD id=${this.id} region=$regionPath partitions=\n ${splits.mkString("\n ")}""") + splits + } + } + + /** + * provide preferred location(s) (host name(s)) of the given partition. + * Only some partitioner implementation(s) provides this info, which is + * useful when Spark cluster and Geode cluster share some hosts. + */ + override def getPreferredLocations(split: Partition) = + split.asInstanceOf[GeodeRDDPartition].locations + + /** + * Get preferred partitioner. return `defaultPartitionedRegionPartitioner` if none + * preference is specified. + */ + private def preferredPartitioner = + GeodeRDDPartitioner(opConf.getOrElse( + PreferredPartitionerPropKey, GeodeRDDPartitioner.defaultPartitionedRegionPartitioner.name)) + + /** materialize a RDD partition */ + override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = { + val partition = split.asInstanceOf[GeodeRDDPartition] + logDebug(s"compute RDD id=${this.id} partition $partition") + connConf.getConnection.getRegionData[K,V](regionPath, whereClause, partition) + // new InterruptibleIterator(context, split.asInstanceOf[GeodeRDDPartition[K, V]].iterator) + } +} + +object GeodeRegionRDD { + + def apply[K: ClassTag, V: ClassTag](sc: SparkContext, regionPath: String, + connConf: GeodeConnectionConf, opConf: Map[String, String] = Map.empty) + : GeodeRegionRDD[K, V] = + new GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala new file mode 100644 index 0000000..f859173 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.javaapi + +import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD +import org.apache.spark.api.java.JavaPairRDD + +class GeodeJavaRegionRDD[K, V](rdd: GeodeRegionRDD[K, V]) extends JavaPairRDD[K, V](rdd)(rdd.kClassTag, rdd.vClassTag) { + + def where(whereClause: String): GeodeJavaRegionRDD[K, V] = new GeodeJavaRegionRDD(rdd.where(whereClause)) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala new file mode 100644 index 0000000..ffa6195 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.javaapi + +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} +import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStream} + +import scala.reflect.ClassTag +import scala.collection.JavaConversions._ + +/** + * A helper class to make it possible to access components written in Scala from Java code. + */ +private[connector] object JavaAPIHelper { + + /** Returns a `ClassTag` of a given runtime class. */ + def getClassTag[T](clazz: Class[T]): ClassTag[T] = ClassTag(clazz) + + /** + * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. + * see JavaSparkContext.fakeClassTag in Spark for more info. + */ + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] + + /** Converts a Java `Properties` to a Scala immutable `Map[String, String]`. */ + def propertiesToScalaMap[K, V](props: java.util.Properties): Map[String, String] = + Map(props.toSeq: _*) + + /** convert a JavaRDD[(K,V)] to JavaPairRDD[K,V] */ + def toJavaPairRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = + JavaPairRDD.fromJavaRDD(rdd) + + /** convert a JavaDStream[(K,V)] to JavaPairDStream[K,V] */ + def toJavaPairDStream[K, V](ds: JavaDStream[(K, V)]): JavaPairDStream[K, V] = + JavaPairDStream.fromJavaDStream(ds) + + /** an empty Map[String, String] for default opConf **/ + val emptyStrStrMap: Map[String, String] = Map.empty +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala new file mode 100644 index 0000000..6f9a780 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark + +import io.pivotal.geode.spark.connector.internal.rdd.{ServerSplitsPartitioner, OnePartitionPartitioner} +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext + +import scala.reflect.ClassTag + +/** + * The root package of Geode connector for Apache Spark. + * Provides handy implicit conversions that add geode-specific + * methods to `SparkContext` and `RDD`. + */ +package object connector { + + /** constants */ + final val GeodeLocatorPropKey = "spark.geode.locators" + // partitioner related keys and values + final val PreferredPartitionerPropKey = "preferred.partitioner" + final val NumberPartitionsPerServerPropKey = "number.partitions.per.server" + final val OnePartitionPartitionerName = OnePartitionPartitioner.name + final val ServerSplitsPartitionerName = ServerSplitsPartitioner.name + + final val RDDSaveBatchSizePropKey = "rdd.save.batch.size" + final val RDDSaveBatchSizeDefault = 10000 + + /** implicits */ + + implicit def toSparkContextFunctions(sc: SparkContext): GeodeSparkContextFunctions = + new GeodeSparkContextFunctions(sc) + + implicit def toSQLContextFunctions(sqlContext: SQLContext): GeodeSQLContextFunctions = + new GeodeSQLContextFunctions(sqlContext) + + implicit def toGeodePairRDDFunctions[K: ClassTag, V: ClassTag] + (self: RDD[(K, V)]): GeodePairRDDFunctions[K, V] = new GeodePairRDDFunctions(self) + + implicit def toGeodeRDDFunctions[T: ClassTag] + (self: RDD[T]): GeodeRDDFunctions[T] = new GeodeRDDFunctions(self) + + /** utility implicits */ + + /** convert Map[String, String] to java.util.Properties */ + implicit def map2Properties(map: Map[String,String]): java.util.Properties = + (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props} + + /** internal util methods */ + + private[connector] def getRddPartitionsInfo(rdd: RDD[_], sep: String = "\n "): String = + rdd.partitions.zipWithIndex.map{case (p,i) => s"$i: $p loc=${rdd.preferredLocations(p)}"}.mkString(sep) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala new file mode 100644 index 0000000..4d46429 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.streaming + +import io.pivotal.geode.spark.connector.GeodeConnectionConf +import io.pivotal.geode.spark.connector.internal.rdd.{GeodePairRDDWriter, GeodeRDDWriter} +import org.apache.spark.Logging +import org.apache.spark.api.java.function.PairFunction +import org.apache.spark.streaming.dstream.DStream + +/** + * Extra geode functions on DStream of non-pair elements through an implicit conversion. + * Import `io.pivotal.geode.spark.connector.streaming._` at the top of your program to + * use these functions. + */ +class GeodeDStreamFunctions[T](val dstream: DStream[T]) extends Serializable with Logging { + + /** + * Save the DStream of non-pair elements to Geode key-value store. + * @param regionPath the full path of region that the DStream is stored + * @param func the function that converts elements of the DStream to key/value pairs + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param opConf the optional parameters for this operation + */ + def saveToGeode[K, V]( + regionPath: String, + func: T => (K, V), + connConf: GeodeConnectionConf = defaultConnectionConf, + opConf: Map[String, String] = Map.empty): Unit = { + connConf.getConnection.validateRegion[K, V](regionPath) + val writer = new GeodeRDDWriter[T, K, V](regionPath, connConf, opConf) + logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""") + dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write(func) _)) + } + + /** this version of saveToGeode is just for Java API */ + def saveToGeode[K, V]( + regionPath: String, + func: PairFunction[T, K, V], + connConf: GeodeConnectionConf, + opConf: Map[String, String] ): Unit = { + saveToGeode[K, V](regionPath, func.call _, connConf, opConf) + } + + private[connector] def defaultConnectionConf: GeodeConnectionConf = + GeodeConnectionConf(dstream.context.sparkContext.getConf) +} + + +/** + * Extra geode functions on DStream of (key, value) pairs through an implicit conversion. + * Import `io.pivotal.geode.spark.connector.streaming._` at the top of your program to + * use these functions. + */ +class GeodePairDStreamFunctions[K, V](val dstream: DStream[(K,V)]) extends Serializable with Logging { + + /** + * Save the DStream of pairs to Geode key-value store without any conversion + * @param regionPath the full path of region that the DStream is stored + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param opConf the optional parameters for this operation + */ + def saveToGeode( + regionPath: String, + connConf: GeodeConnectionConf = defaultConnectionConf, + opConf: Map[String, String] = Map.empty): Unit = { + connConf.getConnection.validateRegion[K, V](regionPath) + val writer = new GeodePairRDDWriter[K, V](regionPath, connConf, opConf) + logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""") + dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _)) + } + + private[connector] def defaultConnectionConf: GeodeConnectionConf = + GeodeConnectionConf(dstream.context.sparkContext.getConf) +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala new file mode 100644 index 0000000..0d1f1eb --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import org.apache.spark.streaming.dstream.DStream + +/** + * Provides handy implicit conversions that add gemfire-specific methods to `DStream`. + */ +package object streaming { + + implicit def toGeodeDStreamFunctions[T](ds: DStream[T]): GeodeDStreamFunctions[T] = + new GeodeDStreamFunctions[T](ds) + + implicit def toGeodePairDStreamFunctions[K, V](ds: DStream[(K, V)]): GeodePairDStreamFunctions[K, V] = + new GeodePairDStreamFunctions[K, V](ds) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/java/io/pivotal/geode/spark/connector/JavaAPITest.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/java/io/pivotal/geode/spark/connector/JavaAPITest.java b/geode-spark-connector/geode-spark-connector/src/test/java/io/pivotal/geode/spark/connector/JavaAPITest.java new file mode 100644 index 0000000..142907e --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/java/io/pivotal/geode/spark/connector/JavaAPITest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector; + +import io.pivotal.geode.spark.connector.javaapi.*; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.SQLContext; +//import org.apache.spark.sql.api.java.JavaSQLContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.dstream.DStream; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; +import scala.Function1; +import scala.Function2; +import scala.Tuple2; +import scala.Tuple3; +import scala.collection.mutable.LinkedList; +import scala.reflect.ClassTag; + +import static org.junit.Assert.*; +import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.*; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.*; + +public class JavaAPITest extends JUnitSuite { + + @SuppressWarnings( "unchecked" ) + public Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> createCommonMocks() { + SparkContext mockSparkContext = mock(SparkContext.class); + GeodeConnectionConf mockConnConf = mock(GeodeConnectionConf.class); + GeodeConnection mockConnection = mock(GeodeConnection.class); + when(mockConnConf.getConnection()).thenReturn(mockConnection); + when(mockConnConf.locators()).thenReturn(new LinkedList()); + return new Tuple3<>(mockSparkContext, mockConnConf, mockConnection); + } + + @Test + public void testSparkContextFunction() throws Exception { + Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks(); + GeodeJavaSparkContextFunctions wrapper = javaFunctions(tuple3._1()); + assertTrue(tuple3._1() == wrapper.sc); + String regionPath = "testregion"; + JavaPairRDD<String, String> rdd = wrapper.geodeRegion(regionPath, tuple3._2()); + verify(tuple3._3()).validateRegion(regionPath); + } + + @Test + public void testJavaSparkContextFunctions() throws Exception { + SparkContext mockSparkContext = mock(SparkContext.class); + JavaSparkContext mockJavaSparkContext = mock(JavaSparkContext.class); + when(mockJavaSparkContext.sc()).thenReturn(mockSparkContext); + GeodeJavaSparkContextFunctions wrapper = javaFunctions(mockJavaSparkContext); + assertTrue(mockSparkContext == wrapper.sc); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void testJavaPairRDDFunctions() throws Exception { + JavaPairRDD<String, Integer> mockPairRDD = mock(JavaPairRDD.class); + RDD<Tuple2<String, Integer>> mockTuple2RDD = mock(RDD.class); + when(mockPairRDD.rdd()).thenReturn(mockTuple2RDD); + GeodeJavaPairRDDFunctions wrapper = javaFunctions(mockPairRDD); + assertTrue(mockTuple2RDD == wrapper.rddf.rdd()); + + Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks(); + when(mockTuple2RDD.sparkContext()).thenReturn(tuple3._1()); + String regionPath = "testregion"; + wrapper.saveToGeode(regionPath, tuple3._2()); + verify(mockTuple2RDD, times(1)).sparkContext(); + verify(tuple3._1(), times(1)).runJob(eq(mockTuple2RDD), any(Function2.class), any(ClassTag.class)); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void testJavaRDDFunctions() throws Exception { + JavaRDD<String> mockJavaRDD = mock(JavaRDD.class); + RDD<String> mockRDD = mock(RDD.class); + when(mockJavaRDD.rdd()).thenReturn(mockRDD); + GeodeJavaRDDFunctions wrapper = javaFunctions(mockJavaRDD); + assertTrue(mockRDD == wrapper.rddf.rdd()); + + Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks(); + when(mockRDD.sparkContext()).thenReturn(tuple3._1()); + PairFunction<String, String, Integer> mockPairFunc = mock(PairFunction.class); + String regionPath = "testregion"; + wrapper.saveToGeode(regionPath, mockPairFunc, tuple3._2()); + verify(mockRDD, times(1)).sparkContext(); + verify(tuple3._1(), times(1)).runJob(eq(mockRDD), any(Function2.class), any(ClassTag.class)); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void testJavaPairDStreamFunctions() throws Exception { + JavaPairDStream<String, String> mockJavaDStream = mock(JavaPairDStream.class); + DStream<Tuple2<String, String>> mockDStream = mock(DStream.class); + when(mockJavaDStream.dstream()).thenReturn(mockDStream); + GeodeJavaPairDStreamFunctions wrapper = javaFunctions(mockJavaDStream); + assertTrue(mockDStream == wrapper.dsf.dstream()); + + Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks(); + String regionPath = "testregion"; + wrapper.saveToGeode(regionPath, tuple3._2()); + verify(tuple3._2()).getConnection(); + verify(tuple3._3()).validateRegion(regionPath); + verify(mockDStream).foreachRDD(any(Function1.class)); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void testJavaPairDStreamFunctionsWithTuple2DStream() throws Exception { + JavaDStream<Tuple2<String, String>> mockJavaDStream = mock(JavaDStream.class); + DStream<Tuple2<String, String>> mockDStream = mock(DStream.class); + when(mockJavaDStream.dstream()).thenReturn(mockDStream); + GeodeJavaPairDStreamFunctions wrapper = javaFunctions(toJavaPairDStream(mockJavaDStream)); + assertTrue(mockDStream == wrapper.dsf.dstream()); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void testJavaDStreamFunctions() throws Exception { + JavaDStream<String> mockJavaDStream = mock(JavaDStream.class); + DStream<String> mockDStream = mock(DStream.class); + when(mockJavaDStream.dstream()).thenReturn(mockDStream); + GeodeJavaDStreamFunctions wrapper = javaFunctions(mockJavaDStream); + assertTrue(mockDStream == wrapper.dsf.dstream()); + + Tuple3<SparkContext, GeodeConnectionConf, GeodeConnection> tuple3 = createCommonMocks(); + PairFunction<String, String, Integer> mockPairFunc = mock(PairFunction.class); + String regionPath = "testregion"; + wrapper.saveToGeode(regionPath, mockPairFunc, tuple3._2()); + verify(tuple3._2()).getConnection(); + verify(tuple3._3()).validateRegion(regionPath); + verify(mockDStream).foreachRDD(any(Function1.class)); + } + + @Test + public void testSQLContextFunction() throws Exception { + SQLContext mockSQLContext = mock(SQLContext.class); + GeodeJavaSQLContextFunctions wrapper = javaFunctions(mockSQLContext); + assertTrue(wrapper.scf.getClass() == GeodeSQLContextFunctions.class); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala new file mode 100644 index 0000000..4e45dc2 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FunSuite, Matchers} +import org.apache.commons.httpclient.HttpClient +import java.io.File + + +class GeodeFunctionDeployerTest extends FunSuite with Matchers with MockitoSugar { + val mockHttpClient: HttpClient = mock[HttpClient] + + test("jmx url creation") { + val jmxHostAndPort = "localhost:7070" + val expectedUrlString = "http://" + jmxHostAndPort + "/gemfire/v1/deployed" + val gfd = new GeodeFunctionDeployer(mockHttpClient); + val urlString = gfd.constructURLString(jmxHostAndPort) + assert(urlString === expectedUrlString) + } + + test("missing jar file") { + val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist" + val gfd = new GeodeFunctionDeployer(mockHttpClient); + intercept[RuntimeException] { gfd.jarFileHandle(missingJarFileLocation)} + } + + test("deploy with missing jar") { + val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist" + val gfd = new GeodeFunctionDeployer(mockHttpClient); + intercept[RuntimeException] {(gfd.deploy("localhost:7070", missingJarFileLocation).contains("Deployed"))} + intercept[RuntimeException] {(gfd.deploy("localhost", 7070, missingJarFileLocation).contains("Deployed"))} + } + + test("successful mocked deploy") { + val gfd = new GeodeFunctionDeployer(mockHttpClient); + val jar = new File("README.md"); + assert(gfd.deploy("localhost:7070", jar).contains("Deployed")) + } + + + +}