[GitHub] [kafka] dongjinleekr commented on pull request #10176: KAFKA-12359: Update Jetty to 11
dongjinleekr commented on pull request #10176: URL: https://github.com/apache/kafka/pull/10176#issuecomment-813274930 For those who are curious about how the Jetty 11 upgrade would be, here is the latest draft with Jetty 11.0.2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should
[ https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314720#comment-17314720 ] Justin Mclean commented on KAFKA-12602: --- Re the first two files - even if something is Apache licensed it's best to mention it in the LICENSE file. There's no need to repeat the license text however. Is if Apache code or 3rd party code? If it come from an ASF project (and there's changes to the original code) then that may have an impact the NOTICE files as well as you need to look at the NOTICE file in the project it come from and copy the relevant parts across. > The LICENSE and NOTICE files don't list everything they should > -- > > Key: KAFKA-12602 > URL: https://issues.apache.org/jira/browse/KAFKA-12602 > Project: Kafka > Issue Type: Bug >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > [~jmclean] raised this on the mailing list: > [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E] > > We need to make the license file match what we are actually shipping in > source and binary distributions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should
[ https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314721#comment-17314721 ] Justin Mclean commented on KAFKA-12602: --- see [1] and [2] 1. https://infra.apache.org/licensing-howto.html#alv2-dep 2. https://infra.apache.org/licensing-howto.html#bundle-asf-product > The LICENSE and NOTICE files don't list everything they should > -- > > Key: KAFKA-12602 > URL: https://issues.apache.org/jira/browse/KAFKA-12602 > Project: Kafka > Issue Type: Bug >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > [~jmclean] raised this on the mailing list: > [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E] > > We need to make the license file match what we are actually shipping in > source and binary distributions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ncliang opened a new pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource
ncliang opened a new pull request #10475: URL: https://github.com/apache/kafka/pull/10475 The default implementation, which is inheritted from ClassLoader, searches the classloader tree from parent first. This causes issues when the resource is available both on the classpath and the plugin path. Instead of attempting to load the resource from plugin path first, the system classloader is consulted first and loads the resource from classpath. A testcase is added in PluginsTest to verify this behavior is fixed by this commit. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ncliang commented on pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource
ncliang commented on pull request #10475: URL: https://github.com/apache/kafka/pull/10475#issuecomment-813282995 @mageshn @C0urante @gharris1727 , please review when you get a chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should
[ https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314723#comment-17314723 ] Justin Mclean commented on KAFKA-12602: --- Looking at the proposed changes, some changes may need to be made to the binary NOTICE file due to including of ALv2 dependancies. I also see you include general MIT and BSD licenses. This is probably OK but it would be better to include each individual license. Both licenses have copyright lines and the BSD license text can varies a little from license to license. In short IMO I think it's much better than before. I think this would be OK to merge and make another release candidate. Perhaps consider refining in future releases? > The LICENSE and NOTICE files don't list everything they should > -- > > Key: KAFKA-12602 > URL: https://issues.apache.org/jira/browse/KAFKA-12602 > Project: Kafka > Issue Type: Bug >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > [~jmclean] raised this on the mailing list: > [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E] > > We need to make the license file match what we are actually shipping in > source and binary distributions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should
[ https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314723#comment-17314723 ] Justin Mclean edited comment on KAFKA-12602 at 4/5/21, 9:12 AM: Looking at the proposed changes, some additions may need to be made to the binary NOTICE file due to inclusion of ALv2 dependancies. I also see you have included general MIT and BSD licenses. This is probably OK but it would be better to include each individual license text. Both MIT and BSD licenses have copyright lines and the BSD license text can vary a little from license to license. In short IMO I think it's much better than before. I think this would be OK to merge and make another release candidate. Perhaps consider refining in future releases? was (Author: jmclean): Looking at the proposed changes, some changes may need to be made to the binary NOTICE file due to including of ALv2 dependancies. I also see you include general MIT and BSD licenses. This is probably OK but it would be better to include each individual license. Both licenses have copyright lines and the BSD license text can varies a little from license to license. In short IMO I think it's much better than before. I think this would be OK to merge and make another release candidate. Perhaps consider refining in future releases? > The LICENSE and NOTICE files don't list everything they should > -- > > Key: KAFKA-12602 > URL: https://issues.apache.org/jira/browse/KAFKA-12602 > Project: Kafka > Issue Type: Bug >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > [~jmclean] raised this on the mailing list: > [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E] > > We need to make the license file match what we are actually shipping in > source and binary distributions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12615) Correct comments for the method Selector.clear
HaiyuanZhao created KAFKA-12615: --- Summary: Correct comments for the method Selector.clear Key: KAFKA-12615 URL: https://issues.apache.org/jira/browse/KAFKA-12615 Project: Kafka Issue Type: Improvement Components: build Reporter: HaiyuanZhao Assignee: HaiyuanZhao {code:java} /** * Clears all the results from the previous poll. This is invoked by Selector at the start of * a poll() when all the results from the previous poll are expected to have been handled. * * SocketServer uses {@link #clearCompletedSends()} and {@link #clearCompletedSends()} to * clear `completedSends` and `completedReceives` as soon as they are processed to avoid * holding onto large request/response buffers from multiple connections longer than necessary. * Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage * is less critical and clearing once-per-poll provides the flexibility to process these results in * any order before the next poll. */ {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12615) Correct comments for the method Selector.clear
[ https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao updated KAFKA-12615: Description: According to my understanding, the second clearCompletedSends should be clearCompletedReceives {code:java} /** * Clears all the results from the previous poll. This is invoked by Selector at the start of * a poll() when all the results from the previous poll are expected to have been handled. * * SocketServer uses {@link #clearCompletedSends()} and {@link #clearCompletedSends()} to * clear `completedSends` and `completedReceives` as soon as they are processed to avoid * holding onto large request/response buffers from multiple connections longer than necessary. * Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage * is less critical and clearing once-per-poll provides the flexibility to process these results in * any order before the next poll. */ {code} was: {code:java} /** * Clears all the results from the previous poll. This is invoked by Selector at the start of * a poll() when all the results from the previous poll are expected to have been handled. * * SocketServer uses {@link #clearCompletedSends()} and {@link #clearCompletedSends()} to * clear `completedSends` and `completedReceives` as soon as they are processed to avoid * holding onto large request/response buffers from multiple connections longer than necessary. * Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage * is less critical and clearing once-per-poll provides the flexibility to process these results in * any order before the next poll. */ {code} > Correct comments for the method Selector.clear > -- > > Key: KAFKA-12615 > URL: https://issues.apache.org/jira/browse/KAFKA-12615 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: HaiyuanZhao >Assignee: HaiyuanZhao >Priority: Minor > > According to my understanding, the second clearCompletedSends should be > clearCompletedReceives > {code:java} > /** > * Clears all the results from the previous poll. This is invoked by Selector > at the start of > * a poll() when all the results from the previous poll are expected to have > been handled. > * > * SocketServer uses {@link #clearCompletedSends()} and {@link > #clearCompletedSends()} to > * clear `completedSends` and `completedReceives` as soon as they are > processed to avoid > * holding onto large request/response buffers from multiple connections > longer than necessary. > * Clients rely on Selector invoking {@link #clear()} at the start of each > poll() since memory usage > * is less critical and clearing once-per-poll provides the flexibility to > process these results in > * any order before the next poll. > */ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12615) Correct comments for the method Selector.clear
[ https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao updated KAFKA-12615: Description: According to my understanding, the second clearCompletedSends should be clearCompletedReceives {code:java} /** * Clears all the results from the previous poll. This is invoked by Selector at the start of * a poll() when all the results from the previous poll are expected to have been handled. * * SocketServer uses {@link #*clearCompletedSends*()} and {@link #*clearCompletedSends*()} to * clear `completedSends` and `completedReceives` as soon as they are processed to avoid * holding onto large request/response buffers from multiple connections longer than necessary. * Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage * is less critical and clearing once-per-poll provides the flexibility to process these results in * any order before the next poll. */ {code} was: According to my understanding, the second clearCompletedSends should be clearCompletedReceives {code:java} /** * Clears all the results from the previous poll. This is invoked by Selector at the start of * a poll() when all the results from the previous poll are expected to have been handled. * * SocketServer uses {@link #clearCompletedSends()} and {@link #clearCompletedSends()} to * clear `completedSends` and `completedReceives` as soon as they are processed to avoid * holding onto large request/response buffers from multiple connections longer than necessary. * Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage * is less critical and clearing once-per-poll provides the flexibility to process these results in * any order before the next poll. */ {code} > Correct comments for the method Selector.clear > -- > > Key: KAFKA-12615 > URL: https://issues.apache.org/jira/browse/KAFKA-12615 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: HaiyuanZhao >Assignee: HaiyuanZhao >Priority: Minor > > According to my understanding, the second clearCompletedSends should be > clearCompletedReceives > {code:java} > /** > * Clears all the results from the previous poll. This is invoked by Selector > at the start of > * a poll() when all the results from the previous poll are expected to have > been handled. > * > * SocketServer uses {@link #*clearCompletedSends*()} and {@link > #*clearCompletedSends*()} to > * clear `completedSends` and `completedReceives` as soon as they are > processed to avoid > * holding onto large request/response buffers from multiple connections > longer than necessary. > * Clients rely on Selector invoking {@link #clear()} at the start of each > poll() since memory usage > * is less critical and clearing once-per-poll provides the flexibility to > process these results in > * any order before the next poll. > */ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12615) Correct comments for the method Selector.clear
[ https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao updated KAFKA-12615: Component/s: (was: build) clients > Correct comments for the method Selector.clear > -- > > Key: KAFKA-12615 > URL: https://issues.apache.org/jira/browse/KAFKA-12615 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: HaiyuanZhao >Assignee: HaiyuanZhao >Priority: Minor > > According to my understanding, the second clearCompletedSends should be > clearCompletedReceives > {code:java} > /** > * Clears all the results from the previous poll. This is invoked by Selector > at the start of > * a poll() when all the results from the previous poll are expected to have > been handled. > * > * SocketServer uses {@link #clearCompletedSends()} and {@link > #clearCompletedSends()} to > * clear `completedSends` and `completedReceives` as soon as they are > processed to avoid > * holding onto large request/response buffers from multiple connections > longer than necessary. > * Clients rely on Selector invoking {@link #clear()} at the start of each > poll() since memory usage > * is less critical and clearing once-per-poll provides the flexibility to > process these results in > * any order before the next poll. > */ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12615) Correct comments for the method Selector.clear
[ https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao updated KAFKA-12615: Description: According to my understanding, the second clearCompletedSends should be clearCompletedReceives /** * Clears all the results from the previous poll. This is invoked by Selector at the start of * a poll() when all the results from the previous poll are expected to have been handled. * * SocketServer uses clearCompletedSends*() and *clearCompletedSends*() to * clear `completedSends` and `completedReceives` as soon as they are processed to avoid * holding onto large request/response buffers from multiple connections longer than necessary. * Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage * is less critical and clearing once-per-poll provides the flexibility to process these results in * any order before the next poll. */ was: According to my understanding, the second clearCompletedSends should be clearCompletedReceives /** * Clears all the results from the previous poll. This is invoked by Selector at the start of * a poll() when all the results from the previous poll are expected to have been handled. * * SocketServer uses {@link #*clearCompletedSends*()} and {@link #*clearCompletedSends*()} to * clear `completedSends` and `completedReceives` as soon as they are processed to avoid * holding onto large request/response buffers from multiple connections longer than necessary. * Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage * is less critical and clearing once-per-poll provides the flexibility to process these results in * any order before the next poll. */ > Correct comments for the method Selector.clear > -- > > Key: KAFKA-12615 > URL: https://issues.apache.org/jira/browse/KAFKA-12615 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: HaiyuanZhao >Assignee: HaiyuanZhao >Priority: Minor > > According to my understanding, the second clearCompletedSends should be > clearCompletedReceives > /** > * Clears all the results from the previous poll. This is invoked by Selector > at the start of > * a poll() when all the results from the previous poll are expected to have > been handled. > * > * SocketServer uses clearCompletedSends*() and *clearCompletedSends*() to > * clear `completedSends` and `completedReceives` as soon as they are > processed to avoid > * holding onto large request/response buffers from multiple connections > longer than necessary. > * Clients rely on Selector invoking {@link #clear()} at the start of each > poll() since memory usage > * is less critical and clearing once-per-poll provides the flexibility to > process these results in > * any order before the next poll. > */ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12615) Correct comments for the method Selector.clear
[ https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao updated KAFKA-12615: Description: According to my understanding, the second clearCompletedSends should be clearCompletedReceives /** * Clears all the results from the previous poll. This is invoked by Selector at the start of * a poll() when all the results from the previous poll are expected to have been handled. * * SocketServer uses {@link #*clearCompletedSends*()} and {@link #*clearCompletedSends*()} to * clear `completedSends` and `completedReceives` as soon as they are processed to avoid * holding onto large request/response buffers from multiple connections longer than necessary. * Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage * is less critical and clearing once-per-poll provides the flexibility to process these results in * any order before the next poll. */ was: According to my understanding, the second clearCompletedSends should be clearCompletedReceives {code:java} /** * Clears all the results from the previous poll. This is invoked by Selector at the start of * a poll() when all the results from the previous poll are expected to have been handled. * * SocketServer uses {@link #*clearCompletedSends*()} and {@link #*clearCompletedSends*()} to * clear `completedSends` and `completedReceives` as soon as they are processed to avoid * holding onto large request/response buffers from multiple connections longer than necessary. * Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage * is less critical and clearing once-per-poll provides the flexibility to process these results in * any order before the next poll. */ {code} > Correct comments for the method Selector.clear > -- > > Key: KAFKA-12615 > URL: https://issues.apache.org/jira/browse/KAFKA-12615 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: HaiyuanZhao >Assignee: HaiyuanZhao >Priority: Minor > > According to my understanding, the second clearCompletedSends should be > clearCompletedReceives > /** > * Clears all the results from the previous poll. This is invoked by Selector > at the start of > * a poll() when all the results from the previous poll are expected to have > been handled. > * > * SocketServer uses {@link #*clearCompletedSends*()} and {@link > #*clearCompletedSends*()} to > * clear `completedSends` and `completedReceives` as soon as they are > processed to avoid > * holding onto large request/response buffers from multiple connections > longer than necessary. > * Clients rely on Selector invoking {@link #clear()} at the start of each > poll() since memory usage > * is less critical and clearing once-per-poll provides the flexibility to > process these results in > * any order before the next poll. > */ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12615) Correct comments for the method Selector.clear
[ https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao updated KAFKA-12615: Description: According to my understanding, the second clearCompletedSends should be clearCompletedReceives /** * Clears all the results from the previous poll. This is invoked by Selector at the start of * a poll() when all the results from the previous poll are expected to have been handled. * * SocketServer uses clearCompletedSends() and *clearCompletedSends*() to * clear `completedSends` and `completedReceives` as soon as they are processed to avoid * holding onto large request/response buffers from multiple connections longer than necessary. * Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage * is less critical and clearing once-per-poll provides the flexibility to process these results in * any order before the next poll. */ was: According to my understanding, the second clearCompletedSends should be clearCompletedReceives /** * Clears all the results from the previous poll. This is invoked by Selector at the start of * a poll() when all the results from the previous poll are expected to have been handled. * * SocketServer uses clearCompletedSends*() and *clearCompletedSends*() to * clear `completedSends` and `completedReceives` as soon as they are processed to avoid * holding onto large request/response buffers from multiple connections longer than necessary. * Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage * is less critical and clearing once-per-poll provides the flexibility to process these results in * any order before the next poll. */ > Correct comments for the method Selector.clear > -- > > Key: KAFKA-12615 > URL: https://issues.apache.org/jira/browse/KAFKA-12615 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: HaiyuanZhao >Assignee: HaiyuanZhao >Priority: Minor > > According to my understanding, the second clearCompletedSends should be > clearCompletedReceives > /** > * Clears all the results from the previous poll. This is invoked by Selector > at the start of > * a poll() when all the results from the previous poll are expected to have > been handled. > * > * SocketServer uses clearCompletedSends() and *clearCompletedSends*() to > * clear `completedSends` and `completedReceives` as soon as they are > processed to avoid > * holding onto large request/response buffers from multiple connections > longer than necessary. > * Clients rely on Selector invoking {@link #clear()} at the start of each > poll() since memory usage > * is less critical and clearing once-per-poll provides the flexibility to > process these results in > * any order before the next poll. > */ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12615) Correct comments for the method Selector.clear
[ https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao updated KAFKA-12615: Description: According to my understanding, the second clearCompletedSends which is highlighted as followed should be clearCompletedReceives /** * Clears all the results from the previous poll. This is invoked by Selector at the start of * a poll() when all the results from the previous poll are expected to have been handled. * * SocketServer uses clearCompletedSends() and *clearCompletedSends*() to * clear `completedSends` and `completedReceives` as soon as they are processed to avoid * holding onto large request/response buffers from multiple connections longer than necessary. * Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage * is less critical and clearing once-per-poll provides the flexibility to process these results in * any order before the next poll. */ was: According to my understanding, the second clearCompletedSends should be clearCompletedReceives /** * Clears all the results from the previous poll. This is invoked by Selector at the start of * a poll() when all the results from the previous poll are expected to have been handled. * * SocketServer uses clearCompletedSends() and *clearCompletedSends*() to * clear `completedSends` and `completedReceives` as soon as they are processed to avoid * holding onto large request/response buffers from multiple connections longer than necessary. * Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage * is less critical and clearing once-per-poll provides the flexibility to process these results in * any order before the next poll. */ > Correct comments for the method Selector.clear > -- > > Key: KAFKA-12615 > URL: https://issues.apache.org/jira/browse/KAFKA-12615 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: HaiyuanZhao >Assignee: HaiyuanZhao >Priority: Minor > > According to my understanding, the second clearCompletedSends which is > highlighted as followed should be clearCompletedReceives > /** > * Clears all the results from the previous poll. This is invoked by Selector > at the start of > * a poll() when all the results from the previous poll are expected to have > been handled. > * > * SocketServer uses clearCompletedSends() and *clearCompletedSends*() to > * clear `completedSends` and `completedReceives` as soon as they are > processed to avoid > * holding onto large request/response buffers from multiple connections > longer than necessary. > * Clients rely on Selector invoking {@link #clear()} at the start of each > poll() since memory usage > * is less critical and clearing once-per-poll provides the flexibility to > process these results in > * any order before the next poll. > */ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] zhaohaidao opened a new pull request #10476: KAFKA-12615: Correct comments for the method Selector.clear
zhaohaidao opened a new pull request #10476: URL: https://github.com/apache/kafka/pull/10476 According to my understanding, the second clearCompletedSends which is highlighted as followed should be clearCompletedReceives /** Clears all the results from the previous poll. This is invoked by Selector at the start of a poll() when all the results from the previous poll are expected to have been handled. SocketServer uses clearCompletedSends() and clearCompletedSends() to clear `completedSends` and `completedReceives` as soon as they are processed to avoid holding onto large request/response buffers from multiple connections longer than necessary. Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage is less critical and clearing once-per-poll provides the flexibility to process these results in any order before the next poll. */ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao opened a new pull request #10477: KAFKA-12615: Correct comments for the method Selector.clear
zhaohaidao opened a new pull request #10477: URL: https://github.com/apache/kafka/pull/10477 According to my understanding, the second clearCompletedSends which is highlighted as followed should be clearCompletedReceives /** Clears all the results from the previous poll. This is invoked by Selector at the start of a poll() when all the results from the previous poll are expected to have been handled. SocketServer uses clearCompletedSends() and **clearCompletedSends**() to clear `completedSends` and `completedReceives` as soon as they are processed to avoid holding onto large request/response buffers from multiple connections longer than necessary. Clients rely on Selector invoking {@link #clear()} at the start of each poll() since memory usage is less critical and clearing once-per-poll provides the flexibility to process these results in any order before the next poll. */ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9229: MINOR: Reduce allocations in requests via buffer caching
ijuma commented on a change in pull request #9229: URL: https://github.com/apache/kafka/pull/9229#discussion_r607129364 ## File path: core/src/main/scala/kafka/server/RequestLocal.scala ## @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.apache.kafka.common.utils.BufferSupplier + +case class RequestLocal(bufferSupplier: BufferSupplier) { Review comment: Add documentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching
ijuma commented on pull request #9229: URL: https://github.com/apache/kafka/pull/9229#issuecomment-813439278 @chia7712 I introduced `RequestLocal` as discussed. Does this seem reasonable to you? If so, I propose the following next steps: 1. In this PR, provide utility methods in `RequestLocal` for the two common defaults: `ThreadLocalCaching` and `NoCaching`. The latter should be used when the usage is not guaranteed to be within the same thread. In the future, we can consider a `ThreadSafeCaching`/`GlobalCaching` option, if that makes sense. 2. In a separate PR, remove the default arguments. This will result in a lot of test changes, but no change in behavior. So, it probably makes sense to review separately. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10476: KAFKA-12615: Correct comments for the method Selector.clear
ijuma commented on pull request #10476: URL: https://github.com/apache/kafka/pull/10476#issuecomment-813444072 Thanks for the PR. This looks like a duplicate of https://github.com/apache/kafka/pull/10477, so closing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma closed pull request #10476: KAFKA-12615: Correct comments for the method Selector.clear
ijuma closed pull request #10476: URL: https://github.com/apache/kafka/pull/10476 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10477: KAFKA-12615: Correct comments for the method Selector.clear
ijuma merged pull request #10477: URL: https://github.com/apache/kafka/pull/10477 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10473: KAFKA-12614: Use Jenkinsfile for trunk and release branch builds
ijuma commented on pull request #10473: URL: https://github.com/apache/kafka/pull/10473#issuecomment-813460788 @mumrah Good point. I don't expect the number of builds to change over time. Instead, we will replace some of the existing builds with new ones. That is, I expect us to have 3 Java versions and 2 Scala versions at any point in time. If this changes, we can consider matrix builds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10473: KAFKA-12614: Use Jenkinsfile for trunk and release branch builds
ijuma merged pull request #10473: URL: https://github.com/apache/kafka/pull/10473 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10473: KAFKA-12614: Use Jenkinsfile for trunk and release branch builds
ijuma commented on pull request #10473: URL: https://github.com/apache/kafka/pull/10473#issuecomment-813463611 Merged to trunk and 2.8 branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik opened a new pull request #10478: KAFKA-12553: Refactor recovery logic
kowshik opened a new pull request #10478: URL: https://github.com/apache/kafka/pull/10478 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #10393: KAFKA-12539: Refactor KafkaRaftCllient handleVoteRequest to reduce cyclomatic complexity
hachikuji merged pull request #10393: URL: https://github.com/apache/kafka/pull/10393 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12539) Move some logic in handleVoteRequest to EpochState
[ https://issues.apache.org/jira/browse/KAFKA-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12539. - Resolution: Fixed > Move some logic in handleVoteRequest to EpochState > -- > > Key: KAFKA-12539 > URL: https://issues.apache.org/jira/browse/KAFKA-12539 > Project: Kafka > Issue Type: Improvement >Reporter: dengziming >Assignee: dengziming >Priority: Minor > > Reduce the cyclomatic complexity of KafkaRaftClient, see the comment for > details: https://github.com/apache/kafka/pull/10289#discussion_r597274570 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12344) Support SlidingWindows in the Scala API
[ https://issues.apache.org/jira/browse/KAFKA-12344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314954#comment-17314954 ] Leah Thomas commented on KAFKA-12344: - Hi [~ketulgupta], thanks for picking this up! It should be a relatively easy fix. If you look at [KGroupedStream.scala|https://github.com/apache/kafka/blob/trunk/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala#L155] you can see that there's a `windowedBy` method for both `Windows` and `SessionWindows`. These are scala wrapper classes that link back to the java code. We need this wrapper method for `SlidingWindows` as well. You should be able to model it off of the other two existing methods. I'm no scala expert but I think we should be able to utilize the existing `TimeWindowedKStream.scala` class instead of making a separate `SlidingWindowedKStreamScala` class, but maybe [~mjsax] can confirm. > Support SlidingWindows in the Scala API > --- > > Key: KAFKA-12344 > URL: https://issues.apache.org/jira/browse/KAFKA-12344 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.7.0 >Reporter: Leah Thomas >Priority: Major > Labels: newbie, scala > > in KIP-450 we implemented sliding windows for the Java API but left out a few > crucial methods to allow sliding windows to work through the Scala API. We > need to add those methods to make the Scala API fully leverage sliding windows -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kowshik commented on a change in pull request #10478: KAFKA-12553: Refactor recovery logic
kowshik commented on a change in pull request #10478: URL: https://github.com/apache/kafka/pull/10478#discussion_r607168537 ## File path: core/src/main/scala/kafka/log/LogLoader.scala ## @@ -0,0 +1,423 @@ +package kafka.log + +import java.io.{File, IOException} +import java.nio.file.{Files, NoSuchFileException} + +import kafka.common.LogSegmentOffsetOverflowException +import kafka.log.Log.{CleanedFileSuffix, DeletedFileSuffix, SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile, offsetFromFileName} +import kafka.server.{LogDirFailureChannel, LogOffsetMetadata} +import kafka.server.epoch.LeaderEpochFileCache +import kafka.utils.{CoreUtils, Logging, Scheduler} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.InvalidOffsetException +import org.apache.kafka.common.utils.Time + +import scala.collection.{Seq, Set, mutable} + +class LogLoader(dir: File, Review comment: Documentation ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -2763,6 +2191,320 @@ object Log { appendInfo.append(batch, firstOffsetMetadata) } + def maybeCreateLeaderEpochCache(dir: File, + topicPartition: TopicPartition, + logDirFailureChannel: LogDirFailureChannel, + recordVersion: RecordVersion): Option[LeaderEpochFileCache] = { +val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir) + +def newLeaderEpochFileCache(): LeaderEpochFileCache = { + val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel) + new LeaderEpochFileCache(topicPartition, checkpointFile) +} + +if (recordVersion.precedes(RecordVersion.V2)) { + val currentCache = if (leaderEpochFile.exists()) +Some(newLeaderEpochFileCache()) + else +None + + if (currentCache.exists(_.nonEmpty)) +warn(s"Deleting non-empty leader epoch cache due to incompatible message format $recordVersion") + + Files.deleteIfExists(leaderEpochFile.toPath) + None +} else { + Some(newLeaderEpochFileCache()) +} + } + + /** + * Swap one or more new segment in place and delete one or more existing segments in a crash-safe manner. The old + * segments will be asynchronously deleted. + * + * This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded + * or the caller will catch and handle IOException + * + * The sequence of operations is: + * + *Cleaner creates one or more new segments with suffix .cleaned and invokes replaceSegments(). + *If broker crashes at this point, the clean-and-swap operation is aborted and + *the .cleaned files are deleted on recovery in loadSegments(). + *New segments are renamed .swap. If the broker crashes before all segments were renamed to .swap, the + *clean-and-swap operation is aborted - .cleaned as well as .swap files are deleted on recovery in + *loadSegments(). We detect this situation by maintaining a specific order in which files are renamed from + *.cleaned to .swap. Basically, files are renamed in descending order of offsets. On recovery, all .swap files + *whose offset is greater than the minimum-offset .clean file are deleted. + *If the broker crashes after all new segments were renamed to .swap, the operation is completed, the swap + *operation is resumed on recovery as described in the next step. + *Old segment files are renamed to .deleted and asynchronous delete is scheduled. + *If the broker crashes, any .deleted files left behind are deleted on recovery in loadSegments(). + *replaceSegments() is then invoked to complete the swap with newSegment recreated from + *the .swap file and oldSegments containing segments which were not renamed before the crash. + *Swap segment(s) are renamed to replace the existing segments, completing this operation. + *If the broker crashes, any .deleted files which may be left behind are deleted + *on recovery in loadSegments(). + * + * + * @param existingSegments The existing segments of the log + * @param newSegments The new log segment to add to the log + * @param oldSegments The old log segments to delete from the log + * @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash + */ + private[log] def replaceSegments(existingSegments: LogSegments, + newSegments: Seq[LogSegment], + oldSegments: Seq[LogSegment], + isRecoveredSwapFile: Boolean = false, + dir: File, + topicPartition: TopicPartition, +
[jira] [Updated] (KAFKA-12606) Some improvements for produce record validation
[ https://issues.apache.org/jira/browse/KAFKA-12606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12606: Description: KIP-467 introduces an extension to the produce response to let the producer identify the specific records that had failed validation. While implementing KAFKA-12548, we realized a few shortcomings in this proposal: 1. Some records may fail only because they are part of a batch which had a record failing validation. In this case, we can return a better exception to the user so that they know the record is safe to be retried. For example, `RecordNotAppendedException` or something like that. 2. Records in the same batch may fail validation for different reasons. For example, one record may fail because of an invalid timestamp; another may fail because it uses a null key and the topic is compacted. However, the schema only allows a single error for each partition, so we cannot distinguish these cases in order to throw more specific exception types. We should consider allowing a record-level error code as well. was: KIP-467 introduces an extension to the produce response to let the producer identify the specific records that had failed validation. While implementing KAFKA-12548, we realized a few shortcomings in this proposal: 1. Some records may fail only because they are part of a batch which had a record failing validation. In this case, we can return a better exception to the user so that they know the record is safe to be retried. For example, `RecordNotAppendedException` or something like that. 2. Records in the same batch may fail validation for different reasons. For example, one record may fail because of an invalid timestamp; another may fail because it uses a null key and the topic is compacted. However, the schema only allows a single error for each partition, so we cannot distinguish these cases in order to throw more specific exception types. > Some improvements for produce record validation > --- > > Key: KAFKA-12606 > URL: https://issues.apache.org/jira/browse/KAFKA-12606 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > Labels: needs-kip > > KIP-467 introduces an extension to the produce response to let the producer > identify the specific records that had failed validation. While implementing > KAFKA-12548, we realized a few shortcomings in this proposal: > 1. Some records may fail only because they are part of a batch which had a > record failing validation. In this case, we can return a better exception to > the user so that they know the record is safe to be retried. For example, > `RecordNotAppendedException` or something like that. > 2. Records in the same batch may fail validation for different reasons. For > example, one record may fail because of an invalid timestamp; another may > fail because it uses a null key and the topic is compacted. However, the > schema only allows a single error for each partition, so we cannot > distinguish these cases in order to throw more specific exception types. We > should consider allowing a record-level error code as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12604) Remove envelope handling from broker
[ https://issues.apache.org/jira/browse/KAFKA-12604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314964#comment-17314964 ] Jason Gustafson commented on KAFKA-12604: - I am going to close this without fixing. Although we do not need the broker to handle the Envelope api today, there will be a point where we want even the zk brokers to forward to the controller. > Remove envelope handling from broker > > > Key: KAFKA-12604 > URL: https://issues.apache.org/jira/browse/KAFKA-12604 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Labels: kip-500 > > We only need the envelope request to be handled on the controller endpoint. > We added it to the broker initially in order to allow testing of the > forwarding logic. Now that the integration testing framework for kip-500 is > in place, we should be able to get rid of this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12604) Remove envelope handling from broker
[ https://issues.apache.org/jira/browse/KAFKA-12604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12604. - Resolution: Won't Fix > Remove envelope handling from broker > > > Key: KAFKA-12604 > URL: https://issues.apache.org/jira/browse/KAFKA-12604 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Labels: kip-500 > > We only need the envelope request to be handled on the controller endpoint. > We added it to the broker initially in order to allow testing of the > forwarding logic. Now that the integration testing framework for kip-500 is > in place, we should be able to get rid of this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12616) Convert integration tests to use ClusterTest
Jason Gustafson created KAFKA-12616: --- Summary: Convert integration tests to use ClusterTest Key: KAFKA-12616 URL: https://issues.apache.org/jira/browse/KAFKA-12616 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson We would like to convert integration tests to use the new ClusterTest annotations so that we can easily test both the Zk and KRaft implementations. This will require adding a bunch of support to the ClusterTest framework as we go along. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12617) Convert MetadataRequestTest to use ClusterTest
Jason Gustafson created KAFKA-12617: --- Summary: Convert MetadataRequestTest to use ClusterTest Key: KAFKA-12617 URL: https://issues.apache.org/jira/browse/KAFKA-12617 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12616) Convert integration tests to use ClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12616: Labels: kip-500 (was: ) > Convert integration tests to use ClusterTest > - > > Key: KAFKA-12616 > URL: https://issues.apache.org/jira/browse/KAFKA-12616 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > Labels: kip-500 > > We would like to convert integration tests to use the new ClusterTest > annotations so that we can easily test both the Zk and KRaft implementations. > This will require adding a bunch of support to the ClusterTest framework as > we go along. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12616) Convert integration tests to use ClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314967#comment-17314967 ] Jason Gustafson commented on KAFKA-12616: - Anyone can feel free to add sub-tasks to this. There are a lot of integration tests that need to be converted. > Convert integration tests to use ClusterTest > - > > Key: KAFKA-12616 > URL: https://issues.apache.org/jira/browse/KAFKA-12616 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > We would like to convert integration tests to use the new ClusterTest > annotations so that we can easily test both the Zk and KRaft implementations. > This will require adding a bunch of support to the ClusterTest framework as > we go along. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12559) Add a top-level Streams config for bounding off-heap memory
[ https://issues.apache.org/jira/browse/KAFKA-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314977#comment-17314977 ] amuthan Ganeshan commented on KAFKA-12559: -- Hi [~ableegoldman] and [~mjsax] I would like to work on this. Would you guide me in this by giving some pointers to start with... > Add a top-level Streams config for bounding off-heap memory > --- > > Key: KAFKA-12559 > URL: https://issues.apache.org/jira/browse/KAFKA-12559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: needs-kip, newbie, newbie++ > > At the moment we provide an example of how to bound the memory usage of > rocskdb in the [Memory > Management|https://kafka.apache.org/27/documentation/streams/developer-guide/memory-mgmt.html#rocksdb] > section of the docs. This requires implementing a custom RocksDBConfigSetter > class and setting a number of rocksdb options for relatively advanced > concepts and configurations. It seems a fair number of users either fail to > find this or consider it to be for more advanced use cases/users. But RocksDB > can eat up a lot of off-heap memory and it's not uncommon for users to come > across a {{RocksDBException: Cannot allocate memory}} > It would probably be a much better user experience if we implemented this > memory bound out-of-the-box and just gave users a top-level StreamsConfig to > tune the off-heap memory given to rocksdb, like we have for on-heap cache > memory with cache.max.bytes.buffering. More advanced users can continue to > fine-tune their memory bounding and apply other configs with a custom config > setter, while new or more casual users can cap on the off-heap memory without > getting their hands dirty with rocksdb. > I would propose to add the following top-level config: > rocksdb.max.bytes.off.heap: medium priority, default to -1 (unbounded), valid > values are [0, inf] > I'd also want to consider adding a second, lower priority top-level config to > give users a knob for adjusting how much of that total off-heap memory goes > to the block cache + index/filter blocks, and how much of it is afforded to > the write buffers. I'm struggling to come up with a good name for this > config, but it would be something like > rocksdb.memtable.to.block.cache.off.heap.memory.ratio: low priority, default > to 0.5, valid values are [0, 1] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients
hachikuji commented on a change in pull request #10142: URL: https://github.com/apache/kafka/pull/10142#discussion_r607216022 ## File path: core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala ## @@ -219,6 +225,146 @@ class AutoTopicCreationManagerTest { testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true) } + @Test + def testTopicCreationWithMetadataContextPassPrincipal(): Unit = { +autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + Some(brokerToController), + Some(adminManager), + Some(controller), + groupCoordinator, + transactionCoordinator) + +val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection +val topicName = "topic" +topicsCollection.add(getNewTopic(topicName)) +val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion() + .setApiKey(ApiKeys.CREATE_TOPICS.id) + .setMinVersion(0) + .setMaxVersion(0) +Mockito.when(brokerToController.controllerApiVersions()) + .thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion + +Mockito.when(controller.isActive).thenReturn(false) + +val requestHeader = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion, + "clientId", 0) + +val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user") +val principalSerde = new KafkaPrincipalSerde { + override def serialize(principal: KafkaPrincipal): Array[Byte] = { +assertEquals(principal, userPrincipal) +Utils.utf8(principal.toString) + } + override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes)) +} + +val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost, + userPrincipal, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false, Optional.of(principalSerde)) + +autoTopicCreationManager.createTopics( Review comment: This ensures that an attempt was made to serialize the principal, but how do we know that it made it to the envelope? I think it would be better to intercept the request that is sent to the channel and verify it directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a change in pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource
gharris1727 commented on a change in pull request #10475: URL: https://github.com/apache/kafka/pull/10475#discussion_r607216362 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java ## @@ -342,6 +343,47 @@ public void newPluginsShouldConfigureWithPluginClassLoader() { assertPluginClassLoaderAlwaysActive(samples); } +@Test +public void pluginClassLoaderReadVersionFromResource() { +TestPlugins.assertAvailable(); + +Map pluginProps = new HashMap<>(); +pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, +TestPlugins.pluginPath().stream() +.filter(s -> s.contains("read-version-from-resource-v1")) +.collect(Collectors.joining())); +plugins = new Plugins(pluginProps); + +Converter converter = plugins.newPlugin( +TestPlugins.READ_VERSION_FROM_RESOURCE, +new AbstractConfig(new ConfigDef(), Collections.emptyMap()), +Converter.class +); +assertEquals("1.0.0", +new String(converter.fromConnectData(null, null, null))); +PluginClassLoader pluginClassLoader = plugins.delegatingLoader() +.pluginClassLoader(TestPlugins.READ_VERSION_FROM_RESOURCE); +assertNotNull(pluginClassLoader); + + +// Re-initialize Plugins object with plugin class loader in the class loader tree. This is +// to simulate the situation where jars exist on both system classpath and plugin path. +pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, +TestPlugins.pluginPath().stream() +.filter(s -> s.contains("read-version-from-resource-v2")) +.collect(Collectors.joining())); +plugins = new Plugins(pluginProps, pluginClassLoader); Review comment: While I understand that the motivation here for explicitly providing the parent is control the behavior of the parent ClassLoader, I think this test is for a pretty extreme case. This is testing a: 1. PluginClassLoader whose parent is a 2. DelegatingClassLoader whose parent is a 3. PluginClassLoader whose parent is a 4. Delegating ClassLoader whose parent is the 5. App ClassLoader Does this test still succeed if we use a different ClassLoader that's not part of a chain like this? We could construct a plain `URLClassLoader` instance as the parent, but that would still require the new Plugins constructor. If we add a test resource file to the App ClassLoader we could skip the additional constructor. We could use multiple different resource file names in the different test plugins to test the three cases: 1. parent has resource but plugin does not 2. plugin has resource but parent does not 3. parent and plugin both have the resource ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java ## @@ -342,6 +343,47 @@ public void newPluginsShouldConfigureWithPluginClassLoader() { assertPluginClassLoaderAlwaysActive(samples); } +@Test +public void pluginClassLoaderReadVersionFromResource() { +TestPlugins.assertAvailable(); + +Map pluginProps = new HashMap<>(); +pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, +TestPlugins.pluginPath().stream() Review comment: nit: TestPlugins has a Map from class name to File, we can add a method for getting the path to a single plugin jar to TestPlugins itself, so we don't have to iterate here. ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java ## @@ -114,6 +118,9 @@ pluginJars.put(SAMPLING_HEADER_CONVERTER, createPluginJar("sampling-header-converter")); pluginJars.put(SAMPLING_CONFIG_PROVIDER, createPluginJar("sampling-config-provider")); pluginJars.put(SERVICE_LOADER, createPluginJar("service-loader")); +// Create two versions of the same plugin reading version string from a resource +pluginJars.put(READ_VERSION_FROM_RESOURCE + ".v1", createPluginJar("read-version-from-resource-v1")); Review comment: nit: make each of these a separate constant -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #10474: KAFKA-12602: Fix LICENSE file
vvcephei commented on pull request #10474: URL: https://github.com/apache/kafka/pull/10474#issuecomment-813527301 Hey @ableegoldman and @mimaison , Since you're also both blocked on this, would you mind giving these changes a double-check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients
hachikuji commented on a change in pull request #10142: URL: https://github.com/apache/kafka/pull/10142#discussion_r607224516 ## File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala ## @@ -156,17 +169,44 @@ class DefaultAutoTopicCreationManager( .setTopics(topicsToCreate) ) -channelManager.get.sendRequest(createTopicsRequest, new ControllerRequestCompletionHandler { +val requestCompletionHandler = new ControllerRequestCompletionHandler { override def onTimeout(): Unit = { debug(s"Auto topic creation timed out for ${creatableTopics.keys}.") clearInflightRequests(creatableTopics) } override def onComplete(response: ClientResponse): Unit = { -debug(s"Auto topic creation completed for ${creatableTopics.keys}.") +debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody.toString}.") clearInflightRequests(creatableTopics) } -}) +} + +val channelManager = this.channelManager.getOrElse { + throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.") +} + +val request = metadataRequestContext.map { context => + val requestVersion = +channelManager.controllerApiVersions() match { + case None => +// We will rely on the Metadata request to be retried in the case +// that the latest version is not usable by the controller. +ApiKeys.CREATE_TOPICS.latestVersion() + case Some(nodeApiVersions) => +nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS) +} + + // Borrow client information such as client id and correlation id from the original request, + // in order to correlate the create request with the original metadata request. + val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, +requestVersion, +context.clientId, +context.correlationId) + ForwardingManager.buildEnvelopeRequest(context, + createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader)) Review comment: @dengziming It's not a bad idea. We could even simplify it a little since the api key and version can be obtained from the request. I tend to agree that this is kind of a niche usage though, so I'm not sure it calls for the generality. Perhaps you could submit a follow-up once this is merged and we can see what it looks like. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should
[ https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315000#comment-17315000 ] John Roesler commented on KAFKA-12602: -- Thank you, [~jmclean] , There weren't too many BSD or MIT licenses, so I've gone ahead and just copied the exact license files from each one of those dependencies. Thanks for the feedback. I also added notices regarding the provenance and copyright of PureJavaCrc32C and Murmur3 to our NOTICE file. I will create two follow-up tickets, one to revisit the NOTICE file, and another to add an automated check to the release script to make sure that the our license file doesn't start to rot again in the future. > The LICENSE and NOTICE files don't list everything they should > -- > > Key: KAFKA-12602 > URL: https://issues.apache.org/jira/browse/KAFKA-12602 > Project: Kafka > Issue Type: Bug >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > [~jmclean] raised this on the mailing list: > [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E] > > We need to make the license file match what we are actually shipping in > source and binary distributions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ncliang commented on a change in pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource
ncliang commented on a change in pull request #10475: URL: https://github.com/apache/kafka/pull/10475#discussion_r607232615 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java ## @@ -342,6 +343,47 @@ public void newPluginsShouldConfigureWithPluginClassLoader() { assertPluginClassLoaderAlwaysActive(samples); } +@Test +public void pluginClassLoaderReadVersionFromResource() { +TestPlugins.assertAvailable(); + +Map pluginProps = new HashMap<>(); +pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, +TestPlugins.pluginPath().stream() +.filter(s -> s.contains("read-version-from-resource-v1")) +.collect(Collectors.joining())); +plugins = new Plugins(pluginProps); + +Converter converter = plugins.newPlugin( +TestPlugins.READ_VERSION_FROM_RESOURCE, +new AbstractConfig(new ConfigDef(), Collections.emptyMap()), +Converter.class +); +assertEquals("1.0.0", +new String(converter.fromConnectData(null, null, null))); +PluginClassLoader pluginClassLoader = plugins.delegatingLoader() +.pluginClassLoader(TestPlugins.READ_VERSION_FROM_RESOURCE); +assertNotNull(pluginClassLoader); + + +// Re-initialize Plugins object with plugin class loader in the class loader tree. This is +// to simulate the situation where jars exist on both system classpath and plugin path. +pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, +TestPlugins.pluginPath().stream() +.filter(s -> s.contains("read-version-from-resource-v2")) +.collect(Collectors.joining())); +plugins = new Plugins(pluginProps, pluginClassLoader); Review comment: I agree that having the additional testcases would be valuable. However, I do think that adding the resource directly to the app classloader makes the test much less readable. I like the idea of constructing and using a plain `URLClassLoader` as the parent. While it still requires the additional constructor to be exposed on `Plugins`, we do already expose the parent parameter on `DelegatingClassLoader` for specifically the flexibility of controlling the parent loader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10393: KAFKA-12539: Refactor KafkaRaftCllient handleVoteRequest to reduce cyclomatic complexity
ijuma commented on a change in pull request #10393: URL: https://github.com/apache/kafka/pull/10393#discussion_r607233215 ## File path: raft/src/main/java/org/apache/kafka/raft/CandidateState.java ## @@ -235,6 +240,15 @@ public int epoch() { return highWatermark; } +@Override +public boolean canGrantVote(int candidateId, boolean isLogUpToDate) { +// Still reject vote request even candidateId = localId, Although the candidate votes for +// itself, this vote is implicit and not "granted". +log.debug("Rejecting vote request from candidate {} since we are already candidate in epoch {}", +candidateId, epoch); Review comment: As a general rule, methods like this should not log IMO. The calling method should log instead. That is, good to avoid the side effect from the "check" operation. It seems like it was like that before this PR. What was the motivation for changing it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
hachikuji commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r607232953 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -355,24 +373,29 @@ private void fireHandleResign(int epoch) { } @Override -public void initialize() throws IOException { -quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); +public void initialize() { +try { +quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); -long currentTimeMs = time.milliseconds(); -if (quorum.isLeader()) { -throw new IllegalStateException("Voter cannot initialize as a Leader"); -} else if (quorum.isCandidate()) { -onBecomeCandidate(currentTimeMs); -} else if (quorum.isFollower()) { -onBecomeFollower(currentTimeMs); -} +long currentTimeMs = time.milliseconds(); +if (quorum.isLeader()) { +throw new IllegalStateException("Voter cannot initialize as a Leader"); +} else if (quorum.isCandidate()) { +onBecomeCandidate(currentTimeMs); +} else if (quorum.isFollower()) { +onBecomeFollower(currentTimeMs); +} -// When there is only a single voter, become candidate immediately -if (quorum.isVoter() -&& quorum.remoteVoters().isEmpty() -&& !quorum.isLeader() -&& !quorum.isCandidate()) { -transitionToCandidate(currentTimeMs); +// When there is only a single voter, become candidate immediately +if (quorum.isVoter() +&& quorum.remoteVoters().isEmpty() +&& !quorum.isLeader() Review comment: Since we're in here already, this check is not needed since we already ruled it out above. ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ## @@ -68,20 +70,65 @@ public synchronized void increment() { @Override public synchronized void handleCommit(BatchReader reader) { try { -int initialValue = this.committed; +int initialValue = committed; while (reader.hasNext()) { BatchReader.Batch batch = reader.next(); log.debug("Handle commit of batch with records {} at base offset {}", batch.records(), batch.baseOffset()); for (Integer value : batch.records()) { -if (value != this.committed + 1) { -throw new AssertionError("Expected next committed value to be " + -(this.committed + 1) + ", but instead found " + value + " on node " + nodeId); +if (value != committed + 1) { +throw new AssertionError( +String.format( +"Expected next committed value to be %s, but instead found %s on node %s", +committed + 1, +value, +nodeId +) +); } -this.committed = value; +committed = value; } + +nextReadOffset = batch.lastOffset() + 1; +readEpoch = batch.epoch(); } log.debug("Counter incremented from {} to {}", initialValue, committed); + +if (lastSnapshotEndOffset + 10 < nextReadOffset) { Review comment: Looks like we are trying to do snapshots every 10 records. We could probably get rid of `lastSnapshotEndOffset` and use `committed % 10` or something like that. It may also be useful to be able to control the frequency of snapshots with a parameter. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -326,6 +336,14 @@ private void updateListenersProgress(List listenerContexts, lon } } +private Optional> latestSnapshot() { +return log.latestSnapshotId().flatMap(snapshoId -> { Review comment: nit: missing t in snapshotId ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2154,8 +2182,14 @@ private boolean maybeCompleteShutdown(long currentTimeMs) { return false; } -private void maybeUpdateOldestSnapshotId() { -log.latestSnapshotId().ifPresent(log::deleteBeforeSnapshot); +private void maybeUpdateEarliestSnapshotId() { Review comment: Hmm, I guess I still see this the other way around. Why would the raft client care about updating the log start offset if not to delete old snapshots? What would that even mean outside the context of snapshot deletion? ## File p
[GitHub] [kafka] cmccabe commented on pull request #10366: KAFKA-12467: Add controller-side snapshot generation
cmccabe commented on pull request #10366: URL: https://github.com/apache/kafka/pull/10366#issuecomment-813567129 Fix spotbugs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
junrao commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r606343187 ## File path: remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java ## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments + * with respect to leader epochs. + * + * Remote log segment can go through the state transitions as mentioned in {@link RemoteLogSegmentState}. + * + * This class will have all the segments which did not reach terminal state viz DELETE_SEGMENT_FINISHED. That means,any + * segment reaching the terminal state will get cleared from this instance. + * This class provides different methods to fetch segment metadata like {@link #remoteLogSegmentMetadata(int, long)}, + * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, {@link #listAllRemoteLogSegments()}. Those + * methods have different semantics to fetch the segment based on its state. + * + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}: + * + * Segment in this state indicate it is not yet copied successfully. So, these segments will not be Review comment: indicate => indicates Ditto in a few other places. ## File path: remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java ## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments + * with respect to leader epochs. + * + * Remote log segment can go through the state transitions as mentioned in {@link RemoteLogSegmentState}. + * + * This class will have all the segments which did not reach terminal state viz DELETE_SEGMENT_FINISHED. That means,any + * segment reaching the terminal state will get cleared from this instance. + * This class provides different methods to fetch segment metadata like {@link #remoteLogSegmentMetadata(int, long)}, + * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, {@link #listAllRemoteLogSegments()}. Those + * methods have different semantics to fetch the segment based on its state. + * + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}: + * + * Segment in this state indicate it is not yet copied successfully. So, these segments will not be + * accessible for reads but these are considered for cleanups when a partition is deleted. + * + * + * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}: + *
[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
cmccabe commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607263189 ## File path: core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala ## @@ -122,6 +122,14 @@ class ClientQuotaCache { entityFilters.put(entityType, entityMatch) } +// Special case for non-strict empty filter, match everything Review comment: What happens if we have an empty filter list and strict mode set? It looks like an exception is thrown-- that doesn't seem correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12548) Invalid record error message is not getting sent to application
[ https://issues.apache.org/jira/browse/KAFKA-12548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12548: Fix Version/s: 3.0.0 > Invalid record error message is not getting sent to application > --- > > Key: KAFKA-12548 > URL: https://issues.apache.org/jira/browse/KAFKA-12548 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0 > > > The ProduceResponse includes a nice record error message when we return > INVALID_RECORD_ERROR. Sadly this is getting discarded by the producer, so the > user never gets a chance to see it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12548) Invalid record error message is not getting sent to application
[ https://issues.apache.org/jira/browse/KAFKA-12548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12548. - Resolution: Fixed > Invalid record error message is not getting sent to application > --- > > Key: KAFKA-12548 > URL: https://issues.apache.org/jira/browse/KAFKA-12548 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0 > > > The ProduceResponse includes a nice record error message when we return > INVALID_RECORD_ERROR. Sadly this is getting discarded by the producer, so the > user never gets a chance to see it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
cmccabe commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607263869 ## File path: core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala ## @@ -121,16 +121,16 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag return } -// Update the cache -quotaCache.updateQuotaCache(ipEntity, quotaRecord.key, quotaRecord.value, quotaRecord.remove) - // Convert the value to an appropriate Option for the quota manager val newValue = if (quotaRecord.remove()) { None } else { Some(quotaRecord.value).map(_.toInt) } connectionQuotas.updateIpConnectionRateQuota(inetAddress, newValue) + +// Update the cache Review comment: What's the purpose of moving this code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
cmccabe commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607264162 ## File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java ## @@ -77,7 +77,7 @@ public String getDisplayName(int invocationIndex) { (BeforeTestExecutionCallback) context -> { KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). -setNumKip500BrokerNodes(clusterConfig.numBrokers()). +setNumBrokerNodes(clusterConfig.numBrokers()). Review comment: indentation seems a little weird here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
cmccabe commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607264162 ## File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java ## @@ -77,7 +77,7 @@ public String getDisplayName(int invocationIndex) { (BeforeTestExecutionCallback) context -> { KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). -setNumKip500BrokerNodes(clusterConfig.numBrokers()). +setNumBrokerNodes(clusterConfig.numBrokers()). Review comment: indentation seems a little weird here. why would this be indented more than the following line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
cmccabe commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607267726 ## File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala ## @@ -212,4 +214,103 @@ class RaftClusterTest { cluster.close() } } + + @Test + def testClientQuotas(): Unit = { +val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). +setNumBrokerNodes(1). +setNumControllerNodes(1).build()).build() +try { + cluster.format() + cluster.startup() + TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == BrokerState.RUNNING, +"Broker never made it to RUNNING state.") + val admin = Admin.create(cluster.clientProperties()) + try { +val entity = new ClientQuotaEntity(Map("user" -> "testkit").asJava) +var filter = ClientQuotaFilter.containsOnly( + List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava) + +def alterThenDescribe(entity: ClientQuotaEntity, + quotas: Seq[ClientQuotaAlteration.Op], + filter: ClientQuotaFilter, + expectCount: Int): java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = { + val alterResult = admin.alterClientQuotas(Seq(new ClientQuotaAlteration(entity, quotas.asJava)).asJava) + try { +alterResult.all().get() + } catch { +case t: Throwable => fail("AlterClientQuotas request failed", t) + } + + def describeOrFail(filter: ClientQuotaFilter): java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = { +try { + admin.describeClientQuotas(filter).entities().get() +} catch { + case t: Throwable => fail("DescribeClientQuotas request failed", t) +} + } + + val (describeResult, ok) = TestUtils.computeUntilTrue(describeOrFail(filter)) { +results => results.getOrDefault(entity, java.util.Collections.emptyMap[String, java.lang.Double]()).size() == expectCount + } + assertTrue(ok, "Broker never saw new client quotas") + describeResult +} + +var describeResult = alterThenDescribe(entity, + Seq(new ClientQuotaAlteration.Op("request_percentage", 0.99)), filter, 1) +assertEquals(0.99, describeResult.get(entity).get("request_percentage"), 1e-6) + +describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", 0.97), + new ClientQuotaAlteration.Op("producer_byte_rate", 1), + new ClientQuotaAlteration.Op("consumer_byte_rate", 10001) +), filter, 3) +assertEquals(0.97, describeResult.get(entity).get("request_percentage"), 1e-6) +assertEquals(1.0, describeResult.get(entity).get("producer_byte_rate"), 1e-6) +assertEquals(10001.0, describeResult.get(entity).get("consumer_byte_rate"), 1e-6) + +describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", 0.95), + new ClientQuotaAlteration.Op("producer_byte_rate", null), + new ClientQuotaAlteration.Op("consumer_byte_rate", null) +), filter, 1) +assertEquals(0.95, describeResult.get(entity).get("request_percentage"), 1e-6) + +describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", null)), filter, 0) + +describeResult = alterThenDescribe(entity, + Seq(new ClientQuotaAlteration.Op("producer_byte_rate", )), filter, 1) +assertEquals(.0, describeResult.get(entity).get("producer_byte_rate"), 1e-6) + +// Add another quota for a different entity with same user part +val entity2 = new ClientQuotaEntity(Map("user" -> "testkit", "client-id" -> "some-client").asJava) +filter = ClientQuotaFilter.containsOnly( + List( +ClientQuotaFilterComponent.ofEntity("user", "testkit"), +ClientQuotaFilterComponent.ofEntity("client-id", "some-client"), + ).asJava) +describeResult = alterThenDescribe(entity2, + Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9998)), filter, 1) +assertEquals(9998.0, describeResult.get(entity2).get("producer_byte_rate"), 1e-6) + +// non-strict match +filter = ClientQuotaFilter.contains( + List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava) + +val (describeResult2, ok) = TestUtils.computeUntilTrue(admin.describeClientQuotas(filter).entities().get()) { + results => results.size() == 2 +} +assertTrue(ok, "Broker never saw two client quotas") Review comment: The "ok" check feels a little clunky here. Plus if this fa
[GitHub] [kafka] hachikuji merged pull request #10445: KAFKA-12548; Propagate record error messages to application
hachikuji merged pull request #10445: URL: https://github.com/apache/kafka/pull/10445 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
cmccabe commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607270847 ## File path: metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java ## @@ -170,8 +170,10 @@ private void alterClientQuotaEntity( } }); -outputRecords.addAll(newRecords); -outputResults.put(entity, ApiError.NONE); +// Only add the records to outputRecords if there were no errors Review comment: This is a good fix. However, I think it would be better just to return immediately after setting the error, rather than waiting until the end of the function. That's consistent with how we handle errors at the top of this function, and in other manager classes. It makes it clear that only one error can be set for each entity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10468: Kafka 12373:Improve KafkaRaftClient handling of graceful shutdown
jsancio commented on a change in pull request #10468: URL: https://github.com/apache/kafka/pull/10468#discussion_r607271981 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -1673,6 +1673,69 @@ public void testLeaderGracefulShutdownTimeout() throws Exception { assertFutureThrows(shutdownFuture, TimeoutException.class); } +@Test +public void testLeaderGracefulShutdownOnClose() throws Exception { +int localId = 0; +int otherNodeId = 1; +int lingerMs = 50; +Set voters = Utils.mkSet(localId, otherNodeId); + +RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) +.withAppendLingerMs(lingerMs) +.build(); + +context.becomeLeader(); +assertEquals(OptionalInt.of(localId), context.currentLeader()); +assertEquals(1L, context.log.endOffset().offset); + +int epoch = context.currentEpoch(); +assertEquals(1L, context.client.scheduleAppend(epoch, singletonList("a"))); + +context.client.poll(); +assertEquals(OptionalLong.of(lingerMs), context.messageQueue.lastPollTimeoutMs()); + +context.time.sleep(20); + +// client closed now. +context.client.close(); + +// Flag for accepting appends should be toggled to false. +assertFalse(context.client.canAcceptAppends()); + +// acceptAppends flag set to false so no writes should be accepted by the Leader now. +assertNull(context.client.scheduleAppend(epoch, singletonList("b"))); + +// The leader should trigger a flush for whatever batches are present in the BatchAccumulator +assertEquals(2L, context.log.endOffset().offset); + +// Now shutdown + +// We should still be running until we have had a chance to send EndQuorumEpoch +assertTrue(context.client.isShuttingDown()); +assertTrue(context.client.isRunning()); + +// Send EndQuorumEpoch request to the other voter +context.pollUntilRequest(); +assertTrue(context.client.isShuttingDown()); +assertTrue(context.client.isRunning()); +context.assertSentEndQuorumEpochRequest(1, otherNodeId); + +// We should still be able to handle vote requests during graceful shutdown +// in order to help the new leader get elected +context.deliverRequest(context.voteRequest(epoch + 1, otherNodeId, epoch, 1L)); +context.client.poll(); +context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true); + Review comment: Okay, thanks! I have limited time at the moment. I'll try to look at it this week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
mumrah commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607274751 ## File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java ## @@ -77,7 +77,7 @@ public String getDisplayName(int invocationIndex) { (BeforeTestExecutionCallback) context -> { KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). -setNumKip500BrokerNodes(clusterConfig.numBrokers()). +setNumBrokerNodes(clusterConfig.numBrokers()). Review comment: Not sure, it must have auto-formatted when i renamed to `setNumBrokerNodes` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
mumrah commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607278548 ## File path: core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala ## @@ -122,6 +122,14 @@ class ClientQuotaCache { entityFilters.put(entityType, entityMatch) } +// Special case for non-strict empty filter, match everything Review comment: It should short-circuit and return an empty map on [L134](https://github.com/apache/kafka/pull/10254/files#diff-99bc678b86ad25d99b32bc192428120d2ff3f3d478159e2c353f4f5346b43718R133-R135) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
mumrah commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607281769 ## File path: core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala ## @@ -121,16 +121,16 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag return } -// Update the cache -quotaCache.updateQuotaCache(ipEntity, quotaRecord.key, quotaRecord.value, quotaRecord.remove) - // Convert the value to an appropriate Option for the quota manager val newValue = if (quotaRecord.remove()) { None } else { Some(quotaRecord.value).map(_.toInt) } connectionQuotas.updateIpConnectionRateQuota(inetAddress, newValue) + +// Update the cache Review comment: Good question. I could swear there was a review comment suggesting this, but I can't seem to find it. I believe I moved this to align it with handleUserClientQuota where we update the underlying quota manager and then update the cache. I don't have a strong opinion on which thing happens first, but they should both probably be the same. Thinking about it more, we might want to be more defensive when calling to the quota managers. If the quota manager cannot be updated, I think we should still update the cache since that reflects the true state of the quota according to the metadata log. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mageshn commented on a change in pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource
mageshn commented on a change in pull request #10475: URL: https://github.com/apache/kafka/pull/10475#discussion_r607285210 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java ## @@ -61,12 +61,24 @@ public Plugins(Map props) { delegatingLoader.initLoaders(); } +public Plugins(Map props, ClassLoader parent) { Review comment: Is this primarily added for testing purpose? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker
mumrah commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607290168 ## File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala ## @@ -212,4 +214,103 @@ class RaftClusterTest { cluster.close() } } + + @Test + def testClientQuotas(): Unit = { +val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). +setNumBrokerNodes(1). +setNumControllerNodes(1).build()).build() +try { + cluster.format() + cluster.startup() + TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == BrokerState.RUNNING, +"Broker never made it to RUNNING state.") + val admin = Admin.create(cluster.clientProperties()) + try { +val entity = new ClientQuotaEntity(Map("user" -> "testkit").asJava) +var filter = ClientQuotaFilter.containsOnly( + List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava) + +def alterThenDescribe(entity: ClientQuotaEntity, + quotas: Seq[ClientQuotaAlteration.Op], + filter: ClientQuotaFilter, + expectCount: Int): java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = { + val alterResult = admin.alterClientQuotas(Seq(new ClientQuotaAlteration(entity, quotas.asJava)).asJava) + try { +alterResult.all().get() + } catch { +case t: Throwable => fail("AlterClientQuotas request failed", t) + } + + def describeOrFail(filter: ClientQuotaFilter): java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = { +try { + admin.describeClientQuotas(filter).entities().get() +} catch { + case t: Throwable => fail("DescribeClientQuotas request failed", t) +} + } + + val (describeResult, ok) = TestUtils.computeUntilTrue(describeOrFail(filter)) { +results => results.getOrDefault(entity, java.util.Collections.emptyMap[String, java.lang.Double]()).size() == expectCount + } + assertTrue(ok, "Broker never saw new client quotas") + describeResult +} + +var describeResult = alterThenDescribe(entity, + Seq(new ClientQuotaAlteration.Op("request_percentage", 0.99)), filter, 1) +assertEquals(0.99, describeResult.get(entity).get("request_percentage"), 1e-6) + +describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", 0.97), + new ClientQuotaAlteration.Op("producer_byte_rate", 1), + new ClientQuotaAlteration.Op("consumer_byte_rate", 10001) +), filter, 3) +assertEquals(0.97, describeResult.get(entity).get("request_percentage"), 1e-6) +assertEquals(1.0, describeResult.get(entity).get("producer_byte_rate"), 1e-6) +assertEquals(10001.0, describeResult.get(entity).get("consumer_byte_rate"), 1e-6) + +describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", 0.95), + new ClientQuotaAlteration.Op("producer_byte_rate", null), + new ClientQuotaAlteration.Op("consumer_byte_rate", null) +), filter, 1) +assertEquals(0.95, describeResult.get(entity).get("request_percentage"), 1e-6) + +describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", null)), filter, 0) + +describeResult = alterThenDescribe(entity, + Seq(new ClientQuotaAlteration.Op("producer_byte_rate", )), filter, 1) +assertEquals(.0, describeResult.get(entity).get("producer_byte_rate"), 1e-6) + +// Add another quota for a different entity with same user part +val entity2 = new ClientQuotaEntity(Map("user" -> "testkit", "client-id" -> "some-client").asJava) +filter = ClientQuotaFilter.containsOnly( + List( +ClientQuotaFilterComponent.ofEntity("user", "testkit"), +ClientQuotaFilterComponent.ofEntity("client-id", "some-client"), + ).asJava) +describeResult = alterThenDescribe(entity2, + Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9998)), filter, 1) +assertEquals(9998.0, describeResult.get(entity2).get("producer_byte_rate"), 1e-6) + +// non-strict match +filter = ClientQuotaFilter.contains( + List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava) + +val (describeResult2, ok) = TestUtils.computeUntilTrue(admin.describeClientQuotas(filter).entities().get()) { + results => results.size() == 2 +} +assertTrue(ok, "Broker never saw two client quotas") Review comment: Yea let me clean this up -- This is an automated messag
[GitHub] [kafka] mumrah commented on a change in pull request #10455: MINOR: Support ExponentialBackoff without jitter
mumrah commented on a change in pull request #10455: URL: https://github.com/apache/kafka/pull/10455#discussion_r607313601 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java ## @@ -47,7 +47,8 @@ public long backoff(long attempts) { } double exp = Math.min(attempts, this.expMax); double term = initialInterval * Math.pow(multiplier, exp); -double randomFactor = ThreadLocalRandom.current().nextDouble(1 - jitter, 1 + jitter); +double randomFactor = jitter < Double.MIN_NORMAL ? 1.0 : Review comment: I had to look this constant up :) Can we just make it check if the jitter is equal to zero (or maybe `<=` zero)? A caller of this method setting jitter to something like 0.5 might be surprised that there is no jitter added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12618) Convert LogManager (and other EasyMocks) in ReplicaManagerTest to Mockito
Justine Olshan created KAFKA-12618: -- Summary: Convert LogManager (and other EasyMocks) in ReplicaManagerTest to Mockito Key: KAFKA-12618 URL: https://issues.apache.org/jira/browse/KAFKA-12618 Project: Kafka Issue Type: Task Reporter: Justine Olshan [This commit|https://github.com/apache/kafka/commit/40f001cc537d6ff2efa71e609c2f84c6b934994d] introduced changes that have Partition calling getLog when there is no topic ID associated to the Partition. In this case, getLog will use a default argument. EasyMock (a Java framework) does not play well with scala's default arguments. For now, we are manually creating a partition and associating it in the initializeLogAndTopicId method. But a better long term solution is to use Mockito which better supports default arguments. It would be good to convert all EasyMocks over to mockito as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7438) Replace EasyMock and PowerMock with Mockito in the clients module
[ https://issues.apache.org/jira/browse/KAFKA-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-7438: --- Summary: Replace EasyMock and PowerMock with Mockito in the clients module (was: Replace EasyMock and PowerMock with Mockito) > Replace EasyMock and PowerMock with Mockito in the clients module > - > > Key: KAFKA-7438 > URL: https://issues.apache.org/jira/browse/KAFKA-7438 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Priority: Major > > Development of EasyMock and PowerMock has stagnated while Mockito continues > to be actively developed. With the new Java cadence, it's a problem to depend > on libraries that do bytecode generation and are not actively maintained. In > addition, Mockito is also easier to use. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7438) Replace EasyMock and PowerMock with Mockito
[ https://issues.apache.org/jira/browse/KAFKA-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-7438: --- Summary: Replace EasyMock and PowerMock with Mockito (was: Replace EasyMock and PowerMock with Mockito in the clients module) > Replace EasyMock and PowerMock with Mockito > --- > > Key: KAFKA-7438 > URL: https://issues.apache.org/jira/browse/KAFKA-7438 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Priority: Major > > Development of EasyMock and PowerMock has stagnated while Mockito continues > to be actively developed. With the new Java cadence, it's a problem to depend > on libraries that do bytecode generation and are not actively maintained. In > addition, Mockito is also easier to use. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12618) Convert LogManager (and other EasyMocks) in ReplicaManagerTest to Mockito
[ https://issues.apache.org/jira/browse/KAFKA-12618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315087#comment-17315087 ] Ismael Juma commented on KAFKA-12618: - Also see https://issues.apache.org/jira/browse/KAFKA-7438 > Convert LogManager (and other EasyMocks) in ReplicaManagerTest to Mockito > - > > Key: KAFKA-12618 > URL: https://issues.apache.org/jira/browse/KAFKA-12618 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Priority: Minor > > [This > commit|https://github.com/apache/kafka/commit/40f001cc537d6ff2efa71e609c2f84c6b934994d] > introduced changes that have Partition calling getLog when there is no topic > ID associated to the Partition. In this case, getLog will use a default > argument. EasyMock (a Java framework) does not play well with scala's default > arguments. For now, we are manually creating a partition and associating it > in the initializeLogAndTopicId method. But a better long term solution is to > use Mockito which better supports default arguments. > It would be good to convert all EasyMocks over to mockito as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12619) Ensure LeaderChange message is committed before initializing high watermark
Jason Gustafson created KAFKA-12619: --- Summary: Ensure LeaderChange message is committed before initializing high watermark Key: KAFKA-12619 URL: https://issues.apache.org/jira/browse/KAFKA-12619 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson KIP-595 describes an extra condition on commitment here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch. In order to ensure that the leader's committed entries cannot get lost, it must commit one record from its own epoch. This guarantees that its latest entry is larger (in terms of epoch/offset) than any previously written record which ensures that any future leader must also include it. This is the purpose of the LeaderChange record which is written to the log as soon as the leader gets elected. We have this check implemented here: https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122. However, the check needs to be a strict inequality since the epoch start offset does not reflect the LeaderChange record itself. In other words, the check is off by one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12619) Ensure LeaderChange message is committed before initializing high watermark
[ https://issues.apache.org/jira/browse/KAFKA-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12619: Description: KIP-595 describes an extra condition on commitment here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch. In order to ensure that a newly elected leader's committed entries cannot get lost, it must commit one record from its own epoch. This guarantees that its latest entry is larger (in terms of epoch/offset) than any previously written record which ensures that any future leader must also include it. This is the purpose of the LeaderChange record which is written to the log as soon as the leader gets elected. We have this check implemented here: https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122. However, the check needs to be a strict inequality since the epoch start offset does not reflect the LeaderChange record itself. In other words, the check is off by one. was: KIP-595 describes an extra condition on commitment here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch. In order to ensure that the leader's committed entries cannot get lost, it must commit one record from its own epoch. This guarantees that its latest entry is larger (in terms of epoch/offset) than any previously written record which ensures that any future leader must also include it. This is the purpose of the LeaderChange record which is written to the log as soon as the leader gets elected. We have this check implemented here: https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122. However, the check needs to be a strict inequality since the epoch start offset does not reflect the LeaderChange record itself. In other words, the check is off by one. > Ensure LeaderChange message is committed before initializing high watermark > --- > > Key: KAFKA-12619 > URL: https://issues.apache.org/jira/browse/KAFKA-12619 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > KIP-595 describes an extra condition on commitment here: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch. > In order to ensure that a newly elected leader's committed entries cannot > get lost, it must commit one record from its own epoch. This guarantees that > its latest entry is larger (in terms of epoch/offset) than any previously > written record which ensures that any future leader must also include it. > This is the purpose of the LeaderChange record which is written to the log as > soon as the leader gets elected. > We have this check implemented here: > https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122. > However, the check needs to be a strict inequality since the epoch start > offset does not reflect the LeaderChange record itself. In other words, the > check is off by one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10455: MINOR: Support ExponentialBackoff without jitter
cmccabe commented on a change in pull request #10455: URL: https://github.com/apache/kafka/pull/10455#discussion_r607330220 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java ## @@ -47,7 +47,8 @@ public long backoff(long attempts) { } double exp = Math.min(attempts, this.expMax); double term = initialInterval * Math.pow(multiplier, exp); -double randomFactor = ThreadLocalRandom.current().nextDouble(1 - jitter, 1 + jitter); +double randomFactor = jitter < Double.MIN_NORMAL ? 1.0 : Review comment: `MIN_NORMAL` is 2^-1022, though. So it certainly wouldn't affect someone setting jitter = 0.5. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9988) Connect incorrectly logs that task has failed when one takes too long to shutdown
[ https://issues.apache.org/jira/browse/KAFKA-9988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9988: - Labels: newbie (was: ) > Connect incorrectly logs that task has failed when one takes too long to > shutdown > - > > Key: KAFKA-9988 > URL: https://issues.apache.org/jira/browse/KAFKA-9988 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.2.3, 2.5.0, 2.3.2, 2.4.1, 2.4.2, > 2.5.1 >Reporter: Sanjana Kaundinya >Priority: Major > Labels: newbie > > If the OffsetStorageReader is closed while the task is trying to shutdown, > and the task is trying to access the offsets from the OffsetStorageReader, > then we see the following in the logs. > {code:java} > [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task threw > an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask) > org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets. > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114) > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader > closed while attempting to read offsets. This is likely because the task was > been scheduled to stop but has taken longer than the graceful shutdown period > to do so. > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103) > ... 14 more > [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task is > being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask) > {code} > This is a bit misleading, because the task is already on its way of being > shutdown, and doesn't actually need manual intervention to be restarted. We > can see that as later on in the logs we see that it throws another > unrecoverable exception. > {code:java} > [2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=connector-18} Task threw > an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask) > {code} > If we know a task is on its way of shutting down, we should not throw a > ConnectException and instead log a warning so that we don't log false > negatives. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10715) Support Kafka connect converter for AVRO
[ https://issues.apache.org/jira/browse/KAFKA-10715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315110#comment-17315110 ] Randall Hauch commented on KAFKA-10715: --- There are multiple Converter implementations outside of Kafka, and IMO there is no need for Kafka to own and maintain its own version of these when those other existing implementations can easily be used by simply installing them. This is similar to how the Kafka project provides only example Connector implementations. See the [rejected alternatives of KIP-26|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767#KIP26AddKafkaConnectframeworkfordataimport/export-Maintainconnectorsintheprojectalongwithframework], which introduced the Connect framework (with Converters). Therefore, I'm going to close this. > Support Kafka connect converter for AVRO > > > Key: KAFKA-10715 > URL: https://issues.apache.org/jira/browse/KAFKA-10715 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ravindranath Kakarla >Priority: Minor > Original Estimate: 72h > Remaining Estimate: 72h > > I want to add support for Avro data format converter to Kafka Connect. Right > now, Kafka connect supports [JSON > converter|[https://github.com/apache/kafka/tree/trunk/connect].] Since, Avro > is a commonly used data format with Kafka, it will be great to have support > for it. > > Confluent Schema Registry libraries have > [support|https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java] > for it. The code seems to be pretty generic and can be used directly with > Kafka connect without schema registry. They are also licensed under Apache > 2.0. > > Can they be copied to this repository and made available for all users of > Kafka Connect? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10715) Support Kafka connect converter for AVRO
[ https://issues.apache.org/jira/browse/KAFKA-10715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-10715. --- Resolution: Won't Do > Support Kafka connect converter for AVRO > > > Key: KAFKA-10715 > URL: https://issues.apache.org/jira/browse/KAFKA-10715 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Ravindranath Kakarla >Priority: Minor > Original Estimate: 72h > Remaining Estimate: 72h > > I want to add support for Avro data format converter to Kafka Connect. Right > now, Kafka connect supports [JSON > converter|[https://github.com/apache/kafka/tree/trunk/connect].] Since, Avro > is a commonly used data format with Kafka, it will be great to have support > for it. > > Confluent Schema Registry libraries have > [support|https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java] > for it. The code seems to be pretty generic and can be used directly with > Kafka connect without schema registry. They are also licensed under Apache > 2.0. > > Can they be copied to this repository and made available for all users of > Kafka Connect? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9017) We see timeout in kafka in production cluster
[ https://issues.apache.org/jira/browse/KAFKA-9017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9017: - Component/s: (was: KafkaConnect) core > We see timeout in kafka in production cluster > - > > Key: KAFKA-9017 > URL: https://issues.apache.org/jira/browse/KAFKA-9017 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 > Environment: Production >Reporter: Suhas >Priority: Critical > Attachments: stderr (7), stdout (12) > > > We see timeout in kafka in production cluster and Kafka is running on > DC/OS(MESOS) > and below are the errors > *+Exception 1: This from application logs+* > 2019-10-07 10:01:59 Error: java.lang.RuntimeException: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > ie-lrx-audit-evt-3: 30030 ms has passed since batch creation plus linger time > *+Exception 2:This from application logs+* > {"eventTime":"2019-10-07 08:20:43.265", "logType":"ERROR", "stackMessage" : > "java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > ie-lrx-audit-evt-3: 30028 ms has passed since batch creation plus linger > time", "stackTrace" : > *+Exception (from log) We see this logs on broker logs+* > [2019-10-10 06:32:10,844] INFO [ReplicaFetcher replicaId=4, leaderId=2, > fetcherId=0] Error sending fetch request (sessionId=919177392, epoch=INITIAL) > to node 2: java.io.IOException: Connection to 2 was disconnected before the > response was read. (org.apache.kafka.clients.FetchSessionHandler)[2019-10-10 > 06:32:10,844] INFO [ReplicaFetcher replicaId=4, leaderId=2, fetcherId=0] > Error sending fetch request (sessionId=919177392, epoch=INITIAL) to node 2: > java.io.IOException: Connection to 2 was disconnected before the response was > read. (org.apache.kafka.clients.FetchSessionHandler)[2019-10-10 06:32:10,849] > WARN [ReplicaFetcher replicaId=4, leaderId=2, fetcherId=0] Error in response > for fetch request (type=FetchRequest, replicaId=4, maxWait=500, minBytes=1, > maxBytes=10485760, fetchData=\{ie-lrx-rxer-audit-evt-0=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), > mft-hdfs-landing-evt-1=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[108]), dca-audit-evt-2=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[108]), > it-sou-audit-evt-7=(offset=94819, logStartOffset=94819, maxBytes=1048576, > currentLeaderEpoch=Optional[100]), intg-ie-lrx-rxer-audit-evt-2=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[78]), > prod-pipelines-errors-evt-0=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[117]), __consumer_offsets-36=(offset=3, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), > panel-data-change-evt-4=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[108]), gdcp-notification-evt-2=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), > data-transfer-change-evt-0=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[108]), __consumer_offsets-11=(offset=15, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[108]), > dca-heartbeat-evt-2=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[105]), ukwhs-error-topic-1=(offset=8, > logStartOffset=8, maxBytes=1048576, currentLeaderEpoch=Optional[105]), > intg-ie-lrx-audit-evt-4=(offset=21, logStartOffset=21, maxBytes=1048576, > currentLeaderEpoch=Optional[74]), __consumer_offsets-16=(offset=11329814, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), > __consumer_offsets-31=(offset=3472033, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[107]), ukpai-hdfs-evt-1=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[107]), > mft-pflow-evt-1=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[108]), ukwhs-hdfs-landing-evt-01-2=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[105]), > it-sou-audit-evt-2=(offset=490084, logStartOffset=490084, maxBytes=1048576, > currentLeaderEpoch=Optional[105]), ie-lrx-pat-audit-evt-4=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104])}, > isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=919177392, > epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)java.io.IOException: > Connection to 2 was disconnected before the response was read at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java
[jira] [Resolved] (KAFKA-8961) Unable to create secure JDBC connection through Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-8961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-8961. -- Resolution: Won't Fix This is not a problem of the Connect framework, and is instead an issue with the connector implementation – or more likely the _installation_ of the connector in the user's environment. > Unable to create secure JDBC connection through Kafka Connect > - > > Key: KAFKA-8961 > URL: https://issues.apache.org/jira/browse/KAFKA-8961 > Project: Kafka > Issue Type: Bug > Components: build, clients, KafkaConnect, network >Affects Versions: 2.2.1 >Reporter: Monika Bainsala >Priority: Major > > As per below article for enabling JDBC secure connection, we can use updated > URL parameter while calling the create connector REST API. > Exampl: > jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(LOAD_BALANCE=YES)(FAILOVER=YES)(ADDRESS=(PROTOCOL=tcp)(HOST=X)(PORT=1520)))(CONNECT_DATA=(SERVICE_NAME=XXAP)));EncryptionLevel=requested;EncryptionTypes=RC4_256;DataIntegrityLevel=requested;DataIntegrityTypes=MD5" > > But this approach is not working currently, kindly help in resolving this > issue. > > Reference : > [https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma opened a new pull request #10479: MINOR: Jenkinsfile's `post` needs `agent` to be set
ijuma opened a new pull request #10479: URL: https://github.com/apache/kafka/pull/10479 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-8867) Kafka Connect JDBC fails to create PostgreSQL table with default boolean value in schema
[ https://issues.apache.org/jira/browse/KAFKA-8867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-8867. -- Resolution: Won't Fix The reported problem is for the Confluent JDBC source/sink connector, and should be reported via that connector's GitHub repository issues. > Kafka Connect JDBC fails to create PostgreSQL table with default boolean > value in schema > > > Key: KAFKA-8867 > URL: https://issues.apache.org/jira/browse/KAFKA-8867 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Tudor >Priority: Major > > The `CREATE TABLE ..` statement generated for JDBC sink connectors when > configured with `auto.create: true` generates field declarations that do not > conform to allowed PostgreSQL syntax when considering fields of type boolean > with default values. > Example record value Avro schema: > {code:java} > { > "namespace": "com.test.avro.schema.v1", > "type": "record", > "name": "SomeEvent", > "fields": [ > { > "name": "boolean_field", > "type": "boolean", > "default": false > } > ] > } > {code} > The connector task fails with: > {code:java} > ERROR WorkerSinkTask{id=test-events-sink-0} RetriableException from SinkTask: > (org.apache.kafka.connect.runtime.WorkerSinkTask:551) > org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: > org.postgresql.util.PSQLException: ERROR: column "boolean_field" is of type > boolean but default expression is of type integer > Hint: You will need to rewrite or cast the expression. > at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){code} > > The generated SQL statement is: > {code:java} > CREATE TABLE "test_data" ("boolean_field" BOOLEAN DEFAULT 0){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8664) non-JSON format messages when streaming data from Kafka to Mongo
[ https://issues.apache.org/jira/browse/KAFKA-8664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-8664. -- Resolution: Won't Fix The reported problem is for a connector implementation that is not owned by the Apache Kafka project. Please report the issue with the provider of the connector. > non-JSON format messages when streaming data from Kafka to Mongo > > > Key: KAFKA-8664 > URL: https://issues.apache.org/jira/browse/KAFKA-8664 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.0 >Reporter: Vu Le >Priority: Major > Attachments: MongoSinkConnector.properties, > log_error_when_stream_data_not_a_json_format.txt > > > Hi team, > I can stream data from Kafka to MongoDB with JSON messages. I use MongoDB > Kafka Connector > ([https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md]) > However, if I send a non-JSON format message the Connector died. Please see > the log file for details. > My config file: > {code:java} > name=mongo-sink > topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector > tasks.max=1 > key.ignore=true > # Specific global MongoDB Sink Connector configuration > connection.uri=mongodb://localhost:27017 > database=test_kafka > collection=transaction > max.num.retries=3 > retries.defer.timeout=5000 > type.name=kafka-connect > key.converter=org.apache.kafka.connect.json.JsonConverter > key.converter.schemas.enable=false > value.converter=org.apache.kafka.connect.json.JsonConverter > value.converter.schemas.enable=false > {code} > I have 2 separated questions: > # how to ignore the message which is non-json format? > # how to defined a default-key for this kind of message (for example: abc -> > \{ "non-json": "abc" } ) > Thanks -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8551) Comments for connectors() in Herder interface
[ https://issues.apache.org/jira/browse/KAFKA-8551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-8551. -- Resolution: Won't Fix Marking as won't fix, since the details are insufficient to try to address. > Comments for connectors() in Herder interface > -- > > Key: KAFKA-8551 > URL: https://issues.apache.org/jira/browse/KAFKA-8551 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.1 >Reporter: Luying Liu >Priority: Major > > There are mistakes in the comments for connectors() in Herder interface. The > mistakes are in the file > [kafka|https://github.com/apache/kafka]/[connect|https://github.com/apache/kafka/tree/trunk/connect]/[runtime|https://github.com/apache/kafka/tree/trunk/connect/runtime]/[src|https://github.com/apache/kafka/tree/trunk/connect/runtime/src]/[main|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main]/[java|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java]/[org|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org]/[apache|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache]/[kafka|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka]/[connect|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka/connect]/[runtime|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime]/*Herder.java.* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-6985) Error connection between cluster node
[ https://issues.apache.org/jira/browse/KAFKA-6985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-6985: - Component/s: (was: KafkaConnect) core > Error connection between cluster node > - > > Key: KAFKA-6985 > URL: https://issues.apache.org/jira/browse/KAFKA-6985 > Project: Kafka > Issue Type: Bug > Components: core > Environment: Centos-7 >Reporter: Ranjeet Ranjan >Priority: Major > > Hi Have setup multi-node Kafka cluster but getting an error while connecting > one node to another although there is an issue with firewall or port. I am > able to telnet > WARN [ReplicaFetcherThread-0-1], Error in fetch > Kafka.server.ReplicaFetcherThread$FetchRequest@8395951 > (Kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to Kafka-1:9092 (id: 1 rack: null) failed > > {code:java} > > at > kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84) > at > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94) > at > kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244) > at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234) > at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > {code} > Here you go server.properties > Node:1 > > {code:java} > # Server Basics # > # The id of the broker. This must be set to a unique integer for each broker. > broker.id=1 > # Switch to enable topic deletion or not, default value is false > delete.topic.enable=true > # Socket Server Settings > # > listeners=PLAINTEXT://kafka-1:9092 > advertised.listeners=PLAINTEXT://kafka-1:9092 > #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > # The number of threads handling network requests > num.network.threads=3 > # The number of threads doing disk I/O > num.io.threads=8 > # The send buffer (SO_SNDBUF) used by the socket server > socket.send.buffer.bytes=102400 > # The receive buffer (SO_RCVBUF) used by the socket server > socket.receive.buffer.bytes=102400 > # The maximum size of a request that the socket server will accept > (protection against OOM) > socket.request.max.bytes=104857600 > # Log Basics # > # A comma seperated list of directories under which to store log files > log.dirs=/var/log/kafka > # The default number of log partitions per topic. More partitions allow > greater > # parallelism for consumption, but this will also result in more files across > # the brokers. > num.partitions=1 > # The number of threads per data directory to be used for log recovery at > startup and flushing at shutdown. > # This value is recommended to be increased for installations with data dirs > located in RAID array. > num.recovery.threads.per.data.dir=1 > # Log Retention Policy > # > # The minimum age of a log file to be eligible for deletion due to age > log.retention.hours=48 > # A size-based retention policy for logs. Segments are pruned from the log as > long as the remaining > # segments don't drop below log.retention.bytes. Functions independently of > log.retention.hours. > log.retention.bytes=1073741824 > # The maximum size of a log segment file. When this size is reached a new log > segment will be created. > log.segment.bytes=1073741824 > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=30 > # Zookeeper # > # root directory for all kafka znodes. > zookeeper.connect=10.130.82.28:2181 > # Timeout in ms for connecting to zookeeper > zookeeper.connection.timeout.ms=6000 > {code} > > > Node-2 > {code:java} > # Server Basics # > # The id of the broker. This must be set to a unique integer for each broker. > broker.id=2 > # Switch to enable topic deletion or not, default value is false > delete.topic.enable=true > # Socket Server Settings > # > listeners=PLAINTEXT://kafka-2:9092 > advertised.listeners=PLAINTEXT://kafka-2:9092 > #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > # Th
[GitHub] [kafka] dielhennr opened a new pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr opened a new pull request #10480: URL: https://github.com/apache/kafka/pull/10480 The KafkaRaftClient has a field for the BatchAccumulator that is only used and set when it is the leader. In other cases, leader specific information was stored in LeaderState. In a recent change EpochState, which LeaderState implements, was changed to be a Closable. QuorumState makes sure to always close the previous state before transitioning to the next state. This redesign was used to move the BatchAccumulator to the LeaderState and simplify some of the handling in KafkaRaftClient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rsomu commented on pull request #4040: KAFKA-6324: Change LogSegment.delete to deleteIfExists and harden log recovery
rsomu commented on pull request #4040: URL: https://github.com/apache/kafka/pull/4040#issuecomment-813671433 Is this fix supposed to avoid the[ NFS silly renames](https://sbg.technology/2018/07/10/kafka-nfs/) issue? I am currently testing Kafka 2.7 on NFS filesystem and encountered the same error message when deleting a topic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10474: KAFKA-12602: Fix LICENSE file
ableegoldman commented on a change in pull request #10474: URL: https://github.com/apache/kafka/pull/10474#discussion_r607355497 ## File path: licenses/DWTFYWTPL ## @@ -0,0 +1,14 @@ +DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE Review comment: 😂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
jsancio commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r607353807 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -48,13 +49,16 @@ private final Set grantingVoters = new HashSet<>(); private final Logger log; +private final BatchAccumulator accumulator; + protected LeaderState( int localId, int epoch, long epochStartOffset, Set voters, Set grantingVoters, -LogContext logContext +LogContext logContext, +BatchAccumulator accumulator Review comment: I would keep `LogContext` as the last argument. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2252,9 +2247,12 @@ public Long scheduleAtomicAppend(int epoch, List records) { return append(epoch, records, true); } +@SuppressWarnings("unchecked") private Long append(int epoch, List records, boolean isAtomic) { -BatchAccumulator accumulator = this.accumulator; -if (accumulator == null) { +BatchAccumulator accumulator; +try { +accumulator = (BatchAccumulator) quorum.leaderStateOrThrow().accumulator(); Review comment: I think you should be able to remove this cast. ## File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java ## @@ -36,30 +36,31 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class LeaderStateTest { +public class LeaderStateTest { private final int localId = 0; private final int epoch = 5; private final LogContext logContext = new LogContext(); -private LeaderState newLeaderState( +private LeaderState newLeaderState( Set voters, long epochStartOffset ) { -return new LeaderState( +return new LeaderState<>( localId, epoch, epochStartOffset, voters, voters, -logContext +logContext, +null Review comment: I would not pass a `null` and add a test checking that this field is returned correctly. ## File path: raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java ## @@ -269,7 +269,7 @@ public void testCandidateToLeader() throws IOException { assertTrue(state.isCandidate()); assertEquals(1, state.epoch()); -state.transitionToLeader(0L); +state.transitionToLeader(0L, null); Review comment: Again, the code in `KafkaRaftClient` assumes that this field cannot be `null`. ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -319,6 +328,10 @@ public String name() { } @Override -public void close() {} +public void close() { +if (accumulator != null) { Review comment: When would accumulator be null? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #10481: KAFKA-12619; Raft leader should expose hw only after committing LeaderChange
hachikuji opened a new pull request #10481: URL: https://github.com/apache/kafka/pull/10481 KIP-595 describes an extra condition on commitment here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch. In order to ensure that a newly elected leader's committed entries cannot get lost, it must commit one record from its own epoch. This guarantees that its latest entry is larger (in terms of epoch/offset) than any previously written record which ensures that any future leader must also include it. This is the purpose of the `LeaderChange` record which is written to the log as soon as the leader gets elected. Although we had this check implemented, it was off by one. We only ensured that replication reached the epoch start offset, which does not reflect the appended `LeaderChange` record. This patch fixes the check and clarifies the point of the check. The rest of the patch is just fixing up test cases. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10481: KAFKA-12619; Raft leader should expose hw only after committing LeaderChange
guozhangwang commented on pull request #10481: URL: https://github.com/apache/kafka/pull/10481#issuecomment-813683249 Nice catch! Regarding the fix, WDYT to just remember the `LeaderChange` record's offset and compare against it instead of the epoch start offset? I'm thinking if in the future there are any scenarios that can fill in some more records between these two, we are still immune to any future bugs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ewencp commented on a change in pull request #10474: KAFKA-12602: Fix LICENSE file
ewencp commented on a change in pull request #10474: URL: https://github.com/apache/kafka/pull/10474#discussion_r607362884 ## File path: LICENSE-binary ## @@ -0,0 +1,602 @@ + + Apache License + Version 2.0, January 2004 +http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution
[GitHub] [kafka] hachikuji commented on pull request #10481: KAFKA-12619; Raft leader should expose hw only after committing LeaderChange
hachikuji commented on pull request #10481: URL: https://github.com/apache/kafka/pull/10481#issuecomment-813686724 @guozhangwang Thanks for the quick comment. I did consider that. I can't say I had a particularly strong reason to reject it, but ultimately I convinced myself that modifying the existing check was good enough and probably simpler. The only invariant that we need to protect is that all records appended in the leader's epoch actually carry the right epoch tag. The refactor proposed here may even give us a stronger way to enforce the invariant: https://github.com/apache/kafka/pull/10480/files. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up
[ https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315140#comment-17315140 ] A. Sophie Blee-Goldman commented on KAFKA-12492: The kafka-site repo is what the actual, live docs are built from. That's why there are separate folders like 27, 26, etc -- these correspond to the docs for versions 2.7 and 2.6, and so on. You only need to submit a PR to the kafka-site repo if you want your change to show up immediately -- if you don't mind waiting for the next release, you can just open a PR to fix the docs in the kafka repo directly. Then, these will be copied over to the kafka-site repo and made live when the next version is released. Obviously it would be ideal if we could fix this in all versions, but it's probably sufficient to just fix it going forward. The 2.8 release is actually going on at the moment, so I would recommend submitting a PR to the kafka repo for now. If we can get it merged before 2.8 is released, then we're good -- otherwise you can open a followup PR with the same fix in just the 28 subdirectory of the kafka-site repo. I'm not sure why you're getting a 403, I was able to setup a local apache server to test some docs but that was a while ago. Since it's just a fix of an existing formatting error, I wouldn't worry about testing it too much. As long as you can figure out why the formatting was messed up to begin with, and feel reasonably confident in your fix, then that's good enough. Remember, once the fix is in kafka-site it'll be live so you can just see what it looks like then. If something is still off, you can always submit a followup PR to fix it right away in kafka-site > Formatting of example RocksDBConfigSetter is messed up > -- > > Key: KAFKA-12492 > URL: https://issues.apache.org/jira/browse/KAFKA-12492 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ben Chen >Priority: Trivial > Labels: docs, newbie > > See the example implementation class CustomRocksDBConfig in the docs for the > rocksdb.config.setter > https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12607) Allow votes to be granted in resigned state
[ https://issues.apache.org/jira/browse/KAFKA-12607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-12607: -- Labels: newbie++ (was: ) > Allow votes to be granted in resigned state > --- > > Key: KAFKA-12607 > URL: https://issues.apache.org/jira/browse/KAFKA-12607 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: dengziming >Priority: Major > Labels: newbie++ > > When the leader is shutting down, it transitions to a resigned state. > Currently all votes are rejected in this state, but we should allow the > resigned leader to help a candidate get elected. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order
hachikuji commented on a change in pull request #9441: URL: https://github.com/apache/kafka/pull/9441#discussion_r607374397 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -905,19 +907,33 @@ class GroupCoordinator(val brokerId: Int, * * @param offsetTopicPartitionId The partition we are now leading */ - def onElection(offsetTopicPartitionId: Int): Unit = { -info(s"Elected as the group coordinator for partition $offsetTopicPartitionId") -groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded) + def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = { +val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId) +if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) { + info(s"Elected as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch") + groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded) + epochForPartitionId.put(offsetTopicPartitionId, coordinatorEpoch) +} else { + warn(s"Ignored election as group coordinator for partition $offsetTopicPartitionId " + +s"in epoch $coordinatorEpoch since current epoch is $currentEpoch") +} } /** * Unload cached state for the given partition and stop handling requests for groups which map to it. * * @param offsetTopicPartitionId The partition we are no longer leading */ - def onResignation(offsetTopicPartitionId: Int): Unit = { -info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId") -groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded) + def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = { +val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId) +if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) { + info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch") + groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded) + epochForPartitionId.remove(offsetTopicPartitionId) Review comment: Hmm.. Why remove the epoch after resignation? It seems like it would be useful to keep tracking it. Maybe it's useful to distinguish the case where the replica is to be deleted? ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -87,6 +87,8 @@ class GroupCoordinator(val brokerId: Int, private val isActive = new AtomicBoolean(false) + val epochForPartitionId = mutable.Map[Int, Int]() Review comment: Does this need to be a concurrent collection? It does not look like we can count on a lock protecting `onElection` and `onResignation`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha
[ https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315142#comment-17315142 ] A. Sophie Blee-Goldman commented on KAFKA-12574: > This is well documented in > https://kafka.apache.org/27/documentation/streams/upgrade-guide Ah, you're right. I must have been on an older version of the docs -- I find Google often takes me to 2.4 docs for some reason. We need better SEO :/ (I filed a ticket for this already a while back) Also not sure how I forgot about EosBetaUpgradeIntegrationTest after spending so much time trying to help fix it -- sorry for doubting our test coverage and docs. Regarding the proposal, I thin users would find it very odd if we moved on to eos-v2 and then suddenly deprecated it and went back to just "eos" -- makes it seem like there was a problem with eos-v2. I would be fine with just staying on eos-v2 though. For one thing it leaves the door open to further developments in eos that need to be gated by a config, eg eos-v3, if we ever have need for that again. > Deprecate eos-alpha > --- > > Key: KAFKA-12574 > URL: https://issues.apache.org/jira/browse/KAFKA-12574 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > In KIP-447 we introduced a new thread-producer which is capable of > exactly-once semantics across multiple tasks. The new mode of EOS, called > eos-beta, is intended to eventually be the preferred processing mode for EOS > as it improves the performance and scaling of partitions/tasks. The only > downside is that it requires brokers to be on version 2.5+ in order to > understand the latest APIs that are necessary for this thread-producer. > We should consider deprecating the eos-alpha config, ie > StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate > to the new-and-improved processing mode, and upgrade their brokers if > necessary. > Eventually we would like to be able to remove the eos-alpha code paths from > Streams as this will help to simplify the logic and reduce the processing > mode branching. But since this will break client-broker compatibility, and > 2.5 is still a relatively recent version, we probably can't actually remove > eos-alpha in the near future -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha
[ https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315144#comment-17315144 ] Ismael Juma commented on KAFKA-12574: - {quote}Regarding the proposal, I thin users would find it very odd if we moved on to eos-v2 and then suddenly deprecated it and went back to just "eos" – makes it seem like there was a problem with eos-v2. I would be fine with just staying on eos-v2 though. For one thing it leaves the door open to further developments in eos that need to be gated by a config, eg eos-v3, if we ever have need for that again. {quote} +1 > Deprecate eos-alpha > --- > > Key: KAFKA-12574 > URL: https://issues.apache.org/jira/browse/KAFKA-12574 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > In KIP-447 we introduced a new thread-producer which is capable of > exactly-once semantics across multiple tasks. The new mode of EOS, called > eos-beta, is intended to eventually be the preferred processing mode for EOS > as it improves the performance and scaling of partitions/tasks. The only > downside is that it requires brokers to be on version 2.5+ in order to > understand the latest APIs that are necessary for this thread-producer. > We should consider deprecating the eos-alpha config, ie > StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate > to the new-and-improved processing mode, and upgrade their brokers if > necessary. > Eventually we would like to be able to remove the eos-alpha code paths from > Streams as this will help to simplify the logic and reduce the processing > mode branching. But since this will break client-broker compatibility, and > 2.5 is still a relatively recent version, we probably can't actually remove > eos-alpha in the near future -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients
hachikuji merged pull request #10142: URL: https://github.com/apache/kafka/pull/10142 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12294) Consider using the forwarding mechanism for metadata auto topic creation
[ https://issues.apache.org/jira/browse/KAFKA-12294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12294. - Fix Version/s: 3.0.0 Resolution: Fixed > Consider using the forwarding mechanism for metadata auto topic creation > > > Key: KAFKA-12294 > URL: https://issues.apache.org/jira/browse/KAFKA-12294 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Boyang Chen >Priority: Major > Fix For: 3.0.0 > > > Once [https://github.com/apache/kafka/pull/9579] is merged, there is a way to > improve the topic creation auditing by forwarding the CreateTopicsRequest > inside Envelope for the given client. Details in > [here|https://github.com/apache/kafka/pull/9579#issuecomment-772283780] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
hachikuji commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r607380961 ## File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java ## @@ -36,30 +36,31 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class LeaderStateTest { +public class LeaderStateTest { private final int localId = 0; private final int epoch = 5; private final LogContext logContext = new LogContext(); -private LeaderState newLeaderState( +private LeaderState newLeaderState( Set voters, long epochStartOffset ) { -return new LeaderState( +return new LeaderState<>( localId, epoch, epochStartOffset, voters, voters, -logContext +logContext, +null Review comment: Yeah, let's add an explicit `requireNonNull` in the constructor. Here we could potentially use a mock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
hachikuji commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r607381875 ## File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java ## @@ -36,30 +36,31 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class LeaderStateTest { +public class LeaderStateTest { Review comment: I confess it's a little annoying to see the generic type leak down to here, but I guess that's the price we have to pay. There's a big part of me that wants to remove the generic and let `ApiMessageAndVersion` be the only supported type. Anyway, that is fuel for a separate issue/discussion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r607383662 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2252,9 +2247,12 @@ public Long scheduleAtomicAppend(int epoch, List records) { return append(epoch, records, true); } +@SuppressWarnings("unchecked") private Long append(int epoch, List records, boolean isAtomic) { -BatchAccumulator accumulator = this.accumulator; -if (accumulator == null) { +BatchAccumulator accumulator; +try { +accumulator = (BatchAccumulator) quorum.leaderStateOrThrow().accumulator(); Review comment: Removing the cast causes this... `Type Mismatch: Cannot convert from BatchAccumulator to BatchAccumulator ` even though this is the signature of accumulator() `public BatchAccumulator accumulator()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r607386832 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -319,6 +328,10 @@ public String name() { } @Override -public void close() {} +public void close() { +if (accumulator != null) { Review comment: QuorumState tests where I was passing in null for the accumulator to get the code to compile. I'll see about using a mock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org