I followed [1] to code a simple example to try suppress operator.
Here is the simple code:
final Serde<String> stringSerde = Serdes.String();
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("TextLinesTopic", Consumed.with(Serdes.String(),
Serdes.String()))
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word,
Grouped.keySerde(stringSerde).withValueSerde(stringSerde))
.windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach(
(key, value) -> {
System.out.printf("key: %s, value: %d\n", key, value);
});
I set commit.interval.ms to 1 and cache.max.bytes.buffering to 0. If I
send one text line "hello", nothing will be printed even I wait for
more than 3 seconds (the window size). Since the time longer than the
window size has elapsed, I think that key and value should be printed.
But if I send another text line "hello", key and value will be
printed.
Can anyone explain this behavior? I have browsed the Kafka
documentation. But I can't find an explanation.
[1]
http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results
--
Jingguo