This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f510761 [SPARK-28089][SQL] File source v2: support reading output of file streaming Sink f510761 is described below commit f5107614d6e38d758d9d17f2dc5d57bc9c8918a1 Author: Gengliang Wang <gengliang.w...@databricks.com> AuthorDate: Thu Jun 20 12:57:13 2019 +0800 [SPARK-28089][SQL] File source v2: support reading output of file streaming Sink ## What changes were proposed in this pull request? File source V1 supports reading output of FileStreamSink as batch. https://github.com/apache/spark/pull/11897 We should support this in file source V2 as well. When reading with paths, we first check if there is metadata log of FileStreamSink. If yes, we use `MetadataLogFileIndex` for listing files; Otherwise, we use `InMemoryFileIndex`. ## How was this patch tested? Unit test Closes #24900 from gengliangwang/FileStreamV2. Authored-by: Gengliang Wang <gengliang.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/execution/datasources/v2/FileTable.scala | 47 +++-- .../spark/sql/streaming/FileStreamSinkSuite.scala | 189 +++++++++++++-------- .../apache/spark/sql/streaming/StreamSuite.scala | 11 +- .../streaming/StreamingDeduplicationSuite.scala | 50 +++--- .../spark/sql/streaming/StreamingQuerySuite.scala | 87 +++++----- 5 files changed, 223 insertions(+), 161 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 3b0cde5..4483f5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -20,11 +20,12 @@ import java.util import scala.collection.JavaConverters._ -import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.types.{DataType, StructType} @@ -44,23 +45,37 @@ abstract class FileTable( val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap // Hadoop Configurations are case sensitive. val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) - val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf, - checkEmptyGlobPath = true, checkFilesExist = true) - val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) - new InMemoryFileIndex( - sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache) + if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) { + // We are reading from the results of a streaming query. We will load files from + // the metadata log instead of listing them using HDFS APIs. + new MetadataLogFileIndex(sparkSession, new Path(paths.head), + options.asScala.toMap, userSpecifiedSchema) + } else { + // This is a non-streaming file based datasource. + val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf, + checkEmptyGlobPath = true, checkFilesExist = true) + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + new InMemoryFileIndex( + sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache) + } } - lazy val dataSchema: StructType = userSpecifiedSchema.map { schema => - val partitionSchema = fileIndex.partitionSchema - val resolver = sparkSession.sessionState.conf.resolver - StructType(schema.filterNot(f => partitionSchema.exists(p => resolver(p.name, f.name)))) - }.orElse { - inferSchema(fileIndex.allFiles()) - }.getOrElse { - throw new AnalysisException( - s"Unable to infer schema for $formatName. It must be specified manually.") - }.asNullable + lazy val dataSchema: StructType = { + val schema = userSpecifiedSchema.map { schema => + val partitionSchema = fileIndex.partitionSchema + val resolver = sparkSession.sessionState.conf.resolver + StructType(schema.filterNot(f => partitionSchema.exists(p => resolver(p.name, f.name)))) + }.orElse { + inferSchema(fileIndex.allFiles()) + }.getOrElse { + throw new AnalysisException( + s"Unable to infer schema for $formatName. It must be specified manually.") + } + fileIndex match { + case _: MetadataLogFileIndex => schema + case _ => schema.asNullable + } + } override lazy val schema: StructType = { val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 5b50f1d..d89caaa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -25,17 +25,19 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path +import org.apache.spark.SparkConf import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.{AnalysisException, DataFrame} import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, FileScan, FileTable} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils -class FileStreamSinkSuite extends StreamTest { +abstract class FileStreamSinkSuite extends StreamTest { import testImplicits._ override def beforeAll(): Unit = { @@ -51,6 +53,8 @@ class FileStreamSinkSuite extends StreamTest { } } + protected def checkQueryExecution(df: DataFrame): Unit + test("unpartitioned writing and batch reading") { val inputData = MemoryStream[Int] val df = inputData.toDF() @@ -121,78 +125,36 @@ class FileStreamSinkSuite extends StreamTest { var query: StreamingQuery = null - // TODO: test file source V2 as well. - withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { - try { - query = - ds.map(i => (i, i * 1000)) - .toDF("id", "value") - .writeStream - .partitionBy("id") - .option("checkpointLocation", checkpointDir) - .format("parquet") - .start(outputDir) - - inputData.addData(1, 2, 3) - failAfter(streamingTimeout) { - query.processAllAvailable() - } + try { + query = + ds.map(i => (i, i * 1000)) + .toDF("id", "value") + .writeStream + .partitionBy("id") + .option("checkpointLocation", checkpointDir) + .format("parquet") + .start(outputDir) - val outputDf = spark.read.parquet(outputDir) - val expectedSchema = new StructType() - .add(StructField("value", IntegerType, nullable = false)) - .add(StructField("id", IntegerType)) - assert(outputDf.schema === expectedSchema) - - // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has - // been inferred - val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect { - case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => baseRelation - } - assert(hadoopdFsRelations.size === 1) - assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex]) - assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id")) - assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value")) - - // Verify the data is correctly read - checkDatasetUnorderly( - outputDf.as[(Int, Int)], - (1000, 1), (2000, 2), (3000, 3)) - - /** Check some condition on the partitions of the FileScanRDD generated by a DF */ - def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = { - val getFileScanRDD = df.queryExecution.executedPlan.collect { - case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] => - scan.inputRDDs().head.asInstanceOf[FileScanRDD] - }.headOption.getOrElse { - fail(s"No FileScan in query\n${df.queryExecution}") - } - func(getFileScanRDD.filePartitions) - } + inputData.addData(1, 2, 3) + failAfter(streamingTimeout) { + query.processAllAvailable() + } - // Read without pruning - checkFileScanPartitions(outputDf) { partitions => - // There should be as many distinct partition values as there are distinct ids - assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3) - } + val outputDf = spark.read.parquet(outputDir) + val expectedSchema = new StructType() + .add(StructField("value", IntegerType, nullable = false)) + .add(StructField("id", IntegerType)) + assert(outputDf.schema === expectedSchema) - // Read with pruning, should read only files in partition dir id=1 - checkFileScanPartitions(outputDf.filter("id = 1")) { partitions => - val filesToBeRead = partitions.flatMap(_.files) - assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/"))) - assert(filesToBeRead.map(_.partitionValues).distinct.size === 1) - } + // Verify the data is correctly read + checkDatasetUnorderly( + outputDf.as[(Int, Int)], + (1000, 1), (2000, 2), (3000, 3)) - // Read with pruning, should read only files in partition dir id=1 and id=2 - checkFileScanPartitions(outputDf.filter("id in (1,2)")) { partitions => - val filesToBeRead = partitions.flatMap(_.files) - assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/"))) - assert(filesToBeRead.map(_.partitionValues).distinct.size === 2) - } - } finally { - if (query != null) { - query.stop() - } + checkQueryExecution(outputDf) + } finally { + if (query != null) { + query.stop() } } } @@ -512,3 +474,92 @@ class FileStreamSinkSuite extends StreamTest { } } } + +class FileStreamSinkV1Suite extends FileStreamSinkSuite { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST, "csv,json,orc,text,parquet") + .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "csv,json,orc,text,parquet") + + override def checkQueryExecution(df: DataFrame): Unit = { + // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has + // been inferred + val hadoopdFsRelations = df.queryExecution.analyzed.collect { + case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => baseRelation + } + assert(hadoopdFsRelations.size === 1) + assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex]) + assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id")) + assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value")) + + /** Check some condition on the partitions of the FileScanRDD generated by a DF */ + def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = { + val getFileScanRDD = df.queryExecution.executedPlan.collect { + case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] => + scan.inputRDDs().head.asInstanceOf[FileScanRDD] + }.headOption.getOrElse { + fail(s"No FileScan in query\n${df.queryExecution}") + } + func(getFileScanRDD.filePartitions) + } + + // Read without pruning + checkFileScanPartitions(df) { partitions => + // There should be as many distinct partition values as there are distinct ids + assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3) + } + + // Read with pruning, should read only files in partition dir id=1 + checkFileScanPartitions(df.filter("id = 1")) { partitions => + val filesToBeRead = partitions.flatMap(_.files) + assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/"))) + assert(filesToBeRead.map(_.partitionValues).distinct.size === 1) + } + + // Read with pruning, should read only files in partition dir id=1 and id=2 + checkFileScanPartitions(df.filter("id in (1,2)")) { partitions => + val filesToBeRead = partitions.flatMap(_.files) + assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/"))) + assert(filesToBeRead.map(_.partitionValues).distinct.size === 2) + } + } +} + +class FileStreamSinkV2Suite extends FileStreamSinkSuite { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST, "") + .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "") + + override def checkQueryExecution(df: DataFrame): Unit = { + // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has + // been inferred + val table = df.queryExecution.analyzed.collect { + case DataSourceV2Relation(table: FileTable, _, _) => table + } + assert(table.size === 1) + assert(table.head.fileIndex.isInstanceOf[MetadataLogFileIndex]) + assert(table.head.fileIndex.partitionSchema.exists(_.name == "id")) + assert(table.head.dataSchema.exists(_.name == "value")) + + /** Check some condition on the partitions of the FileScanRDD generated by a DF */ + def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = { + val fileScan = df.queryExecution.executedPlan.collect { + case batch: BatchScanExec if batch.scan.isInstanceOf[FileScan] => + batch.scan.asInstanceOf[FileScan] + }.headOption.getOrElse { + fail(s"No FileScan in query\n${df.queryExecution}") + } + func(fileScan.planInputPartitions().map(_.asInstanceOf[FilePartition])) + } + + // Read without pruning + checkFileScanPartitions(df) { partitions => + // There should be as many distinct partition values as there are distinct ids + assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3) + } + // TODO: test partition pruning when file source V2 supports it. + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 293bacd..f2f5fad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -218,11 +218,12 @@ class StreamSuite extends StreamTest { } } - // TODO: fix file source V2 as well. - withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { - val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load() - assertDF(df) - assertDF(df) + val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load() + Seq("", "parquet").foreach { useV1SourceReader => + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReader) { + assertDF(df) + assertDF(df) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index 8176bcc..cfd7204 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -197,32 +197,30 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { } test("deduplicate with file sink") { - withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { - withTempDir { output => - withTempDir { checkpointDir => - val outputPath = output.getAbsolutePath - val inputData = MemoryStream[String] - val result = inputData.toDS().dropDuplicates() - val q = result.writeStream - .format("parquet") - .outputMode(Append) - .option("checkpointLocation", checkpointDir.getPath) - .start(outputPath) - try { - inputData.addData("a") - q.processAllAvailable() - checkDataset(spark.read.parquet(outputPath).as[String], "a") - - inputData.addData("a") // Dropped - q.processAllAvailable() - checkDataset(spark.read.parquet(outputPath).as[String], "a") - - inputData.addData("b") - q.processAllAvailable() - checkDataset(spark.read.parquet(outputPath).as[String], "a", "b") - } finally { - q.stop() - } + withTempDir { output => + withTempDir { checkpointDir => + val outputPath = output.getAbsolutePath + val inputData = MemoryStream[String] + val result = inputData.toDS().dropDuplicates() + val q = result.writeStream + .format("parquet") + .outputMode(Append) + .option("checkpointLocation", checkpointDir.getPath) + .start(outputPath) + try { + inputData.addData("a") + q.processAllAvailable() + checkDataset(spark.read.parquet(outputPath).as[String], "a") + + inputData.addData("a") // Dropped + q.processAllAvailable() + checkDataset(spark.read.parquet(outputPath).as[String], "a") + + inputData.addData("b") + q.processAllAvailable() + checkDataset(spark.read.parquet(outputPath).as[String], "a", "b") + } finally { + q.stop() } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index dd4efb2..a5cb25c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -995,59 +995,56 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi // Reading a file sink output in a batch query should detect the legacy _spark_metadata // directory and throw an error - // TODO: test file source V2 as well. - withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { - val e = intercept[SparkException] { - spark.read.load(outputDir.getCanonicalPath).as[Int] - } - assertMigrationError(e.getMessage, sparkMetadataDir, legacySparkMetadataDir) + val e = intercept[SparkException] { + spark.read.load(outputDir.getCanonicalPath).as[Int] + } + assertMigrationError(e.getMessage, sparkMetadataDir, legacySparkMetadataDir) - // Restarting the streaming query should detect the legacy _spark_metadata directory and - // throw an error - val inputData = MemoryStream[Int] - val e2 = intercept[SparkException] { - inputData.toDF() - .writeStream - .format("parquet") - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .start(outputDir.getCanonicalPath) - } - assertMigrationError(e2.getMessage, sparkMetadataDir, legacySparkMetadataDir) - - // Move "_spark_metadata" to fix the file sink and test the checkpoint path. - FileUtils.moveDirectory(legacySparkMetadataDir, sparkMetadataDir) - - // Restarting the streaming query should detect the legacy - // checkpoint path and throw an error. - val e3 = intercept[SparkException] { - inputData.toDF() - .writeStream - .format("parquet") - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .start(outputDir.getCanonicalPath) - } - assertMigrationError(e3.getMessage, checkpointDir, legacyCheckpointDir) + // Restarting the streaming query should detect the legacy _spark_metadata directory and + // throw an error + val inputData = MemoryStream[Int] + val e2 = intercept[SparkException] { + inputData.toDF() + .writeStream + .format("parquet") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + } + assertMigrationError(e2.getMessage, sparkMetadataDir, legacySparkMetadataDir) - // Fix the checkpoint path and verify that the user can migrate the issue by moving files. - FileUtils.moveDirectory(legacyCheckpointDir, checkpointDir) + // Move "_spark_metadata" to fix the file sink and test the checkpoint path. + FileUtils.moveDirectory(legacySparkMetadataDir, sparkMetadataDir) - val q = inputData.toDF() + // Restarting the streaming query should detect the legacy + // checkpoint path and throw an error. + val e3 = intercept[SparkException] { + inputData.toDF() .writeStream .format("parquet") .option("checkpointLocation", checkpointDir.getCanonicalPath) .start(outputDir.getCanonicalPath) - try { - q.processAllAvailable() - // Check the query id to make sure it did use checkpoint - assert(q.id.toString == "09be7fb3-49d8-48a6-840d-e9c2ad92a898") + } + assertMigrationError(e3.getMessage, checkpointDir, legacyCheckpointDir) - // Verify that the batch query can read "_spark_metadata" correctly after migration. - val df = spark.read.load(outputDir.getCanonicalPath) - assert(df.queryExecution.executedPlan.toString contains "MetadataLogFileIndex") - checkDatasetUnorderly(df.as[Int], 1, 2, 3) - } finally { - q.stop() - } + // Fix the checkpoint path and verify that the user can migrate the issue by moving files. + FileUtils.moveDirectory(legacyCheckpointDir, checkpointDir) + + val q = inputData.toDF() + .writeStream + .format("parquet") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + try { + q.processAllAvailable() + // Check the query id to make sure it did use checkpoint + assert(q.id.toString == "09be7fb3-49d8-48a6-840d-e9c2ad92a898") + + // Verify that the batch query can read "_spark_metadata" correctly after migration. + val df = spark.read.load(outputDir.getCanonicalPath) + assert(df.queryExecution.executedPlan.toString contains "MetadataLogFileIndex") + checkDatasetUnorderly(df.as[Int], 1, 2, 3) + } finally { + q.stop() } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org