http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireRunner.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireRunner.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireRunner.scala deleted file mode 100644 index b4ee572..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/GemFireRunner.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ittest.io.pivotal.gemfire.spark.connector.testkit - -import java.io.{IOException, File} -import java.net.InetAddress -import java.util.Properties -import org.apache.commons.httpclient.HttpClient -import org.apache.commons.io.FileUtils -import org.apache.commons.io.filefilter.IOFileFilter - -/** -* A class that manages GemFire locator and servers. Uses gfsh to -* start and stop the locator and servers. -*/ -class GemFireRunner(settings: Properties) { - val gfshCmd = new File(getCurrentDirectory, "../../geode-assembly/build/install/apache-geode/bin/gfsh").toString - val cacheXMLFile = settings.get("cache-xml-file") - val numServers: Int = settings.get("num-of-servers").asInstanceOf[String].toInt - val cwd = new File(".").getAbsolutePath - val gemfireFunctionsTargetDir = new File("../gemfire-functions/target") - val testroot = "target/testgemfire" - val classpath = new File(cwd, "target/scala-2.10/it-classes/") - val locatorPort = startGemFireCluster(numServers) - - def getLocatorPort: Int = locatorPort - - private def getCurrentDirectory = new File( "." ).getCanonicalPath - - private def startGemFireCluster(numServers: Int): Int = { - //ports(0) for GemFire locator, the other ports are for GemFire servers - val ports: Seq[Int] = IOUtils.getRandomAvailableTCPPorts(2 + numServers) - startGemFireLocator(ports(0), ports(1)) - startGemFireServers(ports(0), ports.drop(2)) - registerFunctions(ports(1)) - ports(0) - } - - private def startGemFireLocator(locatorPort: Int, jmxHttpPort:Int) { - println(s"=== GemFireRunner: starting locator on port $locatorPort") - val locatorDir = new File(cwd, s"$testroot/locator") - if (locatorDir.exists()) - FileUtils.deleteDirectory(locatorDir) - IOUtils.mkdir(locatorDir) - new ProcessBuilder() - .command(gfshCmd, "start", "locator", - "--name=locator", - s"--dir=$locatorDir", - s"--port=$locatorPort", - s"--J=-Dgemfire.jmx-manager-http-port=$jmxHttpPort") - .inheritIO() - .start() - - // Wait 30 seconds for locator to start - println(s"=== GemFireRunner: waiting for locator on port $locatorPort") - if (!IOUtils.waitForPortOpen(InetAddress.getByName("localhost"), locatorPort, 30000)) - throw new IOException("Failed to start GemFire locator.") - println(s"=== GemFireRunner: done waiting for locator on port $locatorPort") - } - - private def startGemFireServers(locatorPort: Int, serverPorts: Seq[Int]) { - val procs = for (i <- 0 until serverPorts.length) yield { - println(s"=== GemFireRunner: starting server${i+1} with clientPort ${serverPorts(i)}") - val serverDir = new File(cwd, s"$testroot/server${i+1}") - if (serverDir.exists()) - FileUtils.deleteDirectory(serverDir) - IOUtils.mkdir(serverDir) - new ProcessBuilder() - .command(gfshCmd, "start", "server", - s"--name=server${i+1}", - s"--locators=localhost[$locatorPort]", - s"--bind-address=localhost", - s"--server-port=${serverPorts(i)}", - s"--dir=$serverDir", - s"--cache-xml-file=$cacheXMLFile", - s"--classpath=$classpath") - .inheritIO() - .start() - } - procs.foreach(p => p.waitFor) - println(s"All $serverPorts.length servers have been started") - } - - private def registerFunctions(jmxHttpPort:Int) { - import scala.collection.JavaConversions._ - FileUtils.listFiles(gemfireFunctionsTargetDir, fileFilter, dirFilter).foreach{ f => registerFunction(jmxHttpPort, f)} - } - - def fileFilter = new IOFileFilter { - def accept (file: File) = file.getName.endsWith(".jar") && file.getName.startsWith("gemfire-functions") - def accept (dir: File, name: String) = name.endsWith(".jar") && name.startsWith("gemfire-functions") - } - - def dirFilter = new IOFileFilter { - def accept (file: File) = file.getName.startsWith("scala") - def accept (dir: File, name: String) = name.startsWith("scala") - } - - private def registerFunction(jmxHttpPort:Int, jar:File) { - println("Deploying:" + jar.getName) - import io.pivotal.gemfire.spark.connector.GemFireFunctionDeployer - val deployer = new GemFireFunctionDeployer(new HttpClient()) - deployer.deploy("localhost", jmxHttpPort, jar) - } - - def stopGemFireCluster(): Unit = { - stopGemFireServers(numServers) - stopGemFireLocator() - if (!IOUtils.waitForPortClose(InetAddress.getByName("localhost"), getLocatorPort, 30000)) - throw new IOException(s"Failed to stop GemFire locator at port $getLocatorPort.") - println(s"Successfully stop GemFire locator at port $getLocatorPort.") - } - - private def stopGemFireLocator() { - println(s"=== GemFireRunner: stop locator") - val p = new ProcessBuilder() - .inheritIO() - .command(gfshCmd, "stop", "locator", s"--dir=$testroot/locator") - .start() - p.waitFor() - } - - private def stopGemFireServers(numServers: Int) { - val procs = for (i <-1 to numServers) yield { - println(s"=== GemFireRunner: stop server $i.") - new ProcessBuilder() - .inheritIO() - .command(gfshCmd, "stop", "server", s"--dir=$testroot/server$i") - .start() - } - procs.foreach(p => p.waitFor()) - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/IOUtils.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/IOUtils.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/IOUtils.scala deleted file mode 100644 index 28134a8..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/it/scala/ittest/io/pivotal/gemfire/spark/connector/testkit/IOUtils.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ittest.io.pivotal.gemfire.spark.connector.testkit - -import java.io.{File, IOException} -import java.net.{InetAddress, Socket} -import com.gemstone.gemfire.internal.AvailablePort -import scala.util.Try -import org.apache.log4j.PropertyConfigurator -import java.util.Properties - -object IOUtils { - - /** Makes a new directory or throws an `IOException` if it cannot be made */ - def mkdir(dir: File): File = { - if (!dir.mkdirs()) - throw new IOException(s"Could not create dir $dir") - dir - } - - private def socketPortProb(host: InetAddress, port: Int) = Iterator.continually { - Try { - Thread.sleep(100) - new Socket(host, port).close() - } - } - - /** - * Waits until a port at the given address is open or timeout passes. - * @return true if managed to connect to the port, false if timeout happened first - */ - def waitForPortOpen(host: InetAddress, port: Int, timeout: Long): Boolean = { - val startTime = System.currentTimeMillis() - socketPortProb(host, port) - .dropWhile(p => p.isFailure && System.currentTimeMillis() - startTime < timeout) - .next() - .isSuccess - } - - /** - * Waits until a port at the given address is close or timeout passes. - * @return true if host:port is un-connect-able, false if timeout happened first - */ - def waitForPortClose(host: InetAddress, port: Int, timeout: Long): Boolean = { - val startTime = System.currentTimeMillis() - socketPortProb(host, port) - .dropWhile(p => p.isSuccess && System.currentTimeMillis() - startTime < timeout) - .next() - .isFailure - } - - /** - * Returns array of unique randomly available tcp ports of specified count. - */ - def getRandomAvailableTCPPorts(count: Int): Seq[Int] = - (0 until count).map(x => AvailablePort.getRandomAvailablePortKeeper(AvailablePort.SOCKET)) - .map{x => x.release(); x.getPort}.toArray - - /** - * config a log4j properties used for integration tests - */ - def configTestLog4j(level: String, props: (String, String)*): Unit = { - val pro = new Properties() - props.foreach(p => pro.put(p._1, p._2)) - configTestLog4j(level, pro) - } - - def configTestLog4j(level: String, props: Properties): Unit = { - val pro = new Properties() - pro.put("log4j.rootLogger", s"$level, console") - pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender") - pro.put("log4j.appender.console.target", "System.err") - pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout") - pro.put("log4j.appender.console.layout.ConversionPattern", - "%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n") - pro.putAll(props) - PropertyConfigurator.configure(pro) - - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala deleted file mode 100644 index 67f9e57..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/ManualClockHelper.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming - -import org.apache.spark.util.ManualClock - -object ManualClockHelper { - - def addToTime(ssc: StreamingContext, timeToAdd: Long): Unit = { - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - clock.advance(timeToAdd) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala b/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala deleted file mode 100644 index fce1e67..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/it/scala/org/apache/spark/streaming/TestInputDStream.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming - -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.dstream.InputDStream - -import scala.reflect.ClassTag - -class TestInputDStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int) - extends InputDStream[T](ssc_) { - - def start() {} - - def stop() {} - - def compute(validTime: Time): Option[RDD[T]] = { - logInfo("Computing RDD for time " + validTime) - val index = ((validTime - zeroTime) / slideDuration - 1).toInt - val selectedInput = if (index < input.size) input(index) else Seq[T]() - - // lets us test cases where RDDs are not created - if (selectedInput == null) - return None - - val rdd = ssc.sc.makeRDD(selectedInput, numPartitions) - logInfo("Created RDD " + rdd.id + " with " + selectedInput) - Some(rdd) - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java b/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java deleted file mode 100644 index 527b462..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaDStreamFunctions.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector.javaapi; - -import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; -import io.pivotal.gemfire.spark.connector.streaming.GemFireDStreamFunctions; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.streaming.api.java.JavaDStream; -import java.util.Properties; - -import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*; - -/** - * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaDStream} - * to provide GemFire Spark Connector functionality. - * - * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link - * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p> - */ -public class GemFireJavaDStreamFunctions<T> { - - public final GemFireDStreamFunctions<T> dsf; - - public GemFireJavaDStreamFunctions(JavaDStream<T> ds) { - this.dsf = new GemFireDStreamFunctions<T>(ds.dstream()); - } - - /** - * Save the JavaDStream to GemFire key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param func the PairFunction that converts elements of JavaDStream to key/value pairs - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @param opConf the optional parameters for this operation - */ - public <K, V> void saveToGemfire( - String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf, Properties opConf) { - dsf.saveToGemfire(regionPath, func, connConf, propertiesToScalaMap(opConf)); - } - - /** - * Save the JavaDStream to GemFire key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param func the PairFunction that converts elements of JavaDStream to key/value pairs - * @param opConf the optional parameters for this operation - */ - public <K, V> void saveToGemfire( - String regionPath, PairFunction<T, K, V> func, Properties opConf) { - dsf.saveToGemfire(regionPath, func, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf)); - } - - /** - * Save the JavaDStream to GemFire key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param func the PairFunction that converts elements of JavaDStream to key/value pairs - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - */ - public <K, V> void saveToGemfire( - String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf) { - dsf.saveToGemfire(regionPath, func, connConf, emptyStrStrMap()); - } - - /** - * Save the JavaDStream to GemFire key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param func the PairFunction that converts elements of JavaDStream to key/value pairs - */ - public <K, V> void saveToGemfire( - String regionPath, PairFunction<T, K, V> func) { - dsf.saveToGemfire(regionPath, func, dsf.defaultConnectionConf(), emptyStrStrMap()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java b/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java deleted file mode 100644 index 6556462..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairDStreamFunctions.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector.javaapi; - -import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; -import io.pivotal.gemfire.spark.connector.streaming.GemFirePairDStreamFunctions; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import java.util.Properties; - -import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*; - -/** - * A Java API wrapper over {@link org.apache.spark.streaming.api.java.JavaPairDStream} - * to provide GemFire Spark Connector functionality. - * - * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link - * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p> - */ -public class GemFireJavaPairDStreamFunctions<K, V> { - - public final GemFirePairDStreamFunctions<K, V> dsf; - - public GemFireJavaPairDStreamFunctions(JavaPairDStream<K, V> ds) { - this.dsf = new GemFirePairDStreamFunctions<K, V>(ds.dstream()); - } - - /** - * Save the JavaPairDStream to GemFire key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @param opConf the optional parameters for this operation - */ - public void saveToGemfire(String regionPath, GemFireConnectionConf connConf, Properties opConf) { - dsf.saveToGemfire(regionPath, connConf, propertiesToScalaMap(opConf)); - } - - /** - * Save the JavaPairDStream to GemFire key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - */ - public void saveToGemfire(String regionPath, GemFireConnectionConf connConf) { - dsf.saveToGemfire(regionPath, connConf, emptyStrStrMap()); - } - - /** - * Save the JavaPairDStream to GemFire key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param opConf the optional parameters for this operation - */ - public void saveToGemfire(String regionPath, Properties opConf) { - dsf.saveToGemfire(regionPath, dsf.defaultConnectionConf(), propertiesToScalaMap(opConf)); - } - - /** - * Save the JavaPairDStream to GemFire key-value store. - * @param regionPath the full path of region that the DStream is stored - */ - public void saveToGemfire(String regionPath) { - dsf.saveToGemfire(regionPath, dsf.defaultConnectionConf(), emptyStrStrMap()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java b/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java deleted file mode 100644 index 72fa7a9..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaPairRDDFunctions.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector.javaapi; - -import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; -import io.pivotal.gemfire.spark.connector.GemFirePairRDDFunctions; -import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireJoinRDD; -import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireOuterJoinRDD; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.function.Function; -import scala.Option; -import scala.Tuple2; -import scala.reflect.ClassTag; - -import java.util.Properties; - -import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*; - -/** - * A Java API wrapper over {@link org.apache.spark.api.java.JavaPairRDD} to provide GemFire Spark - * Connector functionality. - * - * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link - * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p> - */ -public class GemFireJavaPairRDDFunctions<K, V> { - - public final GemFirePairRDDFunctions<K, V> rddf; - - public GemFireJavaPairRDDFunctions(JavaPairRDD<K, V> rdd) { - this.rddf = new GemFirePairRDDFunctions<K, V>(rdd.rdd()); - } - - /** - * Save the pair RDD to GemFire key-value store. - * @param regionPath the full path of region that the RDD is stored - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @param opConf the parameters for this operation - */ - public void saveToGemfire(String regionPath, GemFireConnectionConf connConf, Properties opConf) { - rddf.saveToGemfire(regionPath, connConf, propertiesToScalaMap(opConf)); - } - - /** - * Save the pair RDD to GemFire key-value store. - * @param regionPath the full path of region that the RDD is stored - * @param opConf the parameters for this operation - */ - public void saveToGemfire(String regionPath, Properties opConf) { - rddf.saveToGemfire(regionPath, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf)); - } - - /** - * Save the pair RDD to GemFire key-value store. - * @param regionPath the full path of region that the RDD is stored - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - */ - public void saveToGemfire(String regionPath, GemFireConnectionConf connConf) { - rddf.saveToGemfire(regionPath, connConf, emptyStrStrMap()); - } - - /** - * Save the pair RDD to GemFire key-value store with the default GemFireConnector. - * @param regionPath the full path of region that the RDD is stored - */ - public void saveToGemfire(String regionPath) { - rddf.saveToGemfire(regionPath, rddf.defaultConnectionConf(), emptyStrStrMap()); - } - - /** - * Return an JavaPairRDD containing all pairs of elements with matching keys in - * this RDD<K, V> and the GemFire `Region<K, V2>`. Each pair of elements - * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and - * (k, v2) is in the GemFire region. - * - * @param regionPath the region path of the GemFire region - * @param <V2> the value type of the GemFire region - * @return JavaPairRDD<<K, V>, V2> - */ - public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion(String regionPath) { - return joinGemfireRegion(regionPath, rddf.defaultConnectionConf()); - } - - /** - * Return an JavaPairRDD containing all pairs of elements with matching keys in - * this RDD<K, V> and the GemFire `Region<K, V2>`. Each pair of elements - * will be returned as a ((k, v), v2) tuple, where (k, v) is in this RDD and - * (k, v2) is in the GemFire region. - * - * @param regionPath the region path of the GemFire region - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @param <V2> the value type of the GemFire region - * @return JavaPairRDD<<K, V>, V2> - */ - public <V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion( - String regionPath, GemFireConnectionConf connConf) { - GemFireJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.joinGemfireRegion(regionPath, connConf); - ClassTag<Tuple2<K, V>> kt = fakeClassTag(); - ClassTag<V2> vt = fakeClassTag(); - return new JavaPairRDD<>(rdd, kt, vt); - } - - /** - * Return an RDD containing all pairs of elements with matching keys in this - * RDD<K, V> and the GemFire `Region<K2, V2>`. The join key from RDD - * element is generated by `func(K, V) => K2`, and the key from the GemFire - * region is just the key of the key/value pair. - * - * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, - * where (k, v) is in this RDD and (k2, v2) is in the GemFire region. - * - * @param regionPath the region path of the GemFire region - * @param func the function that generates region key from RDD element (K, V) - * @param <K2> the key type of the GemFire region - * @param <V2> the value type of the GemFire region - * @return JavaPairRDD<Tuple2<K, V>, V2> - */ - public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion( - String regionPath, Function<Tuple2<K, V>, K2> func) { - return joinGemfireRegion(regionPath, func, rddf.defaultConnectionConf()); - } - - /** - * Return an RDD containing all pairs of elements with matching keys in this - * RDD<K, V> and the GemFire `Region<K2, V2>`. The join key from RDD - * element is generated by `func(K, V) => K2`, and the key from the GemFire - * region is just the key of the key/value pair. - * - * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, - * where (k, v) is in this RDD and (k2, v2) is in the GemFire region. - * - * @param regionPath the region path of the GemFire region - * @param func the function that generates region key from RDD element (K, V) - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @param <K2> the key type of the GemFire region - * @param <V2> the value type of the GemFire region - * @return JavaPairRDD<Tuple2<K, V>, V2> - */ - public <K2, V2> JavaPairRDD<Tuple2<K, V>, V2> joinGemfireRegion( - String regionPath, Function<Tuple2<K, V>, K2> func, GemFireConnectionConf connConf) { - GemFireJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.joinGemfireRegion(regionPath, func, connConf); - ClassTag<Tuple2<K, V>> kt = fakeClassTag(); - ClassTag<V2> vt = fakeClassTag(); - return new JavaPairRDD<>(rdd, kt, vt); - } - - /** - * Perform a left outer join of this RDD<K, V> and the GemFire `Region<K, V2>`. - * For each element (k, v) in this RDD, the resulting RDD will either contain - * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair - * ((k, v), None)) if no element in the GemFire region have key k. - * - * @param regionPath the region path of the GemFire region - * @param <V2> the value type of the GemFire region - * @return JavaPairRDD<Tuple2<K, V>, Option<V>> - */ - public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion(String regionPath) { - return outerJoinGemfireRegion(regionPath, rddf.defaultConnectionConf()); - } - - /** - * Perform a left outer join of this RDD<K, V> and the GemFire `Region<K, V2>`. - * For each element (k, v) in this RDD, the resulting RDD will either contain - * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair - * ((k, v), None)) if no element in the GemFire region have key k. - * - * @param regionPath the region path of the GemFire region - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @param <V2> the value type of the GemFire region - * @return JavaPairRDD<Tuple2<K, V>, Option<V>> - */ - public <V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion( - String regionPath, GemFireConnectionConf connConf) { - GemFireOuterJoinRDD<Tuple2<K, V>, K, V2> rdd = rddf.outerJoinGemfireRegion(regionPath, connConf); - ClassTag<Tuple2<K, V>> kt = fakeClassTag(); - ClassTag<Option<V2>> vt = fakeClassTag(); - return new JavaPairRDD<>(rdd, kt, vt); - } - - /** - * Perform a left outer join of this RDD<K, V> and the GemFire `Region<K2, V2>`. - * The join key from RDD element is generated by `func(K, V) => K2`, and the - * key from region is just the key of the key/value pair. - * - * For each element (k, v) in `this` RDD, the resulting RDD will either contain - * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair - * ((k, v), None)) if no element in the GemFire region have key `func(k, v)`. - * - * @param regionPath the region path of the GemFire region - * @param func the function that generates region key from RDD element (K, V) - * @param <K2> the key type of the GemFire region - * @param <V2> the value type of the GemFire region - * @return JavaPairRDD<Tuple2<K, V>, Option<V>> - */ - public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion( - String regionPath, Function<Tuple2<K, V>, K2> func) { - return outerJoinGemfireRegion(regionPath, func, rddf.defaultConnectionConf()); - } - - /** - * Perform a left outer join of this RDD<K, V> and the GemFire `Region<K2, V2>`. - * The join key from RDD element is generated by `func(K, V) => K2`, and the - * key from region is just the key of the key/value pair. - * - * For each element (k, v) in `this` RDD, the resulting RDD will either contain - * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair - * ((k, v), None)) if no element in the GemFire region have key `func(k, v)`. - * - * @param regionPath the region path of the GemFire region - * @param func the function that generates region key from RDD element (K, V) - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @param <K2> the key type of the GemFire region - * @param <V2> the value type of the GemFire region - * @return JavaPairRDD<Tuple2<K, V>, Option<V>> - */ - public <K2, V2> JavaPairRDD<Tuple2<K, V>, Option<V2>> outerJoinGemfireRegion( - String regionPath, Function<Tuple2<K, V>, K2> func, GemFireConnectionConf connConf) { - GemFireOuterJoinRDD<Tuple2<K, V>, K2, V2> rdd = rddf.outerJoinGemfireRegion(regionPath, func, connConf); - ClassTag<Tuple2<K, V>> kt = fakeClassTag(); - ClassTag<Option<V2>> vt = fakeClassTag(); - return new JavaPairRDD<>(rdd, kt, vt); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java b/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java deleted file mode 100644 index 519ba6e..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaRDDFunctions.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector.javaapi; - -import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; -import io.pivotal.gemfire.spark.connector.GemFireRDDFunctions; -import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireJoinRDD; -import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireOuterJoinRDD; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; -import scala.Option; -import scala.reflect.ClassTag; - -import java.util.Properties; - -import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*; - -/** - * A Java API wrapper over {@link org.apache.spark.api.java.JavaRDD} to provide GemFire Spark - * Connector functionality. - * - * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link - * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p> - */ -public class GemFireJavaRDDFunctions<T> { - - public final GemFireRDDFunctions<T> rddf; - - public GemFireJavaRDDFunctions(JavaRDD<T> rdd) { - this.rddf = new GemFireRDDFunctions<T>(rdd.rdd()); - } - - /** - * Save the non-pair RDD to GemFire key-value store. - * @param regionPath the full path of region that the RDD is stored - * @param func the PairFunction that converts elements of JavaRDD to key/value pairs - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @param opConf the parameters for this operation - */ - public <K, V> void saveToGemfire( - String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf, Properties opConf) { - rddf.saveToGemfire(regionPath, func, connConf, propertiesToScalaMap(opConf)); - } - - /** - * Save the non-pair RDD to GemFire key-value store. - * @param regionPath the full path of region that the RDD is stored - * @param func the PairFunction that converts elements of JavaRDD to key/value pairs - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - */ - public <K, V> void saveToGemfire( - String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf) { - rddf.saveToGemfire(regionPath, func, connConf, emptyStrStrMap()); - } - - /** - * Save the non-pair RDD to GemFire key-value store. - * @param regionPath the full path of region that the RDD is stored - * @param func the PairFunction that converts elements of JavaRDD to key/value pairs - * @param opConf the parameters for this operation - */ - public <K, V> void saveToGemfire( - String regionPath, PairFunction<T, K, V> func, Properties opConf) { - rddf.saveToGemfire(regionPath, func, rddf.defaultConnectionConf(), propertiesToScalaMap(opConf)); - } - - /** - * Save the non-pair RDD to GemFire key-value store with default GemFireConnector. - * @param regionPath the full path of region that the RDD is stored - * @param func the PairFunction that converts elements of JavaRDD to key/value pairs - */ - public <K, V> void saveToGemfire(String regionPath, PairFunction<T, K, V> func) { - rddf.saveToGemfire(regionPath, func, rddf.defaultConnectionConf(), emptyStrStrMap()); - } - - /** - * Return an RDD containing all pairs of elements with matching keys in this - * RDD<T> and the GemFire `Region<K, V>`. The join key from RDD - * element is generated by `func(T) => K`, and the key from the GemFire - * region is just the key of the key/value pair. - * - * Each pair of elements of result RDD will be returned as a (t, v2) tuple, - * where t is from this RDD and v is from the GemFire region. - * - * @param regionPath the region path of the GemFire region - * @param func the function that generates region key from RDD element T - * @param <K> the key type of the GemFire region - * @param <V> the value type of the GemFire region - * @return JavaPairRDD<T, V> - */ - public <K, V> JavaPairRDD<T, V> joinGemfireRegion(String regionPath, Function<T, K> func) { - return joinGemfireRegion(regionPath, func, rddf.defaultConnectionConf()); - } - - /** - * Return an RDD containing all pairs of elements with matching keys in this - * RDD<T> and the GemFire `Region<K, V>`. The join key from RDD - * element is generated by `func(T) => K`, and the key from the GemFire - * region is just the key of the key/value pair. - * - * Each pair of elements of result RDD will be returned as a (t, v2) tuple, - * where t is from this RDD and v is from the GemFire region. - * - * @param regionPath the region path of the GemFire region - * @param func the function that generates region key from RDD element T - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @param <K> the key type of the GemFire region - * @param <V> the value type of the GemFire region - * @return JavaPairRDD<T, V> - */ - public <K, V> JavaPairRDD<T, V> joinGemfireRegion( - String regionPath, Function<T, K> func, GemFireConnectionConf connConf) { - GemFireJoinRDD<T, K, V> rdd = rddf.joinGemfireRegion(regionPath, func, connConf); - ClassTag<T> kt = fakeClassTag(); - ClassTag<V> vt = fakeClassTag(); - return new JavaPairRDD<>(rdd, kt, vt); - } - - /** - * Perform a left outer join of this RDD<T> and the GemFire `Region<K, V>`. - * The join key from RDD element is generated by `func(T) => K`, and the - * key from region is just the key of the key/value pair. - * - * For each element (t) in this RDD, the resulting RDD will either contain - * all pairs (t, Some(v)) for v in the GemFire region, or the pair - * (t, None) if no element in the GemFire region have key `func(t)`. - * - * @param regionPath the region path of the GemFire region - * @param func the function that generates region key from RDD element T - * @param <K> the key type of the GemFire region - * @param <V> the value type of the GemFire region - * @return JavaPairRDD<T, Option<V>> - */ - public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion(String regionPath, Function<T, K> func) { - return outerJoinGemfireRegion(regionPath, func, rddf.defaultConnectionConf()); - } - - /** - * Perform a left outer join of this RDD<T> and the GemFire `Region<K, V>`. - * The join key from RDD element is generated by `func(T) => K`, and the - * key from region is just the key of the key/value pair. - * - * For each element (t) in this RDD, the resulting RDD will either contain - * all pairs (t, Some(v)) for v in the GemFire region, or the pair - * (t, None) if no element in the GemFire region have key `func(t)`. - * - * @param regionPath the region path of the GemFire region - * @param func the function that generates region key from RDD element T - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @param <K> the key type of the GemFire region - * @param <V> the value type of the GemFire region - * @return JavaPairRDD<T, Option<V>> - */ - public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion( - String regionPath, Function<T, K> func, GemFireConnectionConf connConf) { - GemFireOuterJoinRDD<T, K, V> rdd = rddf.outerJoinGemfireRegion(regionPath, func, connConf); - ClassTag<T> kt = fakeClassTag(); - ClassTag<Option<V>> vt = fakeClassTag(); - return new JavaPairRDD<>(rdd, kt, vt); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java b/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java deleted file mode 100644 index 980c409..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSQLContextFunctions.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector.javaapi; - -import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; -import io.pivotal.gemfire.spark.connector.GemFireSQLContextFunctions; -import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.SQLContext; - -/** - * Java API wrapper over {@link org.apache.spark.sql.SQLContext} to provide GemFire - * OQL functionality. - * - * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link - * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p> - */ -public class GemFireJavaSQLContextFunctions { - - public final GemFireSQLContextFunctions scf; - - public GemFireJavaSQLContextFunctions(SQLContext sqlContext) { - scf = new GemFireSQLContextFunctions(sqlContext); - } - - public <T> DataFrame gemfireOQL(String query) { - DataFrame df = scf.gemfireOQL(query, scf.defaultConnectionConf()); - return df; - } - - public <T> DataFrame gemfireOQL(String query, GemFireConnectionConf connConf) { - DataFrame df = scf.gemfireOQL(query, connConf); - return df; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java b/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java deleted file mode 100644 index f8b930c..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaSparkContextFunctions.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector.javaapi; - - -import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; -import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRegionRDD; -import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRegionRDD$; -import org.apache.spark.SparkContext; -import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*; - -import scala.reflect.ClassTag; -import java.util.Properties; - -/** - * Java API wrapper over {@link org.apache.spark.SparkContext} to provide GemFire - * Connector functionality. - * - * <p></p>To obtain an instance of this wrapper, use one of the factory methods in {@link - * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p> - */ -public class GemFireJavaSparkContextFunctions { - - public final SparkContext sc; - - public GemFireJavaSparkContextFunctions(SparkContext sc) { - this.sc = sc; - } - - /** - * Expose a GemFire region as a JavaPairRDD - * @param regionPath the full path of the region - * @param connConf the GemFireConnectionConf that can be used to access the region - * @param opConf the parameters for this operation, such as preferred partitioner. - */ - public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion( - String regionPath, GemFireConnectionConf connConf, Properties opConf) { - ClassTag<K> kt = fakeClassTag(); - ClassTag<V> vt = fakeClassTag(); - GemFireRegionRDD<K, V> rdd = GemFireRegionRDD$.MODULE$.apply( - sc, regionPath, connConf, propertiesToScalaMap(opConf), kt, vt); - return new GemFireJavaRegionRDD<>(rdd); - } - - /** - * Expose a GemFire region as a JavaPairRDD with default GemFireConnector and no preferred partitioner. - * @param regionPath the full path of the region - */ - public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion(String regionPath) { - GemFireConnectionConf connConf = GemFireConnectionConf.apply(sc.getConf()); - return gemfireRegion(regionPath, connConf, new Properties()); - } - - /** - * Expose a GemFire region as a JavaPairRDD with no preferred partitioner. - * @param regionPath the full path of the region - * @param connConf the GemFireConnectionConf that can be used to access the region - */ - public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion(String regionPath, GemFireConnectionConf connConf) { - return gemfireRegion(regionPath, connConf, new Properties()); - } - - /** - * Expose a GemFire region as a JavaPairRDD with default GemFireConnector. - * @param regionPath the full path of the region - * @param opConf the parameters for this operation, such as preferred partitioner. - */ - public <K, V> GemFireJavaRegionRDD<K, V> gemfireRegion(String regionPath, Properties opConf) { - GemFireConnectionConf connConf = GemFireConnectionConf.apply(sc.getConf()); - return gemfireRegion(regionPath, connConf, opConf); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java b/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java deleted file mode 100644 index 679f197..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/java/io/pivotal/gemfire/spark/connector/javaapi/GemFireJavaUtil.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector.javaapi; - -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.sql.SQLContext; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import scala.Tuple2; - -import io.pivotal.gemfire.spark.connector.package$; - -/** - * The main entry point to Spark GemFire Connector Java API. - * - * There are several helpful static factory methods which build useful wrappers - * around Spark Context, Streaming Context and RDD. There are also helper methods - * to convert JavaRDD<Tuple2<K, V>> to JavaPairRDD<K, V>. - */ -public final class GemFireJavaUtil { - - /** constants */ - public static String GemFireLocatorPropKey = package$.MODULE$.GemFireLocatorPropKey(); - // partitioner related keys and values - public static String PreferredPartitionerPropKey = package$.MODULE$.PreferredPartitionerPropKey(); - public static String NumberPartitionsPerServerPropKey = package$.MODULE$.NumberPartitionsPerServerPropKey(); - public static String OnePartitionPartitionerName = package$.MODULE$.OnePartitionPartitionerName(); - public static String ServerSplitsPartitionerName = package$.MODULE$.ServerSplitsPartitionerName(); - public static String RDDSaveBatchSizePropKey = package$.MODULE$.RDDSaveBatchSizePropKey(); - public static int RDDSaveBatchSizeDefault = package$.MODULE$.RDDSaveBatchSizeDefault(); - - /** The private constructor is used prevents user from creating instance of this class. */ - private GemFireJavaUtil() { } - - /** - * A static factory method to create a {@link GemFireJavaSparkContextFunctions} based - * on an existing {@link SparkContext} instance. - */ - public static GemFireJavaSparkContextFunctions javaFunctions(SparkContext sc) { - return new GemFireJavaSparkContextFunctions(sc); - } - - /** - * A static factory method to create a {@link GemFireJavaSparkContextFunctions} based - * on an existing {@link JavaSparkContext} instance. - */ - public static GemFireJavaSparkContextFunctions javaFunctions(JavaSparkContext jsc) { - return new GemFireJavaSparkContextFunctions(JavaSparkContext.toSparkContext(jsc)); - } - - /** - * A static factory method to create a {@link GemFireJavaPairRDDFunctions} based on an - * existing {@link org.apache.spark.api.java.JavaPairRDD} instance. - */ - public static <K, V> GemFireJavaPairRDDFunctions<K, V> javaFunctions(JavaPairRDD<K, V> rdd) { - return new GemFireJavaPairRDDFunctions<K, V>(rdd); - } - - /** - * A static factory method to create a {@link GemFireJavaRDDFunctions} based on an - * existing {@link org.apache.spark.api.java.JavaRDD} instance. - */ - public static <T> GemFireJavaRDDFunctions<T> javaFunctions(JavaRDD<T> rdd) { - return new GemFireJavaRDDFunctions<T>(rdd); - } - - /** - * A static factory method to create a {@link GemFireJavaPairDStreamFunctions} based on an - * existing {@link org.apache.spark.streaming.api.java.JavaPairDStream} instance. - */ - public static <K, V> GemFireJavaPairDStreamFunctions<K, V> javaFunctions(JavaPairDStream<K, V> ds) { - return new GemFireJavaPairDStreamFunctions<>(ds); - } - - /** - * A static factory method to create a {@link GemFireJavaDStreamFunctions} based on an - * existing {@link org.apache.spark.streaming.api.java.JavaDStream} instance. - */ - public static <T> GemFireJavaDStreamFunctions<T> javaFunctions(JavaDStream<T> ds) { - return new GemFireJavaDStreamFunctions<>(ds); - } - - /** Convert an instance of {@link org.apache.spark.api.java.JavaRDD}<<Tuple2<K, V>> - * to a {@link org.apache.spark.api.java.JavaPairRDD}<K, V>. - */ - public static <K, V> JavaPairRDD<K, V> toJavaPairRDD(JavaRDD<Tuple2<K, V>> rdd) { - return JavaAPIHelper.toJavaPairRDD(rdd); - } - - /** Convert an instance of {@link org.apache.spark.streaming.api.java.JavaDStream}<<Tuple2<K, V>> - * to a {@link org.apache.spark.streaming.api.java.JavaPairDStream}<K, V>. - */ - public static <K, V> JavaPairDStream<K, V> toJavaPairDStream(JavaDStream<Tuple2<K, V>> ds) { - return JavaAPIHelper.toJavaPairDStream(ds); - } - - /** - * A static factory method to create a {@link GemFireJavaSQLContextFunctions} based - * on an existing {@link SQLContext} instance. - */ - public static GemFireJavaSQLContextFunctions javaFunctions(SQLContext sqlContext) { - return new GemFireJavaSQLContextFunctions(sqlContext); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala deleted file mode 100644 index 39ec1c1..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnection.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector - -import com.gemstone.gemfire.cache.execute.ResultCollector -import com.gemstone.gemfire.cache.query.Query -import com.gemstone.gemfire.cache.Region -import io.pivotal.gemfire.spark.connector.internal.RegionMetadata -import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartition - - -trait GemFireConnection { - - /** - * Validate region existence and key/value type constraints, throw RuntimeException - * if region does not exist or key and/or value type do(es) not match. - * @param regionPath the full path of region - */ - def validateRegion[K, V](regionPath: String): Unit - - /** - * Get Region proxy for the given region - * @param regionPath the full path of region - */ - def getRegionProxy[K, V](regionPath: String): Region[K, V] - - /** - * Retrieve region meta data for the given region. - * @param regionPath: the full path of the region - * @return Some[RegionMetadata] if region exists, None otherwise - */ - def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] - - /** - * Retrieve region data for the given region and bucket set - * @param regionPath: the full path of the region - * @param whereClause: the set of bucket IDs - * @param split: GemFire RDD Partition instance - */ - def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GemFireRDDPartition): Iterator[(K, V)] - - def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String): Object - /** - * Create a gemfire OQL query - * @param queryString GemFire OQL query string - */ - def getQuery(queryString: String): Query - - /** Close the connection */ - def close(): Unit -} - - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala deleted file mode 100644 index ea6d246..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionConf.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector - -import org.apache.spark.SparkConf -import io.pivotal.gemfire.spark.connector.internal.{DefaultGemFireConnectionManager, LocatorHelper} - -/** - * Stores configuration of a connection to GemFire cluster. It is serializable and can - * be safely sent over network. - * - * @param locators GemFire locator host:port pairs, the default is (localhost,10334) - * @param gemfireProps The initial gemfire properties to be used. - * @param connectionManager GemFireConnectionFactory instance - */ -class GemFireConnectionConf( - val locators: Seq[(String, Int)], - val gemfireProps: Map[String, String] = Map.empty, - connectionManager: GemFireConnectionManager = new DefaultGemFireConnectionManager - ) extends Serializable { - - /** require at least 1 pair of (host,port) */ - require(locators.nonEmpty) - - def getConnection: GemFireConnection = connectionManager.getConnection(this) - -} - -object GemFireConnectionConf { - - /** - * create GemFireConnectionConf object based on locator string and optional GemFireConnectionFactory - * @param locatorStr GemFire cluster locator string - * @param connectionManager GemFireConnection factory - */ - def apply(locatorStr: String, gemfireProps: Map[String, String] = Map.empty) - (implicit connectionManager: GemFireConnectionManager = new DefaultGemFireConnectionManager): GemFireConnectionConf = { - new GemFireConnectionConf(LocatorHelper.parseLocatorsString(locatorStr), gemfireProps, connectionManager) - } - - /** - * create GemFireConnectionConf object based on SparkConf. Note that implicit can - * be used to control what GemFireConnectionFactory instance to use if desired - * @param conf a SparkConf instance - */ - def apply(conf: SparkConf): GemFireConnectionConf = { - val locatorStr = conf.getOption(GemFireLocatorPropKey).getOrElse( - throw new RuntimeException(s"SparkConf does not contain property $GemFireLocatorPropKey")) - // SparkConf only holds properties whose key starts with "spark.", In order to - // put gemfire properties in SparkConf, all gemfire properties are prefixes with - // "spark.gemfire.". This prefix was removed before the properties were put in `gemfireProp` - val prefix = "spark.gemfire." - val gemfireProps = conf.getAll.filter { - case (k, v) => k.startsWith(prefix) && k != GemFireLocatorPropKey - }.map { case (k, v) => (k.substring(prefix.length), v) }.toMap - apply(locatorStr, gemfireProps) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala deleted file mode 100644 index 7386f5c..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireConnectionManager.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector - -/** - * GemFireConnectionFactory provide an common interface that manages GemFire - * connections, and it's serializable. Each factory instance will handle - * connection instance creation and connection pool management. - */ -trait GemFireConnectionManager extends Serializable { - - /** get connection for the given connector */ - def getConnection(connConf: GemFireConnectionConf): GemFireConnection - - /** close the connection */ - def closeConnection(connConf: GemFireConnectionConf): Unit -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala deleted file mode 100644 index 96a7e81..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployer.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector - -import java.io.File -import java.net.URL -import org.apache.commons.httpclient.methods.PostMethod -import org.apache.commons.httpclient.methods.multipart.{FilePart, Part, MultipartRequestEntity} -import org.apache.commons.httpclient.HttpClient -import org.apache.spark.Logging - -object GemFireFunctionDeployer { - def main(args: Array[String]) { - new GemFireFunctionDeployer(new HttpClient()).commandLineRun(args) - } -} - -class GemFireFunctionDeployer(val httpClient:HttpClient) extends Logging { - - def deploy(host: String, port: Int, jarLocation: String): String = - deploy(host + ":" + port, jarLocation) - - def deploy(host: String, port: Int, jar:File): String = - deploy(host + ":" + port, jar) - - def deploy(jmxHostAndPort: String, jarLocation: String): String = - deploy(jmxHostAndPort, jarFileHandle(jarLocation)) - - def deploy(jmxHostAndPort: String, jar: File): String = { - val urlString = constructURLString(jmxHostAndPort) - val filePost: PostMethod = new PostMethod(urlString) - val parts: Array[Part] = new Array[Part](1) - parts(0) = new FilePart("resources", jar) - filePost.setRequestEntity(new MultipartRequestEntity(parts, filePost.getParams)) - val status: Int = httpClient.executeMethod(filePost) - "Deployed Jar with status:" + status - } - - private[connector] def constructURLString(jmxHostAndPort: String) = - "http://" + jmxHostAndPort + "/gemfire/v1/deployed" - - private[connector]def jarFileHandle(jarLocation: String) = { - val f: File = new File(jarLocation) - if (!f.exists()) { - val errorMessage: String = "Invalid jar file:" + f.getAbsolutePath - logInfo(errorMessage) - throw new RuntimeException(errorMessage) - } - f - } - - def commandLineRun(args: Array[String]):Unit = { - val (hostPort: String, jarFile: String) = - if (args.length < 2) { - logInfo("JMX Manager Host and Port (example: localhost:7070):") - val bufferedReader = new java.io.BufferedReader(new java.io.InputStreamReader(System.in)) - val jmxHostAndPort = bufferedReader.readLine() - logInfo("Location of gemfire-functions.jar:") - val functionJarLocation = bufferedReader.readLine() - (jmxHostAndPort, functionJarLocation) - } else { - (args(0), args(1)) - } - val status = deploy(hostPort, jarFile) - logInfo(status) - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala deleted file mode 100644 index 196d991..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireKryoRegistrator.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector - -import com.esotericsoftware.kryo.Kryo -import io.pivotal.gemfire.spark.connector.internal.oql.UndefinedSerializer -import org.apache.spark.serializer.KryoRegistrator -import com.gemstone.gemfire.cache.query.internal.Undefined - -class GemFireKryoRegistrator extends KryoRegistrator{ - - override def registerClasses(kyro: Kryo): Unit = { - kyro.addDefaultSerializer(classOf[Undefined], classOf[UndefinedSerializer]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala deleted file mode 100644 index 63583da..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFirePairRDDFunctions.scala +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector - -import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireOuterJoinRDD, GemFireJoinRDD, GemFirePairRDDWriter} -import org.apache.spark.Logging -import org.apache.spark.api.java.function.Function -import org.apache.spark.rdd.RDD - -/** - * Extra gemFire functions on RDDs of (key, value) pairs through an implicit conversion. - * Import `io.pivotal.gemfire.spark.connector._` at the top of your program to - * use these functions. - */ -class GemFirePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable with Logging { - - /** - * Save the RDD of pairs to GemFire key-value store without any conversion - * @param regionPath the full path of region that the RDD is stored - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @param opConf the optional parameters for this operation - */ - def saveToGemfire( - regionPath: String, - connConf: GemFireConnectionConf = defaultConnectionConf, - opConf: Map[String, String] = Map.empty): Unit = { - connConf.getConnection.validateRegion[K, V](regionPath) - if (log.isDebugEnabled) - logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""") - else - logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""") - val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf, opConf) - rdd.sparkContext.runJob(rdd, writer.write _) - } - - /** - * Return an RDD containing all pairs of elements with matching keys in `this` - * RDD and the GemFire `Region[K, V2]`. Each pair of elements will be returned - * as a ((k, v), v2) tuple, where (k, v) is in `this` RDD and (k, v2) is in the - * GemFire region. - * - *@param regionPath the region path of the GemFire region - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @tparam K2 the key type of the GemFire region - * @tparam V2 the value type of the GemFire region - * @return RDD[T, V] - */ - def joinGemfireRegion[K2 <: K, V2]( - regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[(K, V), K, V2] = { - new GemFireJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf) - } - - /** - * Return an RDD containing all pairs of elements with matching keys in `this` RDD - * and the GemFire `Region[K2, V2]`. The join key from RDD element is generated by - * `func(K, V) => K2`, and the key from the GemFire region is jus the key of the - * key/value pair. - * - * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, - * where (k, v) is in `this` RDD and (k2, v2) is in the GemFire region. - * - * @param regionPath the region path of the GemFire region - * @param func the function that generates region key from RDD element (K, V) - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @tparam K2 the key type of the GemFire region - * @tparam V2 the value type of the GemFire region - * @return RDD[(K, V), V2] - */ - def joinGemfireRegion[K2, V2]( - regionPath: String, func: ((K, V)) => K2, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[(K, V), K2, V2] = - new GemFireJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf) - - /** This version of joinGemfireRegion(...) is just for Java API. */ - private[connector] def joinGemfireRegion[K2, V2]( - regionPath: String, func: Function[(K, V), K2], connConf: GemFireConnectionConf): GemFireJoinRDD[(K, V), K2, V2] = { - new GemFireJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf) - } - - /** - * Perform a left outer join of `this` RDD and the GemFire `Region[K, V2]`. - * For each element (k, v) in `this` RDD, the resulting RDD will either contain - * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair - * ((k, v), None)) if no element in the GemFire region have key k. - * - * @param regionPath the region path of the GemFire region - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @tparam K2 the key type of the GemFire region - * @tparam V2 the value type of the GemFire region - * @return RDD[ (K, V), Option[V] ] - */ - def outerJoinGemfireRegion[K2 <: K, V2]( - regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[(K, V), K, V2] = { - new GemFireOuterJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf) - } - - /** - * Perform a left outer join of `this` RDD and the GemFire `Region[K2, V2]`. - * The join key from RDD element is generated by `func(K, V) => K2`, and the - * key from region is jus the key of the key/value pair. - * - * For each element (k, v) in `this` RDD, the resulting RDD will either contain - * all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair - * ((k, v), None)) if no element in the GemFire region have key `func(k, v)`. - * - *@param regionPath the region path of the GemFire region - * @param func the function that generates region key from RDD element (K, V) - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @tparam K2 the key type of the GemFire region - * @tparam V2 the value type of the GemFire region - * @return RDD[ (K, V), Option[V] ] - */ - def outerJoinGemfireRegion[K2, V2]( - regionPath: String, func: ((K, V)) => K2, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[(K, V), K2, V2] = { - new GemFireOuterJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf) - } - - /** This version of outerJoinGemfireRegion(...) is just for Java API. */ - private[connector] def outerJoinGemfireRegion[K2, V2]( - regionPath: String, func: Function[(K, V), K2], connConf: GemFireConnectionConf): GemFireOuterJoinRDD[(K, V), K2, V2] = { - new GemFireOuterJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf) - } - - private[connector] def defaultConnectionConf: GemFireConnectionConf = - GemFireConnectionConf(rdd.sparkContext.getConf) - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala deleted file mode 100644 index 2039b7f..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/GemFireRDDFunctions.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector - -import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireOuterJoinRDD, GemFireJoinRDD, GemFireRDDWriter} -import org.apache.spark.Logging -import org.apache.spark.api.java.function.{PairFunction, Function} -import org.apache.spark.rdd.RDD - -/** - * Extra gemFire functions on non-Pair RDDs through an implicit conversion. - * Import `io.pivotal.gemfire.spark.connector._` at the top of your program to - * use these functions. - */ -class GemFireRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging { - - /** - * Save the non-pair RDD to GemFire key-value store. - * @param regionPath the full path of region that the RDD is stored - * @param func the function that converts elements of RDD to key/value pairs - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @param opConf the optional parameters for this operation - */ - def saveToGemfire[K, V]( - regionPath: String, - func: T => (K, V), - connConf: GemFireConnectionConf = defaultConnectionConf, - opConf: Map[String, String] = Map.empty): Unit = { - connConf.getConnection.validateRegion[K, V](regionPath) - if (log.isDebugEnabled) - logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""") - else - logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""") - val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf, opConf) - rdd.sparkContext.runJob(rdd, writer.write(func) _) - } - - /** This version of saveToGemfire(...) is just for Java API. */ - private[connector] def saveToGemfire[K, V]( - regionPath: String, - func: PairFunction[T, K, V], - connConf: GemFireConnectionConf, - opConf: Map[String, String]): Unit = { - saveToGemfire[K, V](regionPath, func.call _, connConf, opConf) - } - - /** - * Return an RDD containing all pairs of elements with matching keys in `this` RDD - * and the GemFire `Region[K, V]`. The join key from RDD element is generated by - * `func(T) => K`, and the key from the GemFire region is just the key of the - * key/value pair. - * - * Each pair of elements of result RDD will be returned as a (t, v) tuple, - * where (t) is in `this` RDD and (k, v) is in the GemFire region. - * - * @param regionPath the region path of the GemFire region - * @param func the function that generate region key from RDD element T - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @tparam K the key type of the GemFire region - * @tparam V the value type of the GemFire region - * @return RDD[T, V] - */ - def joinGemfireRegion[K, V](regionPath: String, func: T => K, - connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[T, K, V] = { - new GemFireJoinRDD[T, K, V](rdd, func, regionPath, connConf) - } - - /** This version of joinGemfireRegion(...) is just for Java API. */ - private[connector] def joinGemfireRegion[K, V]( - regionPath: String, func: Function[T, K], connConf: GemFireConnectionConf): GemFireJoinRDD[T, K, V] = { - joinGemfireRegion(regionPath, func.call _, connConf) - } - - /** - * Perform a left outer join of `this` RDD and the GemFire `Region[K, V]`. - * The join key from RDD element is generated by `func(T) => K`, and the - * key from region is just the key of the key/value pair. - * - * For each element (t) in `this` RDD, the resulting RDD will either contain - * all pairs (t, Some(v)) for v in the GemFire region, or the pair - * (t, None) if no element in the GemFire region have key `func(t)` - * - * @param regionPath the region path of the GemFire region - * @param func the function that generate region key from RDD element T - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @tparam K the key type of the GemFire region - * @tparam V the value type of the GemFire region - * @return RDD[ T, Option[V] ] - */ - def outerJoinGemfireRegion[K, V](regionPath: String, func: T => K, - connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[T, K, V] = { - new GemFireOuterJoinRDD[T, K, V](rdd, func, regionPath, connConf) - } - - /** This version of outerJoinGemfireRegion(...) is just for Java API. */ - private[connector] def outerJoinGemfireRegion[K, V]( - regionPath: String, func: Function[T, K], connConf: GemFireConnectionConf): GemFireOuterJoinRDD[T, K, V] = { - outerJoinGemfireRegion(regionPath, func.call _, connConf) - } - - private[connector] def defaultConnectionConf: GemFireConnectionConf = - GemFireConnectionConf(rdd.sparkContext.getConf) - -} - -