[ https://issues.apache.org/jira/browse/SPARK-26167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-26167. ---------------------------------- Resolution: Cannot Reproduce > No output created for aggregation query in append mode > ------------------------------------------------------ > > Key: SPARK-26167 > URL: https://issues.apache.org/jira/browse/SPARK-26167 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.2 > Reporter: dejan miljkovic > Priority: Major > > For aggregation query in append mode not all outputs are produced for inputs > with expired watermark. I have data in kafka that need to be reprocessed and > results stored in S3. S3 works only with append mode. Problem is that only > part of the data is written to S3. Code below illustrates the my approach. > String windowDuration = "24 hours"; > String slideDuration = "15 minutes"; > Dataset<Row> sliding24h = rowData > {{.withWatermark(eventTimeCol, slideDuration) > .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration), > col(nameCol))}} > count(); > > sliding24h .writeStream() > .format("console") > .option("truncate", false) > .option("numRows", 1000) > {{.outputMode(OutputMode.Append()) }} > .start() > {{.awaitTermination();}} > > Below is the example that shows the behavior. Code produces only empty Batch > 0 in Append mode. Data is aggregated in 24 hour windows with 15 minute slide. > Input data covers 84 hours. I think that code should produce all aggregated > results expect for the last 15 minute interval. > > {color:#000080}public static void {color}main(String [] args) > {color:#000080}throws {color}StreamingQueryException { > SparkSession spark = > SparkSession.builder().master({color:#008000}"local[*]"{color}).getOrCreate(); > ArrayList<String> rl = {color:#000080}new {color}ArrayList<>(); > {color:#000080}for {color}({color:#000080}int {color}i = > {color:#0000ff}0{color}; i < {color:#0000ff}1000{color}; ++i) { > {color:#000080}long {color}t = {color:#0000ff}1512164314L {color}+ i * > {color:#0000ff}5 {color}* {color:#0000ff}60{color}; > rl.add(t + {color:#008000}",qwer"{color}); > } > String nameCol = {color:#008000}"name"{color}; > String eventTimeCol = {color:#008000}"eventTime"{color}; > String eventTimestampCol = {color:#008000}"eventTimestamp"{color}; > MemoryStream<String> input = {color:#000080}new > {color}MemoryStream<>({color:#0000ff}42{color}, spark.sqlContext(), > Encoders.STRING()); > input.addData(JavaConversions.asScalaBuffer(rl).toSeq()); > Dataset<Row> stream = input.toDF().selectExpr( > {color:#008000}"cast(split(value,'[,]')[0] as long) as " {color}+ > eventTimestampCol, > {color:#008000}"cast(split(value,'[,]')[1] as String) as " {color}+ nameCol); > System.{color:#660e7a}out{color}.println({color:#008000}"isStreaming: " > {color}+ stream.isStreaming()); > Column eventTime = functions.to_timestamp(col(eventTimestampCol)); > Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime); > String windowDuration = {color:#008000}"24 hours"{color}; > String slideDuration = {color:#008000}"15 minutes"{color}; > Dataset<Row> sliding24h = rowData > .withWatermark(eventTimeCol, slideDuration) > .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration), > col(nameCol)).count(); > sliding24h > .writeStream() > .format({color:#008000}"console"{color}) > .option({color:#008000}"truncate"{color}, {color:#000080}false{color}) > .option({color:#008000}"numRows"{color}, {color:#0000ff}1000{color}) > .outputMode(OutputMode.Append()) > {color:#808080}//.outputMode(OutputMode.Complete()){color} .start() > .awaitTermination(); > } -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org