Is your time usage correct? It sounds like you want event time not load/process time which is default unless you have a TimestampExtractor defined somewhere upstream? Otherwise I could see far fewer events coming out as streams is just aggregating whatever showed up in that 10 second window.
On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai> wrote: > Disabling the cache with: > > ``` > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0) > ``` > > Results in: > - Emitting many more intermediate calculations. > - Still losing data. > > In my test case it output 342476 intermediate calculations for 3414 > distinct windows, 14400 distinct were expected. > > Regards, > Caleb > > On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > This seems to be related to internal KTable caches. You can disable them > > by setting cache size to zero. > > > > http://docs.confluent.io/current/streams/developer- > > guide.html#memory-management > > > > -Matthias > > > > > > > > On 6/14/17 4:08 PM, Caleb Welton wrote: > > > Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the > > > problem does not manifest, at `StreamsConfig.NUM_STREAM_ > > THREADS_CONFIG=2` > > > or higher the problem shows up. > > > > > > When the number of threads is 1 the speed of data through the first > part > > of > > > the topology (before the ktable) slows down considerably, but it seems > to > > > slow down to the speed of the output which may be the key. > > > > > > That said... Changing the number of stream threads should not impact > data > > > correctness. Seems like a bug someplace in kafka. > > > > > > > > > > > > On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai> > > wrote: > > > > > >> I have a topology of > > >> KStream -> KTable -> KStream > > >> > > >> ``` > > >> > > >> final KStreamBuilder builder = new KStreamBuilder(); > > >> final KStream<String, Metric> metricStream = > > builder.stream(ingestTopic); > > >> final KTable<Windowed<String>, MyThing> myTable = metricStream > > >> .groupByKey(stringSerde, mySerde) > > >> .reduce(MyThing::merge, > > >> TimeWindows.of(10000).advanceBy(10000).until( > > Duration.ofDays(retentionDays).toMillis()), > > >> tableTopic); > > >> > > >> myTable.toStream() > > >> .map((key, value) -> { return (KeyValue.pair(key.key(), > > value.finalize(key.window()))); }) > > >> .to(stringSerde, mySerde, sinkTopic); > > >> > > >> ``` > > >> > > >> > > >> Normally went sent data at 10x a second I expect ~1 output metric for > > >> every 100 metrics it receives, based on the 10 second window width. > > >> > > >> When fed data real time at that rate it seems to do just that. > > >> > > >> However when I either reprocess on an input topic with a large amount > of > > >> data or feed data in significantly faster I see a very different > > behavior. > > >> > > >> Over the course of 20 seconds I can see 1,440,000 messages being > > ingested > > >> into the ktable, but only 633 emitted from it (expected 14400). > > >> > > >> Over the next minute the records output creeps to 1796, but then holds > > >> steady and does not keep going up to the expected total of 14400. > > >> > > >> A consumer reading from the sinkTopic ends up finding about 1264, > which > > is > > >> lower than the 1796 records I would have anticipated from the number > of > > >> calls into the final kstream map function. > > >> > > >> Precise number of emitted records will vary from one run to the next. > > >> > > >> Where are the extra metrics going? Is there some commit issue that is > > >> causing dropped messages if the ktable producer isn't able to keep up? > > >> > > >> Any recommendations on where to focus the investigation of the issue? > > >> > > >> Running Kafka 0.10.2.1. > > >> > > >> Thanks, > > >> Caleb > > >> > > > > > > > >