Ok. That makes sense. Question: why do you use .aggregate() instead of .count() ?
Also, can you share the code of you AggregatorFunction()? Did you change any default setting of StreamsConfig? I have still no idea what could go wrong. Maybe you can run with log level TRACE? Maybe we can get some insight from those. -Matthias On 4/27/17 11:41 PM, Mahendra Kariya wrote: > Oh good point! > > The reason why there is only one row corresponding to each time window is > because it only contains the latest value for the time window. So what we > did was we just dumped the data present in the sink topic to a db using an > upsert query. The primary key of the table was time window. The file that I > attached is actually the data present in the DB. And we know that there is > no bug in our db dump code because we have been using it for a long time in > production without any issues. > > The reason the count is zero for some time windows is because I subtracted > a random number the actual values and rounded it off to zero; for privacy > reason. The actual data doesn't have any zero values. I should have > mentioned this earlier. My bad! > > The stream topology code looks something like this. > > stream > .filter() > .map((key, value) -> new KeyValue<>(transform(key), value) > .groupByKey() > .aggregate(HashSet::new, AggregatorFunction(), > TimeWindows.of(60000).until(3600000)) > .mapValues(HashSet::size) > .toStream() > .map((key, value) -> convertToProtobufObject(key, value)) > .to() > > > > > > > On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Thanks for the details (sorry that I forgot that you did share the >> output already). >> >> Might be a dumb question, but what is the count for missing windows in >> your seconds implementation? >> >> If there is no data for a window, it should not emit a window with count >> zero, but nothing. >> >> Thus, looking at your output, I am wondering how it could contain line >> like: >> >>> 2017-04-27T04:53:00 0 >> >> I am also wondering why your output only contains a single value per >> window. As Streams outputs multiple updates per window while the count >> is increasing, you should actually see multiple records per window. >> >> Your code is like this: >> >> stream.filter().groupByKey().count(TimeWindow.of(60000)).to(); >> >> Or do you have something more complex? >> >> >> -Matthias >> >> >> On 4/27/17 9:16 PM, Mahendra Kariya wrote: >>>> Can you somehow verify your output? >>> >>> >>> Do you mean the Kafka streams output? In the Kafka Streams output, we do >>> see some missing values. I have attached the Kafka Streams output (for a >>> few hours) in the very first email of this thread for reference. >>> >>> Let me also summarise what we have done so far. >>> >>> We took a dump of the raw data present in the source topic. We wrote a >>> script to read this data and do the exact same aggregations that we do >>> using Kafka Streams. And then we compared the output from Kafka Streams >> and >>> our script. >>> >>> The difference that we observed in the two outputs is that there were a >> few >>> rows (corresponding to some time windows) missing in the Streams output. >>> For the time windows for which the data was present, the aggregated >> numbers >>> matched exactly. >>> >>> This means, either all the records for a particular time window are being >>> skipped, or none. Now this is highly unlikely to happen. Maybe there is a >>> bug somewhere in the rocksdb state stores? Just a speculation, not sure >>> though. And there could even be a bug in the reported metric. >>> >> >> >
signature.asc
Description: OpenPGP digital signature