[jira] [Updated] (KAFKA-2164) ReplicaFetcherThread: suspicious log message on reset offset
[ https://issues.apache.org/jira/browse/KAFKA-2164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2164: - Description: If log.logEndOffset < leaderStartOffset the follower resets its offset and prints the following: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [topic,11] reset its fetch offset from 49322124 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} I think the right message should be: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [topic,11] reset its fetch offset from 54369274 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} This occurs because ReplicaFetcherThread resets the offset and then print log message. Posible solution: {code} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/ index b31b432..181cbc1 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String, * Roll out a new log at the follower with the start offset equal to the */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPar - replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt warn("Replica %d for partition %s reset its fetch offset from %d to curre .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset. + replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt leaderStartOffset } } {code} was: If log.logEndOffset < leaderStartOffset the follower resets its offset and prints the following: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [topic,11] reset its fetch offset from 49322124 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} I think the right message should be: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [rt3.iva--yandex--access-log,11] reset its fetch offset from 54369274 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [rt3.iva--yandex--access-log,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} This occurs because ReplicaFetcherThread resets the offset and then print log message. Posible solution: {code} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/ index b31b432..181cbc1 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String, * Roll out a new log at the follower with the start offset equal to the */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPar - replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt warn("Replica %d for partition %s reset its fetch offset from %d to curre .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset. + replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt leaderStartOffset } } {code} > ReplicaFetcherThread: suspicious log message on reset offset > > > Key: KAFKA-2164 > URL: https://issues.apache.org/jira/browse/KAFKA-2164 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 >Reporter: Alexey Ozeritskiy > Attachments: KAFKA-2164.patch > > > If log.logEndOffset < leaderStartOffset the follower resets its offset and > prints the following: > {code} > [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for > partition [topic,11] reset its fetch offset from 49322124 to current leader > 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) > [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset > 54369274 for partition [topic,11] out of range;
Re: [DISCUSS] KIP-21 Configuration Management
Hey Aditya, This is a great! A couple of comments: 1. Leaving the file config in place is definitely the least disturbance. But let's really think about getting rid of the files and just have one config mechanism. There is always a tendency to make everything pluggable which so often just leads to two mediocre solutions. Can we do the exercise of trying to consider fully getting rid of file config and seeing what goes wrong? 2. Do we need to model defaults? The current approach is that if you have a global config x it is overridden for a topic xyz by /topics/xyz/x, and I think this could be extended to /brokers/0/x. I think this is simpler. We need to specify the precedence for these overrides, e.g. if you override at the broker and topic level I think the topic level takes precedence. 3. I recommend we have the producer and consumer config just be an override under client.id. The override is by client id and we can have separate properties for controlling quotas for producers and consumers. 4. Some configs can be changed just by updating the reference, others may require some action. An example of this is if you want to disable log compaction (assuming we wanted to make that dynamic) we need to call shutdown() on the cleaner. I think it may be required to register a listener callback that gets called when the config changes. 5. For handling the reference can you explain your plan a bit? Currently we have an immutable KafkaConfig object with a bunch of vals. That or individual values in there get injected all over the code base. I was thinking something like this: a. We retain the KafkaConfig object as an immutable object just as today. b. It is no longer legit to grab values out fo that config if they are changeable. c. Instead of making KafkaConfig itself mutable we make KafkaConfiguration which has a single volatile reference to the current KafkaConfig. KafkaConfiguration is what gets passed into various components. So to access a config you do something like config.instance.myValue. When the config changes the config manager updates this reference. d. The KafkaConfiguration is the thing that allows doing the configuration.onChange("my.config", callback) -Jay On Tue, Apr 28, 2015 at 3:57 PM, Aditya Auradkar < aaurad...@linkedin.com.invalid> wrote: > Hey everyone, > > Wrote up a KIP to update topic, client and broker configs dynamically via > Zookeeper. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-21+-+Dynamic+Configuration > > Please read and provide feedback. > > Thanks, > Aditya > > PS: I've intentionally kept this discussion separate from KIP-5 since I'm > not sure if that is actively being worked on and I wanted to start with a > clean slate. >
Re: [DISCUSS] Using GitHub Pull Requests for contributions and code review
+1! -Jay On Thu, Apr 30, 2015 at 6:12 AM, Ismael Juma wrote: > Hi all, > > Kafka currently uses a combination of Review Board and JIRA for > contributions and code review. In my opinion, this makes contribution and > code review a bit harder than it has to be. > > I think the approach used by Spark would improve the current situation: > > "Generally, Spark uses JIRA to track logical issues, including bugs and > improvements, and uses Github pull requests to manage the review and merge > of specific code changes. That is, JIRAs are used to describe what should > be fixed or changed, and high-level approaches, and pull requests describe > how to implement that change in the project's source code. For example, > major design decisions are discussed in JIRA."[1] > > It's worth reading the wiki page for all the details, but I will summarise > the suggested workflow for code changes: > >1. Fork the Github repository at http://github.com/apache/kafka (if you >haven't already) >2. git checkout -b kafka-XXX >3. Make one or more commits (smaller commits can be easier to review and >reviewboard makes that hard) >4. git push origin kafka-XXX >5. Create PR against upstream/trunk (this will update JIRA >automatically[2] and it will send an email to the dev mailing list too) >6. A CI build will be triggered[3] >7. Review process happens on GitHub (it's quite handy to be able to >comment on both commit or PR-level, unlike Review Board) >8. Once all feedback has been addressed and the build is green, a >variant of the `merge_spark_pr.py`[4] script is used to squash, merge, >push, close the PR and JIRA issue. The squashed commit generated by the >script includes a bunch of useful information including links to the >original commits[5] (in the future, I think it's worth reconsidering the >squashing of commits, but retaining the information in the commit is >already an improvement) > > Neha merged a couple of commits via GitHub already and it went smoothly > although we are still missing a few of the pieces described above: > >1. CI builds triggered by GitHub PRs (this is supported by Apache Infra, >we need to request it for Kafka and provide whatever configuration is >needed) >2. Adapting Spark's merge_park_pr script and integrating it into the >kafka Git repository >3. Updating the Kafka contribution wiki and adding a CONTRIBUTING.md to >the Git repository (this is shown when someone is creating a pull > request) >4. Go through existing GitHub pull requests and close the ones that are >no longer relevant (there are quite a few as people have been opening > them >over the years, but nothing was done about most of them) >5. Other things I may be missing > > I am volunteering to help with the above if people agree that this is the > right direction for Kafka. Thoughts? > > Best. > Ismael > > P.S. I was told in the Apache Infra HipChat that it's not currently > possible (and there are no plans to change that in the near future) to use > the GitHub merge button to merge PRs. The merge script does quite a few > useful things that the merge button does not in any case. > > [1] > https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark > [2] > > https://issues.apache.org/jira/browse/KAFKA-1054?focusedCommentId=14513614&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14513614 > [3] https://blogs.apache.org/infra/entry/github_pull_request_builds_now > [4] https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py > [5] > > https://github.com/apache/spark/commit/59b7cfc41b2c06fbfbf6aca16c1619496a8d1d00 >
[jira] [Updated] (KAFKA-2165) ReplicaFetcherThread: data loss on unknown exception
[ https://issues.apache.org/jira/browse/KAFKA-2165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2165: - Status: Patch Available (was: Open) > ReplicaFetcherThread: data loss on unknown exception > > > Key: KAFKA-2165 > URL: https://issues.apache.org/jira/browse/KAFKA-2165 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 >Reporter: Alexey Ozeritskiy > Attachments: KAFKA-2165.patch > > > Sometimes in our cluster some replica gets out of the isr. Then broker > redownloads the partition from the beginning. We got the following messages > in logs: > {code} > # The leader: > [2015-03-25 11:11:07,796] ERROR [Replica Manager on Broker 21]: Error when > processing fetch request for partition [topic,11] offset 54369274 from > follower with correlation id 2634499. Possible cause: Request for offset > 54369274 but we only have log segments in the range 49322124 to 54369273. > (kafka.server.ReplicaManager) > {code} > {code} > # The follower: > [2015-03-25 11:11:08,816] WARN [ReplicaFetcherThread-0-21], Replica 31 for > partition [topic,11] reset its fetch offset from 49322124 to current leader > 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) > [2015-03-25 11:11:08,816] ERROR [ReplicaFetcherThread-0-21], Current offset > 54369274 for partition [topic,11] out of range; reset offset to 49322124 > (kafka.server.ReplicaFetcherThread) > {code} > This occures because we update fetchOffset > [here|https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L124] > and then try to process message. > If any exception except OffsetOutOfRangeCode occures we get unsynchronized > fetchOffset and replica.logEndOffset. > On next fetch iteration we can get > fetchOffset>replica.logEndOffset==leaderEndOffset and OffsetOutOfRangeCode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2165) ReplicaFetcherThread: data loss on unknown exception
[ https://issues.apache.org/jira/browse/KAFKA-2165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2165: - Attachment: KAFKA-2165.patch > ReplicaFetcherThread: data loss on unknown exception > > > Key: KAFKA-2165 > URL: https://issues.apache.org/jira/browse/KAFKA-2165 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 >Reporter: Alexey Ozeritskiy > Attachments: KAFKA-2165.patch > > > Sometimes in our cluster some replica gets out of the isr. Then broker > redownloads the partition from the beginning. We got the following messages > in logs: > {code} > # The leader: > [2015-03-25 11:11:07,796] ERROR [Replica Manager on Broker 21]: Error when > processing fetch request for partition [topic,11] offset 54369274 from > follower with correlation id 2634499. Possible cause: Request for offset > 54369274 but we only have log segments in the range 49322124 to 54369273. > (kafka.server.ReplicaManager) > {code} > {code} > # The follower: > [2015-03-25 11:11:08,816] WARN [ReplicaFetcherThread-0-21], Replica 31 for > partition [topic,11] reset its fetch offset from 49322124 to current leader > 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) > [2015-03-25 11:11:08,816] ERROR [ReplicaFetcherThread-0-21], Current offset > 54369274 for partition [topic,11] out of range; reset offset to 49322124 > (kafka.server.ReplicaFetcherThread) > {code} > This occures because we update fetchOffset > [here|https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L124] > and then try to process message. > If any exception except OffsetOutOfRangeCode occures we get unsynchronized > fetchOffset and replica.logEndOffset. > On next fetch iteration we can get > fetchOffset>replica.logEndOffset==leaderEndOffset and OffsetOutOfRangeCode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2165) ReplicaFetcherThread: data loss on unknown exception
Alexey Ozeritskiy created KAFKA-2165: Summary: ReplicaFetcherThread: data loss on unknown exception Key: KAFKA-2165 URL: https://issues.apache.org/jira/browse/KAFKA-2165 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Sometimes in our cluster some replica gets out of the isr. Then broker redownloads the partition from the beginning. We got the following messages in logs: {code} # The leader: [2015-03-25 11:11:07,796] ERROR [Replica Manager on Broker 21]: Error when processing fetch request for partition [topic,11] offset 54369274 from follower with correlation id 2634499. Possible cause: Request for offset 54369274 but we only have log segments in the range 49322124 to 54369273. (kafka.server.ReplicaManager) {code} {code} # The follower: [2015-03-25 11:11:08,816] WARN [ReplicaFetcherThread-0-21], Replica 31 for partition [topic,11] reset its fetch offset from 49322124 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,816] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} This occures because we update fetchOffset [here|https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L124] and then try to process message. If any exception except OffsetOutOfRangeCode occures we get unsynchronized fetchOffset and replica.logEndOffset. On next fetch iteration we can get fetchOffset>replica.logEndOffset==leaderEndOffset and OffsetOutOfRangeCode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2164) ReplicaFetcherThread: suspicious log message on reset offset
[ https://issues.apache.org/jira/browse/KAFKA-2164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2164: - Status: Patch Available (was: Open) > ReplicaFetcherThread: suspicious log message on reset offset > > > Key: KAFKA-2164 > URL: https://issues.apache.org/jira/browse/KAFKA-2164 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 >Reporter: Alexey Ozeritskiy > Attachments: KAFKA-2164.patch > > > If log.logEndOffset < leaderStartOffset the follower resets its offset and > prints the following: > {code} > [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for > partition [topic,11] reset its fetch offset from 49322124 to current leader > 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) > [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset > 54369274 for partition [topic,11] out of range; reset offset to 49322124 > (kafka.server.ReplicaFetcherThread) > {code} > I think the right message should be: > {code} > [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for > partition [rt3.iva--yandex--access-log,11] reset its fetch offset from > 54369274 to current leader 21's start offset 49322124 > (kafka.server.ReplicaFetcherThread) > [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset > 54369274 for partition [rt3.iva--yandex--access-log,11] out of range; reset > offset to 49322124 (kafka.server.ReplicaFetcherThread) > {code} > This occurs because ReplicaFetcherThread resets the offset and then print log > message. > Posible solution: > {code} > diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala > b/core/ > index b31b432..181cbc1 100644 > --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala > +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala > @@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String, > * Roll out a new log at the follower with the start offset equal to > the > */ >val leaderStartOffset = > simpleConsumer.earliestOrLatestOffset(topicAndPar > - replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, > leaderSt >warn("Replica %d for partition %s reset its fetch offset from %d to > curre > .format(brokerConfig.brokerId, topicAndPartition, > replica.logEndOffset. > + replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, > leaderSt >leaderStartOffset > } >} > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2164) ReplicaFetcherThread: suspicious log message on reset offset
[ https://issues.apache.org/jira/browse/KAFKA-2164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2164: - Attachment: KAFKA-2164.patch > ReplicaFetcherThread: suspicious log message on reset offset > > > Key: KAFKA-2164 > URL: https://issues.apache.org/jira/browse/KAFKA-2164 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 >Reporter: Alexey Ozeritskiy > Attachments: KAFKA-2164.patch > > > If log.logEndOffset < leaderStartOffset the follower resets its offset and > prints the following: > {code} > [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for > partition [topic,11] reset its fetch offset from 49322124 to current leader > 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) > [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset > 54369274 for partition [topic,11] out of range; reset offset to 49322124 > (kafka.server.ReplicaFetcherThread) > {code} > I think the right message should be: > {code} > [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for > partition [rt3.iva--yandex--access-log,11] reset its fetch offset from > 54369274 to current leader 21's start offset 49322124 > (kafka.server.ReplicaFetcherThread) > [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset > 54369274 for partition [rt3.iva--yandex--access-log,11] out of range; reset > offset to 49322124 (kafka.server.ReplicaFetcherThread) > {code} > This occurs because ReplicaFetcherThread resets the offset and then print log > message. > Posible solution: > {code} > diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala > b/core/ > index b31b432..181cbc1 100644 > --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala > +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala > @@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String, > * Roll out a new log at the follower with the start offset equal to > the > */ >val leaderStartOffset = > simpleConsumer.earliestOrLatestOffset(topicAndPar > - replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, > leaderSt >warn("Replica %d for partition %s reset its fetch offset from %d to > curre > .format(brokerConfig.brokerId, topicAndPartition, > replica.logEndOffset. > + replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, > leaderSt >leaderStartOffset > } >} > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Using GitHub Pull Requests for contributions and code review
Ismael, We will also need to figure out if we need CONTRIBUTING.md like the following to take care of the Apache licensing stuff. https://github.com/apache/spark/blob/master/CONTRIBUTING.md As for merging changes, I think squashing the commits will be ideal during merge. Also, it would be great if we can always put the latest changes on top during the merge. Thanks, Jun On Thu, Apr 30, 2015 at 8:12 AM, Ismael Juma wrote: > Hi all, > > Kafka currently uses a combination of Review Board and JIRA for > contributions and code review. In my opinion, this makes contribution and > code review a bit harder than it has to be. > > I think the approach used by Spark would improve the current situation: > > "Generally, Spark uses JIRA to track logical issues, including bugs and > improvements, and uses Github pull requests to manage the review and merge > of specific code changes. That is, JIRAs are used to describe what should > be fixed or changed, and high-level approaches, and pull requests describe > how to implement that change in the project's source code. For example, > major design decisions are discussed in JIRA."[1] > > It's worth reading the wiki page for all the details, but I will summarise > the suggested workflow for code changes: > >1. Fork the Github repository at http://github.com/apache/kafka (if you >haven't already) >2. git checkout -b kafka-XXX >3. Make one or more commits (smaller commits can be easier to review and >reviewboard makes that hard) >4. git push origin kafka-XXX >5. Create PR against upstream/trunk (this will update JIRA >automatically[2] and it will send an email to the dev mailing list too) >6. A CI build will be triggered[3] >7. Review process happens on GitHub (it's quite handy to be able to >comment on both commit or PR-level, unlike Review Board) >8. Once all feedback has been addressed and the build is green, a >variant of the `merge_spark_pr.py`[4] script is used to squash, merge, >push, close the PR and JIRA issue. The squashed commit generated by the >script includes a bunch of useful information including links to the >original commits[5] (in the future, I think it's worth reconsidering the >squashing of commits, but retaining the information in the commit is >already an improvement) > > Neha merged a couple of commits via GitHub already and it went smoothly > although we are still missing a few of the pieces described above: > >1. CI builds triggered by GitHub PRs (this is supported by Apache Infra, >we need to request it for Kafka and provide whatever configuration is >needed) >2. Adapting Spark's merge_park_pr script and integrating it into the >kafka Git repository >3. Updating the Kafka contribution wiki and adding a CONTRIBUTING.md to >the Git repository (this is shown when someone is creating a pull > request) >4. Go through existing GitHub pull requests and close the ones that are >no longer relevant (there are quite a few as people have been opening > them >over the years, but nothing was done about most of them) >5. Other things I may be missing > > I am volunteering to help with the above if people agree that this is the > right direction for Kafka. Thoughts? > > Best. > Ismael > > P.S. I was told in the Apache Infra HipChat that it's not currently > possible (and there are no plans to change that in the near future) to use > the GitHub merge button to merge PRs. The merge script does quite a few > useful things that the merge button does not in any case. > > [1] > https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark > [2] > > https://issues.apache.org/jira/browse/KAFKA-1054?focusedCommentId=14513614&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14513614 > [3] https://blogs.apache.org/infra/entry/github_pull_request_builds_now > [4] https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py > [5] > > https://github.com/apache/spark/commit/59b7cfc41b2c06fbfbf6aca16c1619496a8d1d00 >
[jira] [Created] (KAFKA-2164) ReplicaFetcherThread: suspicious log message on reset offset
Alexey Ozeritskiy created KAFKA-2164: Summary: ReplicaFetcherThread: suspicious log message on reset offset Key: KAFKA-2164 URL: https://issues.apache.org/jira/browse/KAFKA-2164 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy If log.logEndOffset < leaderStartOffset the follower resets its offset and prints the following: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [topic,11] reset its fetch offset from 49322124 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [topic,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} I think the right message should be: {code} [2015-03-25 11:11:08,975] WARN [ReplicaFetcherThread-0-21], Replica 30 for partition [rt3.iva--yandex--access-log,11] reset its fetch offset from 54369274 to current leader 21's start offset 49322124 (kafka.server.ReplicaFetcherThread) [2015-03-25 11:11:08,976] ERROR [ReplicaFetcherThread-0-21], Current offset 54369274 for partition [rt3.iva--yandex--access-log,11] out of range; reset offset to 49322124 (kafka.server.ReplicaFetcherThread) {code} This occurs because ReplicaFetcherThread resets the offset and then print log message. Posible solution: {code} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/ index b31b432..181cbc1 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String, * Roll out a new log at the follower with the start offset equal to the */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPar - replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt warn("Replica %d for partition %s reset its fetch offset from %d to curre .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset. + replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderSt leaderStartOffset } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14525421#comment-14525421 ] Jiangjie Qin commented on KAFKA-1788: - I took a shot to incorporate the solution to this problem in KAFKA-2142. The approach I took there is to just use metadata timeout instead of add a new timeout. Because I think this is essentially metadata not available. So we should treat it the same as in send(). This also saves us another timeout configuration. [~ewencp] My concern about having cap on the buffer for each topic-partition is that what if the traffic of each topic-partition is not balanced. If so we might end up waiting on a busy topic-partition's buffer allocation while we actually have plenty of memory to use. That could hurt the performance a lot. > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2.0 >Reporter: Jun Rao >Assignee: Parth Brahmbhatt > Labels: newbie++ > Fix For: 0.8.3 > > Attachments: KAFKA-1788.patch, KAFKA-1788_2015-01-06_13:42:37.patch, > KAFKA-1788_2015-01-06_13:44:41.patch > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-313) Add JSON/CSV output and looping options to ConsumerGroupCommand
[ https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14525420#comment-14525420 ] Ashish K Singh commented on KAFKA-313: -- [~nehanarkhede] and [~gwenshap], pinging again for review. > Add JSON/CSV output and looping options to ConsumerGroupCommand > --- > > Key: KAFKA-313 > URL: https://issues.apache.org/jira/browse/KAFKA-313 > Project: Kafka > Issue Type: Improvement >Reporter: Dave DeMaagd >Assignee: Ashish K Singh >Priority: Minor > Labels: newbie, patch > Fix For: 0.8.3 > > Attachments: KAFKA-313-2012032200.diff, KAFKA-313.1.patch, > KAFKA-313.patch, KAFKA-313_2015-02-23_18:11:32.patch > > > Adds: > * '--loop N' - causes the program to loop forever, sleeping for up to N > seconds between loops (loop time minus collection time, unless that's less > than 0, at which point it will just run again immediately) > * '--asjson' - display as a JSON string instead of the more human readable > output format. > Neither of the above depend on each other (you can loop in the human > readable output, or do a single shot execution with JSON output). Existing > behavior/output maintained if neither of the above are used. Diff Attached. > Impacted files: > core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Perf testing flush() - issues found
Thanks @Jay for suggesting changes to batch.size and linger.ms. I tried them out. It appears one can do better than the default batch.size for this synchronous batch mode with flush(). These new measurements are giving more "rational" numbers which with I can reason and infer some thumb rules (for batch-sync mode using flush). Here are my observations: - The new producer API does much better than the older one for *single threaded* producer. (best# i saw with old is ~68MB/s, with new ~140MB/s) - Higher linger.ms sometimes helps perf and at other times hurts. No simple rule here. Best to try it out and decide whether default is good for your case or not. - For single threaded producer: To get the most throughput, set batch.size = (total bytes between flushes / partition count). - Running more single threaded producer processes helped (till about till 3 / 4 processes) - 1-producer going to single partition is faster than 1 producer going to multiple partitions - The number of bytes between two explicit flushes (ie. flush interval) made much smaller impact than the buffer.size. Something to be learnt here.. my speculation is that with smaller flush intervals this might change. Having two knobs (batch.size & flush interval is a a bit confusing for end users trying to tune it, will be good if we can find if there is some simple guidance feasible) - Other than some inconveniences previously mentioned, I feel flush() could be used as a way to simulate sync-batch behavior. Producer Limits: - Able to exceed 1gigEthernet capacity, but not 10gigEthernet. Does not appear to go beyond ~460MB/s. Verified my test machines are able to achieve 1GB/s. Todo: - Need to try Multi threaded producer. - I did some testing of the Consumer APIs as well with 0.8.1 consumer-perf tool. Wasnt able to push it beyond 30MB/s. When producers ran in parallel it fell to under 10MB/s. Need to dig deeper. Will report back. Suggestions welcome. Measurements: - See attachment - Also available on paste bin: http://pastebin.com/p3kSAjy6 Settings: acks=1, single broker, single threaded producer (new api) Machines: 32 cores, 256GB RAM, 10 gigE, 6x15000 rpm disks 1 partition FlushInt=4MBFlushInt=8MB FlushInt=16MB linger=def batch.size = default 57 54 52 linger=1s batch.size = default 57 61 59 linger=def batch.size= flushInt/parts 136 125 116 linger=1s batch.size= flushInt/parts 92 77 56 linger=def batch.size == flushInt 140 123 124 linger=def batch.size = 10MB 140 123 124 linger=def batch.Size = 20MB31 30 42 4 partitions FlushInt=4MBFlushInt=8MB FlushInt=16MB linger=def batch.size = default95 82 80 linger=1s batch.size = default85 83 85 linger=def batch.size= batch/#part 127 133 90 linger=1s batch.size= batch/#part 94 100 101 linger=def batch.size == flushInt 6086 linger=def batch.size = 10M7 77 linger=def batch.Size = 20M6 65 8 partitions FlushInt=4MBFlushInt=8MB FlushInt=16MB linger=def batch.size = default100 89 96 linger=1s batch.size = default105 97 98 linger=def batch.size= batch/#part 114 128 78 linger=1s batch.size= batch/#part 95 94 102 linger=def batch.size == flushInt7 88 linger=def batch.size = 10M 7 87 linger=def batch.Size = 20M 6 66 With multiple procduers (each single threaded) For 1 partition : 1 process = 136 MB/s 3 process = 344 MB/s 4 process = 290 MB/s For 4 partition (): 1 process = 127 MB/s 3 process = 345 MB/s 4 process = 372 MB/s For 8 partition (): 1 process = 128 MB/s 3 process = 304 MB/s 4 process = 460 MB/s
[jira] [Commented] (KAFKA-2142) Follow-up patch for KAFKA-2138 Refactor the drain message logic in new producer
[ https://issues.apache.org/jira/browse/KAFKA-2142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14525408#comment-14525408 ] Jiangjie Qin commented on KAFKA-2142: - Updated reviewboard https://reviews.apache.org/r/33552/diff/ against branch origin/trunk > Follow-up patch for KAFKA-2138 Refactor the drain message logic in new > producer > --- > > Key: KAFKA-2142 > URL: https://issues.apache.org/jira/browse/KAFKA-2142 > Project: Kafka > Issue Type: Bug >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin > Attachments: KAFKA-2142.patch, KAFKA-2142_2015-04-25_11:48:09.patch, > KAFKA-2142_2015-05-02_11:59:39.patch > > > This is the follow up patch for KAFKA-2138. Currently the logic for sender to > drain message from accumulator is a little bit awkward, we want to refactor > it a little bit. Copy/Paste Guozhang's suggestion below: > {quote} > 1. while handle metadata response and update the metadata, check for ANY > partitions if their leader is not known; if there is set > metadata.requestUpdate. So we do not need to do this step anymore at the > start of run(). > 2. get all the ready nodes based on their connection state only (i.e. no > peeking in RecordAccumulator), and record the node_backoff as min > (reconnection_backoff - time_waited) of all nodes; if one of these node is > connected or connecting, this backoff should be 0. > 3. for each of ready nodes, try to drain their corresponding partitions > in RecordAccumulator while considering or kinds of conditions (full, expired, > exhausted, etc...), and record the data_backoff as min (retry_backoff - > time_waited) of all partitions; if one of the partitions is immediately > sendable, this backoff should be 0. > 4. formulate produce request and call client.poll() with timeout = > reconnection_backoff > 0 ? recconection_backoff : retry_backoff. > 5. in NetworkClient.poll(), the logic of "maybeUpdateMetadata" while > update metadataTimeout can also be simplified. > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2142) Follow-up patch for KAFKA-2138 Refactor the drain message logic in new producer
[ https://issues.apache.org/jira/browse/KAFKA-2142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-2142: Attachment: KAFKA-2142_2015-05-02_11:59:39.patch > Follow-up patch for KAFKA-2138 Refactor the drain message logic in new > producer > --- > > Key: KAFKA-2142 > URL: https://issues.apache.org/jira/browse/KAFKA-2142 > Project: Kafka > Issue Type: Bug >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin > Attachments: KAFKA-2142.patch, KAFKA-2142_2015-04-25_11:48:09.patch, > KAFKA-2142_2015-05-02_11:59:39.patch > > > This is the follow up patch for KAFKA-2138. Currently the logic for sender to > drain message from accumulator is a little bit awkward, we want to refactor > it a little bit. Copy/Paste Guozhang's suggestion below: > {quote} > 1. while handle metadata response and update the metadata, check for ANY > partitions if their leader is not known; if there is set > metadata.requestUpdate. So we do not need to do this step anymore at the > start of run(). > 2. get all the ready nodes based on their connection state only (i.e. no > peeking in RecordAccumulator), and record the node_backoff as min > (reconnection_backoff - time_waited) of all nodes; if one of these node is > connected or connecting, this backoff should be 0. > 3. for each of ready nodes, try to drain their corresponding partitions > in RecordAccumulator while considering or kinds of conditions (full, expired, > exhausted, etc...), and record the data_backoff as min (retry_backoff - > time_waited) of all partitions; if one of the partitions is immediately > sendable, this backoff should be 0. > 4. formulate produce request and call client.poll() with timeout = > reconnection_backoff > 0 ? recconection_backoff : retry_backoff. > 5. in NetworkClient.poll(), the logic of "maybeUpdateMetadata" while > update metadataTimeout can also be simplified. > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33552: Patch for KAFKA-2142
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33552/ --- (Updated May 2, 2015, 6:59 p.m.) Review request for kafka. Bugs: KAFKA-2142 https://issues.apache.org/jira/browse/KAFKA-2142 Repository: kafka Description (updated) --- Patch for KAFKA-2142 Patch for KAFAK-2142 Minor change in comments Fix Null pointer Add fix to KAFKA-1788 Rebase on trunk Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/Metadata.java 07f1cdb1fe920b0c7a5f2d101ddc40c689e1b247 clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 42b12928781463b56fc4a45d96bb4da2745b6d95 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 49a98838767615dd952da20825f6985698137710 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java 06182db1c3a5da85648199b4c0c98b80ea7c6c0c clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 clients/src/test/java/org/apache/kafka/clients/MetadataTest.java 928087d29deb80655ca83726c1ebc45d76468c1f clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 8b278892883e63899b53e15efb9d8c926131e858 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java baa48e7c1b7ac5da8f3aca29f653c3fff88f8009 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 clients/src/test/resources/log4j.properties b1d5b7f2b4091040bdcfb0a60fd5879f45a0 core/src/test/resources/log4j.properties 1b7d5d8f7d5fae7d272849715714781cad05d77b Diff: https://reviews.apache.org/r/33552/diff/ Testing --- Unit Test passed. Thanks, Jiangjie Qin