Repository: spark
Updated Branches:
  refs/heads/master 8c198e246 -> d0ac0e6f4


[SPARK-16020][SQL] Fix complete mode aggregation with console sink

## What changes were proposed in this pull request?

We cannot use `limit` on DataFrame in ConsoleSink because it will use a wrong 
planner. This PR just collects `DataFrame` and calls `show` on a batch 
DataFrame based on the result. This is fine since ConsoleSink is only for 
debugging.

## How was this patch tested?

Manually confirmed ConsoleSink now works with complete mode aggregation.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #13740 from zsxwing/complete-console.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0ac0e6f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0ac0e6f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0ac0e6f

Branch: refs/heads/master
Commit: d0ac0e6f433bfccf4ced3743a2526f67fdb5c38e
Parents: 8c198e2
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Fri Jun 17 21:58:10 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri Jun 17 21:58:10 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/streaming/Sink.scala    |  3 +
 .../spark/sql/execution/streaming/console.scala |  4 +-
 .../execution/streaming/ConsoleSinkSuite.scala  | 99 ++++++++++++++++++++
 3 files changed, 105 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d0ac0e6f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
index e641e09..2571b59 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
@@ -30,6 +30,9 @@ trait Sink {
    * Adds a batch of data to this sink. The data for a given `batchId` is 
deterministic and if
    * this method is called more than once with the same batchId (which will 
happen in the case of
    * failures), then `data` should only be added once.
+   *
+   * Note: You cannot apply any operators on `data` except consuming it (e.g., 
`collect/foreach`).
+   * Otherwise, you may get a wrong result.
    */
   def addBatch(batchId: Long, data: DataFrame): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d0ac0e6f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 2ec2a3c..e8b9712 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -45,7 +45,9 @@ class ConsoleSink(options: Map[String, String]) extends Sink 
with Logging {
     println(batchIdStr)
     println("-------------------------------------------")
     // scalastyle:off println
-    data.show(numRowsToShow, isTruncated)
+    data.sparkSession.createDataFrame(
+      data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
+      .show(numRowsToShow, isTruncated)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d0ac0e6f/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala
new file mode 100644
index 0000000..e853d8c
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.streaming.StreamTest
+
+class ConsoleSinkSuite extends StreamTest with BeforeAndAfter {
+
+  import testImplicits._
+
+  after {
+    sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("SPARK-16020 Complete mode aggregation with console sink") {
+    withTempDir { checkpointLocation =>
+      val origOut = System.out
+      val stdout = new ByteArrayOutputStream()
+      try {
+        // Hook Java System.out.println
+        System.setOut(new PrintStream(stdout))
+        // Hook Scala println
+        Console.withOut(stdout) {
+          val input = MemoryStream[String]
+          val df = input.toDF().groupBy("value").count()
+          val query = df.writeStream
+            .format("console")
+            .outputMode("complete")
+            .option("checkpointLocation", checkpointLocation.getAbsolutePath)
+            .start()
+          input.addData("a")
+          query.processAllAvailable()
+          input.addData("a", "b")
+          query.processAllAvailable()
+          input.addData("a", "b", "c")
+          query.processAllAvailable()
+          query.stop()
+        }
+        System.out.flush()
+      } finally {
+        System.setOut(origOut)
+      }
+
+      val expected = """-------------------------------------------
+        |Batch: 0
+        |-------------------------------------------
+        |+-----+-----+
+        ||value|count|
+        |+-----+-----+
+        ||    a|    1|
+        |+-----+-----+
+        |
+        |-------------------------------------------
+        |Batch: 1
+        |-------------------------------------------
+        |+-----+-----+
+        ||value|count|
+        |+-----+-----+
+        ||    a|    2|
+        ||    b|    1|
+        |+-----+-----+
+        |
+        |-------------------------------------------
+        |Batch: 2
+        |-------------------------------------------
+        |+-----+-----+
+        ||value|count|
+        |+-----+-----+
+        ||    a|    3|
+        ||    b|    2|
+        ||    c|    1|
+        |+-----+-----+
+        |
+        |""".stripMargin
+      assert(expected === new String(stdout.toByteArray, UTF_8))
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to