[ 
https://issues.apache.org/jira/browse/SPARK-26167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26167.
----------------------------------
    Resolution: Cannot Reproduce

> No output created for aggregation query in append mode
> ------------------------------------------------------
>
>                 Key: SPARK-26167
>                 URL: https://issues.apache.org/jira/browse/SPARK-26167
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.2
>            Reporter: dejan miljkovic
>            Priority: Major
>
> For aggregation query in append mode not all outputs are produced for inputs 
> with expired watermark. I have data in kafka that need to be reprocessed and 
> results stored in S3. S3 works only with append mode. Problem is that only 
> part of the data is written to S3. Code below illustrates the my approach.
> String windowDuration = "24 hours"; 
> String slideDuration = "15 minutes"; 
> Dataset<Row> sliding24h = rowData 
> {{.withWatermark(eventTimeCol, slideDuration) 
> .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration), 
> col(nameCol))}}
> count(); 
>  
> sliding24h .writeStream() 
> .format("console") 
> .option("truncate", false) 
> .option("numRows", 1000) 
> {{.outputMode(OutputMode.Append()) }}
> .start() 
> {{.awaitTermination();}}
>  
> Below is the example that shows the behavior. Code produces only empty Batch 
> 0 in Append mode. Data is aggregated in 24 hour windows with 15 minute slide. 
> Input data covers 84 hours. I think that code should produce all aggregated 
> results expect for the last 15 minute interval.
>  
> {color:#000080}public static void {color}main(String [] args) 
> {color:#000080}throws {color}StreamingQueryException {
>  SparkSession spark = 
> SparkSession.builder().master({color:#008000}"local[*]"{color}).getOrCreate();
> ArrayList<String> rl = {color:#000080}new {color}ArrayList<>();
>  {color:#000080}for {color}({color:#000080}int {color}i = 
> {color:#0000ff}0{color}; i < {color:#0000ff}1000{color}; ++i) {
>  {color:#000080}long {color}t = {color:#0000ff}1512164314L {color}+ i * 
> {color:#0000ff}5 {color}* {color:#0000ff}60{color};
>  rl.add(t + {color:#008000}",qwer"{color});
>  }
> String nameCol = {color:#008000}"name"{color};
>  String eventTimeCol = {color:#008000}"eventTime"{color};
>  String eventTimestampCol = {color:#008000}"eventTimestamp"{color};
> MemoryStream<String> input = {color:#000080}new 
> {color}MemoryStream<>({color:#0000ff}42{color}, spark.sqlContext(), 
> Encoders.STRING());
>  input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
>  Dataset<Row> stream = input.toDF().selectExpr(
>  {color:#008000}"cast(split(value,'[,]')[0] as long) as " {color}+ 
> eventTimestampCol,
>  {color:#008000}"cast(split(value,'[,]')[1] as String) as " {color}+ nameCol);
> System.{color:#660e7a}out{color}.println({color:#008000}"isStreaming: " 
> {color}+ stream.isStreaming());
> Column eventTime = functions.to_timestamp(col(eventTimestampCol));
>  Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);
> String windowDuration = {color:#008000}"24 hours"{color};
>  String slideDuration = {color:#008000}"15 minutes"{color};
>  Dataset<Row> sliding24h = rowData
>  .withWatermark(eventTimeCol, slideDuration)
>  .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
>  col(nameCol)).count();
> sliding24h
>  .writeStream()
>  .format({color:#008000}"console"{color})
>  .option({color:#008000}"truncate"{color}, {color:#000080}false{color})
>  .option({color:#008000}"numRows"{color}, {color:#0000ff}1000{color})
>  .outputMode(OutputMode.Append())
>  {color:#808080}//.outputMode(OutputMode.Complete()){color} .start()
>  .awaitTermination();
>  }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to