[ https://issues.apache.org/jira/browse/KAFKA-13290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax resolved KAFKA-13290. ------------------------------------- Resolution: Not A Problem > My timeWindows last aggregated message never emit until a new message coming > ----------------------------------------------------------------------------- > > Key: KAFKA-13290 > URL: https://issues.apache.org/jira/browse/KAFKA-13290 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.6.2 > Environment: Development > Reporter: Steve Zhou > Priority: Major > > I have a Kafka stream event processing code which aggregates 1 minutes data. > It works as expected if data comes continuously, > If we stop producer, then i found the last aggregated message does not emit > until new message coming. > > Following is my sample code, @Bean > public KStream<String, AggregateMetricsFields> kStream(StreamsBuilder > streamBuilder) { > KStream<String, AggregateMetricsFields> aggregatedData = streamBuilder > .stream(dataTopic, dataConsumed) > .groupByKey(Grouped.with( > stringSerde, > aggregateValueSerde)) > > .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L))) > .aggregate(this::initialize, this::aggregateFields, > materializedAsWindowStore(windowedStoreName, > stringSerde, > AggregateMetricsFieldsSerde)) > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()) > .withName(windowedSuppressNodeName)) > .toStream().map((key, aggregateMetrics) -> > { return KeyValue.pair(key.key(), aggregateMetrics); } > ); > aggregatedData.to(aggregatedDataTopic, aggregateDataProduced); > return aggregatedFlowData; > } > -- This message was sent by Atlassian Jira (v8.3.4#803005)