http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala deleted file mode 100644 index ff4cd17..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnection.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector - -import com.gemstone.gemfire.cache.execute.ResultCollector -import com.gemstone.gemfire.cache.query.Query -import com.gemstone.gemfire.cache.Region -import io.pivotal.geode.spark.connector.internal.RegionMetadata -import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartition - - -trait GeodeConnection { - - /** - * Validate region existence and key/value type constraints, throw RuntimeException - * if region does not exist or key and/or value type do(es) not match. - * @param regionPath the full path of region - */ - def validateRegion[K, V](regionPath: String): Unit - - /** - * Get Region proxy for the given region - * @param regionPath the full path of region - */ - def getRegionProxy[K, V](regionPath: String): Region[K, V] - - /** - * Retrieve region meta data for the given region. - * @param regionPath: the full path of the region - * @return Some[RegionMetadata] if region exists, None otherwise - */ - def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] - - /** - * Retrieve region data for the given region and bucket set - * @param regionPath: the full path of the region - * @param whereClause: the set of bucket IDs - * @param split: Geode RDD Partition instance - */ - def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GeodeRDDPartition): Iterator[(K, V)] - - def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String): Object - /** - * Create a geode OQL query - * @param queryString Geode OQL query string - */ - def getQuery(queryString: String): Query - - /** Close the connection */ - def close(): Unit -} - -
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala deleted file mode 100644 index 38d9e07..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionConf.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector - -import org.apache.spark.SparkConf -import io.pivotal.geode.spark.connector.internal.{DefaultGeodeConnectionManager, LocatorHelper} - -/** - * Stores configuration of a connection to Geode cluster. It is serializable and can - * be safely sent over network. - * - * @param locators Geode locator host:port pairs, the default is (localhost,10334) - * @param geodeProps The initial geode properties to be used. - * @param connectionManager GeodeConnectionFactory instance - */ -class GeodeConnectionConf( - val locators: Seq[(String, Int)], - val geodeProps: Map[String, String] = Map.empty, - connectionManager: GeodeConnectionManager = new DefaultGeodeConnectionManager - ) extends Serializable { - - /** require at least 1 pair of (host,port) */ - require(locators.nonEmpty) - - def getConnection: GeodeConnection = connectionManager.getConnection(this) - -} - -object GeodeConnectionConf { - - /** - * create GeodeConnectionConf object based on locator string and optional GeodeConnectionFactory - * @param locatorStr Geode cluster locator string - * @param connectionManager GeodeConnection factory - */ - def apply(locatorStr: String, geodeProps: Map[String, String] = Map.empty) - (implicit connectionManager: GeodeConnectionManager = new DefaultGeodeConnectionManager): GeodeConnectionConf = { - new GeodeConnectionConf(LocatorHelper.parseLocatorsString(locatorStr), geodeProps, connectionManager) - } - - /** - * create GeodeConnectionConf object based on SparkConf. Note that implicit can - * be used to control what GeodeConnectionFactory instance to use if desired - * @param conf a SparkConf instance - */ - def apply(conf: SparkConf): GeodeConnectionConf = { - val locatorStr = conf.getOption(GeodeLocatorPropKey).getOrElse( - throw new RuntimeException(s"SparkConf does not contain property $GeodeLocatorPropKey")) - // SparkConf only holds properties whose key starts with "spark.", In order to - // put geode properties in SparkConf, all geode properties are prefixes with - // "spark.geode.". This prefix was removed before the properties were put in `geodeProp` - val prefix = "spark.geode." - val geodeProps = conf.getAll.filter { - case (k, v) => k.startsWith(prefix) && k != GeodeLocatorPropKey - }.map { case (k, v) => (k.substring(prefix.length), v) }.toMap - apply(locatorStr, geodeProps) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala deleted file mode 100644 index bf678f0..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeConnectionManager.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector - -/** - * GeodeConnectionFactory provide an common interface that manages Geode - * connections, and it's serializable. Each factory instance will handle - * connection instance creation and connection pool management. - */ -trait GeodeConnectionManager extends Serializable { - - /** get connection for the given connector */ - def getConnection(connConf: GeodeConnectionConf): GeodeConnection - - /** close the connection */ - def closeConnection(connConf: GeodeConnectionConf): Unit -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala deleted file mode 100644 index 6e93b05..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployer.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector - -import java.io.File -import java.net.URL -import org.apache.commons.httpclient.methods.PostMethod -import org.apache.commons.httpclient.methods.multipart.{FilePart, Part, MultipartRequestEntity} -import org.apache.commons.httpclient.HttpClient -import org.apache.spark.Logging - -object GeodeFunctionDeployer { - def main(args: Array[String]) { - new GeodeFunctionDeployer(new HttpClient()).commandLineRun(args) - } -} - -class GeodeFunctionDeployer(val httpClient:HttpClient) extends Logging { - - def deploy(host: String, port: Int, jarLocation: String): String = - deploy(host + ":" + port, jarLocation) - - def deploy(host: String, port: Int, jar:File): String = - deploy(host + ":" + port, jar) - - def deploy(jmxHostAndPort: String, jarLocation: String): String = - deploy(jmxHostAndPort, jarFileHandle(jarLocation)) - - def deploy(jmxHostAndPort: String, jar: File): String = { - val urlString = constructURLString(jmxHostAndPort) - val filePost: PostMethod = new PostMethod(urlString) - val parts: Array[Part] = new Array[Part](1) - parts(0) = new FilePart("resources", jar) - filePost.setRequestEntity(new MultipartRequestEntity(parts, filePost.getParams)) - val status: Int = httpClient.executeMethod(filePost) - "Deployed Jar with status:" + status - } - - private[connector] def constructURLString(jmxHostAndPort: String) = - "http://" + jmxHostAndPort + "/gemfire/v1/deployed" - - private[connector]def jarFileHandle(jarLocation: String) = { - val f: File = new File(jarLocation) - if (!f.exists()) { - val errorMessage: String = "Invalid jar file:" + f.getAbsolutePath - logInfo(errorMessage) - throw new RuntimeException(errorMessage) - } - f - } - - def commandLineRun(args: Array[String]):Unit = { - val (hostPort: String, jarFile: String) = - if (args.length < 2) { - logInfo("JMX Manager Host and Port (example: localhost:7070):") - val bufferedReader = new java.io.BufferedReader(new java.io.InputStreamReader(System.in)) - val jmxHostAndPort = bufferedReader.readLine() - logInfo("Location of geode-functions.jar:") - val functionJarLocation = bufferedReader.readLine() - (jmxHostAndPort, functionJarLocation) - } else { - (args(0), args(1)) - } - val status = deploy(hostPort, jarFile) - logInfo(status) - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala deleted file mode 100644 index 8c0aeca..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeKryoRegistrator.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector - -import com.esotericsoftware.kryo.Kryo -import io.pivotal.geode.spark.connector.internal.oql.UndefinedSerializer -import org.apache.spark.serializer.KryoRegistrator -import com.gemstone.gemfire.cache.query.internal.Undefined - -class GeodeKryoRegistrator extends KryoRegistrator{ - - override def registerClasses(kyro: Kryo): Unit = { - kyro.addDefaultSerializer(classOf[Undefined], classOf[UndefinedSerializer]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala deleted file mode 100644 index ba5d2df..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodePairRDDFunctions.scala +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector - -import io.pivotal.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, GeodeJoinRDD, GeodePairRDDWriter} -import org.apache.spark.Logging -import org.apache.spark.api.java.function.Function -import org.apache.spark.rdd.RDD - -/** - * Extra gemFire functions on RDDs of (key, value) pairs through an implicit conversion. - * Import `io.pivotal.geode.spark.connector._` at the top of your program to - * use these functions. - */ -class GeodePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable with Logging { - - /** - * Save the RDD of pairs to Geode key-value store without any conversion - * @param regionPath the full path of region that the RDD is stored - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @param opConf the optional parameters for this operation - */ - def saveToGeode( - regionPath: String, - connConf: GeodeConnectionConf = defaultConnectionConf, - opConf: Map[String, String] = Map.empty): Unit = { - connConf.getConnection.validateRegion[K, V](regionPath) - if (log.isDebugEnabled) - logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""") - else - logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""") - val writer = new GeodePairRDDWriter[K, V](regionPath, connConf, opConf) - rdd.sparkContext.runJob(rdd, writer.write _) - } - - /** - * Return an RDD containing all pairs of elements with matching keys in `this` - * RDD and the Geode `Region[K, V2]`. Each pair of elements will be returned - * as a ((k, v), v2) tuple, where (k, v) is in `this` RDD and (k, v2) is in the - * Geode region. - * - *@param regionPath the region path of the Geode region - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @tparam K2 the key type of the Geode region - * @tparam V2 the value type of the Geode region - * @return RDD[T, V] - */ - def joinGeodeRegion[K2 <: K, V2]( - regionPath: String, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[(K, V), K, V2] = { - new GeodeJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf) - } - - /** - * Return an RDD containing all pairs of elements with matching keys in `this` RDD - * and the Geode `Region[K2, V2]`. The join key from RDD element is generated by - * `func(K, V) => K2`, and the key from the Geode region is jus the key of the - * key/value pair. - * - * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, - * where (k, v) is in `this` RDD and (k2, v2) is in the Geode region. - * - * @param regionPath the region path of the Geode region - * @param func the function that generates region key from RDD element (K, V) - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @tparam K2 the key type of the Geode region - * @tparam V2 the value type of the Geode region - * @return RDD[(K, V), V2] - */ - def joinGeodeRegion[K2, V2]( - regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[(K, V), K2, V2] = - new GeodeJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf) - - /** This version of joinGeodeRegion(...) is just for Java API. */ - private[connector] def joinGeodeRegion[K2, V2]( - regionPath: String, func: Function[(K, V), K2], connConf: GeodeConnectionConf): GeodeJoinRDD[(K, V), K2, V2] = { - new GeodeJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf) - } - - /** - * Perform a left outer join of `this` RDD and the Geode `Region[K, V2]`. - * For each element (k, v) in `this` RDD, the resulting RDD will either contain - * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair - * ((k, v), None)) if no element in the Geode region have key k. - * - * @param regionPath the region path of the Geode region - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @tparam K2 the key type of the Geode region - * @tparam V2 the value type of the Geode region - * @return RDD[ (K, V), Option[V] ] - */ - def outerJoinGeodeRegion[K2 <: K, V2]( - regionPath: String, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K, V2] = { - new GeodeOuterJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf) - } - - /** - * Perform a left outer join of `this` RDD and the Geode `Region[K2, V2]`. - * The join key from RDD element is generated by `func(K, V) => K2`, and the - * key from region is jus the key of the key/value pair. - * - * For each element (k, v) in `this` RDD, the resulting RDD will either contain - * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair - * ((k, v), None)) if no element in the Geode region have key `func(k, v)`. - * - *@param regionPath the region path of the Geode region - * @param func the function that generates region key from RDD element (K, V) - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @tparam K2 the key type of the Geode region - * @tparam V2 the value type of the Geode region - * @return RDD[ (K, V), Option[V] ] - */ - def outerJoinGeodeRegion[K2, V2]( - regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = { - new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf) - } - - /** This version of outerJoinGeodeRegion(...) is just for Java API. */ - private[connector] def outerJoinGeodeRegion[K2, V2]( - regionPath: String, func: Function[(K, V), K2], connConf: GeodeConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = { - new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf) - } - - private[connector] def defaultConnectionConf: GeodeConnectionConf = - GeodeConnectionConf(rdd.sparkContext.getConf) - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala deleted file mode 100644 index 2e5c92a..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeRDDFunctions.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector - -import io.pivotal.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, GeodeJoinRDD, GeodeRDDWriter} -import org.apache.spark.Logging -import org.apache.spark.api.java.function.{PairFunction, Function} -import org.apache.spark.rdd.RDD - -/** - * Extra gemFire functions on non-Pair RDDs through an implicit conversion. - * Import `io.pivotal.geode.spark.connector._` at the top of your program to - * use these functions. - */ -class GeodeRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging { - - /** - * Save the non-pair RDD to Geode key-value store. - * @param regionPath the full path of region that the RDD is stored - * @param func the function that converts elements of RDD to key/value pairs - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @param opConf the optional parameters for this operation - */ - def saveToGeode[K, V]( - regionPath: String, - func: T => (K, V), - connConf: GeodeConnectionConf = defaultConnectionConf, - opConf: Map[String, String] = Map.empty): Unit = { - connConf.getConnection.validateRegion[K, V](regionPath) - if (log.isDebugEnabled) - logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""") - else - logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""") - val writer = new GeodeRDDWriter[T, K, V](regionPath, connConf, opConf) - rdd.sparkContext.runJob(rdd, writer.write(func) _) - } - - /** This version of saveToGeode(...) is just for Java API. */ - private[connector] def saveToGeode[K, V]( - regionPath: String, - func: PairFunction[T, K, V], - connConf: GeodeConnectionConf, - opConf: Map[String, String]): Unit = { - saveToGeode[K, V](regionPath, func.call _, connConf, opConf) - } - - /** - * Return an RDD containing all pairs of elements with matching keys in `this` RDD - * and the Geode `Region[K, V]`. The join key from RDD element is generated by - * `func(T) => K`, and the key from the Geode region is just the key of the - * key/value pair. - * - * Each pair of elements of result RDD will be returned as a (t, v) tuple, - * where (t) is in `this` RDD and (k, v) is in the Geode region. - * - * @param regionPath the region path of the Geode region - * @param func the function that generate region key from RDD element T - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @tparam K the key type of the Geode region - * @tparam V the value type of the Geode region - * @return RDD[T, V] - */ - def joinGeodeRegion[K, V](regionPath: String, func: T => K, - connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[T, K, V] = { - new GeodeJoinRDD[T, K, V](rdd, func, regionPath, connConf) - } - - /** This version of joinGeodeRegion(...) is just for Java API. */ - private[connector] def joinGeodeRegion[K, V]( - regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): GeodeJoinRDD[T, K, V] = { - joinGeodeRegion(regionPath, func.call _, connConf) - } - - /** - * Perform a left outer join of `this` RDD and the Geode `Region[K, V]`. - * The join key from RDD element is generated by `func(T) => K`, and the - * key from region is just the key of the key/value pair. - * - * For each element (t) in `this` RDD, the resulting RDD will either contain - * all pairs (t, Some(v)) for v in the Geode region, or the pair - * (t, None) if no element in the Geode region have key `func(t)` - * - * @param regionPath the region path of the Geode region - * @param func the function that generate region key from RDD element T - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @tparam K the key type of the Geode region - * @tparam V the value type of the Geode region - * @return RDD[ T, Option[V] ] - */ - def outerJoinGeodeRegion[K, V](regionPath: String, func: T => K, - connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[T, K, V] = { - new GeodeOuterJoinRDD[T, K, V](rdd, func, regionPath, connConf) - } - - /** This version of outerJoinGeodeRegion(...) is just for Java API. */ - private[connector] def outerJoinGeodeRegion[K, V]( - regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): GeodeOuterJoinRDD[T, K, V] = { - outerJoinGeodeRegion(regionPath, func.call _, connConf) - } - - private[connector] def defaultConnectionConf: GeodeConnectionConf = - GeodeConnectionConf(rdd.sparkContext.getConf) - -} - - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala deleted file mode 100644 index 83aab7a..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSQLContextFunctions.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector - -import io.pivotal.geode.spark.connector.internal.oql.{OQLRelation, QueryRDD} -import org.apache.spark.Logging -import org.apache.spark.sql.{DataFrame, SQLContext} - -/** - * Provide Geode OQL specific functions - */ -class GeodeSQLContextFunctions(@transient sqlContext: SQLContext) extends Serializable with Logging { - - /** - * Expose a Geode OQL query result as a DataFrame - * @param query the OQL query string. - */ - def geodeOQL( - query: String, - connConf: GeodeConnectionConf = GeodeConnectionConf(sqlContext.sparkContext.getConf)): DataFrame = { - logInfo(s"OQL query = $query") - val rdd = new QueryRDD[Object](sqlContext.sparkContext, query, connConf) - sqlContext.baseRelationToDataFrame(OQLRelation(rdd)(sqlContext)) - } - - private[connector] def defaultConnectionConf: GeodeConnectionConf = - GeodeConnectionConf(sqlContext.sparkContext.getConf) -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala deleted file mode 100644 index 617cb33..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/GeodeSparkContextFunctions.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector - -import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD -import org.apache.spark.SparkContext - -import scala.reflect.ClassTag - -/** Provides Geode specific methods on `SparkContext` */ -class GeodeSparkContextFunctions(@transient sc: SparkContext) extends Serializable { - - /** - * Expose a Geode region as a GeodeRDD - * @param regionPath the full path of the region - * @param connConf the GeodeConnectionConf that can be used to access the region - * @param opConf use this to specify preferred partitioner - * and its parameters. The implementation will use it if it's applicable - */ - def geodeRegion[K: ClassTag, V: ClassTag] ( - regionPath: String, connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf), - opConf: Map[String, String] = Map.empty): GeodeRegionRDD[K, V] = - GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf) - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala deleted file mode 100644 index 52f9961..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnection.scala +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal - -import java.net.InetAddress - -import com.gemstone.gemfire.cache.client.{ClientCache, ClientCacheFactory, ClientRegionShortcut} -import com.gemstone.gemfire.cache.execute.{FunctionException, FunctionService} -import com.gemstone.gemfire.cache.query.Query -import com.gemstone.gemfire.cache.{Region, RegionService} -import com.gemstone.gemfire.internal.cache.execute.InternalExecution -import io.pivotal.geode.spark.connector.internal.oql.QueryResultCollector -import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartition -import org.apache.spark.{SparkEnv, Logging} -import io.pivotal.geode.spark.connector.GeodeConnection -import io.pivotal.geode.spark.connector.internal.geodefunctions._ -import java.util.{Set => JSet, List => JList } - -/** - * Default GeodeConnection implementation. The instance of this should be - * created by DefaultGeodeConnectionFactory - * @param locators pairs of host/port of locators - * @param gemFireProps The initial geode properties to be used. - */ -private[connector] class DefaultGeodeConnection ( - locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) - extends GeodeConnection with Logging { - - private val clientCache = initClientCache() - - /** Register Geode functions to the Geode cluster */ - FunctionService.registerFunction(RetrieveRegionMetadataFunction.getInstance()) - FunctionService.registerFunction(RetrieveRegionFunction.getInstance()) - - private def initClientCache() : ClientCache = { - try { - val ccf = getClientCacheFactory - ccf.create() - } catch { - case e: Exception => - logError(s"""Failed to init ClientCache, locators=${locators.mkString(",")}, Error: $e""") - throw new RuntimeException(e) - } - } - - private def getClientCacheFactory: ClientCacheFactory = { - import io.pivotal.geode.spark.connector.map2Properties - val ccf = new ClientCacheFactory(gemFireProps) - ccf.setPoolReadTimeout(30000) - val servers = LocatorHelper.getAllGeodeServers(locators) - if (servers.isDefined && servers.get.size > 0) { - val sparkIp = System.getenv("SPARK_LOCAL_IP") - val hostName = if (sparkIp != null) InetAddress.getByName(sparkIp).getCanonicalHostName - else InetAddress.getLocalHost.getCanonicalHostName - val executorId = SparkEnv.get.executorId - val pickedServers = LocatorHelper.pickPreferredGeodeServers(servers.get, hostName, executorId) - logInfo(s"""Init ClientCache: severs=${pickedServers.mkString(",")}, host=$hostName executor=$executorId props=$gemFireProps""") - logDebug(s"""Init ClientCache: all-severs=${pickedServers.mkString(",")}""") - pickedServers.foreach{ case (host, port) => ccf.addPoolServer(host, port) } - } else { - logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""") - locators.foreach { case (host, port) => ccf.addPoolLocator(host, port) } - } - ccf - } - - /** close the clientCache */ - override def close(): Unit = - if (! clientCache.isClosed) clientCache.close() - - /** ----------------------------------------- */ - /** implementation of GeodeConnection trait */ - /** ----------------------------------------- */ - - override def getQuery(queryString: String): Query = - clientCache.asInstanceOf[RegionService].getQueryService.newQuery(queryString) - - override def validateRegion[K, V](regionPath: String): Unit = { - val md = getRegionMetadata[K, V](regionPath) - if (! md.isDefined) throw new RuntimeException(s"The region named $regionPath was not found") - } - - def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] = { - import scala.collection.JavaConversions.setAsJavaSet - val region = getRegionProxy[K, V](regionPath) - val set0: JSet[Integer] = Set[Integer](0) - val exec = FunctionService.onRegion(region).asInstanceOf[InternalExecution].withBucketFilter(set0) - exec.setWaitOnExceptionFlag(true) - try { - val collector = exec.execute(RetrieveRegionMetadataFunction.ID) - val r = collector.getResult.asInstanceOf[JList[RegionMetadata]] - logDebug(r.get(0).toString) - Some(r.get(0)) - } catch { - case e: FunctionException => - if (e.getMessage.contains(s"The region named /$regionPath was not found")) None - else throw e - } - } - - def getRegionProxy[K, V](regionPath: String): Region[K, V] = { - val region1: Region[K, V] = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]] - if (region1 != null) region1 - else DefaultGeodeConnection.regionLock.synchronized { - val region2 = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]] - if (region2 != null) region2 - else clientCache.createClientRegionFactory[K, V](ClientRegionShortcut.PROXY).create(regionPath) - } - } - - override def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GeodeRDDPartition): Iterator[(K, V)] = { - val region = getRegionProxy[K, V](regionPath) - val desc = s"""RDD($regionPath, "${whereClause.getOrElse("")}", ${split.index})""" - val args : Array[String] = Array[String](whereClause.getOrElse(""), desc) - val collector = new StructStreamingResultCollector(desc) - // RetrieveRegionResultCollector[(K, V)] - import scala.collection.JavaConversions.setAsJavaSet - val exec = FunctionService.onRegion(region).withArgs(args).withCollector(collector).asInstanceOf[InternalExecution] - .withBucketFilter(split.bucketSet.map(Integer.valueOf)) - exec.setWaitOnExceptionFlag(true) - exec.execute(RetrieveRegionFunction.ID) - collector.getResult.map{objs: Array[Object] => (objs(0).asInstanceOf[K], objs(1).asInstanceOf[V])} - } - - override def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String) = { - import scala.collection.JavaConversions.setAsJavaSet - FunctionService.registerFunction(QueryFunction.getInstance()) - val collector = new QueryResultCollector - val region = getRegionProxy(regionPath) - val args: Array[String] = Array[String](queryString, bucketSet.toString) - val exec = FunctionService.onRegion(region).withCollector(collector).asInstanceOf[InternalExecution] - .withBucketFilter(bucketSet.map(Integer.valueOf)) - .withArgs(args) - exec.execute(QueryFunction.ID) - collector.getResult - } -} - -private[connector] object DefaultGeodeConnection { - /** a lock object only used by getRegionProxy...() */ - private val regionLock = new Object -} - -/** The purpose of this class is making unit test DefaultGeodeConnectionManager easier */ -class DefaultGeodeConnectionFactory { - - def newConnection(locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) = - new DefaultGeodeConnection(locators, gemFireProps) - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala deleted file mode 100644 index eb67cda..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManager.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal - -import io.pivotal.geode.spark.connector.{GeodeConnection, GeodeConnectionConf, GeodeConnectionManager} - -import scala.collection.mutable - -/** - * Default implementation of GeodeConnectionFactory - */ -class DefaultGeodeConnectionManager extends GeodeConnectionManager { - - def getConnection(connConf: GeodeConnectionConf): GeodeConnection = - DefaultGeodeConnectionManager.getConnection(connConf) - - def closeConnection(connConf: GeodeConnectionConf): Unit = - DefaultGeodeConnectionManager.closeConnection(connConf) - -} - -object DefaultGeodeConnectionManager { - - /** connection cache, keyed by host:port pair */ - private[connector] val connections = mutable.Map[(String, Int), GeodeConnection]() - - /** - * use locator host:port pair to lookup cached connection. create new connection - * and add it to the cache `connections` if it does not exist. - */ - def getConnection(connConf: GeodeConnectionConf) - (implicit factory: DefaultGeodeConnectionFactory = new DefaultGeodeConnectionFactory): GeodeConnection = { - - def getCachedConnection(locators: Seq[(String, Int)]): GeodeConnection = { - val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null) - if (conns.nonEmpty) conns(0) else null - } - - val conn1 = getCachedConnection(connConf.locators) - if (conn1 != null) conn1 - else connections.synchronized { - val conn2 = getCachedConnection(connConf.locators) - if (conn2 != null) conn2 - else { - val conn3 = factory.newConnection(connConf.locators, connConf.geodeProps) - connConf.locators.foreach(pair => connections += (pair -> conn3)) - conn3 - } - } - } - - /** - * Close the connection and remove it from connection cache. - * Note: multiple entries may share the same connection, all those entries are removed. - */ - def closeConnection(connConf: GeodeConnectionConf): Unit = { - val conns = connConf.locators.map(connections withDefaultValue null).filter(_ != null) - if (conns.nonEmpty) connections.synchronized { - conns(0).close() - connections.retain((k,v) => v != conns(0)) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala deleted file mode 100644 index 71fed52..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/LocatorHelper.scala +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal - -import java.net.InetSocketAddress -import java.util.{ArrayList => JArrayList} - -import com.gemstone.gemfire.cache.client.internal.locator.{GetAllServersResponse, GetAllServersRequest} -import com.gemstone.gemfire.distributed.internal.ServerLocation -import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient -import org.apache.spark.Logging - -import scala.util.{Failure, Success, Try} - - -object LocatorHelper extends Logging { - - /** valid locator strings are: host[port] and host:port */ - final val LocatorPattern1 = """([\w-_]+(\.[\w-_]+)*)\[([0-9]{2,5})\]""".r - final val LocatorPattern2 = """([\w-_]+(\.[\w-_]+)*):([0-9]{2,5})""".r - - /** convert single locator string to Try[(host, port)] */ - def locatorStr2HostPortPair(locatorStr: String): Try[(String, Int)] = - locatorStr match { - case LocatorPattern1(host, domain, port) => Success((host, port.toInt)) - case LocatorPattern2(host, domain, port) => Success((host, port.toInt)) - case _ => Failure(new Exception(s"invalid locator: $locatorStr")) - } - - /** - * Parse locator strings and returns Seq of (hostname, port) pair. - * Valid locator string are one or more "host[port]" and/or "host:port" - * separated by `,`. For example: - * host1.mydomain.com[8888],host2.mydomain.com[8889] - * host1.mydomain.com:8888,host2.mydomain.com:8889 - */ - def parseLocatorsString(locatorsStr: String): Seq[(String, Int)] = - locatorsStr.split(",").map(locatorStr2HostPortPair).map(_.get) - - - /** - * Return the list of live Geode servers for the given locators. - * @param locators locators for the given Geode cluster - * @param serverGroup optional server group name, default is "" (empty string) - */ - def getAllGeodeServers(locators: Seq[(String, Int)], serverGroup: String = ""): Option[Seq[(String, Int)]] = { - var result: Option[Seq[(String, Int)]] = None - locators.find { case (host, port) => - try { - val addr = new InetSocketAddress(host, port) - val req = new GetAllServersRequest(serverGroup) - val res = TcpClient.requestToServer(addr.getAddress, addr.getPort, req, 2000) - if (res != null) { - import scala.collection.JavaConverters._ - val servers = res.asInstanceOf[GetAllServersResponse].getServers.asInstanceOf[JArrayList[ServerLocation]] - if (servers.size > 0) - result = Some(servers.asScala.map(e => (e.getHostName, e.getPort))) - } - } catch { case e: Exception => logWarning("getAllGeodeServers error", e) - } - result.isDefined - } - result - } - - /** - * Pick up at most 3 preferred servers from all available servers based on - * host name and Spark executor id. - * - * This method is used by DefaultGeodeConnection to create ClientCache. Usually - * one server is enough to initialize ClientCacheFactory, but this provides two - * backup servers in case of the 1st server can't be connected. - * - * @param servers all available servers in the form of (hostname, port) pairs - * @param hostName the host name of the Spark executor - * @param executorId the Spark executor Id, such as "<driver>", "0", "1", ... - * @return Seq[(hostname, port)] of preferred servers - */ - def pickPreferredGeodeServers( - servers: Seq[(String, Int)], hostName: String, executorId: String): Seq[(String, Int)] = { - - // pick up `length` items form the Seq starts at the `start` position. - // The Seq is treated as a ring, so at most `Seq.size` items can be picked - def circularTake[T](seq: Seq[T], start: Int, length: Int): Seq[T] = { - val size = math.min(seq.size, length) - (start until start + size).map(x => seq(x % seq.size)) - } - - // map executor id to int: "<driver>" (or non-number string) to 0, and "n" to n + 1 - val id = try { executorId.toInt + 1 } catch { case e: NumberFormatException => 0 } - - // algorithm: - // 1. sort server list - // 2. split sorted server list into 3 sub-lists a, b, and c: - // list-a: servers on the given host - // list-b: servers that are in front of list-a on the sorted server list - // list-c: servers that are behind list-a on the sorted server list - // then rotate list-a based on executor id, then create new server list: - // modified list-a ++ list-c ++ list-b - // 3. if there's no server on the given host, then create new server list - // by rotating sorted server list based on executor id. - // 4. take up to 3 servers from the new server list - val sortedServers = servers.sorted - val firstIdx = sortedServers.indexWhere(p => p._1 == hostName) - val lastIdx = if (firstIdx < 0) -1 else sortedServers.lastIndexWhere(p => p._1 == hostName) - - if (firstIdx < 0) { // no local server - circularTake(sortedServers, id, 3) - } else { - val (seq1, seq2) = sortedServers.splitAt(firstIdx) - val seq = if (firstIdx == lastIdx) { // one local server - seq2 ++ seq1 - } else { // multiple local server - val (seq3, seq4) = seq2.splitAt(lastIdx - firstIdx + 1) - val seq3b = if (id % seq3.size == 0) seq3 else circularTake(seq3, id, seq3.size) - seq3b ++ seq4 ++ seq1 - } - circularTake(seq, 0, 3) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala deleted file mode 100644 index a8666fc..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultCollector.scala +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.geodefunctions - -import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, BlockingQueue} -import com.gemstone.gemfire.DataSerializer -import com.gemstone.gemfire.cache.execute.ResultCollector -import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl -import com.gemstone.gemfire.cache.query.types.StructType -import com.gemstone.gemfire.distributed.DistributedMember -import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput} -import io.pivotal.geode.spark.connector.internal.geodefunctions.StructStreamingResultSender. - {TYPE_CHUNK, DATA_CHUNK, ERROR_CHUNK, SER_DATA, UNSER_DATA, BYTEARR_DATA} - -/** - * StructStreamingResultCollector and StructStreamingResultSender are paired - * to transfer result of list of `com.gemstone.gemfire.cache.query.Struct` - * from Geode server to Spark Connector (the client of Geode server) - * in streaming, i.e., while sender sending the result, the collector can - * start processing the arrived result without waiting for full result to - * become available. - */ -class StructStreamingResultCollector(desc: String) extends ResultCollector[Array[Byte], Iterator[Array[Object]]] { - - /** the constructor that provide default `desc` (description) */ - def this() = this("StructStreamingResultCollector") - - private val queue: BlockingQueue[Array[Byte]] = new LinkedBlockingQueue[Array[Byte]]() - var structType: StructType = null - - /** ------------------------------------------ */ - /** ResultCollector interface implementations */ - /** ------------------------------------------ */ - - override def getResult: Iterator[Array[Object]] = resultIterator - - override def getResult(timeout: Long, unit: TimeUnit): Iterator[Array[Object]] = - throw new UnsupportedOperationException() - - /** addResult add non-empty byte array (chunk) to the queue */ - override def addResult(memberID: DistributedMember, chunk: Array[Byte]): Unit = - if (chunk != null && chunk.size > 1) { - this.queue.add(chunk) - // println(s"""$desc receive from $memberID: ${chunk.mkString(" ")}""") - } - - /** endResults add special `Array.empty` to the queue as marker of end of data */ - override def endResults(): Unit = this.queue.add(Array.empty) - - override def clearResults(): Unit = this.queue.clear() - - /** ------------------------------------------ */ - /** Internal methods */ - /** ------------------------------------------ */ - - def getResultType: StructType = { - // trigger lazy resultIterator initialization if necessary - if (structType == null) resultIterator.hasNext - structType - } - - /** - * Note: The data is sent in chunks, and each chunk contains multiple - * records. So the result iterator is an iterator (I) of iterator (II), - * i.e., go through each chunk (iterator (I)), and for each chunk, go - * through each record (iterator (II)). - */ - private lazy val resultIterator = new Iterator[Array[Object]] { - - private var currentIterator: Iterator[Array[Object]] = nextIterator() - - override def hasNext: Boolean = { - if (!currentIterator.hasNext && currentIterator != Iterator.empty) currentIterator = nextIterator() - currentIterator.hasNext - } - - /** Note: make sure call `hasNext` first to adjust `currentIterator` */ - override def next(): Array[Object] = currentIterator.next() - } - - /** get the iterator for the next chunk of data */ - private def nextIterator(): Iterator[Array[Object]] = { - val chunk: Array[Byte] = queue.take - if (chunk.isEmpty) { - Iterator.empty - } else { - val input = new ByteArrayDataInput() - input.initialize(chunk, Version.CURRENT) - val chunkType = input.readByte() - // println(s"chunk type $chunkType") - chunkType match { - case TYPE_CHUNK => - if (structType == null) - structType = DataSerializer.readObject(input).asInstanceOf[StructTypeImpl] - nextIterator() - case DATA_CHUNK => - // require(structType != null && structType.getFieldNames.length > 0) - if (structType == null) structType = StructStreamingResultSender.KeyValueType - chunkToIterator(input, structType.getFieldNames.length) - case ERROR_CHUNK => - val error = DataSerializer.readObject(input).asInstanceOf[Exception] - errorPropagationIterator(error) - case _ => throw new RuntimeException(s"unknown chunk type: $chunkType") - } - } - } - - /** create a iterator that propagate sender's exception */ - private def errorPropagationIterator(ex: Exception) = new Iterator[Array[Object]] { - val re = new RuntimeException(ex) - override def hasNext: Boolean = throw re - override def next(): Array[Object] = throw re - } - - /** convert a chunk of data to an iterator */ - private def chunkToIterator(input: ByteArrayDataInput, rowSize: Int) = new Iterator[Array[Object]] { - override def hasNext: Boolean = input.available() > 0 - val tmpInput = new ByteArrayDataInput() - override def next(): Array[Object] = - (0 until rowSize).map { ignore => - val b = input.readByte() - b match { - case SER_DATA => - val arr: Array[Byte] = DataSerializer.readByteArray(input) - tmpInput.initialize(arr, Version.CURRENT) - DataSerializer.readObject(tmpInput).asInstanceOf[Object] - case UNSER_DATA => - DataSerializer.readObject(input).asInstanceOf[Object] - case BYTEARR_DATA => - DataSerializer.readByteArray(input).asInstanceOf[Object] - case _ => - throw new RuntimeException(s"unknown data type $b") - } - }.toArray - } - -} - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala deleted file mode 100644 index 3f6dfad..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParser.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.oql - -import scala.util.parsing.combinator.RegexParsers - -class QueryParser extends RegexParsers { - - def query: Parser[String] = opt(rep(IMPORT ~ PACKAGE)) ~> select ~> opt(distinct) ~> projection ~> from ~> regions <~ opt(where ~ filter) ^^ { - _.toString - } - - val IMPORT: Parser[String] = "[Ii][Mm][Pp][Oo][Rr][Tt]".r - - val select: Parser[String] = "[Ss][Ee][Ll][Ee][Cc][Tt]".r - - val distinct: Parser[String] = "[Dd][Ii][Ss][Tt][Ii][Nn][Cc][Tt]".r - - val from: Parser[String] = "[Ff][Rr][Oo][Mm]".r - - val where: Parser[String] = "[Ww][Hh][Ee][Rr][Ee]".r - - def PACKAGE: Parser[String] = """[\w.]+""".r - - def projection: Parser[String] = "*" | repsep("""["\w]+[.\w"]*""".r, ",") ^^ { - _.toString - } - - def regions: Parser[String] = repsep(region <~ opt(alias), ",") ^^ { - _.toString - } - - def region: Parser[String] = """/[\w.]+[/[\w.]+]*""".r | """[\w]+[.\w]*""".r - - def alias: Parser[String] = not(where) ~> """[\w]+""".r - - def filter: Parser[String] = """[\w.]+[[\s]+[<>=.'\w]+]*""".r -} - -object QueryParser extends QueryParser { - - def parseOQL(expression: String) = parseAll(query, expression) - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala deleted file mode 100644 index 474aa6a..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryRDD.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.oql - -import io.pivotal.geode.spark.connector.GeodeConnectionConf -import io.pivotal.geode.spark.connector.internal.rdd.{GeodeRDDPartition, ServerSplitsPartitioner} -import org.apache.spark.rdd.RDD -import org.apache.spark.{TaskContext, SparkContext, Partition} -import scala.reflect.ClassTag - -/** - * An RDD that provides the functionality that read the OQL query result - * - * @param sc The SparkContext this RDD is associated with - * @param queryString The OQL query string - * @param connConf The GeodeConnectionConf that provide the GeodeConnection - */ -class QueryRDD[T](@transient sc: SparkContext, - queryString: String, - connConf: GeodeConnectionConf) - (implicit ct: ClassTag[T]) - extends RDD[T](sc, Seq.empty) { - - override def getPartitions: Array[Partition] = { - val conn = connConf.getConnection - val regionPath = getRegionPathFromQuery(queryString) - val md = conn.getRegionMetadata(regionPath) - md match { - case Some(metadata) => - if (metadata.isPartitioned) { - val splits = ServerSplitsPartitioner.partitions(conn, metadata, Map.empty) - logInfo(s"QueryRDD.getPartitions():isPartitioned=true, partitions=${splits.mkString(",")}") - splits - } - else { - logInfo(s"QueryRDD.getPartitions():isPartitioned=false") - Array[Partition](new GeodeRDDPartition(0, Set.empty)) - - } - case None => throw new RuntimeException(s"Region $regionPath metadata was not found.") - } - } - - override def compute(split: Partition, context: TaskContext): Iterator[T] = { - val buckets = split.asInstanceOf[GeodeRDDPartition].bucketSet - val regionPath = getRegionPathFromQuery(queryString) - val result = connConf.getConnection.executeQuery(regionPath, buckets, queryString) - result match { - case it: Iterator[T] => - logInfo(s"QueryRDD.compute():query=$queryString, partition=$split") - it - case _ => - throw new RuntimeException("Unexpected OQL result: " + result.toString) - } - } - - private def getRegionPathFromQuery(queryString: String): String = { - val r = QueryParser.parseOQL(queryString).get - r match { - case r: String => - val start = r.indexOf("/") + 1 - var end = r.indexOf(")") - if (r.indexOf(".") > 0) end = math.min(r.indexOf("."), end) - if (r.indexOf(",") > 0) end = math.min(r.indexOf(","), end) - val regionPath = r.substring(start, end) - regionPath - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala deleted file mode 100644 index bedc58d..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/QueryResultCollector.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.oql - -import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} - -import com.gemstone.gemfire.DataSerializer -import com.gemstone.gemfire.cache.execute.ResultCollector -import com.gemstone.gemfire.distributed.DistributedMember -import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput} - -class QueryResultCollector extends ResultCollector[Array[Byte], Iterator[Object]]{ - - private val queue = new LinkedBlockingDeque[Array[Byte]]() - - override def getResult = resultIterator - - override def getResult(timeout: Long, unit: TimeUnit) = throw new UnsupportedOperationException - - override def addResult(memberID: DistributedMember , chunk: Array[Byte]) = - if (chunk != null && chunk.size > 0) { - queue.add(chunk) - } - - override def endResults = queue.add(Array.empty) - - - override def clearResults = queue.clear - - private lazy val resultIterator = new Iterator[Object] { - private var currentIterator = nextIterator - def hasNext = { - if (!currentIterator.hasNext && currentIterator != Iterator.empty) - currentIterator = nextIterator - currentIterator.hasNext - } - def next = currentIterator.next - } - - private def nextIterator: Iterator[Object] = { - val chunk = queue.take - if (chunk.isEmpty) { - Iterator.empty - } - else { - val input = new ByteArrayDataInput - input.initialize(chunk, Version.CURRENT) - new Iterator[Object] { - override def hasNext: Boolean = input.available() > 0 - override def next: Object = DataSerializer.readObject(input).asInstanceOf[Object] - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala deleted file mode 100644 index 6a1611c..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RDDConverter.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.oql - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.sql.types._ -import org.apache.spark.sql.Row -import org.apache.spark.sql.sources.{BaseRelation, TableScan} - -import scala.tools.nsc.backend.icode.analysis.DataFlowAnalysis - -case class OQLRelation[T](queryRDD: QueryRDD[T])(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan { - - override def schema: StructType = new SchemaBuilder(queryRDD).toSparkSchema() - - override def buildScan(): RDD[Row] = new RowBuilder(queryRDD).toRowRDD() - -} - -object RDDConverter { - - def queryRDDToDataFrame[T](queryRDD: QueryRDD[T], sqlContext: SQLContext): DataFrame = { - sqlContext.baseRelationToDataFrame(OQLRelation(queryRDD)(sqlContext)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala deleted file mode 100644 index e54411c..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/RowBuilder.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.oql - -import com.gemstone.gemfire.cache.query.internal.StructImpl -import org.apache.spark.rdd.RDD -import org.apache.spark.sql._ - -class RowBuilder[T](queryRDD: QueryRDD[T]) { - - /** - * Convert QueryRDD to RDD of Row - * @return RDD of Rows - */ - def toRowRDD(): RDD[Row] = { - val rowRDD = queryRDD.map(row => { - row match { - case si: StructImpl => Row.fromSeq(si.getFieldValues) - case obj: Object => Row.fromSeq(Seq(obj)) - } - }) - rowRDD - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala deleted file mode 100644 index 3ca20b7..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/SchemaBuilder.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.oql - -import com.gemstone.gemfire.cache.query.internal.StructImpl -import org.apache.spark.sql.types._ -import scala.collection.mutable.ListBuffer -import org.apache.spark.Logging - -class SchemaBuilder[T](queryRDD: QueryRDD[T]) extends Logging { - - val nullStructType = StructType(Nil) - - val typeMap:Map[Class[_], DataType] = Map( - (classOf[java.lang.String], StringType), - (classOf[java.lang.Integer], IntegerType), - (classOf[java.lang.Short], ShortType), - (classOf[java.lang.Long], LongType), - (classOf[java.lang.Double], DoubleType), - (classOf[java.lang.Float], FloatType), - (classOf[java.lang.Boolean], BooleanType), - (classOf[java.lang.Byte], ByteType), - (classOf[java.util.Date], DateType), - (classOf[java.lang.Object], nullStructType) - ) - - /** - * Analyse QueryRDD to get the Spark schema - * @return The schema represented by Spark StructType - */ - def toSparkSchema(): StructType = { - val row = queryRDD.first() - val tpe = row match { - case r: StructImpl => constructFromStruct(r) - case null => StructType(StructField("col1", NullType) :: Nil) - case default => - val value = typeMap.getOrElse(default.getClass(), nullStructType) - StructType(StructField("col1", value) :: Nil) - } - logInfo(s"Schema: $tpe") - tpe - } - - def constructFromStruct(r:StructImpl) = { - val names = r.getFieldNames - val values = r.getFieldValues - val lb = new ListBuffer[StructField]() - for (i <- 0 until names.length) { - val name = names(i) - val value = values(i) - val dataType = value match { - case null => NullType - case default => typeMap.getOrElse(default.getClass, nullStructType) - } - lb += StructField(name, dataType) - } - StructType(lb.toSeq) - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala deleted file mode 100644 index 37dec42..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/oql/UndefinedSerializer.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.oql - -import com.esotericsoftware.kryo.{Kryo, Serializer} -import com.esotericsoftware.kryo.io.{Output, Input} -import com.gemstone.gemfire.cache.query.QueryService -import com.gemstone.gemfire.cache.query.internal.Undefined - -/** - * This is the customized serializer to serialize QueryService.UNDEFINED, - * i.e. com.gemstone.gemfire.cache.query.internal.Undefined, in order to - * guarantee the singleton Undefined after its deserialization within Spark. - */ -class UndefinedSerializer extends Serializer[Undefined] { - - def write(kryo: Kryo, output: Output, u: Undefined) { - //Only serialize a byte for Undefined - output.writeByte(u.getDSFID) - } - - def read (kryo: Kryo, input: Input, tpe: Class[Undefined]): Undefined = { - //Read DSFID of Undefined - input.readByte() - QueryService.UNDEFINED match { - case null => new Undefined - case _ => - //Avoid calling Undefined constructor again. - QueryService.UNDEFINED.asInstanceOf[Undefined] - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala deleted file mode 100644 index e9dd658..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.rdd - -import com.gemstone.gemfire.cache.Region -import io.pivotal.geode.spark.connector.GeodeConnectionConf -import org.apache.spark.{TaskContext, Partition} -import org.apache.spark.rdd.RDD -import scala.collection.JavaConversions._ - -/** - * An `RDD[T, V]` that will represent the result of a join between `left` RDD[T] - * and the specified Geode Region[K, V]. - */ -class GeodeJoinRDD[T, K, V] private[connector] - ( left: RDD[T], - func: T => K, - val regionPath: String, - val connConf: GeodeConnectionConf - ) extends RDD[(T, V)](left.context, left.dependencies) { - - /** validate region existence when GeodeRDD object is created */ - validate() - - /** Validate region, and make sure it exists. */ - private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath) - - override protected def getPartitions: Array[Partition] = left.partitions - - override def compute(split: Partition, context: TaskContext): Iterator[(T, V)] = { - val region = connConf.getConnection.getRegionProxy[K, V](regionPath) - if (func == null) computeWithoutFunc(split, context, region) - else computeWithFunc(split, context, region) - } - - /** T is (K, V1) since there's no map function `func` */ - private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = { - val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]] - val leftKeys = leftPairs.map { case (k, v) => k}.toSet - // Note: get all will return (key, null) for non-exist entry, so remove those entries - val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null} - leftPairs.filter{case (k, v) => rightPairs.contains(k)} - .map {case (k, v) => ((k, v).asInstanceOf[T], rightPairs.get(k).get)}.toIterator - } - - private def computeWithFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = { - val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t))) - val leftKeys = leftPairs.map { case (t, k) => k}.toSet - // Note: get all will return (key, null) for non-exist entry, so remove those entries - val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null} - leftPairs.filter { case (t, k) => rightPairs.contains(k)}.map {case (t, k) => (t, rightPairs.get(k).get)}.toIterator - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ddee87fe/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala deleted file mode 100644 index 3d61d47..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.rdd - -import com.gemstone.gemfire.cache.Region -import io.pivotal.geode.spark.connector.GeodeConnectionConf -import org.apache.spark.{TaskContext, Partition} -import org.apache.spark.rdd.RDD -import scala.collection.JavaConversions._ - -/** - * An `RDD[ T, Option[V] ]` that represents the result of a left outer join - * between `left` RDD[T] and the specified Geode Region[K, V]. - */ -class GeodeOuterJoinRDD[T, K, V] private[connector] - ( left: RDD[T], - func: T => K, - val regionPath: String, - val connConf: GeodeConnectionConf - ) extends RDD[(T, Option[V])](left.context, left.dependencies) { - - /** validate region existence when GeodeRDD object is created */ - validate() - - /** Validate region, and make sure it exists. */ - private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath) - - override protected def getPartitions: Array[Partition] = left.partitions - - override def compute(split: Partition, context: TaskContext): Iterator[(T, Option[V])] = { - val region = connConf.getConnection.getRegionProxy[K, V](regionPath) - if (func == null) computeWithoutFunc(split, context, region) - else computeWithFunc(split, context, region) - } - - /** T is (K1, V1), and K1 and K are the same type since there's no map function `func` */ - private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, Option[V])] = { - val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]] - val leftKeys = leftPairs.map { case (k, v) => k}.toSet - // Note: get all will return (key, null) for non-exist entry - val rightPairs = region.getAll(leftKeys) - // rightPairs is a java.util.Map, not scala map, so need to convert map.get() to Option - leftPairs.map{ case (k, v) => ((k, v).asInstanceOf[T], Option(rightPairs.get(k))) }.toIterator - } - - private def computeWithFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, Option[V])] = { - val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t))) - val leftKeys = leftPairs.map { case (t, k) => k}.toSet - // Note: get all will return (key, null) for non-exist entry - val rightPairs = region.getAll(leftKeys) - // rightPairs is a java.util.Map, not scala map, so need to convert map.get() to Option - leftPairs.map{ case (t, k) => (t, Option(rightPairs.get(k)))}.toIterator - } -} -