[ https://issues.apache.org/jira/browse/SPARK-19965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15927333#comment-15927333 ]
Shixiong Zhu commented on SPARK-19965: -------------------------------------- This is because inferring partitions doesn't ignore the "_spark_metadata" folder. > DataFrame batch reader may fail to infer partitions when reading > FileStreamSink's output > ---------------------------------------------------------------------------------------- > > Key: SPARK-19965 > URL: https://issues.apache.org/jira/browse/SPARK-19965 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.1.0 > Reporter: Shixiong Zhu > > Reproducer > {code} > test("partitioned writing and batch reading with 'basePath'") { > val inputData = MemoryStream[Int] > val ds = inputData.toDS() > val outputDir = Utils.createTempDir(namePrefix = > "stream.output").getCanonicalPath > val checkpointDir = Utils.createTempDir(namePrefix = > "stream.checkpoint").getCanonicalPath > var query: StreamingQuery = null > try { > query = > ds.map(i => (i, i * 1000)) > .toDF("id", "value") > .writeStream > .partitionBy("id") > .option("checkpointLocation", checkpointDir) > .format("parquet") > .start(outputDir) > inputData.addData(1, 2, 3) > failAfter(streamingTimeout) { > query.processAllAvailable() > } > spark.read.option("basePath", outputDir).parquet(outputDir + > "/*").show() > } finally { > if (query != null) { > query.stop() > } > } > } > {code} > Stack trace > {code} > [info] - partitioned writing and batch reading with 'basePath' *** FAILED *** > (3 seconds, 928 milliseconds) > [info] java.lang.AssertionError: assertion failed: Conflicting directory > structures detected. Suspicious paths: > [info] ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637 > [info] > ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata > [info] > [info] If provided paths are partition directories, please set "basePath" in > the options of the data source to specify the root directory of the table. If > there are multiple root directories, please load them separately and then > union them. > [info] at scala.Predef$.assert(Predef.scala:170) > [info] at > org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133) > [info] at > org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98) > [info] at > org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156) > [info] at > org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54) > [info] at > org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55) > [info] at > org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133) > [info] at > org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361) > [info] at > org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160) > [info] at > org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536) > [info] at > org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520) > [info] at > org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292) > [info] at > org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268) > [info] at > org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org