Repository: spark Updated Branches: refs/heads/branch-2.0 8d8e2332c -> 726f05716
[SPARK-17513][SQL] Make StreamExecution garbage-collect its metadata ## What changes were proposed in this pull request? This PR modifies StreamExecution such that it discards metadata for batches that have already been fully processed. I used the purge method that was added as part of SPARK-17235. This is a resubmission of 15126, which was based on work by frreiss in #15067, but fixed the test case along with some typos. ## How was this patch tested? A new test case in StreamingQuerySuite. The test case would fail without the changes in this pull request. Author: petermaxlee <petermax...@gmail.com> Closes #15166 from petermaxlee/SPARK-17513-2. (cherry picked from commit 976f3b1227c1a9e0b878e010531285fdba57b6a7) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/726f0571 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/726f0571 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/726f0571 Branch: refs/heads/branch-2.0 Commit: 726f05716b6c1c5021460483eedb0c8ca55d9276 Parents: 8d8e233 Author: petermaxlee <petermax...@gmail.com> Authored: Tue Sep 20 19:08:07 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Tue Sep 20 19:08:15 2016 -0700 ---------------------------------------------------------------------- .../sql/execution/streaming/MetadataLog.scala | 1 + .../execution/streaming/StreamExecution.scala | 7 ++++++ .../sql/streaming/StreamingQuerySuite.scala | 24 ++++++++++++++++++++ 3 files changed, 32 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/726f0571/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala index 78d6be1..9e2604c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala @@ -24,6 +24,7 @@ package org.apache.spark.sql.execution.streaming * - Allow the user to query the latest batch id. * - Allow the user to query the metadata object of a specified batch id. * - Allow the user to query metadata objects in a range of batch ids. + * - Allow the user to remove obsolete metadata */ trait MetadataLog[T] { http://git-wip-us.apache.org/repos/asf/spark/blob/726f0571/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5e1e5ee..b7587f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -290,6 +290,13 @@ class StreamExecution( assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") + + // Now that we have logged the new batch, no further processing will happen for + // the previous batch, and it is safe to discard the old metadata. + // Note that purge is exclusive, i.e. it purges everything before currentBatchId. + // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in + // flight at the same time), this cleanup logic will need to change. + offsetLog.purge(currentBatchId) } else { awaitBatchLock.lock() try { http://git-wip-us.apache.org/repos/asf/spark/blob/726f0571/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 9d58315..88f1f18 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } + testQuietly("StreamExecution metadata garbage collection") { + val inputData = MemoryStream[Int] + val mapped = inputData.toDS().map(6 / _) + + // Run 3 batches, and then assert that only 1 metadata file is left at the end + // since the first 2 should have been purged. + testStream(mapped)( + AddData(inputData, 1, 2), + CheckAnswer(6, 3), + AddData(inputData, 1, 2), + CheckAnswer(6, 3, 6, 3), + AddData(inputData, 4, 6), + CheckAnswer(6, 3, 6, 3, 1, 1), + + AssertOnQuery("metadata log should contain only one file") { q => + val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) + val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) + val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475 + assert(toTest.size == 1 && toTest.head == "2") + true + } + ) + } + /** * A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`. * --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org