This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new abf759a  [SPARK-29876][SS] Delete/archive file source completed files 
in separate thread
abf759a is described below

commit abf759a91e01497586b8bb6b7a314dd28fd6cff1
Author: Gabor Somogyi <gabor.g.somo...@gmail.com>
AuthorDate: Fri Jan 17 10:45:36 2020 -0800

    [SPARK-29876][SS] Delete/archive file source completed files in separate 
thread
    
    ### What changes were proposed in this pull request?
    [SPARK-20568](https://issues.apache.org/jira/browse/SPARK-20568) added the 
possibility to clean up completed files in streaming query. Deleting/archiving 
uses the main thread which can slow down processing. In this PR I've created 
thread pool to handle file delete/archival. The number of threads can be 
configured with `spark.sql.streaming.fileSource.cleaner.numThreads`.
    
    ### Why are the changes needed?
    Do file delete/archival in separate thread.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Existing unit tests.
    
    Closes #26502 from gaborgsomogyi/SPARK-29876.
    
    Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 docs/structured-streaming-programming-guide.md     |  5 +--
 .../org/apache/spark/sql/internal/SQLConf.scala    |  6 ++++
 .../sql/execution/streaming/FileStreamSource.scala | 40 +++++++++++++++++++---
 .../sql/streaming/FileStreamSourceSuite.scala      |  9 +++--
 4 files changed, 50 insertions(+), 10 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 306d688..429d456 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -551,9 +551,10 @@ Here are the details of all the sources in Spark.
         When "archive" is provided, additional option 
<code>sourceArchiveDir</code> must be provided as well. The value of 
"sourceArchiveDir" must not match with source pattern in depth (the number of 
directories from the root directory), where the depth is minimum of depth on 
both paths. This will ensure archived files are never included as new source 
files.<br/>
         For example, suppose you provide '/hello?/spark/*' as source pattern, 
'/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", 
as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. 
'/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as 
'/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be 
OK as it doesn't match.<br/>
         Spark will move source files respecting their own path. For example, 
if the path of source file is <code>/a/b/dataset.txt</code> and the path of 
archive directory is <code>/archived/here</code>, file will be moved to 
<code>/archived/here/a/b/dataset.txt</code>.<br/>
-        NOTE: Both archiving (via moving) or deleting completed files will 
introduce overhead (slow down) in each micro-batch, so you need to understand 
the cost for each operation in your file system before enabling this option. On 
the other hand, enabling this option will reduce the cost to list source files 
which can be an expensive operation.<br/>
+        NOTE: Both archiving (via moving) or deleting completed files will 
introduce overhead (slow down, even if it's happening in separate thread) in 
each micro-batch, so you need to understand the cost for each operation in your 
file system before enabling this option. On the other hand, enabling this 
option will reduce the cost to list source files which can be an expensive 
operation.<br/>
+        Number of threads used in completed file cleaner can be configured 
with<code>spark.sql.streaming.fileSource.cleaner.numThreads</code> (default: 
1).<br/>
         NOTE 2: The source path should not be used from multiple sources or 
queries when enabling this option. Similarly, you must ensure the source path 
doesn't match to any files in output directory of file stream sink.<br/>
-        NOTE 3: Both delete and move actions are best effort. Failing to 
delete or move files will not fail the streaming query.
+        NOTE 3: Both delete and move actions are best effort. Failing to 
delete or move files will not fail the streaming query. Spark may not clean up 
some source files in some circumstances - e.g. the application doesn't shut 
down gracefully, too many files are queued to clean up.
         <br/><br/>
         For file-format-specific options, see the related methods in 
<code>DataStreamReader</code>
         (<a 
href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a
 
href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a
 
href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>/<a
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 98a6551..279c79f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1301,6 +1301,12 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val FILE_SOURCE_CLEANER_NUM_THREADS =
+    buildConf("spark.sql.streaming.fileSource.cleaner.numThreads")
+      .doc("Number of threads used in the file source completed file cleaner.")
+      .intConf
+      .createWithDefault(1)
+
   val STREAMING_SCHEMA_INFERENCE =
     buildConf("spark.sql.streaming.schemaInference")
       .internal()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 39fb7f8..36f7002 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.net.URI
+import java.util.concurrent.ThreadPoolExecutor
 import java.util.concurrent.TimeUnit._
 
 import scala.util.control.NonFatal
@@ -30,7 +31,9 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.datasources.{DataSource, 
InMemoryFileIndex, LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
 
 /**
  * A very simple source that reads files from the given directory as they 
appear.
@@ -285,7 +288,7 @@ class FileStreamSource(
     }
   }
 
-  override def stop(): Unit = {}
+  override def stop(): Unit = sourceCleaner.foreach(_.stop())
 }
 
 
@@ -353,8 +356,35 @@ object FileStreamSource {
     def size: Int = map.size()
   }
 
-  private[sql] trait FileStreamSourceCleaner {
-    def clean(entry: FileEntry): Unit
+  private[sql] abstract class FileStreamSourceCleaner extends Logging {
+    private val cleanThreadPool: Option[ThreadPoolExecutor] = {
+      val numThreads = 
SQLConf.get.getConf(SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS)
+      if (numThreads > 0) {
+        logDebug(s"Cleaning file source on $numThreads separate thread(s)")
+        
Some(ThreadUtils.newDaemonCachedThreadPool("file-source-cleaner-threadpool", 
numThreads))
+      } else {
+        logDebug("Cleaning file source on main thread")
+        None
+      }
+    }
+
+    def stop(): Unit = cleanThreadPool.foreach(ThreadUtils.shutdown(_))
+
+    def clean(entry: FileEntry): Unit = {
+      cleanThreadPool match {
+        case Some(p) =>
+          p.submit(new Runnable {
+            override def run(): Unit = {
+              cleanTask(entry)
+            }
+          })
+
+        case None =>
+          cleanTask(entry)
+      }
+    }
+
+    protected def cleanTask(entry: FileEntry): Unit
   }
 
   private[sql] object FileStreamSourceCleaner {
@@ -448,7 +478,7 @@ object FileStreamSource {
       filters.toList
     }
 
-    override def clean(entry: FileEntry): Unit = {
+    override protected def cleanTask(entry: FileEntry): Unit = {
       val curPath = new Path(new URI(entry.path))
       val newPath = new Path(baseArchivePath.toString.stripSuffix("/") + 
curPath.toUri.getPath)
 
@@ -472,7 +502,7 @@ object FileStreamSource {
   private[sql] class SourceFileRemover(fileSystem: FileSystem)
     extends FileStreamSourceCleaner with Logging {
 
-    override def clean(entry: FileEntry): Unit = {
+    override protected def cleanTask(entry: FileEntry): Unit = {
       val curPath = new Path(new URI(entry.path))
       try {
         logDebug(s"Removing completed file $curPath")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 2eb875c..632e007 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1636,7 +1636,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       withSQLConf(
         SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
         // Force deleting the old logs
-        SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+        SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1",
+        SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS.key -> "0"
       ) {
         val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
           "cleanSource" -> "delete")
@@ -1680,7 +1681,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       withSQLConf(
         SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
         // Force deleting the old logs
-        SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+        SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1",
+        SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS.key -> "0"
       ) {
         val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
           "cleanSource" -> "archive", "sourceArchiveDir" -> 
archiveDir.getAbsolutePath)
@@ -1749,7 +1751,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         withSQLConf(
           SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
           // Force deleting the old logs
-          SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+          SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1",
+          SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS.key -> "0"
         ) {
           val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> 
"1",
             "cleanSource" -> cleanOption, "sourceArchiveDir" -> 
archiveDir.getAbsolutePath)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to