Is your time usage correct?  It sounds like you want event time not
load/process time which is default unless you have a TimestampExtractor
defined somewhere upstream?  Otherwise I could see far fewer events coming
out as streams is just aggregating whatever showed up in that 10 second
window.

On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai> wrote:

> Disabling the cache with:
>
> ```
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> 0)
> ```
>
> Results in:
> - Emitting many more intermediate calculations.
> - Still losing data.
>
> In my test case it output 342476 intermediate calculations for 3414
> distinct windows, 14400 distinct were expected.
>
> Regards,
>   Caleb
>
> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > This seems to be related to internal KTable caches. You can disable them
> > by setting cache size to zero.
> >
> > http://docs.confluent.io/current/streams/developer-
> > guide.html#memory-management
> >
> > -Matthias
> >
> >
> >
> > On 6/14/17 4:08 PM, Caleb Welton wrote:
> > > Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the
> > > problem does not manifest, at `StreamsConfig.NUM_STREAM_
> > THREADS_CONFIG=2`
> > > or higher the problem shows up.
> > >
> > > When the number of threads is 1 the speed of data through the first
> part
> > of
> > > the topology (before the ktable) slows down considerably, but it seems
> to
> > > slow down to the speed of the output which may be the key.
> > >
> > > That said... Changing the number of stream threads should not impact
> data
> > > correctness.  Seems like a bug someplace in kafka.
> > >
> > >
> > >
> > > On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai>
> > wrote:
> > >
> > >> I have a topology of
> > >>     KStream -> KTable -> KStream
> > >>
> > >> ```
> > >>
> > >> final KStreamBuilder builder = new KStreamBuilder();
> > >> final KStream<String, Metric> metricStream =
> > builder.stream(ingestTopic);
> > >> final KTable<Windowed<String>, MyThing> myTable = metricStream
> > >>         .groupByKey(stringSerde, mySerde)
> > >>         .reduce(MyThing::merge,
> > >>                 TimeWindows.of(10000).advanceBy(10000).until(
> > Duration.ofDays(retentionDays).toMillis()),
> > >>                 tableTopic);
> > >>
> > >> myTable.toStream()
> > >>         .map((key, value) -> { return (KeyValue.pair(key.key(),
> > value.finalize(key.window()))); })
> > >>         .to(stringSerde, mySerde, sinkTopic);
> > >>
> > >> ```
> > >>
> > >>
> > >> Normally went sent data at 10x a second I expect ~1 output metric for
> > >> every 100 metrics it receives, based on the 10 second window width.
> > >>
> > >> When fed data real time at that rate it seems to do just that.
> > >>
> > >> However when I either reprocess on an input topic with a large amount
> of
> > >> data or feed data in significantly faster I see a very different
> > behavior.
> > >>
> > >> Over the course of 20 seconds I can see 1,440,000 messages being
> > ingested
> > >> into the ktable, but only 633 emitted from it (expected 14400).
> > >>
> > >> Over the next minute the records output creeps to 1796, but then holds
> > >> steady and does not keep going up to the expected total of 14400.
> > >>
> > >> A consumer reading from the sinkTopic ends up finding about 1264,
> which
> > is
> > >> lower than the 1796 records I would have anticipated from the number
> of
> > >> calls into the final kstream map function.
> > >>
> > >> Precise number of emitted records will vary from one run to the next.
> > >>
> > >> Where are the extra metrics going?  Is there some commit issue that is
> > >> causing dropped messages if the ktable producer isn't able to keep up?
> > >>
> > >> Any recommendations on where to focus the investigation of the issue?
> > >>
> > >> Running Kafka 0.10.2.1.
> > >>
> > >> Thanks,
> > >>   Caleb
> > >>
> > >
> >
> >
>

Reply via email to