Repository: spark
Updated Branches:
  refs/heads/master 5e64dab86 -> 59236e5c5


[SPARK-14288][SQL] Memory Sink for streaming

This PR exposes the internal testing `MemorySink` though the data source API.  
This will allow users to easily test streaming applications in the Spark shell 
or other local tests.

Usage:
```scala
inputStream.write
  .format("memory")
  .queryName("memStream")
  .startStream()

// Now you can query the result of the stream here.
sqlContext.table("memStream")
```

The most complicated part of the logic is choosing the checkpoint directory.  
There are a few requirements we are attempting to satisfy here:
 - when working in the shell locally, it should just work with no extra 
configuration.
 - when working on a cluster you should be able to make it easily create the 
checkpoint on a distributed file system so you can test aggregation (state 
checkpoints are also stored in this directory and must be accessible from 
workers).
 - it should be clear that you can't resume since the data is just in memory.

The chosen algorithm proceeds as follows:
 - the user gives a checkpoint directory, use it
 - if the conf has a checkpoint location, use `$location/$queryName`
 - if neither, create a local directory
 - always check to make sure there are no offsets written to the directory

Author: Michael Armbrust <mich...@databricks.com>

Closes #12119 from marmbrus/memorySink.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59236e5c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59236e5c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59236e5c

Branch: refs/heads/master
Commit: 59236e5c5b9d24f90fcf8d09b23ae8b06355657e
Parents: 5e64dab
Author: Michael Armbrust <mich...@databricks.com>
Authored: Wed Apr 6 10:05:02 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Apr 6 10:05:02 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  | 79 ++++++++++++++-----
 .../spark/sql/execution/SparkStrategies.scala   |  6 ++
 .../spark/sql/execution/streaming/memory.scala  |  8 ++
 .../scala/org/apache/spark/sql/QueryTest.scala  |  2 +
 .../spark/sql/streaming/MemorySinkSuite.scala   | 82 ++++++++++++++++++++
 5 files changed, 159 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/59236e5c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 3332a99..54d2508 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -29,8 +29,10 @@ import 
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
 import org.apache.spark.sql.execution.datasources.{BucketSpec, 
CreateTableUsingAsSelect, DataSource}
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
-import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, 
StreamExecution}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.HadoopFsRelation
+import org.apache.spark.util.Utils
 
 /**
  * :: Experimental ::
@@ -275,23 +277,64 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    * @since 2.0.0
    */
   def startStream(): ContinuousQuery = {
-    val dataSource =
-      DataSource(
-        df.sqlContext,
-        className = source,
-        options = extraOptions.toMap,
-        partitionColumns = normalizedParCols.getOrElse(Nil))
-
-    val queryName = extraOptions.getOrElse("queryName", 
StreamExecution.nextName)
-    val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
-      new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString
-    })
-    df.sqlContext.sessionState.continuousQueryManager.startQuery(
-      queryName,
-      checkpointLocation,
-      df,
-      dataSource.createSink(),
-      trigger)
+    if (source == "memory") {
+      val queryName =
+        extraOptions.getOrElse(
+          "queryName", throw new AnalysisException("queryName must be 
specified for memory sink"))
+      val checkpointLocation = extraOptions.get("checkpointLocation").map { 
userSpecified =>
+        new Path(userSpecified).toUri.toString
+      }.orElse {
+        val checkpointConfig: Option[String] =
+          df.sqlContext.conf.getConf(
+            SQLConf.CHECKPOINT_LOCATION,
+            None)
+
+        checkpointConfig.map { location =>
+          new Path(location, queryName).toUri.toString
+        }
+      }.getOrElse {
+        Utils.createTempDir(namePrefix = "memory.stream").getCanonicalPath
+      }
+
+      // If offsets have already been created, we trying to resume a query.
+      val checkpointPath = new Path(checkpointLocation, "offsets")
+      val fs = 
checkpointPath.getFileSystem(df.sqlContext.sparkContext.hadoopConfiguration)
+      if (fs.exists(checkpointPath)) {
+        throw new AnalysisException(
+          s"Unable to resume query written to memory sink. Delete 
$checkpointPath to start over.")
+      } else {
+        checkpointPath.toUri.toString
+      }
+
+      val sink = new MemorySink(df.schema)
+      val resultDf = Dataset.ofRows(df.sqlContext, new MemoryPlan(sink))
+      resultDf.registerTempTable(queryName)
+      val continuousQuery = 
df.sqlContext.sessionState.continuousQueryManager.startQuery(
+        queryName,
+        checkpointLocation,
+        df,
+        sink,
+        trigger)
+      continuousQuery
+    } else {
+      val dataSource =
+        DataSource(
+          df.sqlContext,
+          className = source,
+          options = extraOptions.toMap,
+          partitionColumns = normalizedParCols.getOrElse(Nil))
+
+      val queryName = extraOptions.getOrElse("queryName", 
StreamExecution.nextName)
+      val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
+        new Path(df.sqlContext.conf.checkpointLocation, 
queryName).toUri.toString
+      })
+      df.sqlContext.sessionState.continuousQueryManager.startQuery(
+        queryName,
+        checkpointLocation,
+        df,
+        dataSource.createSink(),
+        trigger)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/59236e5c/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5f3128d..d77aba7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.Strategy
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
@@ -30,6 +31,7 @@ import 
org.apache.spark.sql.execution.command.{DescribeCommand => RunnableDescri
 import org.apache.spark.sql.execution.datasources.{DescribeCommand => 
LogicalDescribeCommand, _}
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
+import org.apache.spark.sql.execution.streaming.MemoryPlan
 import org.apache.spark.sql.internal.SQLConf
 
 private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
@@ -332,6 +334,10 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case r: RunnableCommand => ExecutedCommand(r) :: Nil
 
+      case MemoryPlan(sink, output) =>
+        val encoder = RowEncoder(sink.schema)
+        LocalTableScan(output, sink.allData.map(r => encoder.toRow(r).copy())) 
:: Nil
+
       case logical.Distinct(child) =>
         throw new IllegalStateException(
           "logical distinct operator should have been replaced by aggregate in 
the optimizer")

http://git-wip-us.apache.org/repos/asf/spark/blob/59236e5c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index b652530..351ef40 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -25,6 +25,8 @@ import scala.util.control.NonFatal
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row, SQLContext}
 import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
 import org.apache.spark.sql.types.StructType
 
 object MemoryStream {
@@ -136,3 +138,9 @@ class MemorySink(val schema: StructType) extends Sink with 
Logging {
   }
 }
 
+/**
+ * Used to query the data that has been written into a [[MemorySink]].
+ */
+case class MemoryPlan(sink: MemorySink, output: Seq[Attribute]) extends 
LeafNode {
+  def this(sink: MemorySink) = this(sink, sink.schema.toAttributes)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/59236e5c/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 4e62fac..48a077d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.streaming.MemoryPlan
 
 abstract class QueryTest extends PlanTest {
 
@@ -200,6 +201,7 @@ abstract class QueryTest extends PlanTest {
     logicalPlan.transform {
       case _: ObjectOperator => return
       case _: LogicalRelation => return
+      case _: MemoryPlan => return
     }.transformAllExpressions {
       case a: ImperativeAggregate => return
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/59236e5c/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
new file mode 100644
index 0000000..5249aa2
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{AnalysisException, Row, StreamTest}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
+
+class MemorySinkSuite extends StreamTest with SharedSQLContext {
+  import testImplicits._
+
+  test("registering as a table") {
+    val input = MemoryStream[Int]
+    val query = input.toDF().write
+      .format("memory")
+      .queryName("memStream")
+      .startStream()
+    input.addData(1, 2, 3)
+    query.processAllAvailable()
+
+    checkDataset(
+      sqlContext.table("memStream").as[Int],
+      1, 2, 3)
+
+    input.addData(4, 5, 6)
+    query.processAllAvailable()
+    checkDataset(
+      sqlContext.table("memStream").as[Int],
+      1, 2, 3, 4, 5, 6)
+
+    query.stop()
+  }
+
+  test("error when no name is specified") {
+    val error = intercept[AnalysisException] {
+      val input = MemoryStream[Int]
+      val query = input.toDF().write
+          .format("memory")
+          .startStream()
+    }
+
+    assert(error.message contains "queryName must be specified")
+  }
+
+  test("error if attempting to resume specific checkpoint") {
+    val location = Utils.createTempDir("steaming.checkpoint").getCanonicalPath
+
+    val input = MemoryStream[Int]
+    val query = input.toDF().write
+        .format("memory")
+        .queryName("memStream")
+        .option("checkpointLocation", location)
+        .startStream()
+    input.addData(1, 2, 3)
+    query.processAllAvailable()
+    query.stop()
+
+    intercept[AnalysisException] {
+      input.toDF().write
+        .format("memory")
+        .queryName("memStream")
+        .option("checkpointLocation", location)
+        .startStream()
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to