Hi,

The offsets could belong to different partitions of the same topic. Each
partition has its own offset.

Try logging *record.partition()*.

Regards,
Daniel.


On Thu, Mar 17, 2016 at 1:58 PM, Erik Pettersson <erik.petters...@tink.se>
wrote:

> Hi,
>
> I have the 3 consumers subscribing from a single topic with the following
> settings.
>
> auto.commit.interval.ms = 1000
> enable.auto.commit      = true
> request.timeout.ms      = 60000
> session.timeout.ms      = 59000
> heartbeat.interval.ms   = 4000
>
> Version 0.9.0.
>
> The consumers looks like this:
>
> while (isRunning()) {
>     ConsumerRecords<String, byte[]> records = consumer.poll(3000);
>
>     for (final ConsumerRecord<String, byte[]> record : records) {
>         doWork(...);
>     }
>
>     if (!records.isEmpty()) {
>         log.info(String.format("Committed(Records = '%d', Offset = '%d',
> Time = '%dms')", records.count(),
>                 Iterables.getLast(records).offset(),
>                 TimeUnit.MILLISECONDS.convert(elapsedTime,
> TimeUnit.NANOSECONDS)));
>     }
> }
>
> My problem is that the offset sometimes is moved back a lot during a
> rebalance. This is the log from the consumers during a rebalance.
>
> Consumer 1:
> [2016-03-17 04:06:55,823] Committed(Records = '4', Offset = '427432', Time
> = '3090ms')
> [2016-03-17 04:06:55,823]
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Error
> UNKNOWN_MEMBER_ID occurred while committing offsets for group default
> [2016-03-17 04:06:55,823]
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Auto
> offset commit failed: Commit cannot be completed due to group rebalance
> [2016-03-17 04:06:55,824]
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Error
> UNKNOWN_MEMBER_ID occurred while committing offsets for group default
> [2016-03-17 04:06:55,824]
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Auto
> offset commit failed: Commit cannot be completed due to group rebalance
> [2016-03-17 04:06:55,825]
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Error
> UNKNOWN_MEMBER_ID occurred while committing offsets for group default
> [2016-03-17 04:06:55,825]
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Auto
> offset commit failed:
> [2016-03-17 04:06:55,825]
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Attempt to
> join group default failed due to unknown member id, resetting and retrying.
> [2016-03-17 04:07:11,463] Committed(Records = '4', Offset = '427412', Time
> = '1ms')
>
> Offset before rebalance: 427432
> Offset after rebalance: 427412
> Diff: 20
>
> Consumer 2:
> INFO  [2016-03-17 04:06:52,299] Committed(Records = '1', Offset = '427444',
> Time = '0ms')
> INFO  [2016-03-17 04:06:52,573]
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Attempt to
> heart beat failed since the group is rebalancing, try to re-join group.
> INFO  [2016-03-17 04:07:11,464] Committed(Records = '4', Offset = '427430',
> Time = '0ms')
>
> Offset before rebalance: 427444
> Offset after rebalance: 427430
> Diff: 14
>
> Consumer 3:
> [2016-03-17 04:07:07,254] Committed(Records = '4', Offset = '427408', Time
> = '1674ms')
> [2016-03-17 04:07:07,254]
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Attempt to
> heart beat failed since the group is rebalancing, try to re-join group.
> [2016-03-17 04:07:13,811] Committed(Records = '4', Offset = '412777', Time
> = '2345ms')
>
> Offset before rebalance: 427410
> Offset after rebalance: 412777
> Diff: 14 633. This is causing trouble for me.
>
>
> Any idea on what I'm doing wrong?
>

Reply via email to