Ok. That makes sense.

Question: why do you use .aggregate() instead of .count() ?

Also, can you share the code of you AggregatorFunction()? Did you change
any default setting of StreamsConfig?

I have still no idea what could go wrong. Maybe you can run with log
level TRACE? Maybe we can get some insight from those.


-Matthias

On 4/27/17 11:41 PM, Mahendra Kariya wrote:
> Oh good point!
> 
> The reason why there is only one row corresponding to each time window is
> because it only contains the latest value for the time window. So what we
> did was we just dumped the data present in the sink topic to a db using an
> upsert query. The primary key of the table was time window. The file that I
> attached is actually the data present in the DB. And we know that there is
> no bug in our db dump code because we have been using it for a long time in
> production without any issues.
> 
> The reason the count is zero for some time windows is because I subtracted
> a random number the actual values and rounded it off to zero; for privacy
> reason. The actual data doesn't have any zero values. I should have
> mentioned this earlier. My bad!
> 
> The stream topology code looks something like this.
> 
> stream
>     .filter()
>     .map((key, value) -> new KeyValue<>(transform(key), value)
>     .groupByKey()
>     .aggregate(HashSet::new, AggregatorFunction(),
> TimeWindows.of(60000).until(3600000))
>     .mapValues(HashSet::size)
>     .toStream()
>     .map((key, value) -> convertToProtobufObject(key, value))
>     .to()
> 
> 
> 
> 
> 
> 
> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Thanks for the details (sorry that I forgot that you did share the
>> output already).
>>
>> Might be a dumb question, but what is the count for missing windows in
>> your seconds implementation?
>>
>> If there is no data for a window, it should not emit a window with count
>> zero, but nothing.
>>
>> Thus, looking at your output, I am wondering how it could contain line
>> like:
>>
>>> 2017-04-27T04:53:00 0
>>
>> I am also wondering why your output only contains a single value per
>> window. As Streams outputs multiple updates per window while the count
>> is increasing, you should actually see multiple records per window.
>>
>> Your code is like this:
>>
>> stream.filter().groupByKey().count(TimeWindow.of(60000)).to();
>>
>> Or do you have something more complex?
>>
>>
>> -Matthias
>>
>>
>> On 4/27/17 9:16 PM, Mahendra Kariya wrote:
>>>> Can you somehow verify your output?
>>>
>>>
>>> Do you mean the Kafka streams output? In the Kafka Streams output, we do
>>> see some missing values. I have attached the Kafka Streams output (for a
>>> few hours) in the very first email of this thread for reference.
>>>
>>> Let me also summarise what we have done so far.
>>>
>>> We took a dump of the raw data present in the source topic. We wrote a
>>> script to read this data and do the exact same aggregations that we do
>>> using Kafka Streams. And then we compared the output from Kafka Streams
>> and
>>> our script.
>>>
>>> The difference that we observed in the two outputs is that there were a
>> few
>>> rows (corresponding to some time windows) missing in the Streams output.
>>> For the time windows for which the data was present, the aggregated
>> numbers
>>> matched exactly.
>>>
>>> This means, either all the records for a particular time window are being
>>> skipped, or none. Now this is highly unlikely to happen. Maybe there is a
>>> bug somewhere in the rocksdb state stores? Just a speculation, not sure
>>> though. And there could even be a bug in the reported metric.
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to