Repository: spark Updated Branches: refs/heads/branch-2.4 a2991d233 -> 48e2e6fcc
[SPARK-25644][SS][FOLLOWUP][BUILD] Fix Scala 2.12 build error due to foreachBatch ## What changes were proposed in this pull request? This PR fixes the Scala-2.12 build error due to ambiguity in `foreachBatch` test cases. - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/428/console ```scala [error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala:102: ambiguous reference to overloaded definition, [error] both method foreachBatch in class DataStreamWriter of type (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[Int],Long])org.apache.spark.sql.streaming.DataStreamWriter[Int] [error] and method foreachBatch in class DataStreamWriter of type (function: (org.apache.spark.sql.Dataset[Int], Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[Int] [error] match argument types ((org.apache.spark.sql.Dataset[Int], Any) => Unit) [error] ds.writeStream.foreachBatch((_, _) => {}).trigger(Trigger.Continuous("1 second")).start() [error] ^ [error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala:106: ambiguous reference to overloaded definition, [error] both method foreachBatch in class DataStreamWriter of type (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[Int],Long])org.apache.spark.sql.streaming.DataStreamWriter[Int] [error] and method foreachBatch in class DataStreamWriter of type (function: (org.apache.spark.sql.Dataset[Int], Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[Int] [error] match argument types ((org.apache.spark.sql.Dataset[Int], Any) => Unit) [error] ds.writeStream.foreachBatch((_, _) => {}).partitionBy("value").start() [error] ^ ``` ## How was this patch tested? Manual. Since this failure occurs in Scala-2.12 profile and test cases, Jenkins will not test this. We need to build with Scala-2.12 and run the tests. Closes #22649 from dongjoon-hyun/SPARK-SCALA212. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 9cbf105ab1256d65f027115ba5505842ce8fffe3) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48e2e6fc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48e2e6fc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48e2e6fc Branch: refs/heads/branch-2.4 Commit: 48e2e6fcc3617f021b55c2e2be0cda39cad89711 Parents: a2991d2 Author: Dongjoon Hyun <dongj...@apache.org> Authored: Sat Oct 6 09:40:42 2018 -0700 Committer: Dongjoon Hyun <dongj...@apache.org> Committed: Sat Oct 6 09:40:54 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 4 ++-- .../sql/execution/streaming/sources/ForeachBatchSinkSuite.scala | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/48e2e6fc/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index e0b6d8c..d89e45e 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -33,7 +33,7 @@ import org.apache.kafka.common.TopicPartition import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ -import org.apache.spark.sql.{ForeachWriter, SparkSession} +import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession} import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution @@ -879,7 +879,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } testUtils.waitUntilOffsetAppears(topicPartition, 5) - val q = ds.writeStream.foreachBatch { (ds, epochId) => + val q = ds.writeStream.foreachBatch { (ds: Dataset[String], epochId: Long) => if (epochId == 0) { // Send more message before the tasks of the current batch start reading the current batch // data, so that the executors will prefetch messages in the next batch and drop them. In http://git-wip-us.apache.org/repos/asf/spark/blob/48e2e6fc/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala index 71dff44..3e9ccb0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala @@ -99,11 +99,12 @@ class ForeachBatchSinkSuite extends StreamTest { } assert(ex1.getMessage.contains("foreachBatch function cannot be null")) val ex2 = intercept[AnalysisException] { - ds.writeStream.foreachBatch((_, _) => {}).trigger(Trigger.Continuous("1 second")).start() + ds.writeStream.foreachBatch((_: Dataset[Int], _: Long) => {}) + .trigger(Trigger.Continuous("1 second")).start() } assert(ex2.getMessage.contains("'foreachBatch' is not supported with continuous trigger")) val ex3 = intercept[AnalysisException] { - ds.writeStream.foreachBatch((_, _) => {}).partitionBy("value").start() + ds.writeStream.foreachBatch((_: Dataset[Int], _: Long) => {}).partitionBy("value").start() } assert(ex3.getMessage.contains("'foreachBatch' does not support partitioning")) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org