I would recommend to double check the following: - can you confirm that the filter does not remove all data for those time periods? - I would also check input for your AggregatorFunction() -- does it receive everything? - same for .mapValues()
This would help to understand in what part of the program the data gets lost. -Matthias On 5/2/17 11:09 PM, Mahendra Kariya wrote: > Hi Garrett, > > Thanks for these insights. But we are not consuming old data. We want the > Streams app to run in near real time. And that is how it is actually > running. The lag never increases beyond a certain limit. So I don't think > that's an issue. > > The values of the configs that you are mentioning are whatever Kafka offers > by default. So I guess that should be fine. > > > > On Tue, May 2, 2017 at 7:52 PM, Garrett Barton <garrett.bar...@gmail.com> > wrote: > >> Mahendra, >> >> One possible thing I have seen that exhibits the same behavior of missing >> windows of data is the configuration of the topics (internal and your own) >> retention policies. I was loading data that was fairly old (weeks) and >> using event time semantics as the record timestamp (custom timestamp >> extractor) and the cleanup stuff was deleting segments nearly right after >> they were written. In my case default cleanup run was every 5 minutes, and >> the default retention was 7 days, so every 5 minutes I lost data. In my >> logs I saw a ton of warnings about 'offset not found' and kafka skipping >> ahead to whatever the next available offset was. End result was gaps all >> over my data. I don't have a good fix yet, I set the retention to >> something massive which I think is getting me other problems. >> >> Maybe that helps? >> >> On Tue, May 2, 2017 at 6:27 AM, Mahendra Kariya < >> mahendra.kar...@go-jek.com> >> wrote: >> >>> Hi Matthias, >>> >>> What we did was read the data from sink topic and print it to console. >> And >>> here's the raw data from that topic (the counts are randomized). As we >> can >>> see, the data is certainly missing for some time windows. For instance, >>> after 1493693760, the next timestamp for which the data is present >>> is 1493694300. That's around 9 minutes of data missing. >>> >>> And this is just one instance. There are a lot of such instances in this >>> file. >>> >>> >>> >>> On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya < >>> mahendra.kar...@go-jek.com> wrote: >>> >>>> Thanks for the update Matthias! And sorry for the delayed response. >>>> >>>> The reason we use .aggregate() is because we want to count the number of >>>> unique values for a particular field in the message. So, we just add >> that >>>> particular field's value in the HashSet and then take the size of the >>>> HashSet. >>>> >>>> On our side, we are also investigating and it looks like there might be >> a >>>> bug somewhere in our codebase. If that's the case, then it's quite >> possible >>>> that there is no bug in Kafka Streams, except the metric one. >>>> >>>> We will revert after confirming. >>>> >>>> >>>> >>>> >>>> On Sun, Apr 30, 2017 at 10:39 AM, Matthias J. Sax < >> matth...@confluent.io> >>>> wrote: >>>> >>>>> Just a follow up (we identified a bug in the "skipped records" metric). >>>>> The reported value is not correct. >>>>> >>>>> >>>>> On 4/28/17 9:12 PM, Matthias J. Sax wrote: >>>>>> Ok. That makes sense. >>>>>> >>>>>> Question: why do you use .aggregate() instead of .count() ? >>>>>> >>>>>> Also, can you share the code of you AggregatorFunction()? Did you >>>>> change >>>>>> any default setting of StreamsConfig? >>>>>> >>>>>> I have still no idea what could go wrong. Maybe you can run with log >>>>>> level TRACE? Maybe we can get some insight from those. >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 4/27/17 11:41 PM, Mahendra Kariya wrote: >>>>>>> Oh good point! >>>>>>> >>>>>>> The reason why there is only one row corresponding to each time >>>>> window is >>>>>>> because it only contains the latest value for the time window. So >>>>> what we >>>>>>> did was we just dumped the data present in the sink topic to a db >>>>> using an >>>>>>> upsert query. The primary key of the table was time window. The file >>>>> that I >>>>>>> attached is actually the data present in the DB. And we know that >>>>> there is >>>>>>> no bug in our db dump code because we have been using it for a long >>>>> time in >>>>>>> production without any issues. >>>>>>> >>>>>>> The reason the count is zero for some time windows is because I >>>>> subtracted >>>>>>> a random number the actual values and rounded it off to zero; for >>>>> privacy >>>>>>> reason. The actual data doesn't have any zero values. I should have >>>>>>> mentioned this earlier. My bad! >>>>>>> >>>>>>> The stream topology code looks something like this. >>>>>>> >>>>>>> stream >>>>>>> .filter() >>>>>>> .map((key, value) -> new KeyValue<>(transform(key), value) >>>>>>> .groupByKey() >>>>>>> .aggregate(HashSet::new, AggregatorFunction(), >>>>>>> TimeWindows.of(60000).until(3600000)) >>>>>>> .mapValues(HashSet::size) >>>>>>> .toStream() >>>>>>> .map((key, value) -> convertToProtobufObject(key, value)) >>>>>>> .to() >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax < >>>>> matth...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks for the details (sorry that I forgot that you did share the >>>>>>>> output already). >>>>>>>> >>>>>>>> Might be a dumb question, but what is the count for missing windows >>>>> in >>>>>>>> your seconds implementation? >>>>>>>> >>>>>>>> If there is no data for a window, it should not emit a window with >>>>> count >>>>>>>> zero, but nothing. >>>>>>>> >>>>>>>> Thus, looking at your output, I am wondering how it could contain >>>>> line >>>>>>>> like: >>>>>>>> >>>>>>>>> 2017-04-27T04:53:00 0 >>>>>>>> >>>>>>>> I am also wondering why your output only contains a single value >> per >>>>>>>> window. As Streams outputs multiple updates per window while the >>>>> count >>>>>>>> is increasing, you should actually see multiple records per window. >>>>>>>> >>>>>>>> Your code is like this: >>>>>>>> >>>>>>>> stream.filter().groupByKey().count(TimeWindow.of(60000)).to(); >>>>>>>> >>>>>>>> Or do you have something more complex? >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> On 4/27/17 9:16 PM, Mahendra Kariya wrote: >>>>>>>>>> Can you somehow verify your output? >>>>>>>>> >>>>>>>>> >>>>>>>>> Do you mean the Kafka streams output? In the Kafka Streams output, >>>>> we do >>>>>>>>> see some missing values. I have attached the Kafka Streams output >>>>> (for a >>>>>>>>> few hours) in the very first email of this thread for reference. >>>>>>>>> >>>>>>>>> Let me also summarise what we have done so far. >>>>>>>>> >>>>>>>>> We took a dump of the raw data present in the source topic. We >>>>> wrote a >>>>>>>>> script to read this data and do the exact same aggregations that >> we >>>>> do >>>>>>>>> using Kafka Streams. And then we compared the output from Kafka >>>>> Streams >>>>>>>> and >>>>>>>>> our script. >>>>>>>>> >>>>>>>>> The difference that we observed in the two outputs is that there >>>>> were a >>>>>>>> few >>>>>>>>> rows (corresponding to some time windows) missing in the Streams >>>>> output. >>>>>>>>> For the time windows for which the data was present, the >> aggregated >>>>>>>> numbers >>>>>>>>> matched exactly. >>>>>>>>> >>>>>>>>> This means, either all the records for a particular time window >> are >>>>> being >>>>>>>>> skipped, or none. Now this is highly unlikely to happen. Maybe >>>>> there is a >>>>>>>>> bug somewhere in the rocksdb state stores? Just a speculation, not >>>>> sure >>>>>>>>> though. And there could even be a bug in the reported metric. >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >
signature.asc
Description: OpenPGP digital signature