Repository: spark
Updated Branches:
  refs/heads/branch-2.0 52feb3fbf -> dfdfc3092


[SPARK-17235][SQL] Support purging of old logs in MetadataLog

## What changes were proposed in this pull request?
This patch adds a purge interface to MetadataLog, and an implementation in 
HDFSMetadataLog. The purge function is currently unused, but I will use it to 
purge old execution and file source logs in follow-up patches. These changes 
are required in a production structured streaming job that runs for a long 
period of time.

## How was this patch tested?
Added a unit test case in HDFSMetadataLogSuite.

Author: petermaxlee <petermax...@gmail.com>

Closes #14802 from petermaxlee/SPARK-17235.

(cherry picked from commit f64a1ddd09a34d5d867ccbaba46204d75fad038d)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dfdfc309
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dfdfc309
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dfdfc309

Branch: refs/heads/branch-2.0
Commit: dfdfc3092d1b6942eb9092e28e15fa4efb6ac084
Parents: 52feb3f
Author: petermaxlee <petermax...@gmail.com>
Authored: Fri Aug 26 16:05:34 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Fri Aug 26 16:05:40 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/HDFSMetadataLog.scala   | 14 ++++++++++
 .../sql/execution/streaming/MetadataLog.scala   |  6 +++++
 .../streaming/HDFSMetadataLogSuite.scala        | 27 +++++++++++++++++---
 3 files changed, 43 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dfdfc309/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 2b6f76c..127ece9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -227,6 +227,20 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: 
SparkSession, path: String)
     None
   }
 
+  /**
+   * Removes all the log entry earlier than thresholdBatchId (exclusive).
+   */
+  override def purge(thresholdBatchId: Long): Unit = {
+    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+      .map(f => pathToBatchId(f.getPath))
+
+    for (batchId <- batchIds if batchId < thresholdBatchId) {
+      val path = batchIdToPath(batchId)
+      fileManager.delete(path)
+      logTrace(s"Removed metadata log file: $path")
+    }
+  }
+
   private def createFileManager(): FileManager = {
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/dfdfc309/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
index cc70e1d..78d6be1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
@@ -48,4 +48,10 @@ trait MetadataLog[T] {
    * Return the latest batch Id and its metadata if exist.
    */
   def getLatest(): Option[(Long, T)]
+
+  /**
+   * Removes all the log entry earlier than thresholdBatchId (exclusive).
+   * This operation should be idempotent.
+   */
+  def purge(thresholdBatchId: Long): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dfdfc309/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index ab5a2d2..4259384 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -46,14 +46,14 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
   test("FileManager: FileContextManager") {
     withTempDir { temp =>
       val path = new Path(temp.getAbsolutePath)
-      testManager(path, new FileContextManager(path, new Configuration))
+      testFileManager(path, new FileContextManager(path, new Configuration))
     }
   }
 
   test("FileManager: FileSystemManager") {
     withTempDir { temp =>
       val path = new Path(temp.getAbsolutePath)
-      testManager(path, new FileSystemManager(path, new Configuration))
+      testFileManager(path, new FileSystemManager(path, new Configuration))
     }
   }
 
@@ -103,6 +103,25 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
     }
   }
 
+  testWithUninterruptibleThread("HDFSMetadataLog: purge") {
+    withTempDir { temp =>
+      val metadataLog = new HDFSMetadataLog[String](spark, 
temp.getAbsolutePath)
+      assert(metadataLog.add(0, "batch0"))
+      assert(metadataLog.add(1, "batch1"))
+      assert(metadataLog.add(2, "batch2"))
+      assert(metadataLog.get(0).isDefined)
+      assert(metadataLog.get(1).isDefined)
+      assert(metadataLog.get(2).isDefined)
+      assert(metadataLog.getLatest().get._1 == 2)
+
+      metadataLog.purge(2)
+      assert(metadataLog.get(0).isEmpty)
+      assert(metadataLog.get(1).isEmpty)
+      assert(metadataLog.get(2).isDefined)
+      assert(metadataLog.getLatest().get._1 == 2)
+    }
+  }
+
   testWithUninterruptibleThread("HDFSMetadataLog: restart") {
     withTempDir { temp =>
       val metadataLog = new HDFSMetadataLog[String](spark, 
temp.getAbsolutePath)
@@ -155,8 +174,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
     }
   }
 
-
-  def testManager(basePath: Path, fm: FileManager): Unit = {
+  /** Basic test case for [[FileManager]] implementation. */
+  private def testFileManager(basePath: Path, fm: FileManager): Unit = {
     // Mkdirs
     val dir = new Path(s"$basePath/dir/subdir/subsubdir")
     assert(!fm.exists(dir))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to