[jira] [Resolved] (KAFKA-6099) Seeking consumer to evicted offset resets the offset
[ https://issues.apache.org/jira/browse/KAFKA-6099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6099. Resolution: Not A Problem As pointed out by [~tomas.kralik], you should use auto.reset.policy=none. This will cause an {{InvalidOffsetException}} to be raised from {{poll()}} or {{position()}} after you have seeked to an offset which is no longer valid. Closing this issue for now, but feel free to reopen if you think there is still an issue. > Seeking consumer to evicted offset resets the offset > > > Key: KAFKA-6099 > URL: https://issues.apache.org/jira/browse/KAFKA-6099 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.1 > Environment: Windows >Reporter: Viliam Durina >Priority: Major > > We use manual partition assignment and save the offsets to our storage. The > topic also has short "retention.ms" property. When we try to restart > consumption from an already evicted offset, the offset is reset according to > the "auto.offset.reset" property. That is: > - if "latest" is configured, it only returns records that were inserted after > the `seek` call > - if "earliest" is configured, it tries to restart at 0, which fails for the > same reason, because offset=0 is also evicted. > Expected behavior is to report the situation with an exception, thrown from > either `seek()` or `poll()` call. The user will then be expected to > `seekToBeginning` or `seekToEnd` or to any other position. > Another option is to restart at earliest available record, regardless of the > value of the "auto.offset.reset" property. However, this way the consumer has > no way of knowing that it missed some records it expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6099) Seeking consumer to evicted offset resets the offset
[ https://issues.apache.org/jira/browse/KAFKA-6099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368464#comment-16368464 ] Jason Gustafson edited comment on KAFKA-6099 at 2/18/18 7:59 AM: - As pointed out by [~tomas.kralik], you should use auto.offset.reset=none. This will cause an {{InvalidOffsetException}} to be raised from {{poll()}} or {{position()}} after you have seeked to an offset which is no longer valid. Closing this issue for now, but feel free to reopen if you think there is still an issue. was (Author: hachikuji): As pointed out by [~tomas.kralik], you should use auto.reset.policy=none. This will cause an {{InvalidOffsetException}} to be raised from {{poll()}} or {{position()}} after you have seeked to an offset which is no longer valid. Closing this issue for now, but feel free to reopen if you think there is still an issue. > Seeking consumer to evicted offset resets the offset > > > Key: KAFKA-6099 > URL: https://issues.apache.org/jira/browse/KAFKA-6099 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.1 > Environment: Windows >Reporter: Viliam Durina >Priority: Major > > We use manual partition assignment and save the offsets to our storage. The > topic also has short "retention.ms" property. When we try to restart > consumption from an already evicted offset, the offset is reset according to > the "auto.offset.reset" property. That is: > - if "latest" is configured, it only returns records that were inserted after > the `seek` call > - if "earliest" is configured, it tries to restart at 0, which fails for the > same reason, because offset=0 is also evicted. > Expected behavior is to report the situation with an exception, thrown from > either `seek()` or `poll()` call. The user will then be expected to > `seekToBeginning` or `seekToEnd` or to any other position. > Another option is to restart at earliest available record, regardless of the > value of the "auto.offset.reset" property. However, this way the consumer has > no way of knowing that it missed some records it expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6553) Consumer consumed committed messages
[ https://issues.apache.org/jira/browse/KAFKA-6553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368467#comment-16368467 ] Jason Gustafson commented on KAFKA-6553: It can happens sometimes that there are fetches in-flight when a consumer group rebalances. When the rebalance completes, the consumer will be assigned a new set of partitions and will reset the positions of those partitions to the latest committed offsets. After it does so, the in-flight fetches may return and either no longer be needed if the partitions are no longer assigned, or no longer be at the right fetch offsets. In either case, we discard the data and send a new fetch at the current position. Typically this does not indicate a problem, but I'd have to see the full logs to be sure. > Consumer consumed committed messages > > > Key: KAFKA-6553 > URL: https://issues.apache.org/jira/browse/KAFKA-6553 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Orel Shai >Priority: Critical > > Hi, > We're using consumer kafka client 0.10.2.0 (that is working against Kafka > broker 0.10.0) with the following configuration: > {code:java} > props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); > props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024); > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024); > props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, > RoundRobinAssignor.class.getName()); > props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "3"); > props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "4"); > props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1"); > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); > {code} > So as you can see we're using autocommit. > The consumer API version that we're using has a dedicated thread for doing > autocommit ,so every one second we have an autocommit which means that we > have an heartbeat every one second. > For some reason we're getting the same message lots of times. > While looking at our logs I can see the following: > {code:java} > 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for > partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-15 to the committed offset > 352878 > 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for > partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-3 to the committed offset > 352458 > 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for > partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-19 to the committed offset > 353775 > 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for > partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-23 to the committed offset > 352171 > 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for > partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-7 to the committed offset > 352995 > 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for > partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-27 to the committed offset > 352531 > 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for > partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-11 to the committed offset > 351893 > 2018-02-11 10:56:24,656 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched > records for misc.ha.UpdateNodeGroup.UpdateNodeTopic-23 at offset 352171 since > the current position is 352205 > 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched > records for misc.ha.UpdateNodeGroup.UpdateNodeTopic-11 at offset 351893 since > the current position is 351929 > 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched > records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-26 since it is > no longer fetchable > 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched > records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-17 since it is > no longer fetchable > 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched > records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-29 since it is > no longer fetchable > 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched > records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-5 since it is > no longer fetchable > 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched > records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-8 since it is > no longer fetchable > 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched > rec
[jira] [Assigned] (KAFKA-6554) Broker doesn't reject Produce request with inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-6554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-6554: -- Assignee: Jason Gustafson > Broker doesn't reject Produce request with inconsistent state > - > > Key: KAFKA-6554 > URL: https://issues.apache.org/jira/browse/KAFKA-6554 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.0.0 >Reporter: Simon Fell >Assignee: Jason Gustafson >Priority: Minor > Attachments: produce_v3.txt > > > Produce messages of type v3 have offset deltas in each record along with a > LastOffsetDelta for the topic/partition set. In investigating an issue with > missing offsets, I found a bug in a producer library where it would send > multiple records, but leave LastOffsetDelta at 0. This causes various > problems including holes in the offsets fetched by the consumer. > As lastOffsetDelta can be computed by looking at the records, it seems like > the broker should at least validate the LastOffsetDelta field against the > contained records to stop this bad data getting in. > I've attached a decode v3 produce message that was causing the problems, and > was accepted by the broker. > Here's a link to the issue in the kafka library we were using which has more > context if you need it. > https://github.com/Shopify/sarama/issues/1032 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6554) Broker doesn't reject Produce request with inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-6554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368484#comment-16368484 ] Jason Gustafson commented on KAFKA-6554: Thanks for the report. It does look like we're missing some validation here. I will submit a patch. > Broker doesn't reject Produce request with inconsistent state > - > > Key: KAFKA-6554 > URL: https://issues.apache.org/jira/browse/KAFKA-6554 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.0.0 >Reporter: Simon Fell >Assignee: Jason Gustafson >Priority: Minor > Attachments: produce_v3.txt > > > Produce messages of type v3 have offset deltas in each record along with a > LastOffsetDelta for the topic/partition set. In investigating an issue with > missing offsets, I found a bug in a producer library where it would send > multiple records, but leave LastOffsetDelta at 0. This causes various > problems including holes in the offsets fetched by the consumer. > As lastOffsetDelta can be computed by looking at the records, it seems like > the broker should at least validate the LastOffsetDelta field against the > contained records to stop this bad data getting in. > I've attached a decode v3 produce message that was causing the problems, and > was accepted by the broker. > Here's a link to the issue in the kafka library we were using which has more > context if you need it. > https://github.com/Shopify/sarama/issues/1032 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6554) Broker doesn't reject Produce request with inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-6554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368487#comment-16368487 ] ASF GitHub Bot commented on KAFKA-6554: --- hachikuji opened a new pull request #4585: KAFKA-6554; Missing lastOffsetDelta validation before log append URL: https://github.com/apache/kafka/pull/4585 Add validation checks that the offset range is valid and aligned with the batch count prior to appending to the log. I've added several unit tests to verify the various invalid cases. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Broker doesn't reject Produce request with inconsistent state > - > > Key: KAFKA-6554 > URL: https://issues.apache.org/jira/browse/KAFKA-6554 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.0.0 >Reporter: Simon Fell >Assignee: Jason Gustafson >Priority: Minor > Attachments: produce_v3.txt > > > Produce messages of type v3 have offset deltas in each record along with a > LastOffsetDelta for the topic/partition set. In investigating an issue with > missing offsets, I found a bug in a producer library where it would send > multiple records, but leave LastOffsetDelta at 0. This causes various > problems including holes in the offsets fetched by the consumer. > As lastOffsetDelta can be computed by looking at the records, it seems like > the broker should at least validate the LastOffsetDelta field against the > contained records to stop this bad data getting in. > I've attached a decode v3 produce message that was causing the problems, and > was accepted by the broker. > Here's a link to the issue in the kafka library we were using which has more > context if you need it. > https://github.com/Shopify/sarama/issues/1032 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6024) Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()
[ https://issues.apache.org/jira/browse/KAFKA-6024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368651#comment-16368651 ] Jason Gustafson commented on KAFKA-6024: [~yuzhih...@gmail.com] [~sssanthalingam] I left a comment on the PR which has not been addressed. > Consider moving validation in KafkaConsumer ahead of call to > acquireAndEnsureOpen() > --- > > Key: KAFKA-6024 > URL: https://issues.apache.org/jira/browse/KAFKA-6024 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu >Assignee: siva santhalingam >Priority: Minor > > In several methods, parameter validation is done after calling > acquireAndEnsureOpen() : > {code} > public void seek(TopicPartition partition, long offset) { > acquireAndEnsureOpen(); > try { > if (offset < 0) > throw new IllegalArgumentException("seek offset must not be a > negative number"); > {code} > Since the value of parameter would not change per invocation, it seems > performing validation ahead of acquireAndEnsureOpen() call would be better. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6570) AbstractPartitionAssignor should provide more flexibility to its implementor to see cluster meta data.
Vijendra Kulhade created KAFKA-6570: --- Summary: AbstractPartitionAssignor should provide more flexibility to its implementor to see cluster meta data. Key: KAFKA-6570 URL: https://issues.apache.org/jira/browse/KAFKA-6570 Project: Kafka Issue Type: Wish Components: clients, consumer Affects Versions: 0.10.2.0 Reporter: Vijendra Kulhade At present `AbstractPartitionAssignor` only pass `partitionsPerTopic` to its sub classes i.e. `RangeAssignor` and `RoundRobinAssignor`. I want to extend an assignor which only assigns partitions which are not replicas. Since I don't have the cluster detail, I can not check for the leaders for a partitions and assign them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6570) AbstractPartitionAssignor should provide more flexibility to its sub class to see cluster meta data.
[ https://issues.apache.org/jira/browse/KAFKA-6570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vijendra Kulhade updated KAFKA-6570: Summary: AbstractPartitionAssignor should provide more flexibility to its sub class to see cluster meta data. (was: AbstractPartitionAssignor should provide more flexibility to its implementor to see cluster meta data.) > AbstractPartitionAssignor should provide more flexibility to its sub class to > see cluster meta data. > > > Key: KAFKA-6570 > URL: https://issues.apache.org/jira/browse/KAFKA-6570 > Project: Kafka > Issue Type: Wish > Components: clients, consumer >Affects Versions: 0.10.2.0 >Reporter: Vijendra Kulhade >Priority: Major > > At present `AbstractPartitionAssignor` only pass `partitionsPerTopic` to its > sub classes i.e. `RangeAssignor` and `RoundRobinAssignor`. I want to extend > an assignor which only assigns partitions which are not replicas. Since I > don't have the cluster detail, I can not check for the leaders for a > partitions and assign them. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6569) Reflection in OffsetIndex and TimeIndex construction
[ https://issues.apache.org/jira/browse/KAFKA-6569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368692#comment-16368692 ] ASF GitHub Bot commented on KAFKA-6569: --- ambroff opened a new pull request #4586: KAFKA-6569 Reflection in OffsetIndex and TimeIndex ctrs URL: https://github.com/apache/kafka/pull/4586 We identified that we spend a lot of time in reflection when creating OffsetIndex, TimeIndex, or other implementations of AbstractIndex[K, V], because of the Logging mixin. When the broker is bootstrapping it's just doing this in a tight loop, so this time adds up. This patch moves the logging to a companion objects, statically initializing the logger. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reflection in OffsetIndex and TimeIndex construction > > > Key: KAFKA-6569 > URL: https://issues.apache.org/jira/browse/KAFKA-6569 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Kyle Ambroff-Kao >Assignee: Kyle Ambroff-Kao >Priority: Major > Attachments: after.png, before.png > > > kafka.log.AbstractIndex uses the Logging mixin to lazily initialize loggers > for any concrete type that inherits from it. This works great, except that > the LazyLogging trait uses reflection to compute the logger name. > When you have hundreds of thousands of log segments to load on startup the > extra cost adds up. > I've attached flame graphs from broker startup on a broker that has about > 12TB of log segments to load, and a second flame graph after changing > AbstractIndex to statically initialize a logger. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6569) Reflection in OffsetIndex and TimeIndex construction
[ https://issues.apache.org/jira/browse/KAFKA-6569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368693#comment-16368693 ] Kyle Ambroff-Kao commented on KAFKA-6569: - Submitted a PR for the patch we deployed internally. It's not perfect though since the logger name does change. This wasn't important for us, but I'm not sure whether this matters to you guys. > Reflection in OffsetIndex and TimeIndex construction > > > Key: KAFKA-6569 > URL: https://issues.apache.org/jira/browse/KAFKA-6569 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Kyle Ambroff-Kao >Assignee: Kyle Ambroff-Kao >Priority: Major > Attachments: after.png, before.png > > > kafka.log.AbstractIndex uses the Logging mixin to lazily initialize loggers > for any concrete type that inherits from it. This works great, except that > the LazyLogging trait uses reflection to compute the logger name. > When you have hundreds of thousands of log segments to load on startup the > extra cost adds up. > I've attached flame graphs from broker startup on a broker that has about > 12TB of log segments to load, and a second flame graph after changing > AbstractIndex to statically initialize a logger. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6569) Reflection in OffsetIndex and TimeIndex construction
[ https://issues.apache.org/jira/browse/KAFKA-6569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368701#comment-16368701 ] Kyle Ambroff-Kao commented on KAFKA-6569: - The logger name doesn't change for anything written from kafka.log.OffsetIndex, since the companion object overrides loggerName. However, messages logged from methods in kafka.log.AbstractIndex[K, V] now have the logger name "kafka.log.AbstractIndex$", where before they would be the class name of object (kafka.log.OffsetIndex). > Reflection in OffsetIndex and TimeIndex construction > > > Key: KAFKA-6569 > URL: https://issues.apache.org/jira/browse/KAFKA-6569 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Kyle Ambroff-Kao >Assignee: Kyle Ambroff-Kao >Priority: Major > Attachments: after.png, before.png > > > kafka.log.AbstractIndex uses the Logging mixin to lazily initialize loggers > for any concrete type that inherits from it. This works great, except that > the LazyLogging trait uses reflection to compute the logger name. > When you have hundreds of thousands of log segments to load on startup the > extra cost adds up. > I've attached flame graphs from broker startup on a broker that has about > 12TB of log segments to load, and a second flame graph after changing > AbstractIndex to statically initialize a logger. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6571) KafkaProducer.close(0) should be non-blocking
Dong Lin created KAFKA-6571: --- Summary: KafkaProducer.close(0) should be non-blocking Key: KAFKA-6571 URL: https://issues.apache.org/jira/browse/KAFKA-6571 Project: Kafka Issue Type: Bug Reporter: Dong Lin Assignee: Dong Lin According to the Java doc of producer.close(long timeout, TimeUnit timeUnit), it is said that "Specifying a timeout of zero means do not wait for pending send requests to complete". However, producer.close(0) can currently block on waiting for the sender thread to exit, which in turn can block on user's callback. We probably should not let producer.close(0) join the sender thread if user has specified zero timeout. -- This message was sent by Atlassian JIRA (v7.6.3#76005)