This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 77b99af [SPARK-26824][SS] Fix the checkpoint location and _spark_metadata when it contains special chars 77b99af is described below commit 77b99af57330cf2e5016a6acc69642d54041b041 Author: Shixiong Zhu <zsxw...@gmail.com> AuthorDate: Wed Feb 20 15:44:20 2019 -0800 [SPARK-26824][SS] Fix the checkpoint location and _spark_metadata when it contains special chars ## What changes were proposed in this pull request? When a user specifies a checkpoint location or a file sink output using a path containing special chars that need to be escaped in a path, the streaming query will store checkpoint and file sink metadata in a wrong place. In this PR, I uploaded a checkpoint that was generated by the following codes using Spark 2.4.0 to show this issue: ``` implicit val s = spark.sqlContext val input = org.apache.spark.sql.execution.streaming.MemoryStream[Int] input.addData(1, 2, 3) val q = input.toDF.writeStream.format("parquet").option("checkpointLocation", ".../chk %#chk").start(".../output %#output") q.stop() ``` Here is the structure of the directory: ``` sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0 ├── chk%252520%252525%252523chk │ ├── commits │ │ └── 0 │ ├── metadata │ └── offsets │ └── 0 ├── output %#output │ └── part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet └── output%20%25%23output └── _spark_metadata └── 0 ``` In this checkpoint, the user specified checkpoint location is `.../chk %#chk` but the real path to store the checkpoint is `.../chk%252520%252525%252523chk` (this is generated by escaping the original path three times). The user specified output path is `.../output %#output` but the path to store `_spark_metadata` is `.../output%20%25%23output/_spark_metadata` (this is generated by escaping the original path once). The data files are still in the correct path (such as `.../output %#ou [...] This checkpoint will be used in unit tests in this PR. The fix is just simply removing improper `Path.toUri` calls to fix the issue. However, as the user may not read the release note and is not aware of this checkpoint location change, if they upgrade Spark without moving checkpoint to the new location, their query will just start from the scratch. In order to not surprise the users, this PR also adds a check to **detect the impacted paths and throws an error** to include the migration guide. This check can be turned off by an internal sql conf `spark.sql.streaming.checkpoint.escapedPathCheck.enabled`. Here are ex [...] - Streaming checkpoint error: ``` Error: we detected a possible problem with the location of your checkpoint and you likely need to move it before restarting this query. Earlier version of Spark incorrectly escaped paths when writing out checkpoints for structured streaming. While this was corrected in Spark 3.0, it appears that your query was started using an earlier version that incorrectly handled the checkpoint path. Correct Checkpoint Directory: /.../chk %#chk Incorrect Checkpoint Directory: /.../chk%252520%252525%252523chk Please move the data from the incorrect directory to the correct one, delete the incorrect directory, and then restart this query. If you believe you are receiving this message in error, you can disable it with the SQL conf spark.sql.streaming.checkpoint.escapedPathCheck.enabled. ``` - File sink error (`_spark_metadata`): ``` Error: we detected a possible problem with the location of your "_spark_metadata" directory and you likely need to move it before restarting this query. Earlier version of Spark incorrectly escaped paths when writing out the "_spark_metadata" directory for structured streaming. While this was corrected in Spark 3.0, it appears that your query was started using an earlier version that incorrectly handled the "_spark_metadata" path. Correct "_spark_metadata" Directory: /.../output %#output/_spark_metadata Incorrect "_spark_metadata" Directory: /.../output%20%25%23output/_spark_metadata Please move the data from the incorrect directory to the correct one, delete the incorrect directory, and then restart this query. If you believe you are receiving this message in error, you can disable it with the SQL conf spark.sql.streaming.checkpoint.escapedPathCheck.enabled. ``` ## How was this patch tested? The new unit tests. Closes #23733 from zsxwing/path-fix. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 8 + .../sql/execution/datasources/DataSource.scala | 3 +- .../sql/execution/streaming/FileStreamSink.scala | 66 ++++++-- .../sql/execution/streaming/FileStreamSource.scala | 4 +- .../execution/streaming/MetadataLogFileIndex.scala | 10 +- .../sql/execution/streaming/StreamExecution.scala | 48 +++++- .../sql/streaming/StreamingQueryManager.scala | 4 +- .../chk%252520%252525@%252523chk/commits/0 | 2 + .../chk%252520%252525@%252523chk/metadata | 1 + .../chk%252520%252525@%252523chk/offsets/0 | 3 + ...bb82-4201-8245-05f3dae4c372-c000.snappy.parquet | Bin 0 -> 404 bytes .../output%20%25@%23output/_spark_metadata/0 | 2 + .../spark/sql/streaming/FileStreamSinkSuite.scala | 24 +++ .../spark/sql/streaming/StreamingQuerySuite.scala | 186 +++++++++++++++++++++ .../test/DataStreamReaderWriterSuite.scala | 8 +- 15 files changed, 341 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0b7b67e..7f87546 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1112,6 +1112,14 @@ object SQLConf { .internal() .stringConf + val STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED = + buildConf("spark.sql.streaming.checkpoint.escapedPathCheck.enabled") + .doc("Whether to detect a streaming query may pick up an incorrect checkpoint path due " + + "to SPARK-26824.") + .internal() + .booleanConf + .createWithDefault(true) + val PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION = buildConf("spark.sql.statistics.parallelFileListingInStatsComputation.enabled") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 10dae8a..516c56e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -346,7 +346,8 @@ case class DataSource( case (format: FileFormat, _) if FileStreamSink.hasMetadata( caseInsensitiveOptions.get("path").toSeq ++ paths, - sparkSession.sessionState.newHadoopConf()) => + sparkSession.sessionState.newHadoopConf(), + sparkSession.sessionState.conf) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema) val dataSchema = userSpecifiedSchema.orElse { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index b3d12f6..b679f16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -20,13 +20,15 @@ package org.apache.spark.sql.execution.streaming import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormat, FileFormatWriter} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.SerializableConfiguration object FileStreamSink extends Logging { @@ -37,23 +39,54 @@ object FileStreamSink extends Logging { * Returns true if there is a single path that has a metadata log indicating which files should * be read. */ - def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = { + def hasMetadata(path: Seq[String], hadoopConf: Configuration, sqlConf: SQLConf): Boolean = { path match { case Seq(singlePath) => + val hdfsPath = new Path(singlePath) + val fs = hdfsPath.getFileSystem(hadoopConf) + if (fs.isDirectory(hdfsPath)) { + val metadataPath = new Path(hdfsPath, metadataDir) + checkEscapedMetadataPath(fs, metadataPath, sqlConf) + fs.exists(metadataPath) + } else { + false + } + case _ => false + } + } + + def checkEscapedMetadataPath(fs: FileSystem, metadataPath: Path, sqlConf: SQLConf): Unit = { + if (sqlConf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED) + && StreamExecution.containsSpecialCharsInPath(metadataPath)) { + val legacyMetadataPath = new Path(metadataPath.toUri.toString) + val legacyMetadataPathExists = try { - val hdfsPath = new Path(singlePath) - val fs = hdfsPath.getFileSystem(hadoopConf) - if (fs.isDirectory(hdfsPath)) { - fs.exists(new Path(hdfsPath, metadataDir)) - } else { - false - } + fs.exists(legacyMetadataPath) } catch { case NonFatal(e) => - logWarning(s"Error while looking for metadata directory.") + // We may not have access to this directory. Don't fail the query if that happens. + logWarning(e.getMessage, e) false } - case _ => false + if (legacyMetadataPathExists) { + throw new SparkException( + s"""Error: we detected a possible problem with the location of your "_spark_metadata" + |directory and you likely need to move it before restarting this query. + | + |Earlier version of Spark incorrectly escaped paths when writing out the + |"_spark_metadata" directory for structured streaming. While this was corrected in + |Spark 3.0, it appears that your query was started using an earlier version that + |incorrectly handled the "_spark_metadata" path. + | + |Correct "_spark_metadata" Directory: $metadataPath + |Incorrect "_spark_metadata" Directory: $legacyMetadataPath + | + |Please move the data from the incorrect directory to the correct one, delete the + |incorrect directory, and then restart this query. If you believe you are receiving + |this message in error, you can disable it with the SQL conf + |${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}.""" + .stripMargin) + } } } @@ -92,11 +125,16 @@ class FileStreamSink( partitionColumnNames: Seq[String], options: Map[String, String]) extends Sink with Logging { + private val hadoopConf = sparkSession.sessionState.newHadoopConf() private val basePath = new Path(path) - private val logPath = new Path(basePath, FileStreamSink.metadataDir) + private val logPath = { + val metadataDir = new Path(basePath, FileStreamSink.metadataDir) + val fs = metadataDir.getFileSystem(hadoopConf) + FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf) + metadataDir + } private val fileLog = - new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString) - private val hadoopConf = sparkSession.sessionState.newHadoopConf() + new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString) private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = { val serializableHadoopConf = new SerializableConfiguration(hadoopConf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 103fa7c..43b70ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -208,7 +208,7 @@ class FileStreamSource( var allFiles: Seq[FileStatus] = null sourceHasMetadata match { case None => - if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { + if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) { sourceHasMetadata = Some(true) allFiles = allFilesUsingMetadataLogFileIndex() } else { @@ -220,7 +220,7 @@ class FileStreamSource( // double check whether source has metadata, preventing the extreme corner case that // metadata log and data files are only generated after the previous // `FileStreamSink.hasMetadata` check - if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { + if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) { sourceHasMetadata = Some(true) allFiles = allFilesUsingMetadataLogFileIndex() } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala index 5cacdd0..80eed7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala @@ -39,10 +39,16 @@ class MetadataLogFileIndex( userSpecifiedSchema: Option[StructType]) extends PartitioningAwareFileIndex(sparkSession, Map.empty, userSpecifiedSchema) { - private val metadataDirectory = new Path(path, FileStreamSink.metadataDir) + private val metadataDirectory = { + val metadataDir = new Path(path, FileStreamSink.metadataDir) + val fs = metadataDir.getFileSystem(sparkSession.sessionState.newHadoopConf()) + FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf) + metadataDir + } + logInfo(s"Reading streaming file log from $metadataDirectory") private val metadataLog = - new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toUri.toString) + new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toString) private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory) private var cachedPartitionSpec: PartitionSpec = _ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index dc9ed80..5c21dfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -90,8 +90,45 @@ abstract class StreamExecution( val resolvedCheckpointRoot = { val checkpointPath = new Path(checkpointRoot) val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) - fs.mkdirs(checkpointPath) - checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString + if (sparkSession.conf.get(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED) + && StreamExecution.containsSpecialCharsInPath(checkpointPath)) { + // In Spark 2.4 and earlier, the checkpoint path is escaped 3 times (3 `Path.toUri.toString` + // calls). If this legacy checkpoint path exists, we will throw an error to tell the user how + // to migrate. + val legacyCheckpointDir = + new Path(new Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString + val legacyCheckpointDirExists = + try { + fs.exists(new Path(legacyCheckpointDir)) + } catch { + case NonFatal(e) => + // We may not have access to this directory. Don't fail the query if that happens. + logWarning(e.getMessage, e) + false + } + if (legacyCheckpointDirExists) { + throw new SparkException( + s"""Error: we detected a possible problem with the location of your checkpoint and you + |likely need to move it before restarting this query. + | + |Earlier version of Spark incorrectly escaped paths when writing out checkpoints for + |structured streaming. While this was corrected in Spark 3.0, it appears that your + |query was started using an earlier version that incorrectly handled the checkpoint + |path. + | + |Correct Checkpoint Directory: $checkpointPath + |Incorrect Checkpoint Directory: $legacyCheckpointDir + | + |Please move the data from the incorrect directory to the correct one, delete the + |incorrect directory, and then restart this query. If you believe you are receiving + |this message in error, you can disable it with the SQL conf + |${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}.""" + .stripMargin) + } + } + val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + fs.mkdirs(checkpointDir) + checkpointDir.toString } logInfo(s"Checkpoint root $checkpointRoot resolved to $resolvedCheckpointRoot.") @@ -227,7 +264,7 @@ abstract class StreamExecution( /** Returns the path of a file with `name` in the checkpoint directory. */ protected def checkpointFile(name: String): String = - new Path(new Path(resolvedCheckpointRoot), name).toUri.toString + new Path(new Path(resolvedCheckpointRoot), name).toString /** * Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]] @@ -573,6 +610,11 @@ object StreamExecution { case _ => false } + + /** Whether the path contains special chars that will be escaped when converting to a `URI`. */ + def containsSpecialCharsInPath(path: Path): Boolean = { + path.toUri.getPath != new Path(path.toUri.toString).toUri.getPath + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index a7fa800..2e019ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -217,10 +217,10 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo triggerClock: Clock): StreamingQueryWrapper = { var deleteCheckpointOnStop = false val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified => - new Path(userSpecified).toUri.toString + new Path(userSpecified).toString }.orElse { df.sparkSession.sessionState.conf.checkpointLocation.map { location => - new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString + new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toString } }.getOrElse { if (useTempCheckpointLocation) { diff --git a/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/commits/0 b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/commits/0 new file mode 100644 index 0000000..9c1e302 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/commits/0 @@ -0,0 +1,2 @@ +v1 +{"nextBatchWatermarkMs":0} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/metadata b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/metadata new file mode 100644 index 0000000..3071b0d --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/metadata @@ -0,0 +1 @@ +{"id":"09be7fb3-49d8-48a6-840d-e9c2ad92a898"} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/offsets/0 b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/offsets/0 new file mode 100644 index 0000000..a0a5676 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/offsets/0 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1549649384149,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}} +0 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/output %@#output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/output %@#output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet new file mode 100644 index 0000000..1b2919b Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/output %@#output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet differ diff --git a/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/output%20%25@%23output/_spark_metadata/0 b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/output%20%25@%23output/_spark_metadata/0 new file mode 100644 index 0000000..79768f8 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/output%20%25@%23output/_spark_metadata/0 @@ -0,0 +1,2 @@ +v1 +{"path":"file://TEMPDIR/output%20%25@%23output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet","size":404,"isDir":false,"modificationTime":1549649385000,"blockReplication":1,"blockSize":33554432,"action":"add"} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index ed53def..619d118 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.streaming +import java.io.File import java.util.Locale import org.apache.hadoop.fs.Path @@ -454,4 +455,27 @@ class FileStreamSinkSuite extends StreamTest { } } } + + test("special characters in output path") { + withTempDir { tempDir => + val checkpointDir = new File(tempDir, "chk") + val outputDir = new File(tempDir, "output @#output") + val inputData = MemoryStream[Int] + inputData.addData(1, 2, 3) + val q = inputData.toDF() + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .format("parquet") + .start(outputDir.getCanonicalPath) + try { + q.processAllAvailable() + } finally { + q.stop() + } + // The "_spark_metadata" directory should be in "outputDir" + assert(outputDir.listFiles.map(_.getName).contains(FileStreamSink.metadataDir)) + val outputDf = spark.read.parquet(outputDir.getCanonicalPath).as[Int] + checkDatasetUnorderly(outputDf, 1, 2, 3) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index dc22e31..729173c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.streaming +import java.io.File import java.util.concurrent.CountDownLatch import scala.collection.mutable +import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.commons.lang3.RandomStringUtils +import org.apache.hadoop.fs.Path import org.scalactic.TolerantNumerics import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.PatienceConfiguration.Timeout @@ -915,6 +918,189 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi ) } + test("special characters in checkpoint path") { + withTempDir { tempDir => + val checkpointDir = new File(tempDir, "chk @#chk") + val inputData = MemoryStream[Int] + inputData.addData(1) + val q = inputData.toDF() + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start() + try { + q.processAllAvailable() + assert(checkpointDir.listFiles().toList.nonEmpty) + } finally { + q.stop() + } + } + } + + /** + * Copy the checkpoint generated by Spark 2.4.0 from test resource to `dir` to set up a legacy + * streaming checkpoint. + */ + private def setUp2dot4dot0Checkpoint(dir: File): Unit = { + val input = getClass.getResource("/structured-streaming/escaped-path-2.4.0") + assert(input != null, "cannot find test resource '/structured-streaming/escaped-path-2.4.0'") + val inputDir = new File(input.toURI) + + // Copy test files to tempDir so that we won't modify the original data. + FileUtils.copyDirectory(inputDir, dir) + + // Spark 2.4 and earlier escaped the _spark_metadata path once + val legacySparkMetadataDir = new File( + dir, + new Path("output %@#output/_spark_metadata").toUri.toString) + + // Migrate from legacy _spark_metadata directory to the new _spark_metadata directory. + // Ideally we should copy "_spark_metadata" directly like what the user is supposed to do to + // migrate to new version. However, in our test, "tempDir" will be different in each run and + // we need to fix the absolute path in the metadata to match "tempDir". + val sparkMetadata = FileUtils.readFileToString(new File(legacySparkMetadataDir, "0"), "UTF-8") + FileUtils.write( + new File(legacySparkMetadataDir, "0"), + sparkMetadata.replaceAll("TEMPDIR", dir.getCanonicalPath), + "UTF-8") + } + + test("detect escaped path and report the migration guide") { + // Assert that the error message contains the migration conf, path and the legacy path. + def assertMigrationError(errorMessage: String, path: File, legacyPath: File): Unit = { + Seq(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key, + path.getCanonicalPath, + legacyPath.getCanonicalPath).foreach { msg => + assert(errorMessage.contains(msg)) + } + } + + withTempDir { tempDir => + setUp2dot4dot0Checkpoint(tempDir) + + // Here are the paths we will use to create the query + val outputDir = new File(tempDir, "output %@#output") + val checkpointDir = new File(tempDir, "chk %@#chk") + val sparkMetadataDir = new File(tempDir, "output %@#output/_spark_metadata") + + // The escaped paths used by Spark 2.4 and earlier. + // Spark 2.4 and earlier escaped the checkpoint path three times + val legacyCheckpointDir = new File( + tempDir, + new Path(new Path(new Path("chk %@#chk").toUri.toString).toUri.toString).toUri.toString) + // Spark 2.4 and earlier escaped the _spark_metadata path once + val legacySparkMetadataDir = new File( + tempDir, + new Path("output %@#output/_spark_metadata").toUri.toString) + + // Reading a file sink output in a batch query should detect the legacy _spark_metadata + // directory and throw an error + val e = intercept[SparkException] { + spark.read.load(outputDir.getCanonicalPath).as[Int] + } + assertMigrationError(e.getMessage, sparkMetadataDir, legacySparkMetadataDir) + + // Restarting the streaming query should detect the legacy _spark_metadata directory and throw + // an error + val inputData = MemoryStream[Int] + val e2 = intercept[SparkException] { + inputData.toDF() + .writeStream + .format("parquet") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + } + assertMigrationError(e2.getMessage, sparkMetadataDir, legacySparkMetadataDir) + + // Move "_spark_metadata" to fix the file sink and test the checkpoint path. + FileUtils.moveDirectory(legacySparkMetadataDir, sparkMetadataDir) + + // Restarting the streaming query should detect the legacy checkpoint path and throw an error + val e3 = intercept[SparkException] { + inputData.toDF() + .writeStream + .format("parquet") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + } + assertMigrationError(e3.getMessage, checkpointDir, legacyCheckpointDir) + + // Fix the checkpoint path and verify that the user can migrate the issue by moving files. + FileUtils.moveDirectory(legacyCheckpointDir, checkpointDir) + + val q = inputData.toDF() + .writeStream + .format("parquet") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + try { + q.processAllAvailable() + // Check the query id to make sure it did use checkpoint + assert(q.id.toString == "09be7fb3-49d8-48a6-840d-e9c2ad92a898") + + // Verify that the batch query can read "_spark_metadata" correctly after migration. + val df = spark.read.load(outputDir.getCanonicalPath) + assert(df.queryExecution.executedPlan.toString contains "MetadataLogFileIndex") + checkDatasetUnorderly(df.as[Int], 1, 2, 3) + } finally { + q.stop() + } + } + } + + test("ignore the escaped path check when the flag is off") { + withTempDir { tempDir => + setUp2dot4dot0Checkpoint(tempDir) + val outputDir = new File(tempDir, "output %@#output") + val checkpointDir = new File(tempDir, "chk %@#chk") + + withSQLConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key -> "false") { + // Verify that the batch query ignores the legacy "_spark_metadata" + val df = spark.read.load(outputDir.getCanonicalPath) + assert(!(df.queryExecution.executedPlan.toString contains "MetadataLogFileIndex")) + checkDatasetUnorderly(df.as[Int], 1, 2, 3) + + val inputData = MemoryStream[Int] + val q = inputData.toDF() + .writeStream + .format("parquet") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + try { + q.processAllAvailable() + // Check the query id to make sure it ignores the legacy checkpoint + assert(q.id.toString != "09be7fb3-49d8-48a6-840d-e9c2ad92a898") + } finally { + q.stop() + } + } + } + } + + test("containsSpecialCharsInPath") { + Seq("foo/b ar", + "/foo/b ar", + "file:/foo/b ar", + "file://foo/b ar", + "file:///foo/b ar", + "file://foo:bar@bar/foo/b ar").foreach { p => + assert(StreamExecution.containsSpecialCharsInPath(new Path(p)), s"failed to check $p") + } + Seq("foo/bar", + "/foo/bar", + "file:/foo/bar", + "file://foo/bar", + "file:///foo/bar", + "file://foo:bar@bar/foo/bar", + // Special chars not in a path should not be considered as such urls won't hit the escaped + // path issue. + "file://foo:b ar@bar/foo/bar", + "file://foo:bar@b ar/foo/bar", + "file://f oo:bar@bar/foo/bar").foreach { p => + assert(!StreamExecution.containsSpecialCharsInPath(new Path(p)), s"failed to check $p") + } + } + /** Create a streaming DF that only execute one batch in which it returns the given static DF */ private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = { require(!triggerDF.isStreaming) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index c3c7dcb..99dc076 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -359,7 +359,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { test("source metadataPath") { LastOptions.clear() - val checkpointLocationURI = new Path(newMetadataDir).toUri + val checkpointLocation = new Path(newMetadataDir) val df1 = spark.readStream .format("org.apache.spark.sql.streaming.test") @@ -371,7 +371,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { val q = df1.union(df2).writeStream .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", checkpointLocationURI.toString) + .option("checkpointLocation", checkpointLocation.toString) .trigger(ProcessingTime(10.seconds)) .start() q.processAllAvailable() @@ -379,14 +379,14 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { verify(LastOptions.mockStreamSourceProvider).createSource( any(), - meq(s"${makeQualifiedPath(checkpointLocationURI.toString)}/sources/0"), + meq(s"${new Path(makeQualifiedPath(checkpointLocation.toString)).toString}/sources/0"), meq(None), meq("org.apache.spark.sql.streaming.test"), meq(Map.empty)) verify(LastOptions.mockStreamSourceProvider).createSource( any(), - meq(s"${makeQualifiedPath(checkpointLocationURI.toString)}/sources/1"), + meq(s"${new Path(makeQualifiedPath(checkpointLocation.toString)).toString}/sources/1"), meq(None), meq("org.apache.spark.sql.streaming.test"), meq(Map.empty)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org