I would recommend to double check the following:

 - can you confirm that the filter does not remove all data for those
time periods?
 - I would also check input for your AggregatorFunction() -- does it
receive everything?
 - same for .mapValues()

This would help to understand in what part of the program the data gets
lost.


-Matthias


On 5/2/17 11:09 PM, Mahendra Kariya wrote:
> Hi Garrett,
> 
> Thanks for these insights. But we are not consuming old data. We want the
> Streams app to run in near real time. And that is how it is actually
> running. The lag never increases beyond a certain limit. So I don't think
> that's an issue.
> 
> The values of the configs that you are mentioning are whatever Kafka offers
> by default. So I guess that should be fine.
> 
> 
> 
> On Tue, May 2, 2017 at 7:52 PM, Garrett Barton <garrett.bar...@gmail.com>
> wrote:
> 
>> Mahendra,
>>
>>  One possible thing I have seen that exhibits the same behavior of missing
>> windows of data is the configuration of the topics (internal and your own)
>> retention policies.  I was loading data that was fairly old (weeks) and
>> using event time semantics as the record timestamp (custom timestamp
>> extractor) and the cleanup stuff was deleting segments nearly right after
>> they were written.  In my case default cleanup run was every 5 minutes, and
>> the default retention was 7 days, so every 5 minutes I lost data.  In my
>> logs I saw a ton of warnings about 'offset not found' and kafka skipping
>> ahead to whatever the next available offset was.  End result was gaps all
>> over my data.  I don't have a good fix yet, I set the retention to
>> something massive which I think is getting me other problems.
>>
>> Maybe that helps?
>>
>> On Tue, May 2, 2017 at 6:27 AM, Mahendra Kariya <
>> mahendra.kar...@go-jek.com>
>> wrote:
>>
>>> Hi Matthias,
>>>
>>> What we did was read the data from sink topic and print it to console.
>> And
>>> here's the raw data from that topic (the counts are randomized). As we
>> can
>>> see, the data is certainly missing for some time windows. For instance,
>>> after 1493693760, the next timestamp for which the data is present
>>> is 1493694300. That's around 9 minutes of data missing.
>>>
>>> And this is just one instance. There are a lot of such instances in this
>>> file.
>>>
>>>
>>>
>>> On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya <
>>> mahendra.kar...@go-jek.com> wrote:
>>>
>>>> Thanks for the update Matthias! And sorry for the delayed response.
>>>>
>>>> The reason we use .aggregate() is because we want to count the number of
>>>> unique values for a particular field in the message. So, we just add
>> that
>>>> particular field's value in the HashSet and then take the size of the
>>>> HashSet.
>>>>
>>>> On our side, we are also investigating and it looks like there might be
>> a
>>>> bug somewhere in our codebase. If that's the case, then it's quite
>> possible
>>>> that there is no bug in Kafka Streams, except the metric one.
>>>>
>>>> We will revert after confirming.
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Apr 30, 2017 at 10:39 AM, Matthias J. Sax <
>> matth...@confluent.io>
>>>> wrote:
>>>>
>>>>> Just a follow up (we identified a bug in the "skipped records" metric).
>>>>> The reported value is not correct.
>>>>>
>>>>>
>>>>> On 4/28/17 9:12 PM, Matthias J. Sax wrote:
>>>>>> Ok. That makes sense.
>>>>>>
>>>>>> Question: why do you use .aggregate() instead of .count() ?
>>>>>>
>>>>>> Also, can you share the code of you AggregatorFunction()? Did you
>>>>> change
>>>>>> any default setting of StreamsConfig?
>>>>>>
>>>>>> I have still no idea what could go wrong. Maybe you can run with log
>>>>>> level TRACE? Maybe we can get some insight from those.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 4/27/17 11:41 PM, Mahendra Kariya wrote:
>>>>>>> Oh good point!
>>>>>>>
>>>>>>> The reason why there is only one row corresponding to each time
>>>>> window is
>>>>>>> because it only contains the latest value for the time window. So
>>>>> what we
>>>>>>> did was we just dumped the data present in the sink topic to a db
>>>>> using an
>>>>>>> upsert query. The primary key of the table was time window. The file
>>>>> that I
>>>>>>> attached is actually the data present in the DB. And we know that
>>>>> there is
>>>>>>> no bug in our db dump code because we have been using it for a long
>>>>> time in
>>>>>>> production without any issues.
>>>>>>>
>>>>>>> The reason the count is zero for some time windows is because I
>>>>> subtracted
>>>>>>> a random number the actual values and rounded it off to zero; for
>>>>> privacy
>>>>>>> reason. The actual data doesn't have any zero values. I should have
>>>>>>> mentioned this earlier. My bad!
>>>>>>>
>>>>>>> The stream topology code looks something like this.
>>>>>>>
>>>>>>> stream
>>>>>>>     .filter()
>>>>>>>     .map((key, value) -> new KeyValue<>(transform(key), value)
>>>>>>>     .groupByKey()
>>>>>>>     .aggregate(HashSet::new, AggregatorFunction(),
>>>>>>> TimeWindows.of(60000).until(3600000))
>>>>>>>     .mapValues(HashSet::size)
>>>>>>>     .toStream()
>>>>>>>     .map((key, value) -> convertToProtobufObject(key, value))
>>>>>>>     .to()
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <
>>>>> matth...@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the details (sorry that I forgot that you did share the
>>>>>>>> output already).
>>>>>>>>
>>>>>>>> Might be a dumb question, but what is the count for missing windows
>>>>> in
>>>>>>>> your seconds implementation?
>>>>>>>>
>>>>>>>> If there is no data for a window, it should not emit a window with
>>>>> count
>>>>>>>> zero, but nothing.
>>>>>>>>
>>>>>>>> Thus, looking at your output, I am wondering how it could contain
>>>>> line
>>>>>>>> like:
>>>>>>>>
>>>>>>>>> 2017-04-27T04:53:00 0
>>>>>>>>
>>>>>>>> I am also wondering why your output only contains a single value
>> per
>>>>>>>> window. As Streams outputs multiple updates per window while the
>>>>> count
>>>>>>>> is increasing, you should actually see multiple records per window.
>>>>>>>>
>>>>>>>> Your code is like this:
>>>>>>>>
>>>>>>>> stream.filter().groupByKey().count(TimeWindow.of(60000)).to();
>>>>>>>>
>>>>>>>> Or do you have something more complex?
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 4/27/17 9:16 PM, Mahendra Kariya wrote:
>>>>>>>>>> Can you somehow verify your output?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Do you mean the Kafka streams output? In the Kafka Streams output,
>>>>> we do
>>>>>>>>> see some missing values. I have attached the Kafka Streams output
>>>>> (for a
>>>>>>>>> few hours) in the very first email of this thread for reference.
>>>>>>>>>
>>>>>>>>> Let me also summarise what we have done so far.
>>>>>>>>>
>>>>>>>>> We took a dump of the raw data present in the source topic. We
>>>>> wrote a
>>>>>>>>> script to read this data and do the exact same aggregations that
>> we
>>>>> do
>>>>>>>>> using Kafka Streams. And then we compared the output from Kafka
>>>>> Streams
>>>>>>>> and
>>>>>>>>> our script.
>>>>>>>>>
>>>>>>>>> The difference that we observed in the two outputs is that there
>>>>> were a
>>>>>>>> few
>>>>>>>>> rows (corresponding to some time windows) missing in the Streams
>>>>> output.
>>>>>>>>> For the time windows for which the data was present, the
>> aggregated
>>>>>>>> numbers
>>>>>>>>> matched exactly.
>>>>>>>>>
>>>>>>>>> This means, either all the records for a particular time window
>> are
>>>>> being
>>>>>>>>> skipped, or none. Now this is highly unlikely to happen. Maybe
>>>>> there is a
>>>>>>>>> bug somewhere in the rocksdb state stores? Just a speculation, not
>>>>> sure
>>>>>>>>> though. And there could even be a bug in the reported metric.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to