Re: creating a controlled lag in for a consumer group
Hey Ido. I haven't tried to do something like this, but we've worked out some future plans to do something similar, so I have a bit of interest in what you're saying. I feel like there are some details left out from your post. Are your consumers able to keep up w/ the throughput without the sleep strategy in place? How are messages being published across partitions? Evenly? I assume the timestamps on the messages are always increasing? Where are the timestamps generated? By the client publishers? Are the clocks of all the publishers sync'd? Stephen On Sun, Sep 3, 2017 at 8:01 PM, Ido Barkan wrote: > Hey all. > We are trying to create a controlled lag (a lag of 30 secs). We are doing > that by inspecting a timestamp field for each msg on a specific topic and > doing an actual Thread.sleep for a computed time in the consumer thread > (until the msg is 30 secs old). We were hoping to see that eventually most > of the messages are 30 seconds old and the created lag stands for 30 secs > of processing rate. > This works for us in low throughput but on high rates we are starting to > witness strange behavior: lag on partitions is starting to grow in an > uncontrollable manner and becomes uneven per partition. > We have tried multiple variations of this: sleeping per bulk of msgs, > sleeping per msg, capping the sleep time etc but failed to resolve this. > > Has anyone tried to do this? any ideas? > > Thanks, > Ido > -- > Thanks, > Ido >
Time based data retrieval from Kafka Topic
*Hi All,* *I need a solution to the below Problem statement -* *How to retrieve only last 1 hour data from an existing Kafka Topic, on 1st & every consecutive (at every 15 mins interval) of the client application?* *Note:- The existing Topic is accumulating data since last 6 months.* *Regards,* *Kaustuv Bhattacharya*
Re: Committing an invalid offset with KafkaConsumer.commitSync
Thanks for the answer, I won't need try-catch around commitSync then. Also thanks for updating the docs. 2017-09-03 19:47 GMT+02:00 Mickael Maison : > I believe the Javadoc is slightly incorrect/misleading. > When it says "offset metadata is too large", it is about the metadata > you can commit along with the offset, not the offset. See > OffsetAndMetadata: > http://kafka.apache.org/0110/javadoc/index.html?org/apache/ > kafka/clients/consumer/KafkaConsumer.html > > Regarding the offset value, we only check if it's negative and that's > only performed client side (presumably 3rd party clients could commit > a negative offset). Apart from that, no checks are made if the offset > is "in range" or not. > We had a look a while back to check if the offset is "in range" when > committing but it's complicated, see the comments on > https://issues.apache.org/jira/browse/KAFKA-4081 > > I opened a PR to update the Javadoc: https://github.com/apache/ > kafka/pull/3780 > > HTH > > On Sun, Sep 3, 2017 at 4:57 PM, Stig Døssing > wrote: > > The broker and consumer are version 0.11.0.0. > > > > 2017-09-03 17:38 GMT+02:00 Jeff Widman : > > > >> What broker version are you testing with? > >> > >> On Sep 3, 2017 4:14 AM, "Stig Døssing" > wrote: > >> > >> > Hi, > >> > > >> > The documentation for KafkaConsumer.commitSync(Map) states that a > >> > KafkaException will be thrown if the committed offset is invalid. I > can't > >> > seem to provoke this behavior, so I'd like clarification on whether > this > >> is > >> > something the consumer is intended to do. > >> > > >> > Here's the snippet I'd expect would error out (running in a loop): > >> > public long getEarliestOffset() { > >> > LOG.info("Current offset " + consumer.committed(new > >> > TopicPartition(TOPIC, 0))); > >> > ConsumerRecords records = > >> consumer.poll(2000); > >> > consumer.seekToBeginning(consumer.assignment()); > >> > consumer.commitSync(Collections.singletonMap(new > >> > TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L))); > >> > return consumer.position(new TopicPartition(TOPIC, 0)); > >> > } > >> > > >> > The committed offset appears to be updated to 4.000.000 even though > the > >> > highest offset in that partition is ~6000. It also seems to work the > >> other > >> > way round, if I set the log retention such that offsets before 2000 > are > >> > deleted, I can still commit offset 0. > >> > > >> > I'm trying to clarify what the consumer is expected to do, because I'm > >> > trying to figure out whether a consumer loop that is committing > offsets > >> > with commitSync(Map) for a partition where log deletion is enabled > needs > >> to > >> > put a try-catch guard around the commit call in case the committed > offset > >> > has been deleted. > >> > > >> > Thanks, > >> > Stig Rohde Døssing > >> > > >> >
Re: Committing an invalid offset with KafkaConsumer.commitSync
I believe the Javadoc is slightly incorrect/misleading. When it says "offset metadata is too large", it is about the metadata you can commit along with the offset, not the offset. See OffsetAndMetadata: http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html Regarding the offset value, we only check if it's negative and that's only performed client side (presumably 3rd party clients could commit a negative offset). Apart from that, no checks are made if the offset is "in range" or not. We had a look a while back to check if the offset is "in range" when committing but it's complicated, see the comments on https://issues.apache.org/jira/browse/KAFKA-4081 I opened a PR to update the Javadoc: https://github.com/apache/kafka/pull/3780 HTH On Sun, Sep 3, 2017 at 4:57 PM, Stig Døssing wrote: > The broker and consumer are version 0.11.0.0. > > 2017-09-03 17:38 GMT+02:00 Jeff Widman : > >> What broker version are you testing with? >> >> On Sep 3, 2017 4:14 AM, "Stig Døssing" wrote: >> >> > Hi, >> > >> > The documentation for KafkaConsumer.commitSync(Map) states that a >> > KafkaException will be thrown if the committed offset is invalid. I can't >> > seem to provoke this behavior, so I'd like clarification on whether this >> is >> > something the consumer is intended to do. >> > >> > Here's the snippet I'd expect would error out (running in a loop): >> > public long getEarliestOffset() { >> > LOG.info("Current offset " + consumer.committed(new >> > TopicPartition(TOPIC, 0))); >> > ConsumerRecords records = >> consumer.poll(2000); >> > consumer.seekToBeginning(consumer.assignment()); >> > consumer.commitSync(Collections.singletonMap(new >> > TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L))); >> > return consumer.position(new TopicPartition(TOPIC, 0)); >> > } >> > >> > The committed offset appears to be updated to 4.000.000 even though the >> > highest offset in that partition is ~6000. It also seems to work the >> other >> > way round, if I set the log retention such that offsets before 2000 are >> > deleted, I can still commit offset 0. >> > >> > I'm trying to clarify what the consumer is expected to do, because I'm >> > trying to figure out whether a consumer loop that is committing offsets >> > with commitSync(Map) for a partition where log deletion is enabled needs >> to >> > put a try-catch guard around the commit call in case the committed offset >> > has been deleted. >> > >> > Thanks, >> > Stig Rohde Døssing >> > >>
Re: Committing an invalid offset with KafkaConsumer.commitSync
The broker and consumer are version 0.11.0.0. 2017-09-03 17:38 GMT+02:00 Jeff Widman : > What broker version are you testing with? > > On Sep 3, 2017 4:14 AM, "Stig Døssing" wrote: > > > Hi, > > > > The documentation for KafkaConsumer.commitSync(Map) states that a > > KafkaException will be thrown if the committed offset is invalid. I can't > > seem to provoke this behavior, so I'd like clarification on whether this > is > > something the consumer is intended to do. > > > > Here's the snippet I'd expect would error out (running in a loop): > > public long getEarliestOffset() { > > LOG.info("Current offset " + consumer.committed(new > > TopicPartition(TOPIC, 0))); > > ConsumerRecords records = > consumer.poll(2000); > > consumer.seekToBeginning(consumer.assignment()); > > consumer.commitSync(Collections.singletonMap(new > > TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L))); > > return consumer.position(new TopicPartition(TOPIC, 0)); > > } > > > > The committed offset appears to be updated to 4.000.000 even though the > > highest offset in that partition is ~6000. It also seems to work the > other > > way round, if I set the log retention such that offsets before 2000 are > > deleted, I can still commit offset 0. > > > > I'm trying to clarify what the consumer is expected to do, because I'm > > trying to figure out whether a consumer loop that is committing offsets > > with commitSync(Map) for a partition where log deletion is enabled needs > to > > put a try-catch guard around the commit call in case the committed offset > > has been deleted. > > > > Thanks, > > Stig Rohde Døssing > > >
Re: Committing an invalid offset with KafkaConsumer.commitSync
What broker version are you testing with? On Sep 3, 2017 4:14 AM, "Stig Døssing" wrote: > Hi, > > The documentation for KafkaConsumer.commitSync(Map) states that a > KafkaException will be thrown if the committed offset is invalid. I can't > seem to provoke this behavior, so I'd like clarification on whether this is > something the consumer is intended to do. > > Here's the snippet I'd expect would error out (running in a loop): > public long getEarliestOffset() { > LOG.info("Current offset " + consumer.committed(new > TopicPartition(TOPIC, 0))); > ConsumerRecords records = consumer.poll(2000); > consumer.seekToBeginning(consumer.assignment()); > consumer.commitSync(Collections.singletonMap(new > TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L))); > return consumer.position(new TopicPartition(TOPIC, 0)); > } > > The committed offset appears to be updated to 4.000.000 even though the > highest offset in that partition is ~6000. It also seems to work the other > way round, if I set the log retention such that offsets before 2000 are > deleted, I can still commit offset 0. > > I'm trying to clarify what the consumer is expected to do, because I'm > trying to figure out whether a consumer loop that is committing offsets > with commitSync(Map) for a partition where log deletion is enabled needs to > put a try-catch guard around the commit call in case the committed offset > has been deleted. > > Thanks, > Stig Rohde Døssing >
creating a controlled lag in for a consumer group
Hey all. We are trying to create a controlled lag (a lag of 30 secs). We are doing that by inspecting a timestamp field for each msg on a specific topic and doing an actual Thread.sleep for a computed time in the consumer thread (until the msg is 30 secs old). We were hoping to see that eventually most of the messages are 30 seconds old and the created lag stands for 30 secs of processing rate. This works for us in low throughput but on high rates we are starting to witness strange behavior: lag on partitions is starting to grow in an uncontrollable manner and becomes uneven per partition. We have tried multiple variations of this: sleeping per bulk of msgs, sleeping per msg, capping the sleep time etc but failed to resolve this. Has anyone tried to do this? any ideas? Thanks, Ido -- Thanks, Ido
Committing an invalid offset with KafkaConsumer.commitSync
Hi, The documentation for KafkaConsumer.commitSync(Map) states that a KafkaException will be thrown if the committed offset is invalid. I can't seem to provoke this behavior, so I'd like clarification on whether this is something the consumer is intended to do. Here's the snippet I'd expect would error out (running in a loop): public long getEarliestOffset() { LOG.info("Current offset " + consumer.committed(new TopicPartition(TOPIC, 0))); ConsumerRecords records = consumer.poll(2000); consumer.seekToBeginning(consumer.assignment()); consumer.commitSync(Collections.singletonMap(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L))); return consumer.position(new TopicPartition(TOPIC, 0)); } The committed offset appears to be updated to 4.000.000 even though the highest offset in that partition is ~6000. It also seems to work the other way round, if I set the log retention such that offsets before 2000 are deleted, I can still commit offset 0. I'm trying to clarify what the consumer is expected to do, because I'm trying to figure out whether a consumer loop that is committing offsets with commitSync(Map) for a partition where log deletion is enabled needs to put a try-catch guard around the commit call in case the committed offset has been deleted. Thanks, Stig Rohde Døssing