Repository: spark
Updated Branches:
  refs/heads/branch-1.6 899106cc6 -> c130b8626


[SPARK-4557][STREAMING] Spark Streaming foreachRDD Java API method should 
accept a VoidFunction<...>

Currently streaming foreachRDD Java API uses a function prototype requiring a 
return value of null.  This PR deprecates the old method and uses VoidFunction 
to allow for more concise declaration.  Also added VoidFunction2 to Java API in 
order to use in Streaming methods.  Unit test is added for using foreachRDD 
with VoidFunction, and changes have been tested with Java 7 and Java 8 using 
lambdas.

Author: Bryan Cutler <bjcut...@us.ibm.com>

Closes #9488 from BryanCutler/foreachRDD-VoidFunction-SPARK-4557.

(cherry picked from commit 31921e0f0bd559d042148d1ea32f865fb3068f38)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c130b862
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c130b862
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c130b862

Branch: refs/heads/branch-1.6
Commit: c130b86261152f526a8b36a82acf9d892bc6e258
Parents: 899106c
Author: Bryan Cutler <bjcut...@us.ibm.com>
Authored: Wed Nov 18 12:09:54 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Nov 18 12:10:03 2015 -0800

----------------------------------------------------------------------
 .../spark/api/java/function/VoidFunction2.java  | 27 +++++++++++++
 .../apache/spark/streaming/Java8APISuite.java   | 26 +++++++++++++
 project/MimaExcludes.scala                      |  4 ++
 .../streaming/api/java/JavaDStreamLike.scala    | 24 +++++++++++-
 .../apache/spark/streaming/JavaAPISuite.java    | 41 +++++++++++++++++++-
 5 files changed, 120 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c130b862/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java 
b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
new file mode 100644
index 0000000..6c576ab
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A two-argument function that takes arguments of type T1 and T2 with no 
return value.
+ */
+public interface VoidFunction2<T1, T2> extends Serializable {
+  public void call(T1 v1, T2 v2) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c130b862/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
----------------------------------------------------------------------
diff --git 
a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
 
b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index 163ae92..4eee97b 100644
--- 
a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ 
b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.spark.Accumulator;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
@@ -361,6 +362,31 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
   }
 
   @Test
+  public void testForeachRDD() {
+    final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
+    final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
+    List<List<Integer>> inputData = Arrays.asList(
+        Arrays.asList(1,1,1),
+        Arrays.asList(1,1,1));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
+
+    stream.foreachRDD(rdd -> {
+      accumRdd.add(1);
+      rdd.foreach(x -> accumEle.add(1));
+    });
+
+    // This is a test to make sure foreachRDD(VoidFunction2) can be called 
from Java
+    stream.foreachRDD((rdd, time) -> null);
+
+    JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(2, accumRdd.value().intValue());
+    Assert.assertEquals(6, accumEle.value().intValue());
+  }
+
+  @Test
   public void testPairFlatMap() {
     List<List<String>> inputData = Arrays.asList(
       Arrays.asList("giants"),

http://git-wip-us.apache.org/repos/asf/spark/blob/c130b862/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index eb70d27..bb45d1b 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -142,6 +142,10 @@ object MimaExcludes {
           
"org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream"),
         ProblemFilters.exclude[MissingMethodProblem](
           "org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createRDD")
+      ) ++ Seq(
+        // SPARK-4557 Changed foreachRDD to use VoidFunction
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.streaming.api.java.JavaDStreamLike.foreachRDD")
       )
     case v if v.startsWith("1.5") =>
       Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/c130b862/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index edfa474..84acec7 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -27,7 +27,7 @@ import scala.reflect.ClassTag
 import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaRDDLike}
 import org.apache.spark.api.java.JavaPairRDD._
 import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => 
JFunction2, Function3 => JFunction3, _}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => 
JFunction2, Function3 => JFunction3, VoidFunction => JVoidFunction, 
VoidFunction2 => JVoidFunction2, _}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.api.java.JavaDStream._
@@ -308,7 +308,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, 
R], R <: JavaRDDLike[T
   /**
    * Apply a function to each RDD in this DStream. This is an output operator, 
so
    * 'this' DStream will be registered as an output stream and therefore 
materialized.
+   *
+   * @deprecated  As of release 1.6.0, replaced by foreachRDD(JVoidFunction)
    */
+  @deprecated("Use foreachRDD(foreachFunc: JVoidFunction[R])", "1.6.0")
   def foreachRDD(foreachFunc: JFunction[R, Void]) {
     dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
   }
@@ -316,12 +319,31 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, 
R], R <: JavaRDDLike[T
   /**
    * Apply a function to each RDD in this DStream. This is an output operator, 
so
    * 'this' DStream will be registered as an output stream and therefore 
materialized.
+   *
+   * @deprecated  As of release 1.6.0, replaced by foreachRDD(JVoidFunction2)
    */
+  @deprecated("Use foreachRDD(foreachFunc: JVoidFunction2[R, Time])", "1.6.0")
   def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) {
     dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
   }
 
   /**
+   * Apply a function to each RDD in this DStream. This is an output operator, 
so
+   * 'this' DStream will be registered as an output stream and therefore 
materialized.
+   */
+  def foreachRDD(foreachFunc: JVoidFunction[R]) {
+    dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
+  }
+
+  /**
+   * Apply a function to each RDD in this DStream. This is an output operator, 
so
+   * 'this' DStream will be registered as an output stream and therefore 
materialized.
+   */
+  def foreachRDD(foreachFunc: JVoidFunction2[R, Time]) {
+    dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
+  }
+
+  /**
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of 'this' DStream.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/c130b862/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java 
b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index c521714..609bb44 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -37,7 +37,9 @@ import com.google.common.base.Optional;
 import com.google.common.io.Files;
 import com.google.common.collect.Sets;
 
+import org.apache.spark.Accumulator;
 import org.apache.spark.HashPartitioner;
+import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -45,7 +47,6 @@ import org.apache.spark.api.java.function.*;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.api.java.*;
 import org.apache.spark.util.Utils;
-import org.apache.spark.SparkConf;
 
 // The test suite itself is Serializable so that anonymous Function 
implementations can be
 // serialized, as an alternative to converting these anonymous classes to 
static inner classes;
@@ -770,6 +771,44 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
 
   @SuppressWarnings("unchecked")
   @Test
+  public void testForeachRDD() {
+    final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
+    final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
+    List<List<Integer>> inputData = Arrays.asList(
+        Arrays.asList(1,1,1),
+        Arrays.asList(1,1,1));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
+
+    stream.foreachRDD(new VoidFunction<JavaRDD<Integer>>() {
+      @Override
+      public void call(JavaRDD<Integer> rdd) {
+        accumRdd.add(1);
+        rdd.foreach(new VoidFunction<Integer>() {
+          @Override
+          public void call(Integer i) {
+            accumEle.add(1);
+          }
+        });
+      }
+    });
+
+    // This is a test to make sure foreachRDD(VoidFunction2) can be called 
from Java
+    stream.foreachRDD(new VoidFunction2<JavaRDD<Integer>, Time>() {
+      @Override
+      public void call(JavaRDD<Integer> rdd, Time time) {
+      }
+    });
+
+    JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(2, accumRdd.value().intValue());
+    Assert.assertEquals(6, accumEle.value().intValue());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
   public void testPairFlatMap() {
     List<List<String>> inputData = Arrays.asList(
         Arrays.asList("giants"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to