Is the watermarking configured per-partition in Kafka, or per source? If it is configured per partition, then a late (trailing) or early (leading) partition would not affect the watermark as a whole. There would not be any dropping of late data, simply a delay in the results until the latest partition (watermark wise) has caught up.
Best, Stephan On Wed, Jul 31, 2019 at 8:00 AM Ramya Ramamurthy <hair...@gmail.com> wrote: > Hi Jiangjie, > > Thanks for your response. I was able to figure out the issue. > We have multiple end points from which we receive data. Out of which, one > of the endpoints NTP was not set up/rather not getting synced to ntp > properly. so those VM's were sending packets which was 2 mins ahead of > time. So all the rest of the packets from various other sources coming into > that Kafka cluster was dropped. Increased the watermark to 3 mins [from 1 > min] and this stopped the "laterecordsdropped". > > But this is kind of worrisome, if there is anything like this, it would > affect our systems badly, as we cannot afford to lose data. Is there any > better way to approach this? The Flink tables do not have side outputs to > collect these lost packets as well, which is also a concern. Is this a > feature in making ?? > I could see that the Flink Tables arent yet evolved like Streams. Let me > know what you think. > > Regards, > ~Ramya. > > On Mon, Jul 29, 2019 at 6:17 PM Becket Qin <becket....@gmail.com> wrote: > > > Hi Ramya, > > > > I am a little confused in the situation here. Is the following what you > > saw: > > > > 1. The Kafka topic A has a traffic drop from 22:00 UTC to 00:30 UTC. > > 2. Your Flink job (say job 1) reads from Topic A, but it has a high fetch > > rate with an abnormally high CPU consumption during the period mentioned > in > > (1). > > 3. Your Flink job (say job 2) reads from Topic B, and it sees a lot of > > messages got dropped due to late arrivals. i.e. the timestamp of those > > messages is larger than the watermark + max-allowed-lateness. > > > > What is the relationship between job 1 and job 2? Are they the same job? > Is > > there any producer sending "old" messages to the Kafka cluster which may > > cause those messages to be dropped by Flink due to their old timestamp? > > > > Unfortunately the image does not work in apache mailing list. Can you > post > > the image somewhere and send the link instead? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Thu, Jul 18, 2019 at 9:36 AM Ramya Ramamurthy <hair...@gmail.com> > > wrote: > > > > > Hi, > > > > > > We are facing a serious production issue with Flink. Any help would be > > > appreciated. > > > > > > We receive packets from a Kafka Cluster - This cluster has a sudden > drop > > > in the packets from 22:00 UTC till 00:30 UTC everyday [on a specific > > topic, > > > say "topic A"]. Though our job reads from a different topic [say "topic > > > B"], we see that we drop a lot of packets here [due to > > "laterecordsDropped" > > > metric]. At the same time, we see the job which reads from "topic A" > has > > > high fetch rate. We also observed one of the brokers of this cluster > had > > an > > > abnormal CPU rise [which i attributed to the high fetch rates]. > > > > > > We have a tumbling window of 1 min [with 10 seconds of > > > watermarksPeriodicBounded]. This is based on the packets' event time. > Is > > > there any reason why my job reading from "topic B" can higher records > > > dropped. > > > > > > The picture below has a screenshot where > > > Laterecords dropped corresponds to job reading from "topic B" > > > Fetch and Consume rates relates to job reading from "topic A" [which > has > > > the downward trend in traffic in the mentioned times]. > > > > > > [image: image.png] > > > > > > All these graphs are correlated and we are unable to isolate this > > problem. > > > there are other modules which consumes from this topic, and we have no > > slow > > > records logged here, which is why we are not sure of there is this > issue > > > with Flink alone. > > > > > > Thanks. > > > > > >