This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f510761  [SPARK-28089][SQL] File source v2: support reading output of 
file streaming Sink
f510761 is described below

commit f5107614d6e38d758d9d17f2dc5d57bc9c8918a1
Author: Gengliang Wang <gengliang.w...@databricks.com>
AuthorDate: Thu Jun 20 12:57:13 2019 +0800

    [SPARK-28089][SQL] File source v2: support reading output of file streaming 
Sink
    
    ## What changes were proposed in this pull request?
    
    File source V1 supports reading output of FileStreamSink as batch. 
https://github.com/apache/spark/pull/11897
    We should support this in file source V2 as well. When reading with paths, 
we first check if there is metadata log of FileStreamSink. If yes, we use 
`MetadataLogFileIndex` for listing files; Otherwise, we use `InMemoryFileIndex`.
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #24900 from gengliangwang/FileStreamV2.
    
    Authored-by: Gengliang Wang <gengliang.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/execution/datasources/v2/FileTable.scala   |  47 +++--
 .../spark/sql/streaming/FileStreamSinkSuite.scala  | 189 +++++++++++++--------
 .../apache/spark/sql/streaming/StreamSuite.scala   |  11 +-
 .../streaming/StreamingDeduplicationSuite.scala    |  50 +++---
 .../spark/sql/streaming/StreamingQuerySuite.scala  |  87 +++++-----
 5 files changed, 223 insertions(+), 161 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index 3b0cde5..4483f5b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -20,11 +20,12 @@ import java.util
 
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.{FileStatus, Path}
 
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalog.v2.expressions.Transform
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.streaming.{FileStreamSink, 
MetadataLogFileIndex}
 import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, 
TableCapability}
 import org.apache.spark.sql.sources.v2.TableCapability._
 import org.apache.spark.sql.types.{DataType, StructType}
@@ -44,23 +45,37 @@ abstract class FileTable(
     val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
     // Hadoop Configurations are case sensitive.
     val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
-    val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, 
hadoopConf,
-      checkEmptyGlobPath = true, checkFilesExist = true)
-    val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
-    new InMemoryFileIndex(
-      sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, 
fileStatusCache)
+    if (FileStreamSink.hasMetadata(paths, hadoopConf, 
sparkSession.sessionState.conf)) {
+      // We are reading from the results of a streaming query. We will load 
files from
+      // the metadata log instead of listing them using HDFS APIs.
+      new MetadataLogFileIndex(sparkSession, new Path(paths.head),
+        options.asScala.toMap, userSpecifiedSchema)
+    } else {
+      // This is a non-streaming file based datasource.
+      val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, 
hadoopConf,
+        checkEmptyGlobPath = true, checkFilesExist = true)
+      val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+      new InMemoryFileIndex(
+        sparkSession, rootPathsSpecified, caseSensitiveMap, 
userSpecifiedSchema, fileStatusCache)
+    }
   }
 
-  lazy val dataSchema: StructType = userSpecifiedSchema.map { schema =>
-    val partitionSchema = fileIndex.partitionSchema
-    val resolver = sparkSession.sessionState.conf.resolver
-    StructType(schema.filterNot(f => partitionSchema.exists(p => 
resolver(p.name, f.name))))
-  }.orElse {
-    inferSchema(fileIndex.allFiles())
-  }.getOrElse {
-    throw new AnalysisException(
-      s"Unable to infer schema for $formatName. It must be specified 
manually.")
-  }.asNullable
+  lazy val dataSchema: StructType = {
+    val schema = userSpecifiedSchema.map { schema =>
+      val partitionSchema = fileIndex.partitionSchema
+      val resolver = sparkSession.sessionState.conf.resolver
+      StructType(schema.filterNot(f => partitionSchema.exists(p => 
resolver(p.name, f.name))))
+    }.orElse {
+      inferSchema(fileIndex.allFiles())
+    }.getOrElse {
+      throw new AnalysisException(
+        s"Unable to infer schema for $formatName. It must be specified 
manually.")
+    }
+    fileIndex match {
+      case _: MetadataLogFileIndex => schema
+      case _ => schema.asNullable
+    }
+  }
 
   override lazy val schema: StructType = {
     val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 5b50f1d..d89caaa 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -25,17 +25,19 @@ import scala.collection.JavaConverters._
 
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.SparkConf
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.{AnalysisException, DataFrame}
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, 
DataSourceV2Relation, FileScan, FileTable}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.Utils
 
-class FileStreamSinkSuite extends StreamTest {
+abstract class FileStreamSinkSuite extends StreamTest {
   import testImplicits._
 
   override def beforeAll(): Unit = {
@@ -51,6 +53,8 @@ class FileStreamSinkSuite extends StreamTest {
     }
   }
 
+  protected def checkQueryExecution(df: DataFrame): Unit
+
   test("unpartitioned writing and batch reading") {
     val inputData = MemoryStream[Int]
     val df = inputData.toDF()
@@ -121,78 +125,36 @@ class FileStreamSinkSuite extends StreamTest {
 
     var query: StreamingQuery = null
 
-    // TODO: test file source V2 as well.
-    withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") {
-      try {
-        query =
-          ds.map(i => (i, i * 1000))
-            .toDF("id", "value")
-            .writeStream
-            .partitionBy("id")
-            .option("checkpointLocation", checkpointDir)
-            .format("parquet")
-            .start(outputDir)
-
-        inputData.addData(1, 2, 3)
-        failAfter(streamingTimeout) {
-          query.processAllAvailable()
-        }
+    try {
+      query =
+        ds.map(i => (i, i * 1000))
+          .toDF("id", "value")
+          .writeStream
+          .partitionBy("id")
+          .option("checkpointLocation", checkpointDir)
+          .format("parquet")
+          .start(outputDir)
 
-        val outputDf = spark.read.parquet(outputDir)
-        val expectedSchema = new StructType()
-          .add(StructField("value", IntegerType, nullable = false))
-          .add(StructField("id", IntegerType))
-        assert(outputDf.schema === expectedSchema)
-
-        // Verify that MetadataLogFileIndex is being used and the correct 
partitioning schema has
-        // been inferred
-        val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect {
-          case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => 
baseRelation
-        }
-        assert(hadoopdFsRelations.size === 1)
-        
assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
-        assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
-        assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))
-
-        // Verify the data is correctly read
-        checkDatasetUnorderly(
-          outputDf.as[(Int, Int)],
-          (1000, 1), (2000, 2), (3000, 3))
-
-        /** Check some condition on the partitions of the FileScanRDD 
generated by a DF */
-        def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => 
Unit): Unit = {
-          val getFileScanRDD = df.queryExecution.executedPlan.collect {
-            case scan: DataSourceScanExec if 
scan.inputRDDs().head.isInstanceOf[FileScanRDD] =>
-              scan.inputRDDs().head.asInstanceOf[FileScanRDD]
-          }.headOption.getOrElse {
-            fail(s"No FileScan in query\n${df.queryExecution}")
-          }
-          func(getFileScanRDD.filePartitions)
-        }
+      inputData.addData(1, 2, 3)
+      failAfter(streamingTimeout) {
+        query.processAllAvailable()
+      }
 
-        // Read without pruning
-        checkFileScanPartitions(outputDf) { partitions =>
-          // There should be as many distinct partition values as there are 
distinct ids
-          
assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
-        }
+      val outputDf = spark.read.parquet(outputDir)
+      val expectedSchema = new StructType()
+        .add(StructField("value", IntegerType, nullable = false))
+        .add(StructField("id", IntegerType))
+      assert(outputDf.schema === expectedSchema)
 
-        // Read with pruning, should read only files in partition dir id=1
-        checkFileScanPartitions(outputDf.filter("id = 1")) { partitions =>
-          val filesToBeRead = partitions.flatMap(_.files)
-          assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/")))
-          assert(filesToBeRead.map(_.partitionValues).distinct.size === 1)
-        }
+      // Verify the data is correctly read
+      checkDatasetUnorderly(
+        outputDf.as[(Int, Int)],
+        (1000, 1), (2000, 2), (3000, 3))
 
-        // Read with pruning, should read only files in partition dir id=1 and 
id=2
-        checkFileScanPartitions(outputDf.filter("id in (1,2)")) { partitions =>
-          val filesToBeRead = partitions.flatMap(_.files)
-          assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/")))
-          assert(filesToBeRead.map(_.partitionValues).distinct.size === 2)
-        }
-      } finally {
-        if (query != null) {
-          query.stop()
-        }
+      checkQueryExecution(outputDf)
+    } finally {
+      if (query != null) {
+        query.stop()
       }
     }
   }
@@ -512,3 +474,92 @@ class FileStreamSinkSuite extends StreamTest {
     }
   }
 }
+
+class FileStreamSinkV1Suite extends FileStreamSinkSuite {
+  override protected def sparkConf: SparkConf =
+    super
+      .sparkConf
+      .set(SQLConf.USE_V1_SOURCE_READER_LIST, "csv,json,orc,text,parquet")
+      .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "csv,json,orc,text,parquet")
+
+  override def checkQueryExecution(df: DataFrame): Unit = {
+    // Verify that MetadataLogFileIndex is being used and the correct 
partitioning schema has
+    // been inferred
+    val hadoopdFsRelations = df.queryExecution.analyzed.collect {
+      case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => 
baseRelation
+    }
+    assert(hadoopdFsRelations.size === 1)
+    assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
+    assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
+    assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))
+
+    /** Check some condition on the partitions of the FileScanRDD generated by 
a DF */
+    def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => 
Unit): Unit = {
+      val getFileScanRDD = df.queryExecution.executedPlan.collect {
+        case scan: DataSourceScanExec if 
scan.inputRDDs().head.isInstanceOf[FileScanRDD] =>
+          scan.inputRDDs().head.asInstanceOf[FileScanRDD]
+      }.headOption.getOrElse {
+        fail(s"No FileScan in query\n${df.queryExecution}")
+      }
+      func(getFileScanRDD.filePartitions)
+    }
+
+    // Read without pruning
+    checkFileScanPartitions(df) { partitions =>
+      // There should be as many distinct partition values as there are 
distinct ids
+      assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size 
=== 3)
+    }
+
+    // Read with pruning, should read only files in partition dir id=1
+    checkFileScanPartitions(df.filter("id = 1")) { partitions =>
+      val filesToBeRead = partitions.flatMap(_.files)
+      assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/")))
+      assert(filesToBeRead.map(_.partitionValues).distinct.size === 1)
+    }
+
+    // Read with pruning, should read only files in partition dir id=1 and id=2
+    checkFileScanPartitions(df.filter("id in (1,2)")) { partitions =>
+      val filesToBeRead = partitions.flatMap(_.files)
+      assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/")))
+      assert(filesToBeRead.map(_.partitionValues).distinct.size === 2)
+    }
+  }
+}
+
+class FileStreamSinkV2Suite extends FileStreamSinkSuite {
+  override protected def sparkConf: SparkConf =
+    super
+      .sparkConf
+      .set(SQLConf.USE_V1_SOURCE_READER_LIST, "")
+      .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "")
+
+  override def checkQueryExecution(df: DataFrame): Unit = {
+    // Verify that MetadataLogFileIndex is being used and the correct 
partitioning schema has
+    // been inferred
+    val table = df.queryExecution.analyzed.collect {
+      case DataSourceV2Relation(table: FileTable, _, _) => table
+    }
+    assert(table.size === 1)
+    assert(table.head.fileIndex.isInstanceOf[MetadataLogFileIndex])
+    assert(table.head.fileIndex.partitionSchema.exists(_.name == "id"))
+    assert(table.head.dataSchema.exists(_.name == "value"))
+
+    /** Check some condition on the partitions of the FileScanRDD generated by 
a DF */
+    def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => 
Unit): Unit = {
+      val fileScan = df.queryExecution.executedPlan.collect {
+        case batch: BatchScanExec if batch.scan.isInstanceOf[FileScan] =>
+          batch.scan.asInstanceOf[FileScan]
+      }.headOption.getOrElse {
+        fail(s"No FileScan in query\n${df.queryExecution}")
+      }
+      func(fileScan.planInputPartitions().map(_.asInstanceOf[FilePartition]))
+    }
+
+    // Read without pruning
+    checkFileScanPartitions(df) { partitions =>
+      // There should be as many distinct partition values as there are 
distinct ids
+      assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size 
=== 3)
+    }
+    // TODO: test partition pruning when file source V2 supports it.
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 293bacd..f2f5fad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -218,11 +218,12 @@ class StreamSuite extends StreamTest {
       }
     }
 
-    // TODO: fix file source V2 as well.
-    withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") {
-      val df = 
spark.readStream.format(classOf[FakeDefaultSource].getName).load()
-      assertDF(df)
-      assertDF(df)
+    val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
+    Seq("", "parquet").foreach { useV1SourceReader =>
+      withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReader) {
+        assertDF(df)
+        assertDF(df)
+      }
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index 8176bcc..cfd7204 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -197,32 +197,30 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   }
 
   test("deduplicate with file sink") {
-    withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") {
-      withTempDir { output =>
-        withTempDir { checkpointDir =>
-          val outputPath = output.getAbsolutePath
-          val inputData = MemoryStream[String]
-          val result = inputData.toDS().dropDuplicates()
-          val q = result.writeStream
-            .format("parquet")
-            .outputMode(Append)
-            .option("checkpointLocation", checkpointDir.getPath)
-            .start(outputPath)
-          try {
-            inputData.addData("a")
-            q.processAllAvailable()
-            checkDataset(spark.read.parquet(outputPath).as[String], "a")
-
-            inputData.addData("a") // Dropped
-            q.processAllAvailable()
-            checkDataset(spark.read.parquet(outputPath).as[String], "a")
-
-            inputData.addData("b")
-            q.processAllAvailable()
-            checkDataset(spark.read.parquet(outputPath).as[String], "a", "b")
-          } finally {
-            q.stop()
-          }
+    withTempDir { output =>
+      withTempDir { checkpointDir =>
+        val outputPath = output.getAbsolutePath
+        val inputData = MemoryStream[String]
+        val result = inputData.toDS().dropDuplicates()
+        val q = result.writeStream
+          .format("parquet")
+          .outputMode(Append)
+          .option("checkpointLocation", checkpointDir.getPath)
+          .start(outputPath)
+        try {
+          inputData.addData("a")
+          q.processAllAvailable()
+          checkDataset(spark.read.parquet(outputPath).as[String], "a")
+
+          inputData.addData("a") // Dropped
+          q.processAllAvailable()
+          checkDataset(spark.read.parquet(outputPath).as[String], "a")
+
+          inputData.addData("b")
+          q.processAllAvailable()
+          checkDataset(spark.read.parquet(outputPath).as[String], "a", "b")
+        } finally {
+          q.stop()
         }
       }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index dd4efb2..a5cb25c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -995,59 +995,56 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 
       // Reading a file sink output in a batch query should detect the legacy 
_spark_metadata
       // directory and throw an error
-      // TODO: test file source V2 as well.
-      withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") {
-        val e = intercept[SparkException] {
-          spark.read.load(outputDir.getCanonicalPath).as[Int]
-        }
-        assertMigrationError(e.getMessage, sparkMetadataDir, 
legacySparkMetadataDir)
+      val e = intercept[SparkException] {
+        spark.read.load(outputDir.getCanonicalPath).as[Int]
+      }
+      assertMigrationError(e.getMessage, sparkMetadataDir, 
legacySparkMetadataDir)
 
-        // Restarting the streaming query should detect the legacy 
_spark_metadata directory and
-        // throw an error
-        val inputData = MemoryStream[Int]
-        val e2 = intercept[SparkException] {
-          inputData.toDF()
-            .writeStream
-            .format("parquet")
-            .option("checkpointLocation", checkpointDir.getCanonicalPath)
-            .start(outputDir.getCanonicalPath)
-        }
-        assertMigrationError(e2.getMessage, sparkMetadataDir, 
legacySparkMetadataDir)
-
-        // Move "_spark_metadata" to fix the file sink and test the checkpoint 
path.
-        FileUtils.moveDirectory(legacySparkMetadataDir, sparkMetadataDir)
-
-        // Restarting the streaming query should detect the legacy
-        // checkpoint path and throw an error.
-        val e3 = intercept[SparkException] {
-          inputData.toDF()
-            .writeStream
-            .format("parquet")
-            .option("checkpointLocation", checkpointDir.getCanonicalPath)
-            .start(outputDir.getCanonicalPath)
-        }
-        assertMigrationError(e3.getMessage, checkpointDir, legacyCheckpointDir)
+      // Restarting the streaming query should detect the legacy 
_spark_metadata directory and
+      // throw an error
+      val inputData = MemoryStream[Int]
+      val e2 = intercept[SparkException] {
+        inputData.toDF()
+          .writeStream
+          .format("parquet")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .start(outputDir.getCanonicalPath)
+      }
+      assertMigrationError(e2.getMessage, sparkMetadataDir, 
legacySparkMetadataDir)
 
-        // Fix the checkpoint path and verify that the user can migrate the 
issue by moving files.
-        FileUtils.moveDirectory(legacyCheckpointDir, checkpointDir)
+      // Move "_spark_metadata" to fix the file sink and test the checkpoint 
path.
+      FileUtils.moveDirectory(legacySparkMetadataDir, sparkMetadataDir)
 
-        val q = inputData.toDF()
+      // Restarting the streaming query should detect the legacy
+      // checkpoint path and throw an error.
+      val e3 = intercept[SparkException] {
+        inputData.toDF()
           .writeStream
           .format("parquet")
           .option("checkpointLocation", checkpointDir.getCanonicalPath)
           .start(outputDir.getCanonicalPath)
-        try {
-          q.processAllAvailable()
-          // Check the query id to make sure it did use checkpoint
-          assert(q.id.toString == "09be7fb3-49d8-48a6-840d-e9c2ad92a898")
+      }
+      assertMigrationError(e3.getMessage, checkpointDir, legacyCheckpointDir)
 
-          // Verify that the batch query can read "_spark_metadata" correctly 
after migration.
-          val df = spark.read.load(outputDir.getCanonicalPath)
-          assert(df.queryExecution.executedPlan.toString contains 
"MetadataLogFileIndex")
-          checkDatasetUnorderly(df.as[Int], 1, 2, 3)
-        } finally {
-          q.stop()
-        }
+      // Fix the checkpoint path and verify that the user can migrate the 
issue by moving files.
+      FileUtils.moveDirectory(legacyCheckpointDir, checkpointDir)
+
+      val q = inputData.toDF()
+        .writeStream
+        .format("parquet")
+        .option("checkpointLocation", checkpointDir.getCanonicalPath)
+        .start(outputDir.getCanonicalPath)
+      try {
+        q.processAllAvailable()
+        // Check the query id to make sure it did use checkpoint
+        assert(q.id.toString == "09be7fb3-49d8-48a6-840d-e9c2ad92a898")
+
+        // Verify that the batch query can read "_spark_metadata" correctly 
after migration.
+        val df = spark.read.load(outputDir.getCanonicalPath)
+        assert(df.queryExecution.executedPlan.toString contains 
"MetadataLogFileIndex")
+        checkDatasetUnorderly(df.as[Int], 1, 2, 3)
+      } finally {
+        q.stop()
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to