[jira] [Commented] (SPARK-18227) Parquet file stream sink create a hidden directory "_spark_metadata" cause the DataFrame read from directory failed
[ https://issues.apache.org/jira/browse/SPARK-18227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15656317#comment-15656317 ] Lantao Jin commented on SPARK-18227: [~marmbrus], oh, yeah. That's the root cause, thanks. > Parquet file stream sink create a hidden directory "_spark_metadata" cause > the DataFrame read from directory failed > --- > > Key: SPARK-18227 > URL: https://issues.apache.org/jira/browse/SPARK-18227 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.1 >Reporter: Lantao Jin > > When we set an out directory as a streaming sink with parquet format in > structured streaming, as the streaming job running, all output parquet files > will be written to this out directory. However, it also creates a hidden > directory called "_spark_metadata" in the out directory. If we load the > parquet files from the out directory by "load", it will throw > RuntimeException and task failed. > {code:java} > val stream = modifiedData.writeStream.format("parquet") > .option("checkpointLocation", "/path/ck/") > .start("/path/out/") > val df1 = spark.read.format("parquet").load("/path/out/*") > {code} > {panel} > 16/11/02 03:49:40 WARN TaskSetManager: Lost task 1.0 in stage 110.0 (TID > 3131, cupid044.stratus.phx.ebay.com): java.lang.Ru > ntimeException: hdfs:///path/out/_spark_metadata/0 is not a Parquet file (too > s > mall) > at > org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412) > at > org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385) > at > org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRec > ordReaderBase.java:107) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRec > ordReader.java:109) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor > mat.scala:367) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor > mat.scala:341) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Sour > ce) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > {panel} > That's because the ParquetFileReader reads the metadata file as a parquet > format. > I thought the smooth way to fix it is moving the metadata directory to > another path, but from the code DataSource.scala, it has less path > information except out directory path to store into. So maybe skipping hidden > files and paths could be a better way. But from the stack trace above, it > failed in initialize() in SpecificParquetRecordReaderBase. It means that > metadata files in hidden directory have been traversed in upper > invocation(FileScanRDD). But in there, no format info can be known to skip a > hidden directory(or over authority). > So, what is the best way to fix it? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18227) Parquet file stream sink create a hidden directory "_spark_metadata" cause the DataFrame read from directory failed
[ https://issues.apache.org/jira/browse/SPARK-18227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651778#comment-15651778 ] Michael Armbrust commented on SPARK-18227: -- The {{_spark_metadata}} directory holds the transaction log, which is used to ensure that the FileSink produces exactly once results, even in the case of failures. The issue here is that you are using globs (i.e. the {{*}}), which is causing Spark SQL to read the individual files, rather than loading the metadata. Remove the {{*}} and the problem should go away. > Parquet file stream sink create a hidden directory "_spark_metadata" cause > the DataFrame read from directory failed > --- > > Key: SPARK-18227 > URL: https://issues.apache.org/jira/browse/SPARK-18227 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.1 >Reporter: Lantao Jin > > When we set an out directory as a streaming sink with parquet format in > structured streaming, as the streaming job running, all output parquet files > will be written to this out directory. However, it also creates a hidden > directory called "_spark_metadata" in the out directory. If we load the > parquet files from the out directory by "load", it will throw > RuntimeException and task failed. > {code:java} > val stream = modifiedData.writeStream.format("parquet") > .option("checkpointLocation", "/path/ck/") > .start("/path/out/") > val df1 = spark.read.format("parquet").load("/path/out/*") > {code} > {panel} > 16/11/02 03:49:40 WARN TaskSetManager: Lost task 1.0 in stage 110.0 (TID > 3131, cupid044.stratus.phx.ebay.com): java.lang.Ru > ntimeException: hdfs:///path/out/_spark_metadata/0 is not a Parquet file (too > s > mall) > at > org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412) > at > org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385) > at > org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRec > ordReaderBase.java:107) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRec > ordReader.java:109) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor > mat.scala:367) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor > mat.scala:341) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Sour > ce) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > {panel} > That's because the ParquetFileReader reads the metadata file as a parquet > format. > I thought the smooth way to fix it is moving the metadata directory to > another path, but from the code DataSource.scala, it has less path > information except out directory path to store into. So maybe skipping hidden > files and paths could be a better way. But from the stack trace above, it > failed in initialize() in SpecificParquetRecordReaderBase. It means that > metadata files in hidden directory have been traversed in upper > invocation(FileScanRDD). But in there, no format info can be known to skip a > hidden directory(or over authority). > So, what is the best way to fix it? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18227) Parquet file stream sink create a hidden directory "_spark_metadata" cause the DataFrame read from directory failed
[ https://issues.apache.org/jira/browse/SPARK-18227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15631760#comment-15631760 ] Liwei Lin commented on SPARK-18227: --- Thanks for reporting this. It used to be a problem but had been fixed at least in master(please see https://github.com/apache/spark/blob/4ef39c2f4436fa22d0b957fe7ad477e4c4a16452/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L406-L413). > Parquet file stream sink create a hidden directory "_spark_metadata" cause > the DataFrame read from directory failed > --- > > Key: SPARK-18227 > URL: https://issues.apache.org/jira/browse/SPARK-18227 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.1 >Reporter: Lantao Jin > > When we set an out directory as a streaming sink with parquet format in > structured streaming, as the streaming job running, all output parquet files > will be written to this out directory. However, it also creates a hidden > directory called "_spark_metadata" in the out directory. If we load the > parquet files from the out directory by "load", it will throw > RuntimeException and task failed. > {code:java} > val stream = modifiedData.writeStream.format("parquet") > .option("checkpointLocation", "/path/ck/") > .start("/path/out/") > val df1 = spark.read.format("parquet").load("/path/out/*") > {code} > {panel} > 16/11/02 03:49:40 WARN TaskSetManager: Lost task 1.0 in stage 110.0 (TID > 3131, cupid044.stratus.phx.ebay.com): java.lang.Ru > ntimeException: hdfs:///path/out/_spark_metadata/0 is not a Parquet file (too > s > mall) > at > org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412) > at > org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385) > at > org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRec > ordReaderBase.java:107) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRec > ordReader.java:109) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor > mat.scala:367) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor > mat.scala:341) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Sour > ce) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > {panel} > That's because the ParquetFileReader reads the metadata file as a parquet > format. > I thought the smooth way to fix it is moving the metadata directory to > another path, but from the code DataSource.scala, it has less path > information except out directory path to store into. So maybe skipping hidden > files and paths could be a better way. But from the stack trace above, it > failed in initialize() in SpecificParquetRecordReaderBase. It means that > metadata files in hidden directory have been traversed in upper > invocation(FileScanRDD). But in there, no format info can be known to skip a > hidden directory(or over authority). > So, what is the best way to fix it? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org