Repository: spark Updated Branches: refs/heads/master 9b0593d5e -> c1a0c66bd
[SPARK-18261][STRUCTURED STREAMING] Add statistics to MemorySink for joining ## What changes were proposed in this pull request? Right now, there is no way to join the output of a memory sink with any table: > UnsupportedOperationException: LeafNode MemoryPlan must implement statistics This patch adds statistics to MemorySink, making joining snapshots of memory streams with tables possible. ## How was this patch tested? Added a test case. Author: Liwei Lin <lwl...@gmail.com> Closes #15786 from lw-lin/memory-sink-stat. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1a0c66b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1a0c66b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1a0c66b Branch: refs/heads/master Commit: c1a0c66bd2662bc40f312da474c3b95229fe92d0 Parents: 9b0593d Author: Liwei Lin <lwl...@gmail.com> Authored: Mon Nov 7 17:49:24 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Mon Nov 7 17:49:24 2016 -0800 ---------------------------------------------------------------------- .../spark/sql/execution/streaming/memory.scala | 6 +++++- .../spark/sql/streaming/MemorySinkSuite.scala | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c1a0c66b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 48d9791..613c7cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -212,4 +212,8 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi */ case class MemoryPlan(sink: MemorySink, output: Seq[Attribute]) extends LeafNode { def this(sink: MemorySink) = this(sink, sink.schema.toAttributes) + + private val sizePerRow = sink.schema.toAttributes.map(_.dataType.defaultSize).sum + + override def statistics: Statistics = Statistics(sizePerRow * sink.allData.size) } http://git-wip-us.apache.org/repos/asf/spark/blob/c1a0c66b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala index 310d756..4e9fba9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala @@ -187,6 +187,22 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { query.stop() } + test("MemoryPlan statistics") { + implicit val schema = new StructType().add(new StructField("value", IntegerType)) + val sink = new MemorySink(schema, InternalOutputModes.Append) + val plan = new MemoryPlan(sink) + + // Before adding data, check output + checkAnswer(sink.allData, Seq.empty) + assert(plan.statistics.sizeInBytes === 0) + + sink.addBatch(0, 1 to 3) + assert(plan.statistics.sizeInBytes === 12) + + sink.addBatch(1, 4 to 6) + assert(plan.statistics.sizeInBytes === 24) + } + ignore("stress test") { // Ignore the stress test as it takes several minutes to run (0 until 1000).foreach { _ => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org