Re: creating a controlled lag in for a consumer group

2017-09-03 Thread Stephen Powis
Hey Ido.

I haven't tried to do something like this, but we've worked out some future
plans to do something similar, so I have a bit of interest in what you're
saying.

I feel like there are some details left out from your post.  Are your
consumers able to keep up w/ the throughput without the sleep strategy in
place?  How are messages being published across partitions?  Evenly?  I
assume the timestamps on the messages are always increasing?  Where are the
timestamps generated? By the client publishers?  Are the clocks of all the
publishers sync'd?

Stephen

On Sun, Sep 3, 2017 at 8:01 PM, Ido Barkan  wrote:

> Hey all.
> We are trying to create a controlled lag (a lag of 30 secs). We are doing
> that by inspecting a timestamp field for each msg on a specific topic and
> doing an actual Thread.sleep for a computed time in the consumer thread
> (until the msg is 30 secs old). We were hoping to see that eventually most
> of the messages are 30 seconds old and the created lag stands for 30 secs
> of processing rate.
> This works for us in low throughput but on high rates we are starting to
> witness strange behavior: lag on partitions is starting to grow in an
> uncontrollable manner and becomes uneven per partition.
> We have tried multiple variations of this: sleeping per bulk of msgs,
> sleeping per msg, capping the sleep time etc but failed to resolve this.
>
> Has anyone tried to do this? any ideas?
>
> Thanks,
> Ido
> --
> Thanks,
> Ido
>


Time based data retrieval from Kafka Topic

2017-09-03 Thread Kaustuv Bhattacharya
*Hi All,*

*I need a solution to the below Problem statement -*

*How to retrieve only last 1 hour data from an existing Kafka Topic, on 1st
& every consecutive (at every 15 mins interval) of the client application?*
*Note:- The existing Topic is accumulating data since last 6 months.*

*Regards,*
*Kaustuv Bhattacharya*


Re: Committing an invalid offset with KafkaConsumer.commitSync

2017-09-03 Thread Stig Døssing
Thanks for the answer, I won't need try-catch around commitSync then. Also
thanks for updating the docs.

2017-09-03 19:47 GMT+02:00 Mickael Maison :

> I believe the Javadoc is slightly incorrect/misleading.
> When it says "offset metadata is too large", it is about the metadata
> you can commit along with the offset, not the offset. See
> OffsetAndMetadata:
> http://kafka.apache.org/0110/javadoc/index.html?org/apache/
> kafka/clients/consumer/KafkaConsumer.html
>
> Regarding the offset value, we only check if it's negative and that's
> only performed client side (presumably 3rd party clients could commit
> a negative offset). Apart from that, no checks are made if the offset
> is "in range" or not.
> We had a look a while back to check if the offset is "in range" when
> committing but it's complicated, see the comments on
> https://issues.apache.org/jira/browse/KAFKA-4081
>
> I opened a PR to update the Javadoc: https://github.com/apache/
> kafka/pull/3780
>
> HTH
>
> On Sun, Sep 3, 2017 at 4:57 PM, Stig Døssing 
> wrote:
> > The broker and consumer are version 0.11.0.0.
> >
> > 2017-09-03 17:38 GMT+02:00 Jeff Widman :
> >
> >> What broker version are you testing with?
> >>
> >> On Sep 3, 2017 4:14 AM, "Stig Døssing" 
> wrote:
> >>
> >> > Hi,
> >> >
> >> > The documentation for KafkaConsumer.commitSync(Map) states that a
> >> > KafkaException will be thrown if the committed offset is invalid. I
> can't
> >> > seem to provoke this behavior, so I'd like clarification on whether
> this
> >> is
> >> > something the consumer is intended to do.
> >> >
> >> > Here's the snippet I'd expect would error out (running in a loop):
> >> > public long getEarliestOffset() {
> >> > LOG.info("Current offset " + consumer.committed(new
> >> > TopicPartition(TOPIC, 0)));
> >> > ConsumerRecords records =
> >> consumer.poll(2000);
> >> > consumer.seekToBeginning(consumer.assignment());
> >> > consumer.commitSync(Collections.singletonMap(new
> >> > TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L)));
> >> > return consumer.position(new TopicPartition(TOPIC, 0));
> >> > }
> >> >
> >> > The committed offset appears to be updated to 4.000.000 even though
> the
> >> > highest offset in that partition is ~6000. It also seems to work the
> >> other
> >> > way round, if I set the log retention such that offsets before 2000
> are
> >> > deleted, I can still commit offset 0.
> >> >
> >> > I'm trying to clarify what the consumer is expected to do, because I'm
> >> > trying to figure out whether a consumer loop that is committing
> offsets
> >> > with commitSync(Map) for a partition where log deletion is enabled
> needs
> >> to
> >> > put a try-catch guard around the commit call in case the committed
> offset
> >> > has been deleted.
> >> >
> >> > Thanks,
> >> > Stig Rohde Døssing
> >> >
> >>
>


Re: Committing an invalid offset with KafkaConsumer.commitSync

2017-09-03 Thread Mickael Maison
I believe the Javadoc is slightly incorrect/misleading.
When it says "offset metadata is too large", it is about the metadata
you can commit along with the offset, not the offset. See
OffsetAndMetadata:
http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Regarding the offset value, we only check if it's negative and that's
only performed client side (presumably 3rd party clients could commit
a negative offset). Apart from that, no checks are made if the offset
is "in range" or not.
We had a look a while back to check if the offset is "in range" when
committing but it's complicated, see the comments on
https://issues.apache.org/jira/browse/KAFKA-4081

I opened a PR to update the Javadoc: https://github.com/apache/kafka/pull/3780

HTH

On Sun, Sep 3, 2017 at 4:57 PM, Stig Døssing  wrote:
> The broker and consumer are version 0.11.0.0.
>
> 2017-09-03 17:38 GMT+02:00 Jeff Widman :
>
>> What broker version are you testing with?
>>
>> On Sep 3, 2017 4:14 AM, "Stig Døssing"  wrote:
>>
>> > Hi,
>> >
>> > The documentation for KafkaConsumer.commitSync(Map) states that a
>> > KafkaException will be thrown if the committed offset is invalid. I can't
>> > seem to provoke this behavior, so I'd like clarification on whether this
>> is
>> > something the consumer is intended to do.
>> >
>> > Here's the snippet I'd expect would error out (running in a loop):
>> > public long getEarliestOffset() {
>> > LOG.info("Current offset " + consumer.committed(new
>> > TopicPartition(TOPIC, 0)));
>> > ConsumerRecords records =
>> consumer.poll(2000);
>> > consumer.seekToBeginning(consumer.assignment());
>> > consumer.commitSync(Collections.singletonMap(new
>> > TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L)));
>> > return consumer.position(new TopicPartition(TOPIC, 0));
>> > }
>> >
>> > The committed offset appears to be updated to 4.000.000 even though the
>> > highest offset in that partition is ~6000. It also seems to work the
>> other
>> > way round, if I set the log retention such that offsets before 2000 are
>> > deleted, I can still commit offset 0.
>> >
>> > I'm trying to clarify what the consumer is expected to do, because I'm
>> > trying to figure out whether a consumer loop that is committing offsets
>> > with commitSync(Map) for a partition where log deletion is enabled needs
>> to
>> > put a try-catch guard around the commit call in case the committed offset
>> > has been deleted.
>> >
>> > Thanks,
>> > Stig Rohde Døssing
>> >
>>


Re: Committing an invalid offset with KafkaConsumer.commitSync

2017-09-03 Thread Stig Døssing
The broker and consumer are version 0.11.0.0.

2017-09-03 17:38 GMT+02:00 Jeff Widman :

> What broker version are you testing with?
>
> On Sep 3, 2017 4:14 AM, "Stig Døssing"  wrote:
>
> > Hi,
> >
> > The documentation for KafkaConsumer.commitSync(Map) states that a
> > KafkaException will be thrown if the committed offset is invalid. I can't
> > seem to provoke this behavior, so I'd like clarification on whether this
> is
> > something the consumer is intended to do.
> >
> > Here's the snippet I'd expect would error out (running in a loop):
> > public long getEarliestOffset() {
> > LOG.info("Current offset " + consumer.committed(new
> > TopicPartition(TOPIC, 0)));
> > ConsumerRecords records =
> consumer.poll(2000);
> > consumer.seekToBeginning(consumer.assignment());
> > consumer.commitSync(Collections.singletonMap(new
> > TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L)));
> > return consumer.position(new TopicPartition(TOPIC, 0));
> > }
> >
> > The committed offset appears to be updated to 4.000.000 even though the
> > highest offset in that partition is ~6000. It also seems to work the
> other
> > way round, if I set the log retention such that offsets before 2000 are
> > deleted, I can still commit offset 0.
> >
> > I'm trying to clarify what the consumer is expected to do, because I'm
> > trying to figure out whether a consumer loop that is committing offsets
> > with commitSync(Map) for a partition where log deletion is enabled needs
> to
> > put a try-catch guard around the commit call in case the committed offset
> > has been deleted.
> >
> > Thanks,
> > Stig Rohde Døssing
> >
>


Re: Committing an invalid offset with KafkaConsumer.commitSync

2017-09-03 Thread Jeff Widman
What broker version are you testing with?

On Sep 3, 2017 4:14 AM, "Stig Døssing"  wrote:

> Hi,
>
> The documentation for KafkaConsumer.commitSync(Map) states that a
> KafkaException will be thrown if the committed offset is invalid. I can't
> seem to provoke this behavior, so I'd like clarification on whether this is
> something the consumer is intended to do.
>
> Here's the snippet I'd expect would error out (running in a loop):
> public long getEarliestOffset() {
> LOG.info("Current offset " + consumer.committed(new
> TopicPartition(TOPIC, 0)));
> ConsumerRecords records = consumer.poll(2000);
> consumer.seekToBeginning(consumer.assignment());
> consumer.commitSync(Collections.singletonMap(new
> TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L)));
> return consumer.position(new TopicPartition(TOPIC, 0));
> }
>
> The committed offset appears to be updated to 4.000.000 even though the
> highest offset in that partition is ~6000. It also seems to work the other
> way round, if I set the log retention such that offsets before 2000 are
> deleted, I can still commit offset 0.
>
> I'm trying to clarify what the consumer is expected to do, because I'm
> trying to figure out whether a consumer loop that is committing offsets
> with commitSync(Map) for a partition where log deletion is enabled needs to
> put a try-catch guard around the commit call in case the committed offset
> has been deleted.
>
> Thanks,
> Stig Rohde Døssing
>


creating a controlled lag in for a consumer group

2017-09-03 Thread Ido Barkan
Hey all.
We are trying to create a controlled lag (a lag of 30 secs). We are doing
that by inspecting a timestamp field for each msg on a specific topic and
doing an actual Thread.sleep for a computed time in the consumer thread
(until the msg is 30 secs old). We were hoping to see that eventually most
of the messages are 30 seconds old and the created lag stands for 30 secs
of processing rate.
This works for us in low throughput but on high rates we are starting to
witness strange behavior: lag on partitions is starting to grow in an
uncontrollable manner and becomes uneven per partition.
We have tried multiple variations of this: sleeping per bulk of msgs,
sleeping per msg, capping the sleep time etc but failed to resolve this.

Has anyone tried to do this? any ideas?

Thanks,
Ido
-- 
Thanks,
Ido


Committing an invalid offset with KafkaConsumer.commitSync

2017-09-03 Thread Stig Døssing
Hi,

The documentation for KafkaConsumer.commitSync(Map) states that a
KafkaException will be thrown if the committed offset is invalid. I can't
seem to provoke this behavior, so I'd like clarification on whether this is
something the consumer is intended to do.

Here's the snippet I'd expect would error out (running in a loop):
public long getEarliestOffset() {
LOG.info("Current offset " + consumer.committed(new
TopicPartition(TOPIC, 0)));
ConsumerRecords records = consumer.poll(2000);
consumer.seekToBeginning(consumer.assignment());
consumer.commitSync(Collections.singletonMap(new
TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L)));
return consumer.position(new TopicPartition(TOPIC, 0));
}

The committed offset appears to be updated to 4.000.000 even though the
highest offset in that partition is ~6000. It also seems to work the other
way round, if I set the log retention such that offsets before 2000 are
deleted, I can still commit offset 0.

I'm trying to clarify what the consumer is expected to do, because I'm
trying to figure out whether a consumer loop that is committing offsets
with commitSync(Map) for a partition where log deletion is enabled needs to
put a try-catch guard around the commit call in case the committed offset
has been deleted.

Thanks,
Stig Rohde Døssing