IGNITE-3232 - Inline transformers for IgniteRDD.savePairs and IgniteRDD.saveValues
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e4ef3b2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e4ef3b2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e4ef3b2 Branch: refs/heads/ignite-3216 Commit: 0e4ef3b2e842015ef27aa107a5ecdcc10a73a327 Parents: 50a8ec2 Author: Valentin Kulichenko <valentin.luliche...@gmail.com> Authored: Thu Jun 2 16:03:12 2016 +0300 Committer: Valentin Kulichenko <valentin.luliche...@gmail.com> Committed: Fri Jun 3 19:47:00 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/spark/IgniteRDD.scala | 75 +++++++++++++++++++ .../org/apache/ignite/spark/JavaIgniteRDD.scala | 9 +++ .../org/apache/ignite/spark/IgniteRDDSpec.scala | 77 +++++++++++++++++++- 3 files changed, 160 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0e4ef3b2/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 0d8e730..036dfe6 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -180,6 +180,39 @@ class IgniteRDD[K, V] ( } /** + * Saves values from given RDD into Ignite. A unique key will be generated for each value of the given RDD. + * + * @param rdd RDD instance to save values from. + * @param f Transformation function. + */ + def saveValues[T](rdd: RDD[T], f: (T, IgniteContext[K, V]) â V) = { + rdd.foreachPartition(it â { + val ig = ic.ignite() + + ensureCache() + + val locNode = ig.cluster().localNode() + + val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode)) + + val streamer = ig.dataStreamer[Object, V](cacheName) + + try { + it.foreach(t â { + val value = f(t, ic) + + val key = affinityKeyFunc(value, node.orNull) + + streamer.addData(key, value) + }) + } + finally { + streamer.close() + } + }) + } + + /** * Saves values from the given key-value RDD into Ignite. * * @param rdd RDD instance to save values from. @@ -209,6 +242,48 @@ class IgniteRDD[K, V] ( } /** + * Saves values from the given RDD into Ignite. + * + * @param rdd RDD instance to save values from. + * @param f Transformation function. + * @param overwrite Boolean flag indicating whether the call on this method should overwrite existing + * values in Ignite cache. + */ + def savePairs[T](rdd: RDD[T], f: (T, IgniteContext[K, V]) â (K, V), overwrite: Boolean) = { + rdd.foreachPartition(it â { + val ig = ic.ignite() + + // Make sure to deploy the cache + ensureCache() + + val streamer = ig.dataStreamer[K, V](cacheName) + + try { + streamer.allowOverwrite(overwrite) + + it.foreach(t â { + val tup = f(t, ic) + + streamer.addData(tup._1, tup._2) + }) + } + finally { + streamer.close() + } + }) + } + + /** + * Saves values from the given RDD into Ignite. + * + * @param rdd RDD instance to save values from. + * @param f Transformation function. + */ + def savePairs[T](rdd: RDD[T], f: (T, IgniteContext[K, V]) â (K, V)): Unit = { + savePairs(rdd, f, overwrite = false) + } + + /** * Removes all values from the underlying Ignite cache. */ def clear(): Unit = { http://git-wip-us.apache.org/repos/asf/ignite/blob/0e4ef3b2/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala index 2e8702e..cac0e15 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala @@ -80,12 +80,21 @@ class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V]) def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd)) + def saveValues[T](jrdd: JavaRDD[T], f: (T, IgniteContext[K, V]) â V) = rdd.saveValues(JavaRDD.toRDD(jrdd), f) + def savePairs(jrdd: JavaPairRDD[K, V]) = { val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd) rdd.savePairs(rrdd) } + def savePairs[T](jrdd: JavaRDD[T], f: (T, IgniteContext[K, V]) â (K, V), overwrite: Boolean = false) = { + rdd.savePairs(JavaRDD.toRDD(jrdd), f, overwrite) + } + + def savePairs[T](jrdd: JavaRDD[T], f: (T, IgniteContext[K, V]) â (K, V)): Unit = + savePairs(jrdd, f, overwrite = false) + def clear(): Unit = rdd.clear() } http://git-wip-us.apache.org/repos/asf/ignite/blob/0e4ef3b2/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala index 4e90bc8..61040d9 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala @@ -26,6 +26,7 @@ import org.apache.spark.SparkContext import org.junit.runner.RunWith import org.scalatest._ import org.scalatest.junit.JUnitRunner +import scala.collection.JavaConversions._ import IgniteRDDSpec._ @@ -34,7 +35,7 @@ import scala.annotation.meta.field @RunWith(classOf[JUnitRunner]) class IgniteRDDSpec extends FunSpec with Matchers with BeforeAndAfterAll with BeforeAndAfterEach { describe("IgniteRDD") { - it("should successfully store data to ignite") { + it("should successfully store data to ignite using savePairs") { val sc = new SparkContext("local[*]", "test") try { @@ -59,6 +60,80 @@ class IgniteRDDSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be } } + it("should successfully store data to ignite using savePairs with inline transformation") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, String](sc, + () â configuration("client", client = true)) + + // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache. + ic.fromCache(PARTITIONED_CACHE_NAME).savePairs( + sc.parallelize(0 to 10000, 2), (i: Int, ic) â (String.valueOf(i), "val" + i)) + + // Check cache contents. + val ignite = Ignition.ignite("grid-0") + + for (i â 0 to 10000) { + val res = ignite.cache[String, String](PARTITIONED_CACHE_NAME).get(String.valueOf(i)) + + assert(res != null, "Value was not put to cache for key: " + i) + assert("val" + i == res, "Invalid value stored for key: " + i) + } + } + finally { + sc.stop() + } + } + + it("should successfully store data to ignite using saveValues") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, String](sc, + () â configuration("client", client = true)) + + // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache. + ic.fromCache(PARTITIONED_CACHE_NAME).saveValues( + sc.parallelize(0 to 10000, 2).map(i â "val" + i)) + + // Check cache contents. + val ignite = Ignition.ignite("grid-0") + + val values = ignite.cache[String, String](PARTITIONED_CACHE_NAME).toList.map(e â e.getValue) + + for (i â 0 to 10000) + assert(values.contains("val" + i), "Value not found for index: " + i) + } + finally { + sc.stop() + } + } + + it("should successfully store data to ignite using saveValues with inline transformation") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, String](sc, + () â configuration("client", client = true)) + + // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache. + ic.fromCache(PARTITIONED_CACHE_NAME).saveValues( + sc.parallelize(0 to 10000, 2), (i: Int, ic) â "val" + i) + + // Check cache contents. + val ignite = Ignition.ignite("grid-0") + + val values = ignite.cache[String, String](PARTITIONED_CACHE_NAME).toList.map(e â e.getValue) + + for (i â 0 to 10000) + assert(values.contains("val" + i), "Value not found for index: " + i) + } + finally { + sc.stop() + } + } + it("should successfully read data from ignite") { val sc = new SparkContext("local[*]", "test")