Repository: spark
Updated Branches:
  refs/heads/master 9b0593d5e -> c1a0c66bd


[SPARK-18261][STRUCTURED STREAMING] Add statistics to MemorySink for joining

## What changes were proposed in this pull request?

Right now, there is no way to join the output of a memory sink with any table:

> UnsupportedOperationException: LeafNode MemoryPlan must implement statistics

This patch adds statistics to MemorySink, making joining snapshots of memory 
streams with tables possible.

## How was this patch tested?

Added a test case.

Author: Liwei Lin <lwl...@gmail.com>

Closes #15786 from lw-lin/memory-sink-stat.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1a0c66b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1a0c66b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1a0c66b

Branch: refs/heads/master
Commit: c1a0c66bd2662bc40f312da474c3b95229fe92d0
Parents: 9b0593d
Author: Liwei Lin <lwl...@gmail.com>
Authored: Mon Nov 7 17:49:24 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Nov 7 17:49:24 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/streaming/memory.scala      |  6 +++++-
 .../spark/sql/streaming/MemorySinkSuite.scala       | 16 ++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c1a0c66b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 48d9791..613c7cc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.encoderFor
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
@@ -212,4 +212,8 @@ class MemorySink(val schema: StructType, outputMode: 
OutputMode) extends Sink wi
  */
 case class MemoryPlan(sink: MemorySink, output: Seq[Attribute]) extends 
LeafNode {
   def this(sink: MemorySink) = this(sink, sink.schema.toAttributes)
+
+  private val sizePerRow = 
sink.schema.toAttributes.map(_.dataType.defaultSize).sum
+
+  override def statistics: Statistics = Statistics(sizePerRow * 
sink.allData.size)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c1a0c66b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
index 310d756..4e9fba9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -187,6 +187,22 @@ class MemorySinkSuite extends StreamTest with 
BeforeAndAfter {
     query.stop()
   }
 
+  test("MemoryPlan statistics") {
+    implicit val schema = new StructType().add(new StructField("value", 
IntegerType))
+    val sink = new MemorySink(schema, InternalOutputModes.Append)
+    val plan = new MemoryPlan(sink)
+
+    // Before adding data, check output
+    checkAnswer(sink.allData, Seq.empty)
+    assert(plan.statistics.sizeInBytes === 0)
+
+    sink.addBatch(0, 1 to 3)
+    assert(plan.statistics.sizeInBytes === 12)
+
+    sink.addBatch(1, 4 to 6)
+    assert(plan.statistics.sizeInBytes === 24)
+  }
+
   ignore("stress test") {
     // Ignore the stress test as it takes several minutes to run
     (0 until 1000).foreach { _ =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to