This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 77b99af  [SPARK-26824][SS] Fix the checkpoint location and 
_spark_metadata when it contains special chars
77b99af is described below

commit 77b99af57330cf2e5016a6acc69642d54041b041
Author: Shixiong Zhu <zsxw...@gmail.com>
AuthorDate: Wed Feb 20 15:44:20 2019 -0800

    [SPARK-26824][SS] Fix the checkpoint location and _spark_metadata when it 
contains special chars
    
    ## What changes were proposed in this pull request?
    
    When a user specifies a checkpoint location or a file sink output using a 
path containing special chars that need to be escaped in a path, the streaming 
query will store checkpoint and file sink metadata in a wrong place. In this 
PR, I uploaded a checkpoint that was generated by the following codes using 
Spark 2.4.0 to show this issue:
    
    ```
    implicit val s = spark.sqlContext
    val input = org.apache.spark.sql.execution.streaming.MemoryStream[Int]
    input.addData(1, 2, 3)
    val q = 
input.toDF.writeStream.format("parquet").option("checkpointLocation", ".../chk 
%#chk").start(".../output %#output")
    q.stop()
    ```
    Here is the structure of the directory:
    ```
    sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0
    ├── chk%252520%252525%252523chk
    │   ├── commits
    │   │   └── 0
    │   ├── metadata
    │   └── offsets
    │       └── 0
    ├── output %#output
    │   └── part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet
    └── output%20%25%23output
        └── _spark_metadata
            └── 0
    ```
    
    In this checkpoint, the user specified checkpoint location is `.../chk 
%#chk` but the real path to store the checkpoint is 
`.../chk%252520%252525%252523chk` (this is generated by escaping the original 
path three times). The user specified output path is `.../output %#output` but 
the path to store `_spark_metadata` is 
`.../output%20%25%23output/_spark_metadata` (this is generated by escaping the 
original path once). The data files are still in the correct path (such as 
`.../output %#ou [...]
    
    This checkpoint will be used in unit tests in this PR.
    
    The fix is just simply removing improper `Path.toUri` calls to fix the 
issue.
    
    However, as the user may not read the release note and is not aware of this 
checkpoint location change, if they upgrade Spark without moving checkpoint to 
the new location, their query will just start from the scratch. In order to not 
surprise the users, this PR also adds a check to **detect the impacted paths 
and throws an error** to include the migration guide. This check can be turned 
off by an internal sql conf 
`spark.sql.streaming.checkpoint.escapedPathCheck.enabled`. Here are ex [...]
    
    - Streaming checkpoint error:
    ```
    Error: we detected a possible problem with the location of your checkpoint 
and you
    likely need to move it before restarting this query.
    
    Earlier version of Spark incorrectly escaped paths when writing out 
checkpoints for
    structured streaming. While this was corrected in Spark 3.0, it appears 
that your
    query was started using an earlier version that incorrectly handled the 
checkpoint
    path.
    
    Correct Checkpoint Directory: /.../chk %#chk
    Incorrect Checkpoint Directory: /.../chk%252520%252525%252523chk
    
    Please move the data from the incorrect directory to the correct one, 
delete the
    incorrect directory, and then restart this query. If you believe you are 
receiving
    this message in error, you can disable it with the SQL conf
    spark.sql.streaming.checkpoint.escapedPathCheck.enabled.
    ```
    
    - File sink error (`_spark_metadata`):
    ```
    Error: we detected a possible problem with the location of your 
"_spark_metadata"
    directory and you likely need to move it before restarting this query.
    
    Earlier version of Spark incorrectly escaped paths when writing out the
    "_spark_metadata" directory for structured streaming. While this was 
corrected in
    Spark 3.0, it appears that your query was started using an earlier version 
that
    incorrectly handled the "_spark_metadata" path.
    
    Correct "_spark_metadata" Directory: /.../output %#output/_spark_metadata
    Incorrect "_spark_metadata" Directory: 
/.../output%20%25%23output/_spark_metadata
    
    Please move the data from the incorrect directory to the correct one, 
delete the
    incorrect directory, and then restart this query. If you believe you are 
receiving
    this message in error, you can disable it with the SQL conf
    spark.sql.streaming.checkpoint.escapedPathCheck.enabled.
    ```
    
    ## How was this patch tested?
    
    The new unit tests.
    
    Closes #23733 from zsxwing/path-fix.
    
    Authored-by: Shixiong Zhu <zsxw...@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |   8 +
 .../sql/execution/datasources/DataSource.scala     |   3 +-
 .../sql/execution/streaming/FileStreamSink.scala   |  66 ++++++--
 .../sql/execution/streaming/FileStreamSource.scala |   4 +-
 .../execution/streaming/MetadataLogFileIndex.scala |  10 +-
 .../sql/execution/streaming/StreamExecution.scala  |  48 +++++-
 .../sql/streaming/StreamingQueryManager.scala      |   4 +-
 .../chk%252520%252525@%252523chk/commits/0         |   2 +
 .../chk%252520%252525@%252523chk/metadata          |   1 +
 .../chk%252520%252525@%252523chk/offsets/0         |   3 +
 ...bb82-4201-8245-05f3dae4c372-c000.snappy.parquet | Bin 0 -> 404 bytes
 .../output%20%25@%23output/_spark_metadata/0       |   2 +
 .../spark/sql/streaming/FileStreamSinkSuite.scala  |  24 +++
 .../spark/sql/streaming/StreamingQuerySuite.scala  | 186 +++++++++++++++++++++
 .../test/DataStreamReaderWriterSuite.scala         |   8 +-
 15 files changed, 341 insertions(+), 28 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 0b7b67e..7f87546 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1112,6 +1112,14 @@ object SQLConf {
       .internal()
       .stringConf
 
+  val STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED =
+    buildConf("spark.sql.streaming.checkpoint.escapedPathCheck.enabled")
+      .doc("Whether to detect a streaming query may pick up an incorrect 
checkpoint path due " +
+        "to SPARK-26824.")
+      .internal()
+      .booleanConf
+      .createWithDefault(true)
+
   val PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION =
     
buildConf("spark.sql.statistics.parallelFileListingInStatsComputation.enabled")
       .internal()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 10dae8a..516c56e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -346,7 +346,8 @@ case class DataSource(
       case (format: FileFormat, _)
           if FileStreamSink.hasMetadata(
             caseInsensitiveOptions.get("path").toSeq ++ paths,
-            sparkSession.sessionState.newHadoopConf()) =>
+            sparkSession.sessionState.newHadoopConf(),
+            sparkSession.sessionState.conf) =>
         val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ 
paths).head)
         val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, 
userSpecifiedSchema)
         val dataSchema = userSpecifiedSchema.orElse {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index b3d12f6..b679f16 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -20,13 +20,15 @@ package org.apache.spark.sql.execution.streaming
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, 
FileFormat, FileFormatWriter}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.SerializableConfiguration
 
 object FileStreamSink extends Logging {
@@ -37,23 +39,54 @@ object FileStreamSink extends Logging {
    * Returns true if there is a single path that has a metadata log indicating 
which files should
    * be read.
    */
-  def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = {
+  def hasMetadata(path: Seq[String], hadoopConf: Configuration, sqlConf: 
SQLConf): Boolean = {
     path match {
       case Seq(singlePath) =>
+        val hdfsPath = new Path(singlePath)
+        val fs = hdfsPath.getFileSystem(hadoopConf)
+        if (fs.isDirectory(hdfsPath)) {
+          val metadataPath = new Path(hdfsPath, metadataDir)
+          checkEscapedMetadataPath(fs, metadataPath, sqlConf)
+          fs.exists(metadataPath)
+        } else {
+          false
+        }
+      case _ => false
+    }
+  }
+
+  def checkEscapedMetadataPath(fs: FileSystem, metadataPath: Path, sqlConf: 
SQLConf): Unit = {
+    if 
(sqlConf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
+        && StreamExecution.containsSpecialCharsInPath(metadataPath)) {
+      val legacyMetadataPath = new Path(metadataPath.toUri.toString)
+      val legacyMetadataPathExists =
         try {
-          val hdfsPath = new Path(singlePath)
-          val fs = hdfsPath.getFileSystem(hadoopConf)
-          if (fs.isDirectory(hdfsPath)) {
-            fs.exists(new Path(hdfsPath, metadataDir))
-          } else {
-            false
-          }
+          fs.exists(legacyMetadataPath)
         } catch {
           case NonFatal(e) =>
-            logWarning(s"Error while looking for metadata directory.")
+            // We may not have access to this directory. Don't fail the query 
if that happens.
+            logWarning(e.getMessage, e)
             false
         }
-      case _ => false
+      if (legacyMetadataPathExists) {
+        throw new SparkException(
+          s"""Error: we detected a possible problem with the location of your 
"_spark_metadata"
+             |directory and you likely need to move it before restarting this 
query.
+             |
+             |Earlier version of Spark incorrectly escaped paths when writing 
out the
+             |"_spark_metadata" directory for structured streaming. While this 
was corrected in
+             |Spark 3.0, it appears that your query was started using an 
earlier version that
+             |incorrectly handled the "_spark_metadata" path.
+             |
+             |Correct "_spark_metadata" Directory: $metadataPath
+             |Incorrect "_spark_metadata" Directory: $legacyMetadataPath
+             |
+             |Please move the data from the incorrect directory to the correct 
one, delete the
+             |incorrect directory, and then restart this query. If you believe 
you are receiving
+             |this message in error, you can disable it with the SQL conf
+             
|${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}."""
+            .stripMargin)
+      }
     }
   }
 
@@ -92,11 +125,16 @@ class FileStreamSink(
     partitionColumnNames: Seq[String],
     options: Map[String, String]) extends Sink with Logging {
 
+  private val hadoopConf = sparkSession.sessionState.newHadoopConf()
   private val basePath = new Path(path)
-  private val logPath = new Path(basePath, FileStreamSink.metadataDir)
+  private val logPath = {
+    val metadataDir = new Path(basePath, FileStreamSink.metadataDir)
+    val fs = metadataDir.getFileSystem(hadoopConf)
+    FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, 
sparkSession.sessionState.conf)
+    metadataDir
+  }
   private val fileLog =
-    new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, 
logPath.toUri.toString)
-  private val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, 
logPath.toString)
 
   private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = {
     val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 103fa7c..43b70ae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -208,7 +208,7 @@ class FileStreamSource(
     var allFiles: Seq[FileStatus] = null
     sourceHasMetadata match {
       case None =>
-        if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
+        if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, 
sparkSession.sessionState.conf)) {
           sourceHasMetadata = Some(true)
           allFiles = allFilesUsingMetadataLogFileIndex()
         } else {
@@ -220,7 +220,7 @@ class FileStreamSource(
             // double check whether source has metadata, preventing the 
extreme corner case that
             // metadata log and data files are only generated after the 
previous
             // `FileStreamSink.hasMetadata` check
-            if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
+            if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, 
sparkSession.sessionState.conf)) {
               sourceHasMetadata = Some(true)
               allFiles = allFilesUsingMetadataLogFileIndex()
             } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
index 5cacdd0..80eed7b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
@@ -39,10 +39,16 @@ class MetadataLogFileIndex(
     userSpecifiedSchema: Option[StructType])
   extends PartitioningAwareFileIndex(sparkSession, Map.empty, 
userSpecifiedSchema) {
 
-  private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
+  private val metadataDirectory = {
+    val metadataDir = new Path(path, FileStreamSink.metadataDir)
+    val fs = 
metadataDir.getFileSystem(sparkSession.sessionState.newHadoopConf())
+    FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, 
sparkSession.sessionState.conf)
+    metadataDir
+  }
+
   logInfo(s"Reading streaming file log from $metadataDirectory")
   private val metadataLog =
-    new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, 
metadataDirectory.toUri.toString)
+    new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, 
metadataDirectory.toString)
   private val allFilesFromLog = 
metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory)
   private var cachedPartitionSpec: PartitionSpec = _
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index dc9ed80..5c21dfe 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -90,8 +90,45 @@ abstract class StreamExecution(
   val resolvedCheckpointRoot = {
     val checkpointPath = new Path(checkpointRoot)
     val fs = 
checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
-    fs.mkdirs(checkpointPath)
-    checkpointPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory).toUri.toString
+    if 
(sparkSession.conf.get(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
+        && StreamExecution.containsSpecialCharsInPath(checkpointPath)) {
+      // In Spark 2.4 and earlier, the checkpoint path is escaped 3 times (3 
`Path.toUri.toString`
+      // calls). If this legacy checkpoint path exists, we will throw an error 
to tell the user how
+      // to migrate.
+      val legacyCheckpointDir =
+        new Path(new 
Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString
+      val legacyCheckpointDirExists =
+        try {
+          fs.exists(new Path(legacyCheckpointDir))
+        } catch {
+          case NonFatal(e) =>
+            // We may not have access to this directory. Don't fail the query 
if that happens.
+            logWarning(e.getMessage, e)
+            false
+        }
+      if (legacyCheckpointDirExists) {
+        throw new SparkException(
+          s"""Error: we detected a possible problem with the location of your 
checkpoint and you
+             |likely need to move it before restarting this query.
+             |
+             |Earlier version of Spark incorrectly escaped paths when writing 
out checkpoints for
+             |structured streaming. While this was corrected in Spark 3.0, it 
appears that your
+             |query was started using an earlier version that incorrectly 
handled the checkpoint
+             |path.
+             |
+             |Correct Checkpoint Directory: $checkpointPath
+             |Incorrect Checkpoint Directory: $legacyCheckpointDir
+             |
+             |Please move the data from the incorrect directory to the correct 
one, delete the
+             |incorrect directory, and then restart this query. If you believe 
you are receiving
+             |this message in error, you can disable it with the SQL conf
+             
|${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}."""
+            .stripMargin)
+      }
+    }
+    val checkpointDir = checkpointPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
+    fs.mkdirs(checkpointDir)
+    checkpointDir.toString
   }
   logInfo(s"Checkpoint root $checkpointRoot resolved to 
$resolvedCheckpointRoot.")
 
@@ -227,7 +264,7 @@ abstract class StreamExecution(
 
   /** Returns the path of a file with `name` in the checkpoint directory. */
   protected def checkpointFile(name: String): String =
-    new Path(new Path(resolvedCheckpointRoot), name).toUri.toString
+    new Path(new Path(resolvedCheckpointRoot), name).toString
 
   /**
    * Starts the execution. This returns only after the thread has started and 
[[QueryStartedEvent]]
@@ -573,6 +610,11 @@ object StreamExecution {
     case _ =>
       false
   }
+
+  /** Whether the path contains special chars that will be escaped when 
converting to a `URI`. */
+  def containsSpecialCharsInPath(path: Path): Boolean = {
+    path.toUri.getPath != new Path(path.toUri.toString).toUri.getPath
+  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index a7fa800..2e019ba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -217,10 +217,10 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
       triggerClock: Clock): StreamingQueryWrapper = {
     var deleteCheckpointOnStop = false
     val checkpointLocation = userSpecifiedCheckpointLocation.map { 
userSpecified =>
-      new Path(userSpecified).toUri.toString
+      new Path(userSpecified).toString
     }.orElse {
       df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
-        new Path(location, 
userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
+        new Path(location, 
userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toString
       }
     }.getOrElse {
       if (useTempCheckpointLocation) {
diff --git 
a/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/commits/0
 
b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/commits/0
new file mode 100644
index 0000000..9c1e302
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/commits/0
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/metadata
 
b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/metadata
new file mode 100644
index 0000000..3071b0d
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/metadata
@@ -0,0 +1 @@
+{"id":"09be7fb3-49d8-48a6-840d-e9c2ad92a898"}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/offsets/0
 
b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/offsets/0
new file mode 100644
index 0000000..a0a5676
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/chk%252520%252525@%252523chk/offsets/0
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1549649384149,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
+0
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/output 
%@#output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet 
b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/output 
%@#output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet
new file mode 100644
index 0000000..1b2919b
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/output 
%@#output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet 
differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/output%20%25@%23output/_spark_metadata/0
 
b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/output%20%25@%23output/_spark_metadata/0
new file mode 100644
index 0000000..79768f8
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0/output%20%25@%23output/_spark_metadata/0
@@ -0,0 +1,2 @@
+v1
+{"path":"file://TEMPDIR/output%20%25@%23output/part-00000-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet","size":404,"isDir":false,"modificationTime":1549649385000,"blockReplication":1,"blockSize":33554432,"action":"add"}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index ed53def..619d118 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.streaming
 
+import java.io.File
 import java.util.Locale
 
 import org.apache.hadoop.fs.Path
@@ -454,4 +455,27 @@ class FileStreamSinkSuite extends StreamTest {
       }
     }
   }
+
+  test("special characters in output path") {
+    withTempDir { tempDir =>
+      val checkpointDir = new File(tempDir, "chk")
+      val outputDir = new File(tempDir, "output @#output")
+      val inputData = MemoryStream[Int]
+      inputData.addData(1, 2, 3)
+      val q = inputData.toDF()
+        .writeStream
+        .option("checkpointLocation", checkpointDir.getCanonicalPath)
+        .format("parquet")
+        .start(outputDir.getCanonicalPath)
+      try {
+        q.processAllAvailable()
+      } finally {
+        q.stop()
+      }
+      // The "_spark_metadata" directory should be in "outputDir"
+      
assert(outputDir.listFiles.map(_.getName).contains(FileStreamSink.metadataDir))
+      val outputDf = spark.read.parquet(outputDir.getCanonicalPath).as[Int]
+      checkDatasetUnorderly(outputDf, 1, 2, 3)
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index dc22e31..729173c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.sql.streaming
 
+import java.io.File
 import java.util.concurrent.CountDownLatch
 
 import scala.collection.mutable
 
+import org.apache.commons.io.{FileUtils, IOUtils}
 import org.apache.commons.lang3.RandomStringUtils
+import org.apache.hadoop.fs.Path
 import org.scalactic.TolerantNumerics
 import org.scalatest.BeforeAndAfter
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -915,6 +918,189 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     )
   }
 
+  test("special characters in checkpoint path") {
+    withTempDir { tempDir =>
+      val checkpointDir = new File(tempDir, "chk @#chk")
+      val inputData = MemoryStream[Int]
+      inputData.addData(1)
+      val q = inputData.toDF()
+        .writeStream
+        .format("noop")
+        .option("checkpointLocation", checkpointDir.getCanonicalPath)
+        .start()
+      try {
+        q.processAllAvailable()
+        assert(checkpointDir.listFiles().toList.nonEmpty)
+      } finally {
+        q.stop()
+      }
+    }
+  }
+
+  /**
+   * Copy the checkpoint generated by Spark 2.4.0 from test resource to `dir` 
to set up a legacy
+   * streaming checkpoint.
+   */
+  private def setUp2dot4dot0Checkpoint(dir: File): Unit = {
+    val input = 
getClass.getResource("/structured-streaming/escaped-path-2.4.0")
+    assert(input != null, "cannot find test resource 
'/structured-streaming/escaped-path-2.4.0'")
+    val inputDir = new File(input.toURI)
+
+    // Copy test files to tempDir so that we won't modify the original data.
+    FileUtils.copyDirectory(inputDir, dir)
+
+    // Spark 2.4 and earlier escaped the _spark_metadata path once
+    val legacySparkMetadataDir = new File(
+      dir,
+      new Path("output %@#output/_spark_metadata").toUri.toString)
+
+    // Migrate from legacy _spark_metadata directory to the new 
_spark_metadata directory.
+    // Ideally we should copy "_spark_metadata" directly like what the user is 
supposed to do to
+    // migrate to new version. However, in our test, "tempDir" will be 
different in each run and
+    // we need to fix the absolute path in the metadata to match "tempDir".
+    val sparkMetadata = FileUtils.readFileToString(new 
File(legacySparkMetadataDir, "0"), "UTF-8")
+    FileUtils.write(
+      new File(legacySparkMetadataDir, "0"),
+      sparkMetadata.replaceAll("TEMPDIR", dir.getCanonicalPath),
+      "UTF-8")
+  }
+
+  test("detect escaped path and report the migration guide") {
+    // Assert that the error message contains the migration conf, path and the 
legacy path.
+    def assertMigrationError(errorMessage: String, path: File, legacyPath: 
File): Unit = {
+      Seq(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key,
+          path.getCanonicalPath,
+          legacyPath.getCanonicalPath).foreach { msg =>
+        assert(errorMessage.contains(msg))
+      }
+    }
+
+    withTempDir { tempDir =>
+      setUp2dot4dot0Checkpoint(tempDir)
+
+      // Here are the paths we will use to create the query
+      val outputDir = new File(tempDir, "output %@#output")
+      val checkpointDir = new File(tempDir, "chk %@#chk")
+      val sparkMetadataDir = new File(tempDir, "output 
%@#output/_spark_metadata")
+
+      // The escaped paths used by Spark 2.4 and earlier.
+      // Spark 2.4 and earlier escaped the checkpoint path three times
+      val legacyCheckpointDir = new File(
+        tempDir,
+        new Path(new Path(new Path("chk 
%@#chk").toUri.toString).toUri.toString).toUri.toString)
+      // Spark 2.4 and earlier escaped the _spark_metadata path once
+      val legacySparkMetadataDir = new File(
+        tempDir,
+        new Path("output %@#output/_spark_metadata").toUri.toString)
+
+      // Reading a file sink output in a batch query should detect the legacy 
_spark_metadata
+      // directory and throw an error
+      val e = intercept[SparkException] {
+        spark.read.load(outputDir.getCanonicalPath).as[Int]
+      }
+      assertMigrationError(e.getMessage, sparkMetadataDir, 
legacySparkMetadataDir)
+
+      // Restarting the streaming query should detect the legacy 
_spark_metadata directory and throw
+      // an error
+      val inputData = MemoryStream[Int]
+      val e2 = intercept[SparkException] {
+        inputData.toDF()
+          .writeStream
+          .format("parquet")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .start(outputDir.getCanonicalPath)
+      }
+      assertMigrationError(e2.getMessage, sparkMetadataDir, 
legacySparkMetadataDir)
+
+      // Move "_spark_metadata" to fix the file sink and test the checkpoint 
path.
+      FileUtils.moveDirectory(legacySparkMetadataDir, sparkMetadataDir)
+
+      // Restarting the streaming query should detect the legacy checkpoint 
path and throw an error
+      val e3 = intercept[SparkException] {
+        inputData.toDF()
+          .writeStream
+          .format("parquet")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .start(outputDir.getCanonicalPath)
+      }
+      assertMigrationError(e3.getMessage, checkpointDir, legacyCheckpointDir)
+
+      // Fix the checkpoint path and verify that the user can migrate the 
issue by moving files.
+      FileUtils.moveDirectory(legacyCheckpointDir, checkpointDir)
+
+      val q = inputData.toDF()
+        .writeStream
+        .format("parquet")
+        .option("checkpointLocation", checkpointDir.getCanonicalPath)
+        .start(outputDir.getCanonicalPath)
+      try {
+        q.processAllAvailable()
+        // Check the query id to make sure it did use checkpoint
+        assert(q.id.toString == "09be7fb3-49d8-48a6-840d-e9c2ad92a898")
+
+        // Verify that the batch query can read "_spark_metadata" correctly 
after migration.
+        val df = spark.read.load(outputDir.getCanonicalPath)
+        assert(df.queryExecution.executedPlan.toString contains 
"MetadataLogFileIndex")
+        checkDatasetUnorderly(df.as[Int], 1, 2, 3)
+      } finally {
+        q.stop()
+      }
+    }
+  }
+
+  test("ignore the escaped path check when the flag is off") {
+    withTempDir { tempDir =>
+      setUp2dot4dot0Checkpoint(tempDir)
+      val outputDir = new File(tempDir, "output %@#output")
+      val checkpointDir = new File(tempDir, "chk %@#chk")
+
+      withSQLConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key 
-> "false") {
+        // Verify that the batch query ignores the legacy "_spark_metadata"
+        val df = spark.read.load(outputDir.getCanonicalPath)
+        assert(!(df.queryExecution.executedPlan.toString contains 
"MetadataLogFileIndex"))
+        checkDatasetUnorderly(df.as[Int], 1, 2, 3)
+
+        val inputData = MemoryStream[Int]
+        val q = inputData.toDF()
+          .writeStream
+          .format("parquet")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .start(outputDir.getCanonicalPath)
+        try {
+          q.processAllAvailable()
+          // Check the query id to make sure it ignores the legacy checkpoint
+          assert(q.id.toString != "09be7fb3-49d8-48a6-840d-e9c2ad92a898")
+        } finally {
+          q.stop()
+        }
+      }
+    }
+  }
+
+  test("containsSpecialCharsInPath") {
+    Seq("foo/b ar",
+        "/foo/b ar",
+        "file:/foo/b ar",
+        "file://foo/b ar",
+        "file:///foo/b ar",
+        "file://foo:bar@bar/foo/b ar").foreach { p =>
+      assert(StreamExecution.containsSpecialCharsInPath(new Path(p)), s"failed 
to check $p")
+    }
+    Seq("foo/bar",
+        "/foo/bar",
+        "file:/foo/bar",
+        "file://foo/bar",
+        "file:///foo/bar",
+        "file://foo:bar@bar/foo/bar",
+        // Special chars not in a path should not be considered as such urls 
won't hit the escaped
+        // path issue.
+        "file://foo:b ar@bar/foo/bar",
+        "file://foo:bar@b ar/foo/bar",
+        "file://f oo:bar@bar/foo/bar").foreach { p =>
+      assert(!StreamExecution.containsSpecialCharsInPath(new Path(p)), 
s"failed to check $p")
+    }
+  }
+
   /** Create a streaming DF that only execute one batch in which it returns 
the given static DF */
   private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame 
= {
     require(!triggerDF.isStreaming)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index c3c7dcb..99dc076 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -359,7 +359,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   test("source metadataPath") {
     LastOptions.clear()
 
-    val checkpointLocationURI = new Path(newMetadataDir).toUri
+    val checkpointLocation = new Path(newMetadataDir)
 
     val df1 = spark.readStream
       .format("org.apache.spark.sql.streaming.test")
@@ -371,7 +371,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
     val q = df1.union(df2).writeStream
       .format("org.apache.spark.sql.streaming.test")
-      .option("checkpointLocation", checkpointLocationURI.toString)
+      .option("checkpointLocation", checkpointLocation.toString)
       .trigger(ProcessingTime(10.seconds))
       .start()
     q.processAllAvailable()
@@ -379,14 +379,14 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
     verify(LastOptions.mockStreamSourceProvider).createSource(
       any(),
-      meq(s"${makeQualifiedPath(checkpointLocationURI.toString)}/sources/0"),
+      meq(s"${new 
Path(makeQualifiedPath(checkpointLocation.toString)).toString}/sources/0"),
       meq(None),
       meq("org.apache.spark.sql.streaming.test"),
       meq(Map.empty))
 
     verify(LastOptions.mockStreamSourceProvider).createSource(
       any(),
-      meq(s"${makeQualifiedPath(checkpointLocationURI.toString)}/sources/1"),
+      meq(s"${new 
Path(makeQualifiedPath(checkpointLocation.toString)).toString}/sources/1"),
       meq(None),
       meq("org.apache.spark.sql.streaming.test"),
       meq(Map.empty))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to