Repository: spark
Updated Branches:
  refs/heads/master 261c55dd8 -> 9812f7d53


[SPARK-17165][SQL] FileStreamSource should not track the list of seen files 
indefinitely

## What changes were proposed in this pull request?
Before this change, FileStreamSource uses an in-memory hash set to track the 
list of files processed by the engine. The list can grow indefinitely, leading 
to OOM or overflow of the hash set.

This patch introduces a new user-defined option called "maxFileAge", default to 
24 hours. If a file is older than this age, FileStreamSource will purge it from 
the in-memory map that was used to track the list of files that have been 
processed.

## How was this patch tested?
Added unit tests for the underlying utility, and also added an end-to-end test 
to validate the purge in FileStreamSourceSuite. Also verified the new test 
cases would fail when the timeout was set to a very large number.

Author: petermaxlee <petermax...@gmail.com>

Closes #14728 from petermaxlee/SPARK-17165.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9812f7d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9812f7d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9812f7d5

Branch: refs/heads/master
Commit: 9812f7d5381f7cd8112fd30c7e45ae4f0eab6e88
Parents: 261c55d
Author: petermaxlee <petermax...@gmail.com>
Authored: Fri Aug 26 11:30:23 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri Aug 26 11:30:23 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/FileStreamOptions.scala |  54 +++++++
 .../execution/streaming/FileStreamSource.scala  | 149 +++++++++++++++----
 .../execution/streaming/HDFSMetadataLog.scala   |   2 +-
 .../streaming/FileStreamSourceSuite.scala       |  76 ++++++++++
 .../sql/streaming/FileStreamSourceSuite.scala   |  40 ++++-
 5 files changed, 285 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9812f7d5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
new file mode 100644
index 0000000..3efc20c
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.Try
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
+import org.apache.spark.util.Utils
+
+/**
+ * User specified options for file streams.
+ */
+class FileStreamOptions(parameters: Map[String, String]) extends Logging {
+
+  val maxFilesPerTrigger: Option[Int] = 
parameters.get("maxFilesPerTrigger").map { str =>
+    Try(str.toInt).toOption.filter(_ > 0).getOrElse {
+      throw new IllegalArgumentException(
+        s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a 
positive integer")
+    }
+  }
+
+  /**
+   * Maximum age of a file that can be found in this directory, before it is 
deleted.
+   *
+   * The max age is specified with respect to the timestamp of the latest 
file, and not the
+   * timestamp of the current system. That this means if the last file has 
timestamp 1000, and the
+   * current system time is 2000, and max age is 200, the system will purge 
files older than
+   * 800 (rather than 1800) from the internal state.
+   *
+   * Default to a week.
+   */
+  val maxFileAgeMs: Long =
+    Utils.timeStringAsMs(parameters.getOrElse("maxFileAge", "7d"))
+
+  /** Options as specified by the user, in a case-insensitive map, without 
"path" set. */
+  val optionMapWithoutPath: Map[String, String] =
+    new CaseInsensitiveMap(parameters).filterKeys(_ != "path")
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9812f7d5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 0cfad65..e8b969b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -17,21 +17,20 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import scala.util.Try
+import scala.collection.JavaConverters._
 
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, 
DataSource, ListingFileCatalog, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{DataSource, 
ListingFileCatalog, LogicalRelation}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.collection.OpenHashSet
 
 /**
- * A very simple source that reads text files from the given directory as they 
appear.
+ * A very simple source that reads files from the given directory as they 
appear.
  *
- * TODO Clean up the metadata files periodically
+ * TODO: Clean up the metadata log files periodically.
  */
 class FileStreamSource(
     sparkSession: SparkSession,
@@ -41,19 +40,34 @@ class FileStreamSource(
     metadataPath: String,
     options: Map[String, String]) extends Source with Logging {
 
-  private val fs = new 
Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
-  private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can 
contains glob patterns
-  private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, 
metadataPath)
+  import FileStreamSource._
+
+  private val sourceOptions = new FileStreamOptions(options)
+
+  private val qualifiedBasePath: Path = {
+    val fs = new 
Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
+    fs.makeQualified(new Path(path))  // can contains glob patterns
+  }
+
+  private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, 
metadataPath)
+
   private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
 
   /** Maximum number of new files to be considered in each batch */
-  private val maxFilesPerBatch = getMaxFilesPerBatch()
+  private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger
+
+  /** A mapping from a file that we have processed to some timestamp it was 
last modified. */
+  // Visible for testing and debugging in production.
+  val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
 
-  private val seenFiles = new OpenHashSet[String]
-  metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) =>
-    files.foreach(seenFiles.add)
+  metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, entry) =>
+    entry.foreach(seenFiles.add)
+    // TODO: move purge call out of the loop once we truncate logs.
+    seenFiles.purge()
   }
 
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = 
${sourceOptions.maxFileAgeMs}")
+
   /**
    * Returns the maximum offset that can be retrieved from the source.
    *
@@ -61,16 +75,27 @@ class FileStreamSource(
    * there is no race here, so the cost of `synchronized` should be rare.
    */
   private def fetchMaxOffset(): LongOffset = synchronized {
-    val newFiles = fetchAllFiles().filter(!seenFiles.contains(_))
+    // All the new files found - ignore aged files and files that we have seen.
+    val newFiles = fetchAllFiles().filter(seenFiles.isNewFile)
+
+    // Obey user's setting to limit the number of files in this batch trigger.
     val batchFiles =
       if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else 
newFiles
+
     batchFiles.foreach { file =>
       seenFiles.add(file)
       logDebug(s"New file: $file")
     }
-    logTrace(s"Number of new files = ${newFiles.size})")
-    logTrace(s"Number of files selected for batch = ${batchFiles.size}")
-    logTrace(s"Number of seen files = ${seenFiles.size}")
+    val numPurged = seenFiles.purge()
+
+    logTrace(
+      s"""
+         |Number of new files = ${newFiles.size}
+         |Number of files selected for batch = ${batchFiles.size}
+         |Number of seen files = ${seenFiles.size}
+         |Number of files purged from tracking map = $numPurged
+       """.stripMargin)
+
     if (batchFiles.nonEmpty) {
       maxBatchId += 1
       metadataLog.add(maxBatchId, batchFiles)
@@ -104,22 +129,26 @@ class FileStreamSource(
     val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
     logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
     logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
-    val newOptions = new CaseInsensitiveMap(options).filterKeys(_ != "path")
     val newDataSource =
       DataSource(
         sparkSession,
-        paths = files,
+        paths = files.map(_.path),
         userSpecifiedSchema = Some(schema),
         className = fileFormatClassName,
-        options = newOptions)
+        options = sourceOptions.optionMapWithoutPath)
     Dataset.ofRows(sparkSession, 
LogicalRelation(newDataSource.resolveRelation()))
   }
 
-  private def fetchAllFiles(): Seq[String] = {
+  /**
+   * Returns a list of files found, sorted by their timestamp.
+   */
+  private def fetchAllFiles(): Seq[FileEntry] = {
     val startTime = System.nanoTime
     val globbedPaths = 
SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
     val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, 
Some(new StructType))
-    val files = 
catalog.allFiles().sortBy(_.getModificationTime).map(_.getPath.toUri.toString)
+    val files = catalog.allFiles().sortBy(_.getModificationTime).map { status 
=>
+      FileEntry(status.getPath.toUri.toString, status.getModificationTime)
+    }
     val endTime = System.nanoTime
     val listingTimeMs = (endTime.toDouble - startTime) / 1000000
     if (listingTimeMs > 2000) {
@@ -132,20 +161,76 @@ class FileStreamSource(
     files
   }
 
-  private def getMaxFilesPerBatch(): Option[Int] = {
-    new CaseInsensitiveMap(options)
-      .get("maxFilesPerTrigger")
-      .map { str =>
-        Try(str.toInt).toOption.filter(_ > 0).getOrElse {
-          throw new IllegalArgumentException(
-            s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a 
positive integer")
-        }
-      }
-  }
-
   override def getOffset: Option[Offset] = 
Some(fetchMaxOffset()).filterNot(_.offset == -1)
 
   override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
 
   override def stop() {}
 }
+
+
+object FileStreamSource {
+
+  /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
+  type Timestamp = Long
+
+  case class FileEntry(path: String, timestamp: Timestamp) extends Serializable
+
+  /**
+   * A custom hash map used to track the list of files seen. This map is not 
thread-safe.
+   *
+   * To prevent the hash map from growing indefinitely, a purge function is 
available to
+   * remove files "maxAgeMs" older than the latest file.
+   */
+  class SeenFilesMap(maxAgeMs: Long) {
+    require(maxAgeMs >= 0)
+
+    /** Mapping from file to its timestamp. */
+    private val map = new java.util.HashMap[String, Timestamp]
+
+    /** Timestamp of the latest file. */
+    private var latestTimestamp: Timestamp = 0L
+
+    /** Timestamp for the last purge operation. */
+    private var lastPurgeTimestamp: Timestamp = 0L
+
+    /** Add a new file to the map. */
+    def add(file: FileEntry): Unit = {
+      map.put(file.path, file.timestamp)
+      if (file.timestamp > latestTimestamp) {
+        latestTimestamp = file.timestamp
+      }
+    }
+
+    /**
+     * Returns true if we should consider this file a new file. The file is 
only considered "new"
+     * if it is new enough that we are still tracking, and we have not seen it 
before.
+     */
+    def isNewFile(file: FileEntry): Boolean = {
+      // Note that we are testing against lastPurgeTimestamp here so we'd 
never miss a file that
+      // is older than (latestTimestamp - maxAgeMs) but has not been purged 
yet.
+      file.timestamp >= lastPurgeTimestamp && !map.containsKey(file.path)
+    }
+
+    /** Removes aged entries and returns the number of files removed. */
+    def purge(): Int = {
+      lastPurgeTimestamp = latestTimestamp - maxAgeMs
+      val iter = map.entrySet().iterator()
+      var count = 0
+      while (iter.hasNext) {
+        val entry = iter.next()
+        if (entry.getValue < lastPurgeTimestamp) {
+          count += 1
+          iter.remove()
+        }
+      }
+      count
+    }
+
+    def size: Int = map.size()
+
+    def allEntries: Seq[FileEntry] = {
+      map.entrySet().asScala.map(entry => FileEntry(entry.getKey, 
entry.getValue)).toSeq
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9812f7d5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 698f07b..2b6f76c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -180,7 +180,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: 
SparkSession, path: String)
   private def isFileAlreadyExistsException(e: IOException): Boolean = {
     e.isInstanceOf[FileAlreadyExistsException] ||
       // Old Hadoop versions don't throw FileAlreadyExistsException. Although 
it's fixed in
-      // HADOOP-9361, we still need to support old Hadoop versions.
+      // HADOOP-9361 in Hadoop 2.5, we still need to support old Hadoop 
versions.
       (e.getMessage != null && e.getMessage.startsWith("File already exists: 
"))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9812f7d5/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
new file mode 100644
index 0000000..c6db2fd
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.SparkFunSuite
+
+class FileStreamSourceSuite extends SparkFunSuite {
+
+  import FileStreamSource._
+
+  test("SeenFilesMap") {
+    val map = new SeenFilesMap(maxAgeMs = 10)
+
+    map.add(FileEntry("a", 5))
+    assert(map.size == 1)
+    map.purge()
+    assert(map.size == 1)
+
+    // Add a new entry and purge should be no-op, since the gap is exactly 10 
ms.
+    map.add(FileEntry("b", 15))
+    assert(map.size == 2)
+    map.purge()
+    assert(map.size == 2)
+
+    // Add a new entry that's more than 10 ms than the first entry. We should 
be able to purge now.
+    map.add(FileEntry("c", 16))
+    assert(map.size == 3)
+    map.purge()
+    assert(map.size == 2)
+
+    // Override existing entry shouldn't change the size
+    map.add(FileEntry("c", 25))
+    assert(map.size == 2)
+
+    // Not a new file because we have seen c before
+    assert(!map.isNewFile(FileEntry("c", 20)))
+
+    // Not a new file because timestamp is too old
+    assert(!map.isNewFile(FileEntry("d", 5)))
+
+    // Finally a new file: never seen and not too old
+    assert(map.isNewFile(FileEntry("e", 20)))
+  }
+
+  test("SeenFilesMap should only consider a file old if it is earlier than 
last purge time") {
+    val map = new SeenFilesMap(maxAgeMs = 10)
+
+    map.add(FileEntry("a", 20))
+    assert(map.size == 1)
+
+    // Timestamp 5 should still considered a new file because purge time 
should be 0
+    assert(map.isNewFile(FileEntry("b", 9)))
+    assert(map.isNewFile(FileEntry("b", 10)))
+
+    // Once purge, purge time should be 10 and then b would be a old file if 
it is less than 10.
+    map.purge()
+    assert(!map.isNewFile(FileEntry("b", 9)))
+    assert(map.isNewFile(FileEntry("b", 10)))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9812f7d5/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 47260a2..03222b4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -104,12 +104,13 @@ class FileStreamSourceTest extends StreamTest with 
SharedSQLContext {
   def createFileStream(
       format: String,
       path: String,
-      schema: Option[StructType] = None): DataFrame = {
+      schema: Option[StructType] = None,
+      options: Map[String, String] = Map.empty): DataFrame = {
     val reader =
       if (schema.isDefined) {
-        spark.readStream.format(format).schema(schema.get)
+        spark.readStream.format(format).schema(schema.get).options(options)
       } else {
-        spark.readStream.format(format)
+        spark.readStream.format(format).options(options)
       }
     reader.load(path)
   }
@@ -331,6 +332,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
+  test("SPARK-17165 should not track the list of seen files indefinitely") {
+    // This test works by:
+    // 1. Create a file
+    // 2. Get it processed
+    // 3. Sleeps for a very short amount of time (larger than maxFileAge
+    // 4. Add another file (at this point the original file should have been 
purged
+    // 5. Test the size of the seenFiles internal data structure
+
+    // Note that if we change maxFileAge to a very large number, the last step 
should fail.
+    withTempDirs { case (src, tmp) =>
+      val textStream: DataFrame =
+        createFileStream("text", src.getCanonicalPath, options = 
Map("maxFileAge" -> "5ms"))
+
+      testStream(textStream)(
+        AddTextFileData("a\nb", src, tmp),
+        CheckAnswer("a", "b"),
+
+        // SLeeps longer than 5ms (maxFileAge)
+        AssertOnQuery { _ => Thread.sleep(10); true },
+
+        AddTextFileData("c\nd", src, tmp),
+        CheckAnswer("a", "b", "c", "d"),
+
+        AssertOnQuery("seen files should contain only one entry") { 
streamExecution =>
+          val source = streamExecution.logicalPlan.collect { case e: 
StreamingExecutionRelation =>
+            e.source.asInstanceOf[FileStreamSource]
+          }.head
+          source.seenFiles.size == 1
+        }
+      )
+    }
+  }
+
   // =============== JSON file stream tests ================
 
   test("read from json files") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to