Silvio Fiorito created SPARK-20065:
--------------------------------------

             Summary: Empty output files created for aggregation query in 
append mode
                 Key: SPARK-20065
                 URL: https://issues.apache.org/jira/browse/SPARK-20065
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.1.0
            Reporter: Silvio Fiorito


I've got a Kafka topic which I'm querying, running a windowed aggregation, with 
a 30 second watermark, 10 second trigger, writing out to Parquet with append 
output mode.

Every 10 second trigger generates a file, regardless of whether there was any 
data for that trigger, or whether any records were actually finalized by the 
watermark.

Is this expected behavior or should it not write out these empty files?

{code}
val df = spark.readStream.format("kafka")....

val query = df
  .withWatermark("timestamp", "30 seconds")
  .groupBy(window($"timestamp", "10 seconds"))
  .count()
  .select(date_format($"window.start", "HH:mm:ss").as("time"), $"count")

query
  .writeStream
  .format("parquet")
  .option("checkpointLocation", aggChk)
  .trigger(ProcessingTime("10 seconds"))
  .outputMode("append")
  .start(aggPath)
{code}

As the query executes, do a file listing on "aggPath" and you'll see 339 byte 
files at a minimum until we arrive at the first watermark and the initial batch 
is finalized. Even after that though, as there are empty batches it'll keep 
generating empty files every trigger.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to