Repository: spark
Updated Branches:
  refs/heads/branch-2.0 72b3cff33 -> ea205e376


[SPARK-16963][STREAMING][SQL] Changes to Source trait and related 
implementation classes

## What changes were proposed in this pull request?

This PR contains changes to the Source trait such that the scheduler can notify 
data sources when it is safe to discard buffered data. Summary of changes:
* Added a method `commit(end: Offset)` that tells the Source that is OK to 
discard all offsets up `end`, inclusive.
* Changed the semantics of a `None` value for the `getBatch` method to mean 
"from the very beginning of the stream"; as opposed to "all data present in the 
Source's buffer".
* Added notes that the upper layers of the system will never call `getBatch` 
with a start value less than the last value passed to `commit`.
* Added a `lastCommittedOffset` method to allow the scheduler to query the 
status of each Source on restart. This addition is not strictly necessary, but 
it seemed like a good idea -- Sources will be maintaining their own persistent 
state, and there may be bugs in the checkpointing code.
* The scheduler in `StreamExecution.scala` now calls `commit` on its stream 
sources after marking each batch as complete in its checkpoint.
* `MemoryStream` now cleans committed batches out of its internal buffer.
* `TextSocketSource` now cleans committed batches from its internal buffer.

## How was this patch tested?
Existing regression tests already exercise the new code.

Author: frreiss <frre...@us.ibm.com>

Closes #14553 from frreiss/fred-16963.

(cherry picked from commit 5b27598ff50cb08e7570fade458da0a3d4d4eabc)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea205e37
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea205e37
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea205e37

Branch: refs/heads/branch-2.0
Commit: ea205e376d555869519ee59186f53ed573ccee39
Parents: 72b3cff
Author: frreiss <frre...@us.ibm.com>
Authored: Wed Oct 26 17:33:08 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Oct 26 17:33:16 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/FileStreamSource.scala  |  9 +++
 .../spark/sql/execution/streaming/Source.scala  | 22 ++++--
 .../execution/streaming/StreamExecution.scala   | 32 ++++++---
 .../spark/sql/execution/streaming/memory.scala  | 47 +++++++++++--
 .../spark/sql/execution/streaming/socket.scala  | 72 ++++++++++++++++----
 .../sql/streaming/StreamingQuerySuite.scala     |  8 +--
 6 files changed, 154 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ea205e37/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 5ada238..c47033a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -176,6 +176,15 @@ class FileStreamSource(
 
   override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
 
+  /**
+   * Informs the source that Spark has completed processing all data for 
offsets less than or
+   * equal to `end` and will only request offsets greater than `end` in the 
future.
+   */
+  override def commit(end: Offset): Unit = {
+    // No-op for now; FileStreamSource currently garbage-collects files based 
on timestamp
+    // and the value of the maxFileAge parameter.
+  }
+
   override def stop() {}
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ea205e37/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
index 9711478..f3bd5bf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -30,16 +30,30 @@ trait Source  {
   /** Returns the schema of the data from this source */
   def schema: StructType
 
-  /** Returns the maximum available offset for this source. */
+  /**
+   * Returns the maximum available offset for this source.
+   * Returns `None` if this source has never received any data.
+   */
   def getOffset: Option[Offset]
 
   /**
-   * Returns the data that is between the offsets (`start`, `end`]. When 
`start` is `None` then
-   * the batch should begin with the first available record. This method must 
always return the
-   * same data for a particular `start` and `end` pair.
+   * Returns the data that is between the offsets (`start`, `end`]. When 
`start` is `None`,
+   * then the batch should begin with the first record. This method must 
always return the
+   * same data for a particular `start` and `end` pair; even after the Source 
has been restarted
+   * on a different node.
+   *
+   * Higher layers will always call this method with a value of `start` 
greater than or equal
+   * to the last value passed to `commit` and a value of `end` less than or 
equal to the
+   * last value returned by `getOffset`
    */
   def getBatch(start: Option[Offset], end: Offset): DataFrame
 
+  /**
+   * Informs the source that Spark has completed processing all data for 
offsets less than or
+   * equal to `end` and will only request offsets greater than `end` in the 
future.
+   */
+  def commit(end: Offset) : Unit = {}
+
   /** Stop this source and free any resources it has allocated. */
   def stop(): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ea205e37/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 627b87b..4707bfb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -73,6 +73,9 @@ class StreamExecution(
   /**
    * Tracks how much data we have processed and committed to the sink or state 
store from each
    * input source.
+   * Only the scheduler thread should modify this field, and only in atomic 
steps.
+   * Other threads should make a shallow copy if they are going to access this 
field more than
+   * once, since the field's value may change at any time.
    */
   @volatile
   var committedOffsets = new StreamProgress
@@ -80,6 +83,9 @@ class StreamExecution(
   /**
    * Tracks the offsets that are available to be processed, but have not yet 
be committed to the
    * sink.
+   * Only the scheduler thread should modify this field, and only in atomic 
steps.
+   * Other threads should make a shallow copy if they are going to access this 
field more than
+   * once, since the field's value may change at any time.
    */
   @volatile
   private var availableOffsets = new StreamProgress
@@ -337,17 +343,27 @@ class StreamExecution(
     }
     if (hasNewData) {
       reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
-        assert(
-          offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
+        assert(offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
           s"Concurrent update to the log. Multiple streaming jobs detected for 
$currentBatchId")
         logInfo(s"Committed offsets for batch $currentBatchId.")
 
+        // NOTE: The following code is correct because runBatches() processes 
exactly one
+        // batch at a time. If we add pipeline parallelism (multiple batches 
in flight at
+        // the same time), this cleanup logic will need to change.
+
+        // Now that we've updated the scheduler's persistent checkpoint, it is 
safe for the
+        // sources to discard data from the previous batch.
+        val prevBatchOff = offsetLog.get(currentBatchId - 1)
+        if (prevBatchOff.isDefined) {
+          prevBatchOff.get.toStreamProgress(sources).foreach {
+            case (src, off) => src.commit(off)
+          }
+        }
+
         // Now that we have logged the new batch, no further processing will 
happen for
-        // the previous batch, and it is safe to discard the old metadata.
-        // Note that purge is exclusive, i.e. it purges everything before 
currentBatchId.
-        // NOTE: If StreamExecution implements pipeline parallelism (multiple 
batches in
-        // flight at the same time), this cleanup logic will need to change.
-        offsetLog.purge(currentBatchId)
+        // the batch before the previous batch, and it is safe to discard the 
old metadata.
+        // Note that purge is exclusive, i.e. it purges everything before the 
target ID.
+        offsetLog.purge(currentBatchId - 1)
       }
     } else {
       awaitBatchLock.lock()
@@ -455,7 +471,7 @@ class StreamExecution(
 
   /**
    * Blocks the current thread until processing for data from the given 
`source` has reached at
-   * least the given `Offset`. This method is indented for use primarily when 
writing tests.
+   * least the given `Offset`. This method is intended for use primarily when 
writing tests.
    */
   private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
     def notDone = {

http://git-wip-us.apache.org/repos/asf/spark/blob/ea205e37/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 53eebae..66dc204 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
 import java.util.concurrent.atomic.AtomicInteger
 import javax.annotation.concurrent.GuardedBy
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.util.control.NonFatal
 
 import org.apache.spark.internal.Logging
@@ -51,12 +51,23 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: 
SQLContext)
   protected val logicalPlan = StreamingExecutionRelation(this)
   protected val output = logicalPlan.output
 
+  /**
+   * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
+   * Stored in a ListBuffer to facilitate removing committed batches.
+   */
   @GuardedBy("this")
-  protected val batches = new ArrayBuffer[Dataset[A]]
+  protected val batches = new ListBuffer[Dataset[A]]
 
   @GuardedBy("this")
   protected var currentOffset: LongOffset = new LongOffset(-1)
 
+  /**
+   * Last offset that was discarded, or -1 if no commits have occurred. Note 
that the value
+   * -1 is used in calculations below and isn't just an arbitrary constant.
+   */
+  @GuardedBy("this")
+  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+
   def schema: StructType = encoder.schema
 
   def toDS()(implicit sqlContext: SQLContext): Dataset[A] = {
@@ -85,21 +96,25 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: 
SQLContext)
   override def toString: String = 
s"MemoryStream[${Utils.truncatedString(output, ",")}]"
 
   override def getOffset: Option[Offset] = synchronized {
-    if (batches.isEmpty) {
+    if (currentOffset.offset == -1) {
       None
     } else {
       Some(currentOffset)
     }
   }
 
-  /**
-   * Returns the data that is between the offsets (`start`, `end`].
-   */
   override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+    // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
     val startOrdinal =
       
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
     val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
-    val newBlocks = synchronized { batches.slice(startOrdinal, endOrdinal) }
+
+    // Internal buffer only holds the batches after lastCommittedOffset.
+    val newBlocks = synchronized {
+      val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
+      val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
+      batches.slice(sliceStart, sliceEnd)
+    }
 
     logDebug(
       s"MemoryBatch [$startOrdinal, $endOrdinal]: 
${newBlocks.flatMap(_.collect()).mkString(", ")}")
@@ -111,11 +126,29 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: 
SQLContext)
       }
   }
 
+  override def commit(end: Offset): Unit = synchronized {
+    end match {
+      case newOffset: LongOffset =>
+        val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+
+        if (offsetDiff < 0) {
+          sys.error(s"Offsets committed out of order: $lastOffsetCommitted 
followed by $end")
+        }
+
+        batches.trimStart(offsetDiff)
+        lastOffsetCommitted = newOffset
+      case _ =>
+        sys.error(s"MemoryStream.commit() received an offset ($end) that did 
not originate with " +
+          "an instance of this class")
+    }
+  }
+
   override def stop() {}
 
   def reset(): Unit = synchronized {
     batches.clear()
     currentOffset = new LongOffset(-1)
+    lastOffsetCommitted = new LongOffset(-1)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ea205e37/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
index fb15239..c662e7c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
@@ -24,14 +24,15 @@ import java.text.SimpleDateFormat
 import java.util.Calendar
 import javax.annotation.concurrent.GuardedBy
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.ListBuffer
 import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
+import org.apache.spark.sql._
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
 
+
 object TextSocketSource {
   val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
   val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
@@ -53,8 +54,18 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
   @GuardedBy("this")
   private var readThread: Thread = null
 
+  /**
+   * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
+   * Stored in a ListBuffer to facilitate removing committed batches.
+   */
+  @GuardedBy("this")
+  protected val batches = new ListBuffer[(String, Timestamp)]
+
+  @GuardedBy("this")
+  protected var currentOffset: LongOffset = new LongOffset(-1)
+
   @GuardedBy("this")
-  private var lines = new ArrayBuffer[(String, Timestamp)]
+  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
 
   initialize()
 
@@ -74,10 +85,12 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
               return
             }
             TextSocketSource.this.synchronized {
-              lines += ((line,
+              val newData = (line,
                 Timestamp.valueOf(
                   
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
-                ))
+                )
+              currentOffset = currentOffset + 1
+              batches.append(newData)
             }
           }
         } catch {
@@ -92,21 +105,54 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
   override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
   else TextSocketSource.SCHEMA_REGULAR
 
-  /** Returns the maximum available offset for this source. */
   override def getOffset: Option[Offset] = synchronized {
-    if (lines.isEmpty) None else Some(LongOffset(lines.size - 1))
+    if (currentOffset.offset == -1) {
+      None
+    } else {
+      Some(currentOffset)
+    }
   }
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
   override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
-    val startIdx = start.map(_.asInstanceOf[LongOffset].offset.toInt + 
1).getOrElse(0)
-    val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1
-    val data = synchronized { lines.slice(startIdx, endIdx) }
+    val startOrdinal =
+      
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
+    val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
+
+    // Internal buffer only holds the batches after lastOffsetCommitted
+    val rawList = synchronized {
+      val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
+      val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
+      batches.slice(sliceStart, sliceEnd)
+    }
+
     import sqlContext.implicits._
+    val rawBatch = sqlContext.createDataset(rawList)
+
+    // Underlying MemoryStream has schema (String, Timestamp); strip out the 
timestamp
+    // if requested.
     if (includeTimestamp) {
-      data.toDF("value", "timestamp")
+      rawBatch.toDF("value", "timestamp")
+    } else {
+      // Strip out timestamp
+      rawBatch.select("_1").toDF("value")
+    }
+  }
+
+  override def commit(end: Offset): Unit = synchronized {
+    if (end.isInstanceOf[LongOffset]) {
+      val newOffset = end.asInstanceOf[LongOffset]
+      val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+
+      if (offsetDiff < 0) {
+        sys.error(s"Offsets committed out of order: $lastOffsetCommitted 
followed by $end")
+      }
+
+      batches.trimStart(offsetDiff)
+      lastOffsetCommitted = newOffset
     } else {
-      data.map(_._1).toDF("value")
+      sys.error(s"TextSocketStream.commit() received an offset ($end) that did 
not " +
+        s"originate with an instance of this class")
     }
   }
 
@@ -141,7 +187,7 @@ class TextSocketSourceProvider extends StreamSourceProvider 
with DataSourceRegis
       providerName: String,
       parameters: Map[String, String]): (String, StructType) = {
     logWarning("The socket source should not be used for production 
applications! " +
-      "It does not support recovery and stores state indefinitely.")
+      "It does not support recovery.")
     if (!parameters.contains("host")) {
       throw new AnalysisException("Set a host to read from with 
option(\"host\", ...).")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ea205e37/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 92020be..dad4104 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -252,8 +252,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
     val inputData = MemoryStream[Int]
     val mapped = inputData.toDS().map(6 / _)
 
-    // Run 3 batches, and then assert that only 1 metadata file is left at the 
end
-    // since the first 2 should have been purged.
+    // Run 3 batches, and then assert that only 2 metadata files is are at the 
end
+    // since the first should have been purged.
     testStream(mapped)(
       AddData(inputData, 1, 2),
       CheckAnswer(6, 3),
@@ -262,11 +262,11 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
       AddData(inputData, 4, 6),
       CheckAnswer(6, 3, 6, 3, 1, 1),
 
-      AssertOnQuery("metadata log should contain only one file") { q =>
+      AssertOnQuery("metadata log should contain only two files") { q =>
         val metadataLogDir = new 
java.io.File(q.offsetLog.metadataPath.toString)
         val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
         val toTest = logFileNames.filter(! _.endsWith(".crc"))  // Workaround 
for SPARK-17475
-        assert(toTest.size == 1 && toTest.head == "2")
+        assert(toTest.size == 2 && toTest.head == "1")
         true
       }
     )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to