This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 701b06a [SPARK-26389][SS] Add force delete temp checkpoint configuration 701b06a is described below commit 701b06a7e2e76e5d9ed020c62e0ed3464fa2818b Author: Gabor Somogyi <gabor.g.somo...@gmail.com> AuthorDate: Fri Feb 8 10:22:51 2019 -0800 [SPARK-26389][SS] Add force delete temp checkpoint configuration ## What changes were proposed in this pull request? Not all users wants to keep temporary checkpoint directories. Additionally hard to restore from it. In this PR I've added a force delete flag which is default `false`. Additionally not clear for users when temporary checkpoint directory deleted so added log messages to explain this a bit more. ## How was this patch tested? Existing + additional unit tests. Closes #23732 from gaborgsomogyi/SPARK-26389. Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 6 ++++ .../sql/execution/streaming/StreamExecution.scala | 11 ++++++-- .../sql/streaming/StreamingQueryManager.scala | 8 ++++-- .../test/DataStreamReaderWriterSuite.scala | 32 +++++++++++++++++++++- 4 files changed, 51 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 11e1a5e..d285e00 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -907,6 +907,12 @@ object SQLConf { .stringConf .createOptional + val FORCE_DELETE_TEMP_CHECKPOINT_LOCATION = + buildConf("spark.sql.streaming.forceDeleteTempCheckpointLocation") + .doc("When true, enable temporary checkpoint locations force delete.") + .booleanConf + .createWithDefault(false) + val MIN_BATCHES_TO_RETAIN = buildConf("spark.sql.streaming.minBatchesToRetain") .internal() .doc("The minimum number of batches that must be retained and made recoverable.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 90f7b47..dc9ed80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -55,7 +55,8 @@ case object RECONFIGURING extends State * and the results are committed transactionally to the given [[Sink]]. * * @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without - * errors + * errors. Checkpoint deletion can be forced with the appropriate + * Spark configuration. */ abstract class StreamExecution( override val sparkSession: SparkSession, @@ -92,6 +93,7 @@ abstract class StreamExecution( fs.mkdirs(checkpointPath) checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString } + logInfo(s"Checkpoint root $checkpointRoot resolved to $resolvedCheckpointRoot.") def logicalPlan: LogicalPlan @@ -335,10 +337,13 @@ abstract class StreamExecution( postEvent( new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString))) - // Delete the temp checkpoint only when the query didn't fail - if (deleteCheckpointOnStop && exception.isEmpty) { + // Delete the temp checkpoint when either force delete enabled or the query didn't fail + if (deleteCheckpointOnStop && + (sparkSession.sessionState.conf + .getConf(SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION) || exception.isEmpty)) { val checkpointPath = new Path(resolvedCheckpointRoot) try { + logInfo(s"Deleting checkpoint $checkpointPath.") val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) fs.delete(checkpointPath, true) } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 881cd96..cb9ca4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -221,9 +221,13 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo } }.getOrElse { if (useTempCheckpointLocation) { - // Delete the temp checkpoint when a query is being stopped without errors. deleteCheckpointOnStop = true - Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath + val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath + logWarning("Temporary checkpoint location created which is deleted normally when" + + s" the query didn't fail: $tempDir. If it's required to delete it under any" + + s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" + + s" true. Important to know deleting temp checkpoint folder is best effort.") + tempDir } else { throw new AnalysisException( "checkpointLocation must be specified either " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 74ea0bf..c3c7dcb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -614,6 +614,21 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { } } + test("configured checkpoint dir should not be deleted if a query is stopped without errors and" + + " force temp checkpoint deletion enabled") { + import testImplicits._ + withTempDir { checkpointPath => + withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath, + SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key -> "true") { + val ds = MemoryStream[Int].toDS + val query = ds.writeStream.format("console").start() + assert(checkpointPath.exists()) + query.stop() + assert(checkpointPath.exists()) + } + } + } + test("temp checkpoint dir should be deleted if a query is stopped without errors") { import testImplicits._ val query = MemoryStream[Int].toDS.writeStream.format("console").start() @@ -627,6 +642,17 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { } testQuietly("temp checkpoint dir should not be deleted if a query is stopped with an error") { + testTempCheckpointWithFailedQuery(false) + } + + testQuietly("temp checkpoint should be deleted if a query is stopped with an error and force" + + " temp checkpoint deletion enabled") { + withSQLConf(SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key -> "true") { + testTempCheckpointWithFailedQuery(true) + } + } + + private def testTempCheckpointWithFailedQuery(checkpointMustBeDeleted: Boolean): Unit = { import testImplicits._ val input = MemoryStream[Int] val query = input.toDS.map(_ / 0).writeStream.format("console").start() @@ -638,7 +664,11 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { intercept[StreamingQueryException] { query.awaitTermination() } - assert(fs.exists(checkpointDir)) + if (!checkpointMustBeDeleted) { + assert(fs.exists(checkpointDir)) + } else { + assert(!fs.exists(checkpointDir)) + } } test("SPARK-20431: Specify a schema by using a DDL-formatted string") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org