Repository: spark
Updated Branches:
  refs/heads/master 4f7292c87 -> 68a6dc974


[SPARK-18826][SS] Add 'latestFirst' option to FileStreamSource

## What changes were proposed in this pull request?

When starting a stream with a lot of backfill and maxFilesPerTrigger, the user 
could often want to start with most recent files first. This would let you keep 
low latency for recent data and slowly backfill historical data.

This PR adds a new option `latestFirst` to control this behavior. When it's 
true, `FileStreamSource` will sort the files by the modified time from latest 
to oldest, and take the first `maxFilesPerTrigger` files as a new batch.

## How was this patch tested?

The added test.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #16251 from zsxwing/newest-first.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68a6dc97
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68a6dc97
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68a6dc97

Branch: refs/heads/master
Commit: 68a6dc974b25e6eddef109f6fd23ae4e9775ceca
Parents: 4f7292c
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Dec 15 13:17:51 2016 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Dec 15 13:17:51 2016 -0800

----------------------------------------------------------------------
 .../execution/streaming/FileStreamOptions.scala | 14 ++++++
 .../execution/streaming/FileStreamSource.scala  | 11 ++++-
 .../sql/streaming/FileStreamSourceSuite.scala   | 47 ++++++++++++++++++++
 3 files changed, 71 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/68a6dc97/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index fdea65c..25ebe17 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -53,4 +53,18 @@ class FileStreamOptions(parameters: CaseInsensitiveMap) 
extends Logging {
   /** Options as specified by the user, in a case-insensitive map, without 
"path" set. */
   val optionMapWithoutPath: Map[String, String] =
     parameters.filterKeys(_ != "path")
+
+  /**
+   * Whether to scan latest files first. If it's true, when the source finds 
unprocessed files in a
+   * trigger, it will first process the latest files.
+   */
+  val latestFirst: Boolean = parameters.get("latestFirst").map { str =>
+    try {
+      str.toBoolean
+    } catch {
+      case _: IllegalArgumentException =>
+        throw new IllegalArgumentException(
+          s"Invalid value '$str' for option 'latestFirst', must be 'true' or 
'false'")
+    }
+  }.getOrElse(false)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/68a6dc97/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 20e0dce..39c0b49 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -62,6 +62,15 @@ class FileStreamSource(
   /** Maximum number of new files to be considered in each batch */
   private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger
 
+  private val fileSortOrder = if (sourceOptions.latestFirst) {
+      logWarning(
+        """'latestFirst' is true. New files will be processed first.
+          |It may affect the watermark value""".stripMargin)
+      implicitly[Ordering[Long]].reverse
+    } else {
+      implicitly[Ordering[Long]]
+    }
+
   /** A mapping from a file that we have processed to some timestamp it was 
last modified. */
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
@@ -155,7 +164,7 @@ class FileStreamSource(
     val startTime = System.nanoTime
     val globbedPaths = 
SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
     val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, 
Some(new StructType))
-    val files = catalog.allFiles().sortBy(_.getModificationTime).map { status 
=>
+    val files = 
catalog.allFiles().sortBy(_.getModificationTime)(fileSortOrder).map { status =>
       (status.getPath.toUri.toString, status.getModificationTime)
     }
     val endTime = System.nanoTime

http://git-wip-us.apache.org/repos/asf/spark/blob/68a6dc97/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index bcb6852..b96ccb4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming
 import java.io.File
 
 import org.scalatest.PrivateMethodTester
+import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.sql._
@@ -1059,6 +1060,52 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
{
     val str = 
Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
     SerializedOffset(str.trim)
   }
+
+  test("FileStreamSource - latestFirst") {
+    withTempDir { src =>
+      // Prepare two files: 1.txt, 2.txt, and make sure they have different 
modified time.
+      val f1 = stringToFile(new File(src, "1.txt"), "1")
+      val f2 = stringToFile(new File(src, "2.txt"), "2")
+      f2.setLastModified(f1.lastModified + 1000)
+
+      def runTwoBatchesAndVerifyResults(
+          latestFirst: Boolean,
+          firstBatch: String,
+          secondBatch: String): Unit = {
+        val fileStream = createFileStream(
+          "text",
+          src.getCanonicalPath,
+          options = Map("latestFirst" -> latestFirst.toString, 
"maxFilesPerTrigger" -> "1"))
+        val clock = new StreamManualClock()
+        testStream(fileStream)(
+          StartStream(trigger = ProcessingTime(10), triggerClock = clock),
+          AssertOnQuery { _ =>
+            // Block until the first batch finishes.
+            eventually(timeout(streamingTimeout)) {
+              assert(clock.isStreamWaitingAt(0))
+            }
+            true
+          },
+          CheckLastBatch(firstBatch),
+          AdvanceManualClock(10),
+          AssertOnQuery { _ =>
+            // Block until the second batch finishes.
+            eventually(timeout(streamingTimeout)) {
+              assert(clock.isStreamWaitingAt(10))
+            }
+            true
+          },
+          CheckLastBatch(secondBatch)
+        )
+      }
+
+      // Read oldest files first, so the first batch is "1", and the second 
batch is "2".
+      runTwoBatchesAndVerifyResults(latestFirst = false, firstBatch = "1", 
secondBatch = "2")
+
+      // Read latest files first, so the first batch is "2", and the second 
batch is "1".
+      runTwoBatchesAndVerifyResults(latestFirst = true, firstBatch = "2", 
secondBatch = "1")
+    }
+  }
 }
 
 class FileStreamSourceStressTestSuite extends FileStreamSourceTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to