Repository: spark
Updated Branches:
  refs/heads/master 89cd3845b -> 4913c92c2


[SPARK-19633][SS] FileSource read from FileSink

## What changes were proposed in this pull request?

Right now file source always uses `InMemoryFileIndex` to scan files from a 
given path.

But when reading the outputs from another streaming query, the file source 
should use `MetadataFileIndex` to list files from the sink log. This patch adds 
this support.

## `MetadataFileIndex` or `InMemoryFileIndex`
```scala
spark
  .readStream
  .format(...)
  .load("/some/path") // for a non-glob path:
                      //   - use `MetadataFileIndex` when 
`/some/path/_spark_meta` exists
                      //   - fall back to `InMemoryFileIndex` otherwise
```
```scala
spark
  .readStream
  .format(...)
  .load("/some/path/*/*") // for a glob path: always use `InMemoryFileIndex`
```

## How was this patch tested?

two newly added tests

Author: Liwei Lin <lwl...@gmail.com>

Closes #16987 from lw-lin/source-read-from-sink.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4913c92c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4913c92c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4913c92c

Branch: refs/heads/master
Commit: 4913c92c2fbfcc22b41afb8ce79687165392d7da
Parents: 89cd384
Author: Liwei Lin <lwl...@gmail.com>
Authored: Tue Feb 28 22:58:51 2017 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Feb 28 22:58:51 2017 -0800

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  |  26 +----
 .../execution/streaming/FileStreamSink.scala    |  27 ++++-
 .../execution/streaming/FileStreamSource.scala  |  63 +++++++++-
 .../sql/streaming/FileStreamSourceSuite.scala   | 117 +++++++++++++++++--
 .../apache/spark/sql/streaming/StreamTest.scala |   8 +-
 5 files changed, 200 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4913c92c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d510581..c1353d4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -280,28 +280,6 @@ case class DataSource(
   }
 
   /**
-   * Returns true if there is a single path that has a metadata log indicating 
which files should
-   * be read.
-   */
-  def hasMetadata(path: Seq[String]): Boolean = {
-    path match {
-      case Seq(singlePath) =>
-        try {
-          val hdfsPath = new Path(singlePath)
-          val fs = 
hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
-          val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
-          val res = fs.exists(metadataPath)
-          res
-        } catch {
-          case NonFatal(e) =>
-            logWarning(s"Error while looking for metadata directory.")
-            false
-        }
-      case _ => false
-    }
-  }
-
-  /**
    * Create a resolved [[BaseRelation]] that can be used to read data from or 
write data into this
    * [[DataSource]]
    *
@@ -331,7 +309,9 @@ case class DataSource(
       // We are reading from the results of a streaming query. Load files from 
the metadata log
       // instead of listing them using HDFS APIs.
       case (format: FileFormat, _)
-          if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
+          if FileStreamSink.hasMetadata(
+            caseInsensitiveOptions.get("path").toSeq ++ paths,
+            sparkSession.sessionState.newHadoopConf()) =>
         val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ 
paths).head)
         val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath)
         val dataSchema = userSpecifiedSchema.orElse {

http://git-wip-us.apache.org/repos/asf/spark/blob/4913c92c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 0dbe2a7..07ec4e9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
@@ -25,9 +28,31 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
FileFormatWriter}
 
-object FileStreamSink {
+object FileStreamSink extends Logging {
   // The name of the subdirectory that is used to store metadata about which 
files are valid.
   val metadataDir = "_spark_metadata"
+
+  /**
+   * Returns true if there is a single path that has a metadata log indicating 
which files should
+   * be read.
+   */
+  def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = {
+    path match {
+      case Seq(singlePath) =>
+        try {
+          val hdfsPath = new Path(singlePath)
+          val fs = hdfsPath.getFileSystem(hadoopConf)
+          val metadataPath = new Path(hdfsPath, metadataDir)
+          val res = fs.exists(metadataPath)
+          res
+        } catch {
+          case NonFatal(e) =>
+            logWarning(s"Error while looking for metadata directory.")
+            false
+        }
+      case _ => false
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/4913c92c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 39c0b49..6a7263c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming
 
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path}
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
@@ -43,8 +43,10 @@ class FileStreamSource(
 
   private val sourceOptions = new FileStreamOptions(options)
 
+  private val hadoopConf = sparkSession.sessionState.newHadoopConf()
+
   private val qualifiedBasePath: Path = {
-    val fs = new 
Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
+    val fs = new Path(path).getFileSystem(hadoopConf)
     fs.makeQualified(new Path(path))  // can contains glob patterns
   }
 
@@ -158,13 +160,64 @@ class FileStreamSource(
   }
 
   /**
+   * If the source has a metadata log indicating which files should be read, 
then we should use it.
+   * Only when user gives a non-glob path that will we figure out whether the 
source has some
+   * metadata log
+   *
+   * None        means we don't know at the moment
+   * Some(true)  means we know for sure the source DOES have metadata
+   * Some(false) means we know for sure the source DOSE NOT have metadata
+   */
+  @volatile private[sql] var sourceHasMetadata: Option[Boolean] =
+    if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None
+
+  private def allFilesUsingInMemoryFileIndex() = {
+    val globbedPaths = 
SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
+    val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, options, 
Some(new StructType))
+    fileIndex.allFiles()
+  }
+
+  private def allFilesUsingMetadataLogFileIndex() = {
+    // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is 
guaranteed to be a
+    // non-glob path
+    new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles()
+  }
+
+  /**
    * Returns a list of files found, sorted by their timestamp.
    */
   private def fetchAllFiles(): Seq[(String, Long)] = {
     val startTime = System.nanoTime
-    val globbedPaths = 
SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
-    val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, 
Some(new StructType))
-    val files = 
catalog.allFiles().sortBy(_.getModificationTime)(fileSortOrder).map { status =>
+
+    var allFiles: Seq[FileStatus] = null
+    sourceHasMetadata match {
+      case None =>
+        if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
+          sourceHasMetadata = Some(true)
+          allFiles = allFilesUsingMetadataLogFileIndex()
+        } else {
+          allFiles = allFilesUsingInMemoryFileIndex()
+          if (allFiles.isEmpty) {
+            // we still cannot decide
+          } else {
+            // decide what to use for future rounds
+            // double check whether source has metadata, preventing the 
extreme corner case that
+            // metadata log and data files are only generated after the 
previous
+            // `FileStreamSink.hasMetadata` check
+            if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
+              sourceHasMetadata = Some(true)
+              allFiles = allFilesUsingMetadataLogFileIndex()
+            } else {
+              sourceHasMetadata = Some(false)
+              // `allFiles` have already been fetched using InMemoryFileIndex 
in this round
+            }
+          }
+        }
+      case Some(true) => allFiles = allFilesUsingMetadataLogFileIndex()
+      case Some(false) => allFiles = allFilesUsingInMemoryFileIndex()
+    }
+
+    val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { 
status =>
       (status.getPath.toUri.toString, status.getModificationTime)
     }
     val endTime = System.nanoTime

http://git-wip-us.apache.org/repos/asf/spark/blob/4913c92c/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 5110d89..1586850 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -52,10 +52,7 @@ abstract class FileStreamSourceTest
         query.nonEmpty,
         "Cannot add data when there is no query for finding the active file 
stream source")
 
-      val sources = query.get.logicalPlan.collect {
-        case StreamingExecutionRelation(source, _) if 
source.isInstanceOf[FileStreamSource] =>
-          source.asInstanceOf[FileStreamSource]
-      }
+      val sources = getSourcesFromStreamingQuery(query.get)
       if (sources.isEmpty) {
         throw new Exception(
           "Could not find file source in the StreamExecution logical plan to 
add data to")
@@ -134,6 +131,14 @@ abstract class FileStreamSourceTest
       }.head
   }
 
+  protected def getSourcesFromStreamingQuery(query: StreamExecution): 
Seq[FileStreamSource] = {
+    query.logicalPlan.collect {
+      case StreamingExecutionRelation(source, _) if 
source.isInstanceOf[FileStreamSource] =>
+        source.asInstanceOf[FileStreamSource]
+    }
+  }
+
+
   protected def withTempDirs(body: (File, File) => Unit) {
     val src = Utils.createTempDir(namePrefix = "streaming.src")
     val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
@@ -388,9 +393,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         CheckAnswer("a", "b", "c", "d"),
 
         AssertOnQuery("seen files should contain only one entry") { 
streamExecution =>
-          val source = streamExecution.logicalPlan.collect { case e: 
StreamingExecutionRelation =>
-            e.source.asInstanceOf[FileStreamSource]
-          }.head
+          val source = getSourcesFromStreamingQuery(streamExecution).head
           assert(source.seenFiles.size == 1)
           true
         }
@@ -662,6 +665,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
+  test("read data from outputs of another streaming query") {
+    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+      withTempDirs { case (outputDir, checkpointDir) =>
+        // q1 is a streaming query that reads from memory and writes to text 
files
+        val q1Source = MemoryStream[String]
+        val q1 =
+          q1Source
+            .toDF()
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .format("text")
+            .start(outputDir.getCanonicalPath)
+
+        // q2 is a streaming query that reads q1's text outputs
+        val q2 =
+          createFileStream("text", outputDir.getCanonicalPath).filter($"value" 
contains "keep")
+
+        def q1AddData(data: String*): StreamAction =
+          Execute { _ =>
+            q1Source.addData(data)
+            q1.processAllAvailable()
+          }
+        def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+        testStream(q2)(
+          // batch 0
+          q1AddData("drop1", "keep2"),
+          q2ProcessAllAvailable(),
+          CheckAnswer("keep2"),
+
+          // batch 1
+          Assert {
+            // create a text file that won't be on q1's sink log
+            // thus even if its content contains "keep", it should NOT appear 
in q2's answer
+            val shouldNotKeep = new File(outputDir, "should_not_keep.txt")
+            stringToFile(shouldNotKeep, "should_not_keep!!!")
+            shouldNotKeep.exists()
+          },
+          q1AddData("keep3"),
+          q2ProcessAllAvailable(),
+          CheckAnswer("keep2", "keep3"),
+
+          // batch 2: check that things work well when the sink log gets 
compacted
+          q1AddData("keep4"),
+          Assert {
+            // compact interval is 3, so file "2.compact" should exist
+            new File(outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+          },
+          q2ProcessAllAvailable(),
+          CheckAnswer("keep2", "keep3", "keep4"),
+
+          Execute { _ => q1.stop() }
+        )
+      }
+    }
+  }
+
+  test("start before another streaming query, and read its output") {
+    withTempDirs { case (outputDir, checkpointDir) =>
+      // q1 is a streaming query that reads from memory and writes to text 
files
+      val q1Source = MemoryStream[String]
+      // define q1, but don't start it for now
+      val q1Write =
+        q1Source
+          .toDF()
+          .writeStream
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .format("text")
+      var q1: StreamingQuery = null
+
+      val q2 = createFileStream("text", 
outputDir.getCanonicalPath).filter($"value" contains "keep")
+
+      testStream(q2)(
+        AssertOnQuery { q2 =>
+          val fileSource = getSourcesFromStreamingQuery(q2).head
+          // q1 has not started yet, verify that q2 doesn't know whether q1 
has metadata
+          fileSource.sourceHasMetadata === None
+        },
+        Execute { _ =>
+          q1 = q1Write.start(outputDir.getCanonicalPath)
+          q1Source.addData("drop1", "keep2")
+          q1.processAllAvailable()
+        },
+        AssertOnQuery { q2 =>
+          q2.processAllAvailable()
+          val fileSource = getSourcesFromStreamingQuery(q2).head
+          // q1 has started, verify that q2 knows q1 has metadata by now
+          fileSource.sourceHasMetadata === Some(true)
+        },
+        CheckAnswer("keep2"),
+        Execute { _ => q1.stop() }
+      )
+    }
+  }
+
   test("when schema inference is turned on, should read partition data") {
     def createFile(content: String, src: File, tmp: File): Unit = {
       val tempFile = Utils.tempFileWith(new File(tmp, "text"))
@@ -755,10 +853,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         .streamingQuery
       q.processAllAvailable()
       val memorySink = q.sink.asInstanceOf[MemorySink]
-      val fileSource = q.logicalPlan.collect {
-        case StreamingExecutionRelation(source, _) if 
source.isInstanceOf[FileStreamSource] =>
-          source.asInstanceOf[FileStreamSource]
-      }.head
+      val fileSource = getSourcesFromStreamingQuery(q).head
 
       /** Check the data read in the last batch */
       def checkLastBatchData(data: Int*): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4913c92c/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index af2f31a..60e2375 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -208,6 +208,12 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
     }
   }
 
+  /** Execute arbitrary code */
+  object Execute {
+    def apply(func: StreamExecution => Any): AssertOnQuery =
+      AssertOnQuery(query => { func(query); true })
+  }
+
   class StreamManualClock(time: Long = 0L) extends ManualClock(time) with 
Serializable {
     private var waitStartTime: Option[Long] = None
 
@@ -472,7 +478,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
 
           case a: AssertOnQuery =>
             verify(currentStream != null || lastStream != null,
-              "cannot assert when not stream has been started")
+              "cannot assert when no stream has been started")
             val streamToAssert = Option(currentStream).getOrElse(lastStream)
             verify(a.condition(streamToAssert), s"Assert on query failed: 
${a.message}")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to