No, my application id doesn't change

пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pklei...@gmail.com>:

> Hi
> Does your applicationId change?
> Best regards
> Patrik
> > Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <afgmeis...@gmail.com>:
> >
> > Hi everyone! I use kafka-streams, and i have a problem when i use
> > windowedBy. Everything works well until I restart the application. After
> > restarting my aggregation starts from beginning.
> > Code bellow:
> >>
> >>    StreamsBuilder builder = new StreamsBuilder()
> >>    KStream stream = builder.stream(topic,
> Consumed.with(Serdes.String(), Serdes.String()))
> >>
> >>    KTable table =
> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
> >>            .aggregate(
> >>            { new AggregatorModel() },
> >>            { key, value, aggregate ->
> >>                return aggregate.add(value)
> >>            }
> >>    )
> >>            .toStream()
> >>            .map({ k, v ->
> >>        new KeyValue<>(k.window().end(), v)
> >>    })
> >>            .to('output')
> >>
> >>    def config = new Properties()
> >>    config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
> >>    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
> >>    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> TimeUnit.SECONDS.toMillis(60))
> >>
> >>    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
> >>    kafkaStreams.start()
> >>
> >>
> > I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
> > 'latest' and 'earliest' but it didn't help.
> > Can you help me understand what I'm doing wrong?
> > Thank you.

Reply via email to