[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/17395#discussion_r107822893 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -292,7 +292,10 @@ object FileFormatWriter extends Logging { override def execute(iter: Iterator[InternalRow]): Set[String] = { var fileCounter = 0 var recordsInFile: Long = 0L - newOutputWriter(fileCounter) + // Skip the empty partition to avoid creating a mass of 'empty' files. + if (iter.hasNext) { +newOutputWriter(fileCounter) --- End diff -- Yes, I was thinking in that way. I remember I tried several tries at that time but failed to make a good fix, and could not have some time to work on that further. Another problem is, it might be a datasource-specific issue because, for example, ORC does not write out empty df. For example, ``` scala> spark.range(100).filter("id > 100").write.orc("/tmp/abc1") scala> spark.read.orc("/tmp/abc1").show() org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. It must be specified manually.; at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:182) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:182) ``` This issue is described in https://issues.apache.org/jira/browse/SPARK-15474. FWIW, I happened to see https://issues.apache.org/jira/browse/SPARK-15693 around that time and I kind of felt we may be able to consolidate this issue with it although it is a rough idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/17395#discussion_r107821138 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -292,7 +292,10 @@ object FileFormatWriter extends Logging { override def execute(iter: Iterator[InternalRow]): Set[String] = { var fileCounter = 0 var recordsInFile: Long = 0L - newOutputWriter(fileCounter) + // Skip the empty partition to avoid creating a mass of 'empty' files. + if (iter.hasNext) { +newOutputWriter(fileCounter) --- End diff -- Thanks for your prompt. How about just left one empty file containing the metadata when df has empty partition? Furthmore, we may just left one metadata file? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/17395#discussion_r107652020 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -292,7 +292,10 @@ object FileFormatWriter extends Logging { override def execute(iter: Iterator[InternalRow]): Set[String] = { var fileCounter = 0 var recordsInFile: Long = 0L - newOutputWriter(fileCounter) + // Skip the empty partition to avoid creating a mass of 'empty' files. + if (iter.hasNext) { +newOutputWriter(fileCounter) --- End diff -- See https://github.com/apache/spark/pull/12855, https://issues.apache.org/jira/browse/SPARK-10216 and https://issues.apache.org/jira/browse/SPARK-15393 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/17395#discussion_r107650990 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -292,7 +292,10 @@ object FileFormatWriter extends Logging { override def execute(iter: Iterator[InternalRow]): Set[String] = { var fileCounter = 0 var recordsInFile: Long = 0L - newOutputWriter(fileCounter) + // Skip the empty partition to avoid creating a mass of 'empty' files. + if (iter.hasNext) { +newOutputWriter(fileCounter) --- End diff -- Reading empty data should be fine too. I should preserve the schema. I am pretty sure that we want this case because mine was reverted due to the case above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/17395#discussion_r107650070 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -292,7 +292,10 @@ object FileFormatWriter extends Logging { override def execute(iter: Iterator[InternalRow]): Set[String] = { var fileCounter = 0 var recordsInFile: Long = 0L - newOutputWriter(fileCounter) + // Skip the empty partition to avoid creating a mass of 'empty' files. + if (iter.hasNext) { +newOutputWriter(fileCounter) --- End diff -- @HyukjinKwon IIUC, this case should fail as expected, as there is no output. Am i missing something? ``` spark.range(100).filter("id > 100").write.parquet("/tmp/abc") spark.read.parquet("/tmp/abc").show() ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/17395#discussion_r107637615 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -292,7 +292,10 @@ object FileFormatWriter extends Logging { override def execute(iter: Iterator[InternalRow]): Set[String] = { var fileCounter = 0 var recordsInFile: Long = 0L - newOutputWriter(fileCounter) + // Skip the empty partition to avoid creating a mass of 'empty' files. + if (iter.hasNext) { +newOutputWriter(fileCounter) --- End diff -- Let me see how to cover this case --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/17395#discussion_r107611325 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -292,7 +292,10 @@ object FileFormatWriter extends Logging { override def execute(iter: Iterator[InternalRow]): Set[String] = { var fileCounter = 0 var recordsInFile: Long = 0L - newOutputWriter(fileCounter) + // Skip the empty partition to avoid creating a mass of 'empty' files. + if (iter.hasNext) { +newOutputWriter(fileCounter) --- End diff -- I proposed the similar PR before but got reverted. In this case, Parquet would not write out the footer and schema information. Namely, this will break the case below: ```scala spark.range(100).filter("id > 100").write.parquet("/tmp/abc") spark.read.parquet("/tmp/abc").show() ``` Up to my knowledge, we don't have test cases for them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17395: [SPARK-20065][SS] Avoid to output empty parquet f...
GitHub user uncleGen opened a pull request: https://github.com/apache/spark/pull/17395 [SPARK-20065][SS] Avoid to output empty parquet files ## Problem Description Reported by Silvio Fiorito I've got a Kafka topic which I'm querying, running a windowed aggregation, with a 30 second watermark, 10 second trigger, writing out to Parquet with append output mode. Every 10 second trigger generates a file, regardless of whether there was any data for that trigger, or whether any records were actually finalized by the watermark. Is this expected behavior or should it not write out these empty files? ``` val df = spark.readStream.format("kafka") val query = df .withWatermark("timestamp", "30 seconds") .groupBy(window($"timestamp", "10 seconds")) .count() .select(date_format($"window.start", "HH:mm:ss").as("time"), $"count") query .writeStream .format("parquet") .option("checkpointLocation", aggChk) .trigger(ProcessingTime("10 seconds")) .outputMode("append") .start(aggPath) ``` As the query executes, do a file listing on "aggPath" and you'll see 339 byte files at a minimum until we arrive at the first watermark and the initial batch is finalized. Even after that though, as there are empty batches it'll keep generating empty files every trigger. ## What changes were proposed in this pull request? Check the partition is empty or not, and skip empty partition to avoid output empty file. ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/uncleGen/spark SPARK-20065 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17395.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17395 commit 86a7d2fa96e3134c1e64864eba81a3bebdedceea Author: uncleGen Date: 2017-03-23T08:10:31Z avoid to output empty parquet files --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org