[GitHub] [kafka] dongjinleekr commented on pull request #10176: KAFKA-12359: Update Jetty to 11

2021-04-05 Thread GitBox


dongjinleekr commented on pull request #10176:
URL: https://github.com/apache/kafka/pull/10176#issuecomment-813274930


   For those who are curious about how the Jetty 11 upgrade would be, here is 
the latest draft with Jetty 11.0.2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should

2021-04-05 Thread Justin Mclean (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314720#comment-17314720
 ] 

Justin Mclean commented on KAFKA-12602:
---

Re the first two files - even if something is Apache licensed it's best to 
mention it in the LICENSE file. There's no need to repeat the license text 
however. Is if Apache code or 3rd party code? If it come from an ASF project 
(and there's changes to the original code) then that may have an impact the 
NOTICE files as well as you need to look at the NOTICE file in the project it 
come from and copy the relevant parts across.

> The LICENSE and NOTICE files don't list everything they should
> --
>
> Key: KAFKA-12602
> URL: https://issues.apache.org/jira/browse/KAFKA-12602
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> [~jmclean] raised this on the mailing list: 
> [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E]
>  
> We need to make  the license file match what we are actually shipping in 
> source and binary distributions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should

2021-04-05 Thread Justin Mclean (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314721#comment-17314721
 ] 

Justin Mclean commented on KAFKA-12602:
---

see [1] and [2]

1. https://infra.apache.org/licensing-howto.html#alv2-dep
2. https://infra.apache.org/licensing-howto.html#bundle-asf-product


> The LICENSE and NOTICE files don't list everything they should
> --
>
> Key: KAFKA-12602
> URL: https://issues.apache.org/jira/browse/KAFKA-12602
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> [~jmclean] raised this on the mailing list: 
> [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E]
>  
> We need to make  the license file match what we are actually shipping in 
> source and binary distributions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ncliang opened a new pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource

2021-04-05 Thread GitBox


ncliang opened a new pull request #10475:
URL: https://github.com/apache/kafka/pull/10475


   The default implementation, which is inheritted from ClassLoader, searches 
the
   classloader tree from parent first. This causes issues when the resource is
   available both on the classpath and the plugin path. Instead of attempting to
   load the resource from plugin path first, the system classloader is consulted
   first and loads the resource from classpath.
   
   A testcase is added in PluginsTest to verify this behavior is fixed by this
   commit.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ncliang commented on pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource

2021-04-05 Thread GitBox


ncliang commented on pull request #10475:
URL: https://github.com/apache/kafka/pull/10475#issuecomment-813282995


   @mageshn @C0urante @gharris1727 , please review when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should

2021-04-05 Thread Justin Mclean (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314723#comment-17314723
 ] 

Justin Mclean commented on KAFKA-12602:
---

Looking at the proposed changes, some changes may need to be made to the  
binary NOTICE file due to including of ALv2 dependancies. I also see you 
include general MIT and BSD licenses. This is probably OK but it would be 
better to include each individual license. Both licenses have copyright lines 
and the BSD license text can varies a little from license to license.

In short IMO I think it's much better than before. I think this would be OK to 
merge and make another release candidate. Perhaps consider refining in future 
releases?

> The LICENSE and NOTICE files don't list everything they should
> --
>
> Key: KAFKA-12602
> URL: https://issues.apache.org/jira/browse/KAFKA-12602
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> [~jmclean] raised this on the mailing list: 
> [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E]
>  
> We need to make  the license file match what we are actually shipping in 
> source and binary distributions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should

2021-04-05 Thread Justin Mclean (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314723#comment-17314723
 ] 

Justin Mclean edited comment on KAFKA-12602 at 4/5/21, 9:12 AM:


Looking at the proposed changes, some additions may need to be made to the  
binary NOTICE file due to inclusion of ALv2 dependancies. I also see you have 
included general MIT and BSD licenses. This is probably OK but it would be 
better to include each individual license text. Both MIT and BSD licenses have 
copyright lines and the BSD license text can vary a little from license to 
license.

In short IMO I think it's much better than before. I think this would be OK to 
merge and make another release candidate. Perhaps consider refining in future 
releases?


was (Author: jmclean):
Looking at the proposed changes, some changes may need to be made to the  
binary NOTICE file due to including of ALv2 dependancies. I also see you 
include general MIT and BSD licenses. This is probably OK but it would be 
better to include each individual license. Both licenses have copyright lines 
and the BSD license text can varies a little from license to license.

In short IMO I think it's much better than before. I think this would be OK to 
merge and make another release candidate. Perhaps consider refining in future 
releases?

> The LICENSE and NOTICE files don't list everything they should
> --
>
> Key: KAFKA-12602
> URL: https://issues.apache.org/jira/browse/KAFKA-12602
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> [~jmclean] raised this on the mailing list: 
> [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E]
>  
> We need to make  the license file match what we are actually shipping in 
> source and binary distributions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12615) Correct comments for the method Selector.clear

2021-04-05 Thread HaiyuanZhao (Jira)
HaiyuanZhao created KAFKA-12615:
---

 Summary: Correct comments for the method Selector.clear
 Key: KAFKA-12615
 URL: https://issues.apache.org/jira/browse/KAFKA-12615
 Project: Kafka
  Issue Type: Improvement
  Components: build
Reporter: HaiyuanZhao
Assignee: HaiyuanZhao


{code:java}

/**
 * Clears all the results from the previous poll. This is invoked by Selector 
at the start of
 * a poll() when all the results from the previous poll are expected to have 
been handled.
 * 
 * SocketServer uses {@link #clearCompletedSends()} and {@link 
#clearCompletedSends()} to
 * clear `completedSends` and `completedReceives` as soon as they are processed 
to avoid
 * holding onto large request/response buffers from multiple connections longer 
than necessary.
 * Clients rely on Selector invoking {@link #clear()} at the start of each 
poll() since memory usage
 * is less critical and clearing once-per-poll provides the flexibility to 
process these results in
 * any order before the next poll.
 */
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12615) Correct comments for the method Selector.clear

2021-04-05 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao updated KAFKA-12615:

Description: 
According to my understanding, the second clearCompletedSends should be 
clearCompletedReceives
{code:java}

/**
 * Clears all the results from the previous poll. This is invoked by Selector 
at the start of
 * a poll() when all the results from the previous poll are expected to have 
been handled.
 * 
 * SocketServer uses {@link #clearCompletedSends()} and {@link 
#clearCompletedSends()} to
 * clear `completedSends` and `completedReceives` as soon as they are processed 
to avoid
 * holding onto large request/response buffers from multiple connections longer 
than necessary.
 * Clients rely on Selector invoking {@link #clear()} at the start of each 
poll() since memory usage
 * is less critical and clearing once-per-poll provides the flexibility to 
process these results in
 * any order before the next poll.
 */
{code}

  was:
{code:java}

/**
 * Clears all the results from the previous poll. This is invoked by Selector 
at the start of
 * a poll() when all the results from the previous poll are expected to have 
been handled.
 * 
 * SocketServer uses {@link #clearCompletedSends()} and {@link 
#clearCompletedSends()} to
 * clear `completedSends` and `completedReceives` as soon as they are processed 
to avoid
 * holding onto large request/response buffers from multiple connections longer 
than necessary.
 * Clients rely on Selector invoking {@link #clear()} at the start of each 
poll() since memory usage
 * is less critical and clearing once-per-poll provides the flexibility to 
process these results in
 * any order before the next poll.
 */
{code}


> Correct comments for the method Selector.clear
> --
>
> Key: KAFKA-12615
> URL: https://issues.apache.org/jira/browse/KAFKA-12615
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: HaiyuanZhao
>Assignee: HaiyuanZhao
>Priority: Minor
>
> According to my understanding, the second clearCompletedSends should be 
> clearCompletedReceives
> {code:java}
> /**
>  * Clears all the results from the previous poll. This is invoked by Selector 
> at the start of
>  * a poll() when all the results from the previous poll are expected to have 
> been handled.
>  * 
>  * SocketServer uses {@link #clearCompletedSends()} and {@link 
> #clearCompletedSends()} to
>  * clear `completedSends` and `completedReceives` as soon as they are 
> processed to avoid
>  * holding onto large request/response buffers from multiple connections 
> longer than necessary.
>  * Clients rely on Selector invoking {@link #clear()} at the start of each 
> poll() since memory usage
>  * is less critical and clearing once-per-poll provides the flexibility to 
> process these results in
>  * any order before the next poll.
>  */
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12615) Correct comments for the method Selector.clear

2021-04-05 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao updated KAFKA-12615:

Description: 
According to my understanding, the second clearCompletedSends should be 
clearCompletedReceives
{code:java}

/**
 * Clears all the results from the previous poll. This is invoked by Selector 
at the start of
 * a poll() when all the results from the previous poll are expected to have 
been handled.
 * 
 * SocketServer uses {@link #*clearCompletedSends*()} and {@link 
#*clearCompletedSends*()} to
 * clear `completedSends` and `completedReceives` as soon as they are processed 
to avoid
 * holding onto large request/response buffers from multiple connections longer 
than necessary.
 * Clients rely on Selector invoking {@link #clear()} at the start of each 
poll() since memory usage
 * is less critical and clearing once-per-poll provides the flexibility to 
process these results in
 * any order before the next poll.
 */
{code}

  was:
According to my understanding, the second clearCompletedSends should be 
clearCompletedReceives
{code:java}

/**
 * Clears all the results from the previous poll. This is invoked by Selector 
at the start of
 * a poll() when all the results from the previous poll are expected to have 
been handled.
 * 
 * SocketServer uses {@link #clearCompletedSends()} and {@link 
#clearCompletedSends()} to
 * clear `completedSends` and `completedReceives` as soon as they are processed 
to avoid
 * holding onto large request/response buffers from multiple connections longer 
than necessary.
 * Clients rely on Selector invoking {@link #clear()} at the start of each 
poll() since memory usage
 * is less critical and clearing once-per-poll provides the flexibility to 
process these results in
 * any order before the next poll.
 */
{code}


> Correct comments for the method Selector.clear
> --
>
> Key: KAFKA-12615
> URL: https://issues.apache.org/jira/browse/KAFKA-12615
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: HaiyuanZhao
>Assignee: HaiyuanZhao
>Priority: Minor
>
> According to my understanding, the second clearCompletedSends should be 
> clearCompletedReceives
> {code:java}
> /**
>  * Clears all the results from the previous poll. This is invoked by Selector 
> at the start of
>  * a poll() when all the results from the previous poll are expected to have 
> been handled.
>  * 
>  * SocketServer uses {@link #*clearCompletedSends*()} and {@link 
> #*clearCompletedSends*()} to
>  * clear `completedSends` and `completedReceives` as soon as they are 
> processed to avoid
>  * holding onto large request/response buffers from multiple connections 
> longer than necessary.
>  * Clients rely on Selector invoking {@link #clear()} at the start of each 
> poll() since memory usage
>  * is less critical and clearing once-per-poll provides the flexibility to 
> process these results in
>  * any order before the next poll.
>  */
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12615) Correct comments for the method Selector.clear

2021-04-05 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao updated KAFKA-12615:

Component/s: (was: build)
 clients

> Correct comments for the method Selector.clear
> --
>
> Key: KAFKA-12615
> URL: https://issues.apache.org/jira/browse/KAFKA-12615
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: HaiyuanZhao
>Assignee: HaiyuanZhao
>Priority: Minor
>
> According to my understanding, the second clearCompletedSends should be 
> clearCompletedReceives
> {code:java}
> /**
>  * Clears all the results from the previous poll. This is invoked by Selector 
> at the start of
>  * a poll() when all the results from the previous poll are expected to have 
> been handled.
>  * 
>  * SocketServer uses {@link #clearCompletedSends()} and {@link 
> #clearCompletedSends()} to
>  * clear `completedSends` and `completedReceives` as soon as they are 
> processed to avoid
>  * holding onto large request/response buffers from multiple connections 
> longer than necessary.
>  * Clients rely on Selector invoking {@link #clear()} at the start of each 
> poll() since memory usage
>  * is less critical and clearing once-per-poll provides the flexibility to 
> process these results in
>  * any order before the next poll.
>  */
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12615) Correct comments for the method Selector.clear

2021-04-05 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao updated KAFKA-12615:

Description: 
According to my understanding, the second clearCompletedSends should be 
clearCompletedReceives


/**
 * Clears all the results from the previous poll. This is invoked by Selector 
at the start of
 * a poll() when all the results from the previous poll are expected to have 
been handled.
 * 
 * SocketServer uses clearCompletedSends*() and *clearCompletedSends*() to
 * clear `completedSends` and `completedReceives` as soon as they are processed 
to avoid
 * holding onto large request/response buffers from multiple connections longer 
than necessary.
 * Clients rely on Selector invoking {@link #clear()} at the start of each 
poll() since memory usage
 * is less critical and clearing once-per-poll provides the flexibility to 
process these results in
 * any order before the next poll.
 */


  was:
According to my understanding, the second clearCompletedSends should be 
clearCompletedReceives


/**
 * Clears all the results from the previous poll. This is invoked by Selector 
at the start of
 * a poll() when all the results from the previous poll are expected to have 
been handled.
 * 
 * SocketServer uses {@link #*clearCompletedSends*()} and {@link 
#*clearCompletedSends*()} to
 * clear `completedSends` and `completedReceives` as soon as they are processed 
to avoid
 * holding onto large request/response buffers from multiple connections longer 
than necessary.
 * Clients rely on Selector invoking {@link #clear()} at the start of each 
poll() since memory usage
 * is less critical and clearing once-per-poll provides the flexibility to 
process these results in
 * any order before the next poll.
 */



> Correct comments for the method Selector.clear
> --
>
> Key: KAFKA-12615
> URL: https://issues.apache.org/jira/browse/KAFKA-12615
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: HaiyuanZhao
>Assignee: HaiyuanZhao
>Priority: Minor
>
> According to my understanding, the second clearCompletedSends should be 
> clearCompletedReceives
> /**
>  * Clears all the results from the previous poll. This is invoked by Selector 
> at the start of
>  * a poll() when all the results from the previous poll are expected to have 
> been handled.
>  * 
>  * SocketServer uses clearCompletedSends*() and *clearCompletedSends*() to
>  * clear `completedSends` and `completedReceives` as soon as they are 
> processed to avoid
>  * holding onto large request/response buffers from multiple connections 
> longer than necessary.
>  * Clients rely on Selector invoking {@link #clear()} at the start of each 
> poll() since memory usage
>  * is less critical and clearing once-per-poll provides the flexibility to 
> process these results in
>  * any order before the next poll.
>  */



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12615) Correct comments for the method Selector.clear

2021-04-05 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao updated KAFKA-12615:

Description: 
According to my understanding, the second clearCompletedSends should be 
clearCompletedReceives


/**
 * Clears all the results from the previous poll. This is invoked by Selector 
at the start of
 * a poll() when all the results from the previous poll are expected to have 
been handled.
 * 
 * SocketServer uses {@link #*clearCompletedSends*()} and {@link 
#*clearCompletedSends*()} to
 * clear `completedSends` and `completedReceives` as soon as they are processed 
to avoid
 * holding onto large request/response buffers from multiple connections longer 
than necessary.
 * Clients rely on Selector invoking {@link #clear()} at the start of each 
poll() since memory usage
 * is less critical and clearing once-per-poll provides the flexibility to 
process these results in
 * any order before the next poll.
 */


  was:
According to my understanding, the second clearCompletedSends should be 
clearCompletedReceives
{code:java}

/**
 * Clears all the results from the previous poll. This is invoked by Selector 
at the start of
 * a poll() when all the results from the previous poll are expected to have 
been handled.
 * 
 * SocketServer uses {@link #*clearCompletedSends*()} and {@link 
#*clearCompletedSends*()} to
 * clear `completedSends` and `completedReceives` as soon as they are processed 
to avoid
 * holding onto large request/response buffers from multiple connections longer 
than necessary.
 * Clients rely on Selector invoking {@link #clear()} at the start of each 
poll() since memory usage
 * is less critical and clearing once-per-poll provides the flexibility to 
process these results in
 * any order before the next poll.
 */
{code}


> Correct comments for the method Selector.clear
> --
>
> Key: KAFKA-12615
> URL: https://issues.apache.org/jira/browse/KAFKA-12615
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: HaiyuanZhao
>Assignee: HaiyuanZhao
>Priority: Minor
>
> According to my understanding, the second clearCompletedSends should be 
> clearCompletedReceives
> /**
>  * Clears all the results from the previous poll. This is invoked by Selector 
> at the start of
>  * a poll() when all the results from the previous poll are expected to have 
> been handled.
>  * 
>  * SocketServer uses {@link #*clearCompletedSends*()} and {@link 
> #*clearCompletedSends*()} to
>  * clear `completedSends` and `completedReceives` as soon as they are 
> processed to avoid
>  * holding onto large request/response buffers from multiple connections 
> longer than necessary.
>  * Clients rely on Selector invoking {@link #clear()} at the start of each 
> poll() since memory usage
>  * is less critical and clearing once-per-poll provides the flexibility to 
> process these results in
>  * any order before the next poll.
>  */



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12615) Correct comments for the method Selector.clear

2021-04-05 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao updated KAFKA-12615:

Description: 
According to my understanding, the second clearCompletedSends should be 
clearCompletedReceives


/**
 * Clears all the results from the previous poll. This is invoked by Selector 
at the start of
 * a poll() when all the results from the previous poll are expected to have 
been handled.
 * 
 * SocketServer uses clearCompletedSends() and *clearCompletedSends*() to
 * clear `completedSends` and `completedReceives` as soon as they are processed 
to avoid
 * holding onto large request/response buffers from multiple connections longer 
than necessary.
 * Clients rely on Selector invoking {@link #clear()} at the start of each 
poll() since memory usage
 * is less critical and clearing once-per-poll provides the flexibility to 
process these results in
 * any order before the next poll.
 */


  was:
According to my understanding, the second clearCompletedSends should be 
clearCompletedReceives


/**
 * Clears all the results from the previous poll. This is invoked by Selector 
at the start of
 * a poll() when all the results from the previous poll are expected to have 
been handled.
 * 
 * SocketServer uses clearCompletedSends*() and *clearCompletedSends*() to
 * clear `completedSends` and `completedReceives` as soon as they are processed 
to avoid
 * holding onto large request/response buffers from multiple connections longer 
than necessary.
 * Clients rely on Selector invoking {@link #clear()} at the start of each 
poll() since memory usage
 * is less critical and clearing once-per-poll provides the flexibility to 
process these results in
 * any order before the next poll.
 */



> Correct comments for the method Selector.clear
> --
>
> Key: KAFKA-12615
> URL: https://issues.apache.org/jira/browse/KAFKA-12615
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: HaiyuanZhao
>Assignee: HaiyuanZhao
>Priority: Minor
>
> According to my understanding, the second clearCompletedSends should be 
> clearCompletedReceives
> /**
>  * Clears all the results from the previous poll. This is invoked by Selector 
> at the start of
>  * a poll() when all the results from the previous poll are expected to have 
> been handled.
>  * 
>  * SocketServer uses clearCompletedSends() and *clearCompletedSends*() to
>  * clear `completedSends` and `completedReceives` as soon as they are 
> processed to avoid
>  * holding onto large request/response buffers from multiple connections 
> longer than necessary.
>  * Clients rely on Selector invoking {@link #clear()} at the start of each 
> poll() since memory usage
>  * is less critical and clearing once-per-poll provides the flexibility to 
> process these results in
>  * any order before the next poll.
>  */



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12615) Correct comments for the method Selector.clear

2021-04-05 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao updated KAFKA-12615:

Description: 
 According to my understanding, the second clearCompletedSends which is 
highlighted as followed should be clearCompletedReceives


/**
 * Clears all the results from the previous poll. This is invoked by Selector 
at the start of
 * a poll() when all the results from the previous poll are expected to have 
been handled.
 * 
 * SocketServer uses clearCompletedSends() and *clearCompletedSends*() to
 * clear `completedSends` and `completedReceives` as soon as they are processed 
to avoid
 * holding onto large request/response buffers from multiple connections longer 
than necessary.
 * Clients rely on Selector invoking {@link #clear()} at the start of each 
poll() since memory usage
 * is less critical and clearing once-per-poll provides the flexibility to 
process these results in
 * any order before the next poll.
 */


  was:
According to my understanding, the second clearCompletedSends should be 
clearCompletedReceives


/**
 * Clears all the results from the previous poll. This is invoked by Selector 
at the start of
 * a poll() when all the results from the previous poll are expected to have 
been handled.
 * 
 * SocketServer uses clearCompletedSends() and *clearCompletedSends*() to
 * clear `completedSends` and `completedReceives` as soon as they are processed 
to avoid
 * holding onto large request/response buffers from multiple connections longer 
than necessary.
 * Clients rely on Selector invoking {@link #clear()} at the start of each 
poll() since memory usage
 * is less critical and clearing once-per-poll provides the flexibility to 
process these results in
 * any order before the next poll.
 */



> Correct comments for the method Selector.clear
> --
>
> Key: KAFKA-12615
> URL: https://issues.apache.org/jira/browse/KAFKA-12615
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: HaiyuanZhao
>Assignee: HaiyuanZhao
>Priority: Minor
>
>  According to my understanding, the second clearCompletedSends which is 
> highlighted as followed should be clearCompletedReceives
> /**
>  * Clears all the results from the previous poll. This is invoked by Selector 
> at the start of
>  * a poll() when all the results from the previous poll are expected to have 
> been handled.
>  * 
>  * SocketServer uses clearCompletedSends() and *clearCompletedSends*() to
>  * clear `completedSends` and `completedReceives` as soon as they are 
> processed to avoid
>  * holding onto large request/response buffers from multiple connections 
> longer than necessary.
>  * Clients rely on Selector invoking {@link #clear()} at the start of each 
> poll() since memory usage
>  * is less critical and clearing once-per-poll provides the flexibility to 
> process these results in
>  * any order before the next poll.
>  */



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] zhaohaidao opened a new pull request #10476: KAFKA-12615: Correct comments for the method Selector.clear

2021-04-05 Thread GitBox


zhaohaidao opened a new pull request #10476:
URL: https://github.com/apache/kafka/pull/10476


   According to my understanding, the second clearCompletedSends which is 
highlighted as followed should be clearCompletedReceives
   
   /**
   
   Clears all the results from the previous poll. This is invoked by Selector 
at the start of
   a poll() when all the results from the previous poll are expected to have 
been handled.
   
   SocketServer uses clearCompletedSends() and clearCompletedSends() to
   clear `completedSends` and `completedReceives` as soon as they are processed 
to avoid
   holding onto large request/response buffers from multiple connections longer 
than necessary.
   Clients rely on Selector invoking {@link #clear()}
   at the start of each poll() since memory usage
   
   is less critical and clearing once-per-poll provides the flexibility to 
process these results in
   any order before the next poll.
   */


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zhaohaidao opened a new pull request #10477: KAFKA-12615: Correct comments for the method Selector.clear

2021-04-05 Thread GitBox


zhaohaidao opened a new pull request #10477:
URL: https://github.com/apache/kafka/pull/10477


   According to my understanding, the second clearCompletedSends which is 
highlighted as followed should be clearCompletedReceives
   
   /**
   
   Clears all the results from the previous poll. This is invoked by Selector 
at the start of
   a poll() when all the results from the previous poll are expected to have 
been handled.
   
   SocketServer uses clearCompletedSends() and **clearCompletedSends**() to 
clear `completedSends` and `completedReceives` as soon as they are processed to 
avoid holding onto large request/response buffers from multiple connections 
longer than necessary. Clients rely on Selector invoking {@link #clear()} at 
the start of each poll() since memory usage
   
   is less critical and clearing once-per-poll provides the flexibility to 
process these results in
   any order before the next poll.
   */
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2021-04-05 Thread GitBox


ijuma commented on a change in pull request #9229:
URL: https://github.com/apache/kafka/pull/9229#discussion_r607129364



##
File path: core/src/main/scala/kafka/server/RequestLocal.scala
##
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.kafka.common.utils.BufferSupplier
+
+case class RequestLocal(bufferSupplier: BufferSupplier) {

Review comment:
   Add documentation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2021-04-05 Thread GitBox


ijuma commented on pull request #9229:
URL: https://github.com/apache/kafka/pull/9229#issuecomment-813439278


   @chia7712 I introduced `RequestLocal` as discussed. Does this seem 
reasonable to you? If so, I propose the following next steps:
   
   1. In this PR, provide utility methods in `RequestLocal` for the two common 
defaults: `ThreadLocalCaching` and `NoCaching`. The latter should be used when 
the usage is not guaranteed to be within the same thread. In the future, we can 
consider a `ThreadSafeCaching`/`GlobalCaching` option, if that makes sense.
   
   2. In a separate PR, remove the default arguments. This will result in a lot 
of test changes, but no change in behavior. So, it probably makes sense to 
review separately.
   
   Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10476: KAFKA-12615: Correct comments for the method Selector.clear

2021-04-05 Thread GitBox


ijuma commented on pull request #10476:
URL: https://github.com/apache/kafka/pull/10476#issuecomment-813444072


   Thanks for the PR. This looks like a duplicate of 
https://github.com/apache/kafka/pull/10477, so closing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma closed pull request #10476: KAFKA-12615: Correct comments for the method Selector.clear

2021-04-05 Thread GitBox


ijuma closed pull request #10476:
URL: https://github.com/apache/kafka/pull/10476


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10477: KAFKA-12615: Correct comments for the method Selector.clear

2021-04-05 Thread GitBox


ijuma merged pull request #10477:
URL: https://github.com/apache/kafka/pull/10477


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10473: KAFKA-12614: Use Jenkinsfile for trunk and release branch builds

2021-04-05 Thread GitBox


ijuma commented on pull request #10473:
URL: https://github.com/apache/kafka/pull/10473#issuecomment-813460788


   @mumrah Good point. I don't expect the number of builds to change over time. 
Instead, we will replace some of the existing builds with new ones. That is, I 
expect us to have 3 Java versions and 2 Scala versions at any point in time. If 
this changes, we can consider matrix builds.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10473: KAFKA-12614: Use Jenkinsfile for trunk and release branch builds

2021-04-05 Thread GitBox


ijuma merged pull request #10473:
URL: https://github.com/apache/kafka/pull/10473


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10473: KAFKA-12614: Use Jenkinsfile for trunk and release branch builds

2021-04-05 Thread GitBox


ijuma commented on pull request #10473:
URL: https://github.com/apache/kafka/pull/10473#issuecomment-813463611


   Merged to trunk and 2.8 branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik opened a new pull request #10478: KAFKA-12553: Refactor recovery logic

2021-04-05 Thread GitBox


kowshik opened a new pull request #10478:
URL: https://github.com/apache/kafka/pull/10478


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #10393: KAFKA-12539: Refactor KafkaRaftCllient handleVoteRequest to reduce cyclomatic complexity

2021-04-05 Thread GitBox


hachikuji merged pull request #10393:
URL: https://github.com/apache/kafka/pull/10393


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12539) Move some logic in handleVoteRequest to EpochState

2021-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12539.
-
Resolution: Fixed

> Move some logic in handleVoteRequest to EpochState
> --
>
> Key: KAFKA-12539
> URL: https://issues.apache.org/jira/browse/KAFKA-12539
> Project: Kafka
>  Issue Type: Improvement
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
>
> Reduce the cyclomatic complexity of KafkaRaftClient, see the comment for 
> details: https://github.com/apache/kafka/pull/10289#discussion_r597274570



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12344) Support SlidingWindows in the Scala API

2021-04-05 Thread Leah Thomas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314954#comment-17314954
 ] 

Leah Thomas commented on KAFKA-12344:
-

Hi [~ketulgupta], thanks for picking this up! It should be a relatively easy 
fix. If you look at 
[KGroupedStream.scala|https://github.com/apache/kafka/blob/trunk/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala#L155]
 you can see that there's a `windowedBy` method for both `Windows` and 
`SessionWindows`. These are scala wrapper classes that link back to the java 
code. We need this wrapper method for `SlidingWindows` as well. You should be 
able to model it off of the other two existing methods. 

I'm no scala expert but I think we should be able to utilize the existing 
`TimeWindowedKStream.scala` class instead of making a separate 
`SlidingWindowedKStreamScala` class, but maybe [~mjsax] can confirm. 

> Support SlidingWindows in the Scala API
> ---
>
> Key: KAFKA-12344
> URL: https://issues.apache.org/jira/browse/KAFKA-12344
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Leah Thomas
>Priority: Major
>  Labels: newbie, scala
>
> in KIP-450 we implemented sliding windows for the Java API but left out a few 
> crucial methods to allow sliding windows to work through the Scala API. We 
> need to add those methods to make the Scala API fully leverage sliding windows



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik commented on a change in pull request #10478: KAFKA-12553: Refactor recovery logic

2021-04-05 Thread GitBox


kowshik commented on a change in pull request #10478:
URL: https://github.com/apache/kafka/pull/10478#discussion_r607168537



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -0,0 +1,423 @@
+package kafka.log
+
+import java.io.{File, IOException}
+import java.nio.file.{Files, NoSuchFileException}
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.log.Log.{CleanedFileSuffix, DeletedFileSuffix, SwapFileSuffix, 
isIndexFile, isLogFile, offsetFromFile, offsetFromFileName}
+import kafka.server.{LogDirFailureChannel, LogOffsetMetadata}
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.utils.{CoreUtils, Logging, Scheduler}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.InvalidOffsetException
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Seq, Set, mutable}
+
+class LogLoader(dir: File,

Review comment:
   Documentation

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2763,6 +2191,320 @@ object Log {
 appendInfo.append(batch, firstOffsetMetadata)
   }
 
+  def maybeCreateLeaderEpochCache(dir: File,
+  topicPartition: TopicPartition,
+  logDirFailureChannel: LogDirFailureChannel,
+  recordVersion: RecordVersion): 
Option[LeaderEpochFileCache] = {
+val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
+
+def newLeaderEpochFileCache(): LeaderEpochFileCache = {
+  val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, 
logDirFailureChannel)
+  new LeaderEpochFileCache(topicPartition, checkpointFile)
+}
+
+if (recordVersion.precedes(RecordVersion.V2)) {
+  val currentCache = if (leaderEpochFile.exists())
+Some(newLeaderEpochFileCache())
+  else
+None
+
+  if (currentCache.exists(_.nonEmpty))
+warn(s"Deleting non-empty leader epoch cache due to incompatible 
message format $recordVersion")
+
+  Files.deleteIfExists(leaderEpochFile.toPath)
+  None
+} else {
+  Some(newLeaderEpochFileCache())
+}
+  }
+
+  /**
+   * Swap one or more new segment in place and delete one or more existing 
segments in a crash-safe manner. The old
+   * segments will be asynchronously deleted.
+   *
+   * This method does not need to convert IOException to KafkaStorageException 
because it is either called before all logs are loaded
+   * or the caller will catch and handle IOException
+   *
+   * The sequence of operations is:
+   * 
+   *Cleaner creates one or more new segments with suffix .cleaned and 
invokes replaceSegments().
+   *If broker crashes at this point, the clean-and-swap operation is 
aborted and
+   *the .cleaned files are deleted on recovery in loadSegments().
+   *New segments are renamed .swap. If the broker crashes before all 
segments were renamed to .swap, the
+   *clean-and-swap operation is aborted - .cleaned as well as .swap 
files are deleted on recovery in
+   *loadSegments(). We detect this situation by maintaining a specific 
order in which files are renamed from
+   *.cleaned to .swap. Basically, files are renamed in descending 
order of offsets. On recovery, all .swap files
+   *whose offset is greater than the minimum-offset .clean file are 
deleted.
+   *If the broker crashes after all new segments were renamed to 
.swap, the operation is completed, the swap
+   *operation is resumed on recovery as described in the next step.
+   *Old segment files are renamed to .deleted and asynchronous delete 
is scheduled.
+   *If the broker crashes, any .deleted files left behind are deleted 
on recovery in loadSegments().
+   *replaceSegments() is then invoked to complete the swap with 
newSegment recreated from
+   *the .swap file and oldSegments containing segments which were not 
renamed before the crash.
+   *Swap segment(s) are renamed to replace the existing segments, 
completing this operation.
+   *If the broker crashes, any .deleted files which may be left behind 
are deleted
+   *on recovery in loadSegments().
+   * 
+   *
+   * @param existingSegments The existing segments of the log
+   * @param newSegments The new log segment to add to the log
+   * @param oldSegments The old log segments to delete from the log
+   * @param isRecoveredSwapFile true if the new segment was created from a 
swap file during recovery after a crash
+   */
+  private[log] def replaceSegments(existingSegments: LogSegments,
+   newSegments: Seq[LogSegment],
+   oldSegments: Seq[LogSegment],
+   isRecoveredSwapFile: Boolean = false,
+   dir: File,
+   topicPartition: TopicPartition,
+   

[jira] [Updated] (KAFKA-12606) Some improvements for produce record validation

2021-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12606:

Description: 
KIP-467 introduces an extension to the produce response to let the producer 
identify the specific records that had failed validation. While implementing 
KAFKA-12548, we realized a few shortcomings in this proposal:

1. Some records may fail only because they are part of a batch which had a 
record failing validation. In this case, we can return a better exception to 
the user so that they know the record is safe to be retried. For example, 
`RecordNotAppendedException` or something like that.
2. Records in the same batch may fail validation for different reasons. For 
example, one record may fail because of an invalid timestamp; another may fail 
because it uses a null key and the topic is compacted. However, the schema only 
allows a single error for each partition, so we cannot distinguish these cases 
in order to throw more specific exception types. We should consider allowing a 
record-level error code as well.

  was:
KIP-467 introduces an extension to the produce response to let the producer 
identify the specific records that had failed validation. While implementing 
KAFKA-12548, we realized a few shortcomings in this proposal:

1. Some records may fail only because they are part of a batch which had a 
record failing validation. In this case, we can return a better exception to 
the user so that they know the record is safe to be retried. For example, 
`RecordNotAppendedException` or something like that.
2. Records in the same batch may fail validation for different reasons. For 
example, one record may fail because of an invalid timestamp; another may fail 
because it uses a null key and the topic is compacted. However, the schema only 
allows a single error for each partition, so we cannot distinguish these cases 
in order to throw more specific exception types.


> Some improvements for produce record validation
> ---
>
> Key: KAFKA-12606
> URL: https://issues.apache.org/jira/browse/KAFKA-12606
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
>
> KIP-467 introduces an extension to the produce response to let the producer 
> identify the specific records that had failed validation. While implementing 
> KAFKA-12548, we realized a few shortcomings in this proposal:
> 1. Some records may fail only because they are part of a batch which had a 
> record failing validation. In this case, we can return a better exception to 
> the user so that they know the record is safe to be retried. For example, 
> `RecordNotAppendedException` or something like that.
> 2. Records in the same batch may fail validation for different reasons. For 
> example, one record may fail because of an invalid timestamp; another may 
> fail because it uses a null key and the topic is compacted. However, the 
> schema only allows a single error for each partition, so we cannot 
> distinguish these cases in order to throw more specific exception types. We 
> should consider allowing a record-level error code as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12604) Remove envelope handling from broker

2021-04-05 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314964#comment-17314964
 ] 

Jason Gustafson commented on KAFKA-12604:
-

I am going to close this without fixing. Although we do not need the broker to 
handle the Envelope api today, there will be a point where we want even the zk 
brokers to forward to the controller.

> Remove envelope handling from broker
> 
>
> Key: KAFKA-12604
> URL: https://issues.apache.org/jira/browse/KAFKA-12604
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: kip-500
>
> We only need the envelope request to be handled on the controller endpoint. 
> We added it to the broker initially in order to allow testing of the 
> forwarding logic. Now that the integration testing framework for kip-500 is 
> in place, we should be able to get rid of this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12604) Remove envelope handling from broker

2021-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12604.
-
Resolution: Won't Fix

> Remove envelope handling from broker
> 
>
> Key: KAFKA-12604
> URL: https://issues.apache.org/jira/browse/KAFKA-12604
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: kip-500
>
> We only need the envelope request to be handled on the controller endpoint. 
> We added it to the broker initially in order to allow testing of the 
> forwarding logic. Now that the integration testing framework for kip-500 is 
> in place, we should be able to get rid of this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12616) Convert integration tests to use ClusterTest

2021-04-05 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12616:
---

 Summary: Convert integration tests to use ClusterTest 
 Key: KAFKA-12616
 URL: https://issues.apache.org/jira/browse/KAFKA-12616
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


We would like to convert integration tests to use the new ClusterTest 
annotations so that we can easily test both the Zk and KRaft implementations. 
This will require adding a bunch of support to the ClusterTest framework as we 
go along.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12617) Convert MetadataRequestTest to use ClusterTest

2021-04-05 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12617:
---

 Summary: Convert MetadataRequestTest to use ClusterTest
 Key: KAFKA-12617
 URL: https://issues.apache.org/jira/browse/KAFKA-12617
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Jason Gustafson






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12616) Convert integration tests to use ClusterTest

2021-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12616:

Labels: kip-500  (was: )

> Convert integration tests to use ClusterTest 
> -
>
> Key: KAFKA-12616
> URL: https://issues.apache.org/jira/browse/KAFKA-12616
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: kip-500
>
> We would like to convert integration tests to use the new ClusterTest 
> annotations so that we can easily test both the Zk and KRaft implementations. 
> This will require adding a bunch of support to the ClusterTest framework as 
> we go along.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12616) Convert integration tests to use ClusterTest

2021-04-05 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314967#comment-17314967
 ] 

Jason Gustafson commented on KAFKA-12616:
-

Anyone can feel free to add sub-tasks to this. There are a lot of integration 
tests that need to be converted.

> Convert integration tests to use ClusterTest 
> -
>
> Key: KAFKA-12616
> URL: https://issues.apache.org/jira/browse/KAFKA-12616
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> We would like to convert integration tests to use the new ClusterTest 
> annotations so that we can easily test both the Zk and KRaft implementations. 
> This will require adding a bunch of support to the ClusterTest framework as 
> we go along.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12559) Add a top-level Streams config for bounding off-heap memory

2021-04-05 Thread amuthan Ganeshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314977#comment-17314977
 ] 

amuthan Ganeshan commented on KAFKA-12559:
--

Hi [~ableegoldman] and [~mjsax]

I would like to work on this.

Would you guide me in this by giving some pointers to start with...

> Add a top-level Streams config for bounding off-heap memory
> ---
>
> Key: KAFKA-12559
> URL: https://issues.apache.org/jira/browse/KAFKA-12559
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> At the moment we provide an example of how to bound the memory usage of 
> rocskdb in the [Memory 
> Management|https://kafka.apache.org/27/documentation/streams/developer-guide/memory-mgmt.html#rocksdb]
>  section of the docs. This requires implementing a custom RocksDBConfigSetter 
> class and setting a number of rocksdb options for relatively advanced 
> concepts and configurations. It seems a fair number of users either fail to 
> find this or consider it to be for more advanced use cases/users. But RocksDB 
> can eat up a lot of off-heap memory and it's not uncommon for users to come 
> across a {{RocksDBException: Cannot allocate memory}}
> It would probably be a much better user experience if we implemented this 
> memory bound out-of-the-box and just gave users a top-level StreamsConfig to 
> tune the off-heap memory given to rocksdb, like we have for on-heap cache 
> memory with cache.max.bytes.buffering. More advanced users can continue to 
> fine-tune their memory bounding and apply other configs with a custom config 
> setter, while new or more casual users can cap on the off-heap memory without 
> getting their hands dirty with rocksdb.
> I would propose to add the following top-level config:
> rocksdb.max.bytes.off.heap: medium priority, default to -1 (unbounded), valid 
> values are [0, inf]
> I'd also want to consider adding a second, lower priority top-level config to 
> give users a knob for adjusting how much of that total off-heap memory goes 
> to the block cache + index/filter blocks, and how much of it is afforded to 
> the write buffers. I'm struggling to come up with a good name for this 
> config, but it would be something like
> rocksdb.memtable.to.block.cache.off.heap.memory.ratio: low priority, default 
> to 0.5, valid values are [0, 1]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

2021-04-05 Thread GitBox


hachikuji commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r607216022



##
File path: 
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
##
@@ -219,6 +225,146 @@ class AutoTopicCreationManagerTest {
 testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
   }
 
+  @Test
+  def testTopicCreationWithMetadataContextPassPrincipal(): Unit = {
+autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+  config,
+  Some(brokerToController),
+  Some(adminManager),
+  Some(controller),
+  groupCoordinator,
+  transactionCoordinator)
+
+val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
+val topicName = "topic"
+topicsCollection.add(getNewTopic(topicName))
+val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
+  .setApiKey(ApiKeys.CREATE_TOPICS.id)
+  .setMinVersion(0)
+  .setMaxVersion(0)
+Mockito.when(brokerToController.controllerApiVersions())
+  
.thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion
+
+Mockito.when(controller.isActive).thenReturn(false)
+
+val requestHeader = new RequestHeader(ApiKeys.METADATA, 
ApiKeys.METADATA.latestVersion,
+  "clientId", 0)
+
+val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
+val principalSerde = new KafkaPrincipalSerde {
+  override def serialize(principal: KafkaPrincipal): Array[Byte] = {
+assertEquals(principal, userPrincipal)
+Utils.utf8(principal.toString)
+  }
+  override def deserialize(bytes: Array[Byte]): KafkaPrincipal = 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
+}
+
+val requestContext = new RequestContext(requestHeader, "1", 
InetAddress.getLocalHost,
+  userPrincipal, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+  SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false, 
Optional.of(principalSerde))
+
+autoTopicCreationManager.createTopics(

Review comment:
   This ensures that an attempt was made to serialize the principal, but 
how do we know that it made it to the envelope? I think it would be better to 
intercept the request that is sent to the channel and verify it directly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gharris1727 commented on a change in pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource

2021-04-05 Thread GitBox


gharris1727 commented on a change in pull request #10475:
URL: https://github.com/apache/kafka/pull/10475#discussion_r607216362



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
##
@@ -342,6 +343,47 @@ public void 
newPluginsShouldConfigureWithPluginClassLoader() {
 assertPluginClassLoaderAlwaysActive(samples);
 }
 
+@Test
+public void pluginClassLoaderReadVersionFromResource() {
+TestPlugins.assertAvailable();
+
+Map pluginProps = new HashMap<>();
+pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG,
+TestPlugins.pluginPath().stream()
+.filter(s -> 
s.contains("read-version-from-resource-v1"))
+.collect(Collectors.joining()));
+plugins = new Plugins(pluginProps);
+
+Converter converter = plugins.newPlugin(
+TestPlugins.READ_VERSION_FROM_RESOURCE,
+new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
+Converter.class
+);
+assertEquals("1.0.0",
+new String(converter.fromConnectData(null, null, null)));
+PluginClassLoader pluginClassLoader = plugins.delegatingLoader()
+.pluginClassLoader(TestPlugins.READ_VERSION_FROM_RESOURCE);
+assertNotNull(pluginClassLoader);
+
+
+// Re-initialize Plugins object with plugin class loader in the class 
loader tree. This is
+// to simulate the situation where jars exist on both system classpath 
and plugin path.
+pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG,
+TestPlugins.pluginPath().stream()
+.filter(s -> 
s.contains("read-version-from-resource-v2"))
+.collect(Collectors.joining()));
+plugins = new Plugins(pluginProps, pluginClassLoader);

Review comment:
   While I understand that the motivation here for explicitly providing the 
parent is control the behavior of the parent ClassLoader, I think this test is 
for a pretty extreme case. This is testing a:
   
   1. PluginClassLoader whose parent is a
   2. DelegatingClassLoader whose parent is a
   3. PluginClassLoader whose parent is a
   4. Delegating ClassLoader whose parent is the
   5. App ClassLoader
   
   Does this test still succeed if we use a different ClassLoader that's not 
part of a chain like this?
   We could construct a plain `URLClassLoader` instance as the parent, but that 
would still require the new Plugins constructor.
   If we add a test resource file to the App ClassLoader we could skip the 
additional constructor. We could use multiple different resource file names in 
the different test plugins to test the three cases:
   
   1. parent has resource but plugin does not
   2. plugin has resource but parent does not
   3. parent and plugin both have the resource

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
##
@@ -342,6 +343,47 @@ public void 
newPluginsShouldConfigureWithPluginClassLoader() {
 assertPluginClassLoaderAlwaysActive(samples);
 }
 
+@Test
+public void pluginClassLoaderReadVersionFromResource() {
+TestPlugins.assertAvailable();
+
+Map pluginProps = new HashMap<>();
+pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG,
+TestPlugins.pluginPath().stream()

Review comment:
   nit: TestPlugins has a Map from class name to File, we can add a method 
for getting the path to a single plugin jar to TestPlugins itself, so we don't 
have to iterate here.

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
##
@@ -114,6 +118,9 @@
 pluginJars.put(SAMPLING_HEADER_CONVERTER, 
createPluginJar("sampling-header-converter"));
 pluginJars.put(SAMPLING_CONFIG_PROVIDER, 
createPluginJar("sampling-config-provider"));
 pluginJars.put(SERVICE_LOADER, createPluginJar("service-loader"));
+// Create two versions of the same plugin reading version string 
from a resource
+pluginJars.put(READ_VERSION_FROM_RESOURCE + ".v1", 
createPluginJar("read-version-from-resource-v1"));

Review comment:
   nit: make each of these a separate constant




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #10474: KAFKA-12602: Fix LICENSE file

2021-04-05 Thread GitBox


vvcephei commented on pull request #10474:
URL: https://github.com/apache/kafka/pull/10474#issuecomment-813527301


   Hey @ableegoldman and @mimaison , 
   
   Since you're also both blocked on this, would you mind giving these changes 
a double-check?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

2021-04-05 Thread GitBox


hachikuji commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r607224516



##
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##
@@ -156,17 +169,44 @@ class DefaultAutoTopicCreationManager(
 .setTopics(topicsToCreate)
 )
 
-channelManager.get.sendRequest(createTopicsRequest, new 
ControllerRequestCompletionHandler {
+val requestCompletionHandler = new ControllerRequestCompletionHandler {
   override def onTimeout(): Unit = {
 debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
 clearInflightRequests(creatableTopics)
   }
 
   override def onComplete(response: ClientResponse): Unit = {
-debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
+debug(s"Auto topic creation completed for ${creatableTopics.keys} with 
response ${response.responseBody.toString}.")
 clearInflightRequests(creatableTopics)
   }
-})
+}
+
+val channelManager = this.channelManager.getOrElse {
+  throw new IllegalStateException("Channel manager must be defined in 
order to send CreateTopic requests.")
+}
+
+val request = metadataRequestContext.map { context =>
+  val requestVersion =
+channelManager.controllerApiVersions() match {
+  case None =>
+// We will rely on the Metadata request to be retried in the case
+// that the latest version is not usable by the controller.
+ApiKeys.CREATE_TOPICS.latestVersion()
+  case Some(nodeApiVersions) =>
+nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
+}
+
+  // Borrow client information such as client id and correlation id from 
the original request,
+  // in order to correlate the create request with the original metadata 
request.
+  val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+requestVersion,
+context.clientId,
+context.correlationId)
+  ForwardingManager.buildEnvelopeRequest(context,
+
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))

Review comment:
   @dengziming It's not a bad idea. We could even simplify it a little 
since the api key and version can be obtained from the request. I tend to agree 
that this is kind of a niche usage though, so I'm not sure it calls for the 
generality. Perhaps you could submit a follow-up once this is merged and we can 
see what it looks like.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should

2021-04-05 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315000#comment-17315000
 ] 

John Roesler commented on KAFKA-12602:
--

Thank you, [~jmclean] ,

 

There weren't too many BSD or MIT licenses, so I've gone ahead and just copied 
the exact license files from each one of those dependencies. Thanks for the 
feedback.

I also added notices regarding the provenance and copyright of PureJavaCrc32C 
and Murmur3 to our NOTICE file.

I will create two follow-up tickets, one to revisit the NOTICE file, and 
another to add an automated check to the release script to make sure that the 
our license file doesn't start to rot again in the future.

> The LICENSE and NOTICE files don't list everything they should
> --
>
> Key: KAFKA-12602
> URL: https://issues.apache.org/jira/browse/KAFKA-12602
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> [~jmclean] raised this on the mailing list: 
> [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E]
>  
> We need to make  the license file match what we are actually shipping in 
> source and binary distributions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ncliang commented on a change in pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource

2021-04-05 Thread GitBox


ncliang commented on a change in pull request #10475:
URL: https://github.com/apache/kafka/pull/10475#discussion_r607232615



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
##
@@ -342,6 +343,47 @@ public void 
newPluginsShouldConfigureWithPluginClassLoader() {
 assertPluginClassLoaderAlwaysActive(samples);
 }
 
+@Test
+public void pluginClassLoaderReadVersionFromResource() {
+TestPlugins.assertAvailable();
+
+Map pluginProps = new HashMap<>();
+pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG,
+TestPlugins.pluginPath().stream()
+.filter(s -> 
s.contains("read-version-from-resource-v1"))
+.collect(Collectors.joining()));
+plugins = new Plugins(pluginProps);
+
+Converter converter = plugins.newPlugin(
+TestPlugins.READ_VERSION_FROM_RESOURCE,
+new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
+Converter.class
+);
+assertEquals("1.0.0",
+new String(converter.fromConnectData(null, null, null)));
+PluginClassLoader pluginClassLoader = plugins.delegatingLoader()
+.pluginClassLoader(TestPlugins.READ_VERSION_FROM_RESOURCE);
+assertNotNull(pluginClassLoader);
+
+
+// Re-initialize Plugins object with plugin class loader in the class 
loader tree. This is
+// to simulate the situation where jars exist on both system classpath 
and plugin path.
+pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG,
+TestPlugins.pluginPath().stream()
+.filter(s -> 
s.contains("read-version-from-resource-v2"))
+.collect(Collectors.joining()));
+plugins = new Plugins(pluginProps, pluginClassLoader);

Review comment:
   I agree that having the additional testcases would be valuable. However, 
I do think that adding the resource directly to the app classloader makes the 
test much less readable. I like the idea of constructing and using a plain 
`URLClassLoader` as the parent. While it still requires the additional 
constructor to be exposed on `Plugins`, we do already expose the parent 
parameter on `DelegatingClassLoader` for specifically the flexibility of 
controlling the parent loader.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10393: KAFKA-12539: Refactor KafkaRaftCllient handleVoteRequest to reduce cyclomatic complexity

2021-04-05 Thread GitBox


ijuma commented on a change in pull request #10393:
URL: https://github.com/apache/kafka/pull/10393#discussion_r607233215



##
File path: raft/src/main/java/org/apache/kafka/raft/CandidateState.java
##
@@ -235,6 +240,15 @@ public int epoch() {
 return highWatermark;
 }
 
+@Override
+public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+// Still reject vote request even candidateId = localId, Although the 
candidate votes for
+// itself, this vote is implicit and not "granted".
+log.debug("Rejecting vote request from candidate {} since we are 
already candidate in epoch {}",
+candidateId, epoch);

Review comment:
   As a general rule, methods like this should not log IMO. The calling 
method should log instead. That is, good to avoid the side effect from the 
"check" operation. It seems like it was like that before this PR. What was the 
motivation for changing it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-05 Thread GitBox


hachikuji commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r607232953



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -355,24 +373,29 @@ private void fireHandleResign(int epoch) {
 }
 
 @Override
-public void initialize() throws IOException {
-quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
+public void initialize() {
+try {
+quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
 
-long currentTimeMs = time.milliseconds();
-if (quorum.isLeader()) {
-throw new IllegalStateException("Voter cannot initialize as a 
Leader");
-} else if (quorum.isCandidate()) {
-onBecomeCandidate(currentTimeMs);
-} else if (quorum.isFollower()) {
-onBecomeFollower(currentTimeMs);
-}
+long currentTimeMs = time.milliseconds();
+if (quorum.isLeader()) {
+throw new IllegalStateException("Voter cannot initialize as a 
Leader");
+} else if (quorum.isCandidate()) {
+onBecomeCandidate(currentTimeMs);
+} else if (quorum.isFollower()) {
+onBecomeFollower(currentTimeMs);
+}
 
-// When there is only a single voter, become candidate immediately
-if (quorum.isVoter()
-&& quorum.remoteVoters().isEmpty()
-&& !quorum.isLeader()
-&& !quorum.isCandidate()) {
-transitionToCandidate(currentTimeMs);
+// When there is only a single voter, become candidate immediately
+if (quorum.isVoter()
+&& quorum.remoteVoters().isEmpty()
+&& !quorum.isLeader()

Review comment:
   Since we're in here already, this check is not needed since we already 
ruled it out above.

##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -68,20 +70,65 @@ public synchronized void increment() {
 @Override
 public synchronized void handleCommit(BatchReader reader) {
 try {
-int initialValue = this.committed;
+int initialValue = committed;
 while (reader.hasNext()) {
 BatchReader.Batch batch = reader.next();
 log.debug("Handle commit of batch with records {} at base 
offset {}",
 batch.records(), batch.baseOffset());
 for (Integer value : batch.records()) {
-if (value != this.committed + 1) {
-throw new AssertionError("Expected next committed 
value to be " +
-(this.committed + 1) + ", but instead found " + 
value + " on node " + nodeId);
+if (value != committed + 1) {
+throw new AssertionError(
+String.format(
+"Expected next committed value to be %s, but 
instead found %s on node %s",
+committed + 1,
+value,
+nodeId
+)
+);
 }
-this.committed = value;
+committed = value;
 }
+
+nextReadOffset = batch.lastOffset() + 1;
+readEpoch = batch.epoch();
 }
 log.debug("Counter incremented from {} to {}", initialValue, 
committed);
+
+if (lastSnapshotEndOffset + 10 < nextReadOffset) {

Review comment:
   Looks like we are trying to do snapshots every 10 records. We could 
probably get rid of `lastSnapshotEndOffset` and use `committed % 10` or 
something like that. It may also be useful to be able to control the frequency 
of snapshots with a parameter.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -326,6 +336,14 @@ private void updateListenersProgress(List 
listenerContexts, lon
 }
 }
 
+private Optional> latestSnapshot() {
+return log.latestSnapshotId().flatMap(snapshoId -> {

Review comment:
   nit: missing t in snapshotId

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2154,8 +2182,14 @@ private boolean maybeCompleteShutdown(long 
currentTimeMs) {
 return false;
 }
 
-private void maybeUpdateOldestSnapshotId() {
-log.latestSnapshotId().ifPresent(log::deleteBeforeSnapshot);
+private void maybeUpdateEarliestSnapshotId() {

Review comment:
   Hmm, I guess I still see this the other way around. Why would the raft 
client care about updating the log start offset if not to delete old snapshots? 
What would that even mean outside the context of snapshot deletion?

##
File p

[GitHub] [kafka] cmccabe commented on pull request #10366: KAFKA-12467: Add controller-side snapshot generation

2021-04-05 Thread GitBox


cmccabe commented on pull request #10366:
URL: https://github.com/apache/kafka/pull/10366#issuecomment-813567129


   Fix spotbugs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-05 Thread GitBox


junrao commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r606343187



##
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class provides an in-memory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to leader epochs.
+ * 
+ * Remote log segment can go through the state transitions as mentioned in 
{@link RemoteLogSegmentState}.
+ * 
+ * This class will have all the segments which did not reach terminal state 
viz DELETE_SEGMENT_FINISHED. That means,any
+ * segment reaching the terminal state will get cleared from this instance.
+ * This class provides different methods to fetch segment metadata like {@link 
#remoteLogSegmentMetadata(int, long)},
+ * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, 
{@link #listAllRemoteLogSegments()}. Those
+ * methods have different semantics to fetch the segment based on its state.
+ * 
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}:
+ * 
+ * Segment in this state indicate it is not yet copied successfully. So, these 
segments will not be

Review comment:
   indicate => indicates Ditto in a few other places.

##
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class provides an in-memory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to leader epochs.
+ * 
+ * Remote log segment can go through the state transitions as mentioned in 
{@link RemoteLogSegmentState}.
+ * 
+ * This class will have all the segments which did not reach terminal state 
viz DELETE_SEGMENT_FINISHED. That means,any
+ * segment reaching the terminal state will get cleared from this instance.
+ * This class provides different methods to fetch segment metadata like {@link 
#remoteLogSegmentMetadata(int, long)},
+ * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, 
{@link #listAllRemoteLogSegments()}. Those
+ * methods have different semantics to fetch the segment based on its state.
+ * 
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}:
+ * 
+ * Segment in this state indicate it is not yet copied successfully. So, these 
segments will not be
+ * accessible for reads but these are considered for cleanups when a partition 
is deleted.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}:
+ *

[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607263189



##
File path: core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala
##
@@ -122,6 +122,14 @@ class ClientQuotaCache {
   entityFilters.put(entityType, entityMatch)
 }
 
+// Special case for non-strict empty filter, match everything

Review comment:
   What happens if we have an empty filter list and strict mode set?  It 
looks like an exception is thrown-- that doesn't seem correct?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12548) Invalid record error message is not getting sent to application

2021-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12548:

Fix Version/s: 3.0.0

> Invalid record error message is not getting sent to application
> ---
>
> Key: KAFKA-12548
> URL: https://issues.apache.org/jira/browse/KAFKA-12548
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0
>
>
> The ProduceResponse includes a nice record error message when we return 
> INVALID_RECORD_ERROR. Sadly this is getting discarded by the producer, so the 
> user never gets a chance to see it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12548) Invalid record error message is not getting sent to application

2021-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12548.
-
Resolution: Fixed

> Invalid record error message is not getting sent to application
> ---
>
> Key: KAFKA-12548
> URL: https://issues.apache.org/jira/browse/KAFKA-12548
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0
>
>
> The ProduceResponse includes a nice record error message when we return 
> INVALID_RECORD_ERROR. Sadly this is getting discarded by the producer, so the 
> user never gets a chance to see it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607263869



##
File path: 
core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
##
@@ -121,16 +121,16 @@ class ClientQuotaMetadataManager(private[metadata] val 
quotaManagers: QuotaManag
   return
 }
 
-// Update the cache
-quotaCache.updateQuotaCache(ipEntity, quotaRecord.key, quotaRecord.value, 
quotaRecord.remove)
-
 // Convert the value to an appropriate Option for the quota manager
 val newValue = if (quotaRecord.remove()) {
   None
 } else {
   Some(quotaRecord.value).map(_.toInt)
 }
 connectionQuotas.updateIpConnectionRateQuota(inetAddress, newValue)
+
+// Update the cache

Review comment:
   What's the purpose of moving this code?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607264162



##
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##
@@ -77,7 +77,7 @@ public String getDisplayName(int invocationIndex) {
 (BeforeTestExecutionCallback) context -> {
 KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(
 new TestKitNodes.Builder().
-setNumKip500BrokerNodes(clusterConfig.numBrokers()).
+setNumBrokerNodes(clusterConfig.numBrokers()).

Review comment:
   indentation seems a little weird here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607264162



##
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##
@@ -77,7 +77,7 @@ public String getDisplayName(int invocationIndex) {
 (BeforeTestExecutionCallback) context -> {
 KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(
 new TestKitNodes.Builder().
-setNumKip500BrokerNodes(clusterConfig.numBrokers()).
+setNumBrokerNodes(clusterConfig.numBrokers()).

Review comment:
   indentation seems a little weird here.  why would this be indented more 
than the following line?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607267726



##
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##
@@ -212,4 +214,103 @@ class RaftClusterTest {
   cluster.close()
 }
   }
+
+  @Test
+  def testClientQuotas(): Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(1).
+setNumControllerNodes(1).build()).build()
+try {
+  cluster.format()
+  cluster.startup()
+  TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == 
BrokerState.RUNNING,
+"Broker never made it to RUNNING state.")
+  val admin = Admin.create(cluster.clientProperties())
+  try {
+val entity = new ClientQuotaEntity(Map("user" -> "testkit").asJava)
+var filter = ClientQuotaFilter.containsOnly(
+  List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava)
+
+def alterThenDescribe(entity: ClientQuotaEntity,
+  quotas: Seq[ClientQuotaAlteration.Op],
+  filter: ClientQuotaFilter,
+  expectCount: Int): 
java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = {
+  val alterResult = admin.alterClientQuotas(Seq(new 
ClientQuotaAlteration(entity, quotas.asJava)).asJava)
+  try {
+alterResult.all().get()
+  } catch {
+case t: Throwable => fail("AlterClientQuotas request failed", t)
+  }
+
+  def describeOrFail(filter: ClientQuotaFilter): 
java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = {
+try {
+  admin.describeClientQuotas(filter).entities().get()
+} catch {
+  case t: Throwable => fail("DescribeClientQuotas request failed", 
t)
+}
+  }
+
+  val (describeResult, ok) = 
TestUtils.computeUntilTrue(describeOrFail(filter)) {
+results => results.getOrDefault(entity, 
java.util.Collections.emptyMap[String, java.lang.Double]()).size() == 
expectCount
+  }
+  assertTrue(ok, "Broker never saw new client quotas")
+  describeResult
+}
+
+var describeResult = alterThenDescribe(entity,
+  Seq(new ClientQuotaAlteration.Op("request_percentage", 0.99)), 
filter, 1)
+assertEquals(0.99, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+
+describeResult = alterThenDescribe(entity, Seq(
+  new ClientQuotaAlteration.Op("request_percentage", 0.97),
+  new ClientQuotaAlteration.Op("producer_byte_rate", 1),
+  new ClientQuotaAlteration.Op("consumer_byte_rate", 10001)
+), filter, 3)
+assertEquals(0.97, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+assertEquals(1.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6)
+assertEquals(10001.0, 
describeResult.get(entity).get("consumer_byte_rate"), 1e-6)
+
+describeResult = alterThenDescribe(entity, Seq(
+  new ClientQuotaAlteration.Op("request_percentage", 0.95),
+  new ClientQuotaAlteration.Op("producer_byte_rate", null),
+  new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+), filter, 1)
+assertEquals(0.95, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+
+describeResult = alterThenDescribe(entity, Seq(
+  new ClientQuotaAlteration.Op("request_percentage", null)), filter, 0)
+
+describeResult = alterThenDescribe(entity,
+  Seq(new ClientQuotaAlteration.Op("producer_byte_rate", )), 
filter, 1)
+assertEquals(.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6)
+
+// Add another quota for a different entity with same user part
+val entity2 = new ClientQuotaEntity(Map("user" -> "testkit", 
"client-id" -> "some-client").asJava)
+filter = ClientQuotaFilter.containsOnly(
+  List(
+ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+ClientQuotaFilterComponent.ofEntity("client-id", "some-client"),
+  ).asJava)
+describeResult = alterThenDescribe(entity2,
+  Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9998)), 
filter, 1)
+assertEquals(9998.0, 
describeResult.get(entity2).get("producer_byte_rate"), 1e-6)
+
+// non-strict match
+filter = ClientQuotaFilter.contains(
+  List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava)
+
+val (describeResult2, ok) = 
TestUtils.computeUntilTrue(admin.describeClientQuotas(filter).entities().get()) 
{
+  results => results.size() == 2
+}
+assertTrue(ok, "Broker never saw two client quotas")

Review comment:
   The "ok" check feels a little clunky here.  Plus if this fa

[GitHub] [kafka] hachikuji merged pull request #10445: KAFKA-12548; Propagate record error messages to application

2021-04-05 Thread GitBox


hachikuji merged pull request #10445:
URL: https://github.com/apache/kafka/pull/10445


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607270847



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
##
@@ -170,8 +170,10 @@ private void alterClientQuotaEntity(
 }
 });
 
-outputRecords.addAll(newRecords);
-outputResults.put(entity, ApiError.NONE);
+// Only add the records to outputRecords if there were no errors

Review comment:
   This is a good fix.
   
   However, I think it would be better just to return immediately after setting 
the error, rather than waiting until the end of the function.  That's 
consistent with how we handle errors at the top of this function, and in other 
manager classes.  It makes it clear that only one error can be set for each 
entity.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10468: Kafka 12373:Improve KafkaRaftClient handling of graceful shutdown

2021-04-05 Thread GitBox


jsancio commented on a change in pull request #10468:
URL: https://github.com/apache/kafka/pull/10468#discussion_r607271981



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -1673,6 +1673,69 @@ public void testLeaderGracefulShutdownTimeout() throws 
Exception {
 assertFutureThrows(shutdownFuture, TimeoutException.class);
 }
 
+@Test
+public void testLeaderGracefulShutdownOnClose() throws Exception {
+int localId = 0;
+int otherNodeId = 1;
+int lingerMs = 50;
+Set voters = Utils.mkSet(localId, otherNodeId);
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.withAppendLingerMs(lingerMs)
+.build();
+
+context.becomeLeader();
+assertEquals(OptionalInt.of(localId), context.currentLeader());
+assertEquals(1L, context.log.endOffset().offset);
+
+int epoch = context.currentEpoch();
+assertEquals(1L, context.client.scheduleAppend(epoch, 
singletonList("a")));
+
+context.client.poll();
+assertEquals(OptionalLong.of(lingerMs), 
context.messageQueue.lastPollTimeoutMs());
+
+context.time.sleep(20);
+
+// client closed now.
+context.client.close();
+
+// Flag for accepting appends should be toggled to false.
+assertFalse(context.client.canAcceptAppends());
+
+// acceptAppends flag set to false so no writes should be accepted by 
the Leader now.
+assertNull(context.client.scheduleAppend(epoch, singletonList("b")));
+
+// The leader should trigger a flush for whatever batches are present 
in the BatchAccumulator
+assertEquals(2L, context.log.endOffset().offset);
+
+// Now shutdown
+
+// We should still be running until we have had a chance to send 
EndQuorumEpoch
+assertTrue(context.client.isShuttingDown());
+assertTrue(context.client.isRunning());
+
+// Send EndQuorumEpoch request to the other voter
+context.pollUntilRequest();
+assertTrue(context.client.isShuttingDown());
+assertTrue(context.client.isRunning());
+context.assertSentEndQuorumEpochRequest(1, otherNodeId);
+
+// We should still be able to handle vote requests during graceful 
shutdown
+// in order to help the new leader get elected
+context.deliverRequest(context.voteRequest(epoch + 1, otherNodeId, 
epoch, 1L));
+context.client.poll();
+context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.empty(), true);
+

Review comment:
   Okay, thanks! I have limited time at the moment. I'll try to look at it 
this week.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


mumrah commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607274751



##
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##
@@ -77,7 +77,7 @@ public String getDisplayName(int invocationIndex) {
 (BeforeTestExecutionCallback) context -> {
 KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(
 new TestKitNodes.Builder().
-setNumKip500BrokerNodes(clusterConfig.numBrokers()).
+setNumBrokerNodes(clusterConfig.numBrokers()).

Review comment:
   Not sure, it must have auto-formatted when i renamed to 
`setNumBrokerNodes`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


mumrah commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607278548



##
File path: core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala
##
@@ -122,6 +122,14 @@ class ClientQuotaCache {
   entityFilters.put(entityType, entityMatch)
 }
 
+// Special case for non-strict empty filter, match everything

Review comment:
   It should short-circuit and return an empty map on 
[L134](https://github.com/apache/kafka/pull/10254/files#diff-99bc678b86ad25d99b32bc192428120d2ff3f3d478159e2c353f4f5346b43718R133-R135)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


mumrah commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607281769



##
File path: 
core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
##
@@ -121,16 +121,16 @@ class ClientQuotaMetadataManager(private[metadata] val 
quotaManagers: QuotaManag
   return
 }
 
-// Update the cache
-quotaCache.updateQuotaCache(ipEntity, quotaRecord.key, quotaRecord.value, 
quotaRecord.remove)
-
 // Convert the value to an appropriate Option for the quota manager
 val newValue = if (quotaRecord.remove()) {
   None
 } else {
   Some(quotaRecord.value).map(_.toInt)
 }
 connectionQuotas.updateIpConnectionRateQuota(inetAddress, newValue)
+
+// Update the cache

Review comment:
   Good question. I could swear there was a review comment suggesting this, 
but I can't seem to find it. I believe I moved this to align it with 
handleUserClientQuota where we update the underlying quota manager and then 
update the cache. I don't have a strong opinion on which thing happens first, 
but they should both probably be the same.
   
   Thinking about it more, we might want to be more defensive when calling to 
the quota managers. If the quota manager cannot be updated, I think we should 
still update the cache since that reflects the true state of the quota 
according to the metadata log. Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mageshn commented on a change in pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource

2021-04-05 Thread GitBox


mageshn commented on a change in pull request #10475:
URL: https://github.com/apache/kafka/pull/10475#discussion_r607285210



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
##
@@ -61,12 +61,24 @@ public Plugins(Map props) {
 delegatingLoader.initLoaders();
 }
 
+public Plugins(Map props, ClassLoader parent) {

Review comment:
   Is this primarily added for testing purpose?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-05 Thread GitBox


mumrah commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607290168



##
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##
@@ -212,4 +214,103 @@ class RaftClusterTest {
   cluster.close()
 }
   }
+
+  @Test
+  def testClientQuotas(): Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(1).
+setNumControllerNodes(1).build()).build()
+try {
+  cluster.format()
+  cluster.startup()
+  TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == 
BrokerState.RUNNING,
+"Broker never made it to RUNNING state.")
+  val admin = Admin.create(cluster.clientProperties())
+  try {
+val entity = new ClientQuotaEntity(Map("user" -> "testkit").asJava)
+var filter = ClientQuotaFilter.containsOnly(
+  List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava)
+
+def alterThenDescribe(entity: ClientQuotaEntity,
+  quotas: Seq[ClientQuotaAlteration.Op],
+  filter: ClientQuotaFilter,
+  expectCount: Int): 
java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = {
+  val alterResult = admin.alterClientQuotas(Seq(new 
ClientQuotaAlteration(entity, quotas.asJava)).asJava)
+  try {
+alterResult.all().get()
+  } catch {
+case t: Throwable => fail("AlterClientQuotas request failed", t)
+  }
+
+  def describeOrFail(filter: ClientQuotaFilter): 
java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = {
+try {
+  admin.describeClientQuotas(filter).entities().get()
+} catch {
+  case t: Throwable => fail("DescribeClientQuotas request failed", 
t)
+}
+  }
+
+  val (describeResult, ok) = 
TestUtils.computeUntilTrue(describeOrFail(filter)) {
+results => results.getOrDefault(entity, 
java.util.Collections.emptyMap[String, java.lang.Double]()).size() == 
expectCount
+  }
+  assertTrue(ok, "Broker never saw new client quotas")
+  describeResult
+}
+
+var describeResult = alterThenDescribe(entity,
+  Seq(new ClientQuotaAlteration.Op("request_percentage", 0.99)), 
filter, 1)
+assertEquals(0.99, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+
+describeResult = alterThenDescribe(entity, Seq(
+  new ClientQuotaAlteration.Op("request_percentage", 0.97),
+  new ClientQuotaAlteration.Op("producer_byte_rate", 1),
+  new ClientQuotaAlteration.Op("consumer_byte_rate", 10001)
+), filter, 3)
+assertEquals(0.97, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+assertEquals(1.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6)
+assertEquals(10001.0, 
describeResult.get(entity).get("consumer_byte_rate"), 1e-6)
+
+describeResult = alterThenDescribe(entity, Seq(
+  new ClientQuotaAlteration.Op("request_percentage", 0.95),
+  new ClientQuotaAlteration.Op("producer_byte_rate", null),
+  new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+), filter, 1)
+assertEquals(0.95, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+
+describeResult = alterThenDescribe(entity, Seq(
+  new ClientQuotaAlteration.Op("request_percentage", null)), filter, 0)
+
+describeResult = alterThenDescribe(entity,
+  Seq(new ClientQuotaAlteration.Op("producer_byte_rate", )), 
filter, 1)
+assertEquals(.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6)
+
+// Add another quota for a different entity with same user part
+val entity2 = new ClientQuotaEntity(Map("user" -> "testkit", 
"client-id" -> "some-client").asJava)
+filter = ClientQuotaFilter.containsOnly(
+  List(
+ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+ClientQuotaFilterComponent.ofEntity("client-id", "some-client"),
+  ).asJava)
+describeResult = alterThenDescribe(entity2,
+  Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9998)), 
filter, 1)
+assertEquals(9998.0, 
describeResult.get(entity2).get("producer_byte_rate"), 1e-6)
+
+// non-strict match
+filter = ClientQuotaFilter.contains(
+  List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava)
+
+val (describeResult2, ok) = 
TestUtils.computeUntilTrue(admin.describeClientQuotas(filter).entities().get()) 
{
+  results => results.size() == 2
+}
+assertTrue(ok, "Broker never saw two client quotas")

Review comment:
   Yea let me clean this up




-- 
This is an automated messag

[GitHub] [kafka] mumrah commented on a change in pull request #10455: MINOR: Support ExponentialBackoff without jitter

2021-04-05 Thread GitBox


mumrah commented on a change in pull request #10455:
URL: https://github.com/apache/kafka/pull/10455#discussion_r607313601



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##
@@ -47,7 +47,8 @@ public long backoff(long attempts) {
 }
 double exp = Math.min(attempts, this.expMax);
 double term = initialInterval * Math.pow(multiplier, exp);
-double randomFactor = ThreadLocalRandom.current().nextDouble(1 - 
jitter, 1 + jitter);
+double randomFactor = jitter < Double.MIN_NORMAL ? 1.0 :

Review comment:
   I had to look this constant up :) 
   
   Can we just make it check if the jitter is equal to zero (or maybe `<=` 
zero)? A caller of this method setting jitter to something like 0.5 might be 
surprised that there is no jitter added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12618) Convert LogManager (and other EasyMocks) in ReplicaManagerTest to Mockito

2021-04-05 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-12618:
--

 Summary: Convert LogManager (and other EasyMocks) in 
ReplicaManagerTest to Mockito
 Key: KAFKA-12618
 URL: https://issues.apache.org/jira/browse/KAFKA-12618
 Project: Kafka
  Issue Type: Task
Reporter: Justine Olshan


[This 
commit|https://github.com/apache/kafka/commit/40f001cc537d6ff2efa71e609c2f84c6b934994d]
 introduced changes that have Partition calling getLog when there is no topic 
ID associated to the Partition. In this case, getLog will use a default 
argument. EasyMock (a Java framework) does not play well with scala's default 
arguments. For now, we are manually creating a partition and associating it in 
the initializeLogAndTopicId method. But a better long term solution is to use 
Mockito which better supports default arguments.

It would be good to convert all EasyMocks over to mockito as well. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7438) Replace EasyMock and PowerMock with Mockito in the clients module

2021-04-05 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7438:
---
Summary: Replace EasyMock and PowerMock with Mockito in the clients module  
(was: Replace EasyMock and PowerMock with Mockito)

> Replace EasyMock and PowerMock with Mockito in the clients module
> -
>
> Key: KAFKA-7438
> URL: https://issues.apache.org/jira/browse/KAFKA-7438
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
>
> Development of EasyMock and PowerMock has stagnated while Mockito continues 
> to be actively developed. With the new Java cadence, it's a problem to depend 
> on libraries that do bytecode generation and are not actively maintained. In 
> addition, Mockito is also easier to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7438) Replace EasyMock and PowerMock with Mockito

2021-04-05 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7438:
---
Summary: Replace EasyMock and PowerMock with Mockito  (was: Replace 
EasyMock and PowerMock with Mockito in the clients module)

> Replace EasyMock and PowerMock with Mockito
> ---
>
> Key: KAFKA-7438
> URL: https://issues.apache.org/jira/browse/KAFKA-7438
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
>
> Development of EasyMock and PowerMock has stagnated while Mockito continues 
> to be actively developed. With the new Java cadence, it's a problem to depend 
> on libraries that do bytecode generation and are not actively maintained. In 
> addition, Mockito is also easier to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12618) Convert LogManager (and other EasyMocks) in ReplicaManagerTest to Mockito

2021-04-05 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315087#comment-17315087
 ] 

Ismael Juma commented on KAFKA-12618:
-

Also see https://issues.apache.org/jira/browse/KAFKA-7438

> Convert LogManager (and other EasyMocks) in ReplicaManagerTest to Mockito
> -
>
> Key: KAFKA-12618
> URL: https://issues.apache.org/jira/browse/KAFKA-12618
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Priority: Minor
>
> [This 
> commit|https://github.com/apache/kafka/commit/40f001cc537d6ff2efa71e609c2f84c6b934994d]
>  introduced changes that have Partition calling getLog when there is no topic 
> ID associated to the Partition. In this case, getLog will use a default 
> argument. EasyMock (a Java framework) does not play well with scala's default 
> arguments. For now, we are manually creating a partition and associating it 
> in the initializeLogAndTopicId method. But a better long term solution is to 
> use Mockito which better supports default arguments.
> It would be good to convert all EasyMocks over to mockito as well. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12619) Ensure LeaderChange message is committed before initializing high watermark

2021-04-05 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12619:
---

 Summary: Ensure LeaderChange message is committed before 
initializing high watermark
 Key: KAFKA-12619
 URL: https://issues.apache.org/jira/browse/KAFKA-12619
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


KIP-595 describes an extra condition on commitment here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch.
 In order to ensure that the leader's committed entries cannot get lost, it 
must commit one record from its own epoch. This guarantees that its latest 
entry is larger (in terms of epoch/offset) than any previously written record 
which ensures that any future leader must also include it. This is the purpose 
of the LeaderChange record which is written to the log as soon as the leader 
gets elected.

We have this check implemented here: 
https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122.
 However, the check needs to be a strict inequality since the epoch start 
offset does not reflect the LeaderChange record itself. In other words, the 
check is off by one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12619) Ensure LeaderChange message is committed before initializing high watermark

2021-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12619:

Description: 
KIP-595 describes an extra condition on commitment here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch.
 In order to ensure that a newly elected leader's committed entries cannot get 
lost, it must commit one record from its own epoch. This guarantees that its 
latest entry is larger (in terms of epoch/offset) than any previously written 
record which ensures that any future leader must also include it. This is the 
purpose of the LeaderChange record which is written to the log as soon as the 
leader gets elected.

We have this check implemented here: 
https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122.
 However, the check needs to be a strict inequality since the epoch start 
offset does not reflect the LeaderChange record itself. In other words, the 
check is off by one.

  was:
KIP-595 describes an extra condition on commitment here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch.
 In order to ensure that the leader's committed entries cannot get lost, it 
must commit one record from its own epoch. This guarantees that its latest 
entry is larger (in terms of epoch/offset) than any previously written record 
which ensures that any future leader must also include it. This is the purpose 
of the LeaderChange record which is written to the log as soon as the leader 
gets elected.

We have this check implemented here: 
https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122.
 However, the check needs to be a strict inequality since the epoch start 
offset does not reflect the LeaderChange record itself. In other words, the 
check is off by one.


> Ensure LeaderChange message is committed before initializing high watermark
> ---
>
> Key: KAFKA-12619
> URL: https://issues.apache.org/jira/browse/KAFKA-12619
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> KIP-595 describes an extra condition on commitment here: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch.
>  In order to ensure that a newly elected leader's committed entries cannot 
> get lost, it must commit one record from its own epoch. This guarantees that 
> its latest entry is larger (in terms of epoch/offset) than any previously 
> written record which ensures that any future leader must also include it. 
> This is the purpose of the LeaderChange record which is written to the log as 
> soon as the leader gets elected.
> We have this check implemented here: 
> https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/LeaderState.java#L122.
>  However, the check needs to be a strict inequality since the epoch start 
> offset does not reflect the LeaderChange record itself. In other words, the 
> check is off by one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10455: MINOR: Support ExponentialBackoff without jitter

2021-04-05 Thread GitBox


cmccabe commented on a change in pull request #10455:
URL: https://github.com/apache/kafka/pull/10455#discussion_r607330220



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##
@@ -47,7 +47,8 @@ public long backoff(long attempts) {
 }
 double exp = Math.min(attempts, this.expMax);
 double term = initialInterval * Math.pow(multiplier, exp);
-double randomFactor = ThreadLocalRandom.current().nextDouble(1 - 
jitter, 1 + jitter);
+double randomFactor = jitter < Double.MIN_NORMAL ? 1.0 :

Review comment:
   `MIN_NORMAL` is 2^-1022, though.  So it certainly wouldn't affect 
someone setting jitter = 0.5.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9988) Connect incorrectly logs that task has failed when one takes too long to shutdown

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9988:
-
Labels: newbie  (was: )

> Connect incorrectly logs that task has failed when one takes too long to 
> shutdown
> -
>
> Key: KAFKA-9988
> URL: https://issues.apache.org/jira/browse/KAFKA-9988
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.2.3, 2.5.0, 2.3.2, 2.4.1, 2.4.2, 
> 2.5.1
>Reporter: Sanjana Kaundinya
>Priority: Major
>  Labels: newbie
>
> If the OffsetStorageReader is closed while the task is trying to shutdown, 
> and the task is trying to access the offsets from the OffsetStorageReader, 
> then we see the following in the logs.
> {code:java}
> [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
> at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader 
> closed while attempting to read offsets. This is likely because the task was 
> been scheduled to stop but has taken longer than the graceful shutdown period 
> to do so.
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
> ... 14 more
> [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task is 
> being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> This is a bit misleading, because the task is already on its way of being 
> shutdown, and doesn't actually need manual intervention to be restarted. We 
> can see that as later on in the logs we see that it throws another 
> unrecoverable exception.
> {code:java}
> [2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=connector-18} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> If we know a task is on its way of shutting down, we should not throw a 
> ConnectException and instead log a warning so that we don't log false 
> negatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10715) Support Kafka connect converter for AVRO

2021-04-05 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315110#comment-17315110
 ] 

Randall Hauch commented on KAFKA-10715:
---

There are multiple Converter implementations outside of Kafka, and IMO there is 
no need for Kafka to own and maintain its own version of these when those other 
existing implementations can easily be used by simply installing them.

This is similar to how the Kafka project provides only example Connector 
implementations. See the [rejected alternatives of 
KIP-26|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767#KIP26AddKafkaConnectframeworkfordataimport/export-Maintainconnectorsintheprojectalongwithframework],
 which introduced the Connect framework (with Converters).

Therefore, I'm going to close this.

> Support Kafka connect converter for AVRO
> 
>
> Key: KAFKA-10715
> URL: https://issues.apache.org/jira/browse/KAFKA-10715
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ravindranath Kakarla
>Priority: Minor
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> I want to add support for Avro data format converter to Kafka Connect. Right 
> now, Kafka connect supports [JSON 
> converter|[https://github.com/apache/kafka/tree/trunk/connect].] Since, Avro 
> is a commonly used data format with Kafka, it will be great to have support 
> for it. 
>  
> Confluent Schema Registry libraries have 
> [support|https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java]
>  for it. The code seems to be pretty generic and can be used directly with 
> Kafka connect without schema registry. They are also licensed under Apache 
> 2.0.
>  
> Can they be copied to this repository and made available for all users of 
> Kafka Connect?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10715) Support Kafka connect converter for AVRO

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-10715.
---
Resolution: Won't Do

> Support Kafka connect converter for AVRO
> 
>
> Key: KAFKA-10715
> URL: https://issues.apache.org/jira/browse/KAFKA-10715
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ravindranath Kakarla
>Priority: Minor
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> I want to add support for Avro data format converter to Kafka Connect. Right 
> now, Kafka connect supports [JSON 
> converter|[https://github.com/apache/kafka/tree/trunk/connect].] Since, Avro 
> is a commonly used data format with Kafka, it will be great to have support 
> for it. 
>  
> Confluent Schema Registry libraries have 
> [support|https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java]
>  for it. The code seems to be pretty generic and can be used directly with 
> Kafka connect without schema registry. They are also licensed under Apache 
> 2.0.
>  
> Can they be copied to this repository and made available for all users of 
> Kafka Connect?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9017) We see timeout in kafka in production cluster

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9017:
-
Component/s: (was: KafkaConnect)
 core

> We see timeout in kafka in production cluster
> -
>
> Key: KAFKA-9017
> URL: https://issues.apache.org/jira/browse/KAFKA-9017
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
> Environment: Production
>Reporter: Suhas
>Priority: Critical
> Attachments: stderr (7), stdout (12)
>
>
> We see timeout in kafka in production cluster and Kafka is running on 
> DC/OS(MESOS)
> and below are the errors 
> *+Exception 1: This from application logs+*
> 2019-10-07 10:01:59 Error: java.lang.RuntimeException: 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> ie-lrx-audit-evt-3: 30030 ms has passed since batch creation plus linger time
> *+Exception 2:This from application logs+*
>  {"eventTime":"2019-10-07 08:20:43.265", "logType":"ERROR", "stackMessage" : 
> "java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> ie-lrx-audit-evt-3: 30028 ms has passed since batch creation plus linger 
> time", "stackTrace" : 
> *+Exception (from log) We see this logs on broker logs+*
> [2019-10-10 06:32:10,844] INFO [ReplicaFetcher replicaId=4, leaderId=2, 
> fetcherId=0] Error sending fetch request (sessionId=919177392, epoch=INITIAL) 
> to node 2: java.io.IOException: Connection to 2 was disconnected before the 
> response was read. (org.apache.kafka.clients.FetchSessionHandler)[2019-10-10 
> 06:32:10,844] INFO [ReplicaFetcher replicaId=4, leaderId=2, fetcherId=0] 
> Error sending fetch request (sessionId=919177392, epoch=INITIAL) to node 2: 
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read. (org.apache.kafka.clients.FetchSessionHandler)[2019-10-10 06:32:10,849] 
> WARN [ReplicaFetcher replicaId=4, leaderId=2, fetcherId=0] Error in response 
> for fetch request (type=FetchRequest, replicaId=4, maxWait=500, minBytes=1, 
> maxBytes=10485760, fetchData=\{ie-lrx-rxer-audit-evt-0=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), 
> mft-hdfs-landing-evt-1=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[108]), dca-audit-evt-2=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[108]), 
> it-sou-audit-evt-7=(offset=94819, logStartOffset=94819, maxBytes=1048576, 
> currentLeaderEpoch=Optional[100]), intg-ie-lrx-rxer-audit-evt-2=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[78]), 
> prod-pipelines-errors-evt-0=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[117]), __consumer_offsets-36=(offset=3, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), 
> panel-data-change-evt-4=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[108]), gdcp-notification-evt-2=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), 
> data-transfer-change-evt-0=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[108]), __consumer_offsets-11=(offset=15, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[108]), 
> dca-heartbeat-evt-2=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[105]), ukwhs-error-topic-1=(offset=8, 
> logStartOffset=8, maxBytes=1048576, currentLeaderEpoch=Optional[105]), 
> intg-ie-lrx-audit-evt-4=(offset=21, logStartOffset=21, maxBytes=1048576, 
> currentLeaderEpoch=Optional[74]), __consumer_offsets-16=(offset=11329814, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104]), 
> __consumer_offsets-31=(offset=3472033, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[107]), ukpai-hdfs-evt-1=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[107]), 
> mft-pflow-evt-1=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[108]), ukwhs-hdfs-landing-evt-01-2=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[105]), 
> it-sou-audit-evt-2=(offset=490084, logStartOffset=490084, maxBytes=1048576, 
> currentLeaderEpoch=Optional[105]), ie-lrx-pat-audit-evt-4=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[104])}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=919177392, 
> epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)java.io.IOException: 
> Connection to 2 was disconnected before the response was read at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java

[jira] [Resolved] (KAFKA-8961) Unable to create secure JDBC connection through Kafka Connect

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8961.
--
Resolution: Won't Fix

This is not a problem of the Connect framework, and is instead an issue with 
the connector implementation – or more likely the _installation_ of the 
connector in the user's environment.

> Unable to create secure JDBC connection through Kafka Connect
> -
>
> Key: KAFKA-8961
> URL: https://issues.apache.org/jira/browse/KAFKA-8961
> Project: Kafka
>  Issue Type: Bug
>  Components: build, clients, KafkaConnect, network
>Affects Versions: 2.2.1
>Reporter: Monika Bainsala
>Priority: Major
>
> As per below article for enabling JDBC secure connection, we can use updated 
> URL parameter while calling the create connector REST API.
> Exampl:
> jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(LOAD_BALANCE=YES)(FAILOVER=YES)(ADDRESS=(PROTOCOL=tcp)(HOST=X)(PORT=1520)))(CONNECT_DATA=(SERVICE_NAME=XXAP)));EncryptionLevel=requested;EncryptionTypes=RC4_256;DataIntegrityLevel=requested;DataIntegrityTypes=MD5"
>  
> But this approach is not working currently, kindly help in resolving this 
> issue.
>  
> Reference :
> [https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma opened a new pull request #10479: MINOR: Jenkinsfile's `post` needs `agent` to be set

2021-04-05 Thread GitBox


ijuma opened a new pull request #10479:
URL: https://github.com/apache/kafka/pull/10479


   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-8867) Kafka Connect JDBC fails to create PostgreSQL table with default boolean value in schema

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8867.
--
Resolution: Won't Fix

The reported problem is for the Confluent JDBC source/sink connector, and 
should be reported via that connector's GitHub repository issues.

> Kafka Connect JDBC fails to create PostgreSQL table with default boolean 
> value in schema
> 
>
> Key: KAFKA-8867
> URL: https://issues.apache.org/jira/browse/KAFKA-8867
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Tudor
>Priority: Major
>
> The `CREATE TABLE ..` statement generated for JDBC sink connectors when 
> configured with `auto.create: true` generates field declarations that do not 
> conform to allowed PostgreSQL syntax when considering fields of type boolean 
> with default values.
> Example record value Avro schema:
> {code:java}
> {
>   "namespace": "com.test.avro.schema.v1",
>   "type": "record",
>   "name": "SomeEvent",
>   "fields": [
> {
>   "name": "boolean_field",
>   "type": "boolean",
>   "default": false
> }
>   ]
> }
> {code}
> The connector task fails with:  
> {code:java}
> ERROR WorkerSinkTask{id=test-events-sink-0} RetriableException from SinkTask: 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:551)
> org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: 
> org.postgresql.util.PSQLException: ERROR: column "boolean_field" is of type 
> boolean but default expression is of type integer
>   Hint: You will need to rewrite or cast the expression.
>   at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748){code}
>  
> The generated SQL statement is: 
> {code:java}
> CREATE TABLE "test_data" ("boolean_field" BOOLEAN DEFAULT 0){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8664) non-JSON format messages when streaming data from Kafka to Mongo

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8664.
--
Resolution: Won't Fix

The reported problem is for a connector implementation that is not owned by the 
Apache Kafka project. Please report the issue with the provider of the 
connector.

> non-JSON format messages when streaming data from Kafka to Mongo
> 
>
> Key: KAFKA-8664
> URL: https://issues.apache.org/jira/browse/KAFKA-8664
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.0
>Reporter: Vu Le
>Priority: Major
> Attachments: MongoSinkConnector.properties, 
> log_error_when_stream_data_not_a_json_format.txt
>
>
> Hi team,
> I can stream data from Kafka to MongoDB with JSON messages. I use MongoDB 
> Kafka Connector 
> ([https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md])
> However, if I send a non-JSON format message the Connector died. Please see 
> the log file for details.
> My config file:
> {code:java}
> name=mongo-sink
> topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector
> tasks.max=1
> key.ignore=true
> # Specific global MongoDB Sink Connector configuration
> connection.uri=mongodb://localhost:27017
> database=test_kafka
> collection=transaction
> max.num.retries=3
> retries.defer.timeout=5000
> type.name=kafka-connect
> key.converter=org.apache.kafka.connect.json.JsonConverter
> key.converter.schemas.enable=false
> value.converter=org.apache.kafka.connect.json.JsonConverter
> value.converter.schemas.enable=false
> {code}
> I have 2 separated questions:  
>  # how to ignore the message which is non-json format?
>  # how to defined a default-key for this kind of message (for example: abc -> 
> \{ "non-json": "abc" } )
> Thanks



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8551) Comments for connectors() in Herder interface

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8551.
--
Resolution: Won't Fix

Marking as won't fix, since the details are insufficient to try to address.

> Comments for connectors() in Herder interface 
> --
>
> Key: KAFKA-8551
> URL: https://issues.apache.org/jira/browse/KAFKA-8551
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.1
>Reporter: Luying Liu
>Priority: Major
>
> There are mistakes in the comments for connectors() in Herder interface.  The 
> mistakes are in the  file 
> [kafka|https://github.com/apache/kafka]/[connect|https://github.com/apache/kafka/tree/trunk/connect]/[runtime|https://github.com/apache/kafka/tree/trunk/connect/runtime]/[src|https://github.com/apache/kafka/tree/trunk/connect/runtime/src]/[main|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main]/[java|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java]/[org|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org]/[apache|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache]/[kafka|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka]/[connect|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka/connect]/[runtime|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime]/*Herder.java.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6985) Error connection between cluster node

2021-04-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6985:
-
Component/s: (was: KafkaConnect)
 core

> Error connection between cluster node
> -
>
> Key: KAFKA-6985
> URL: https://issues.apache.org/jira/browse/KAFKA-6985
> Project: Kafka
>  Issue Type: Bug
>  Components: core
> Environment: Centos-7
>Reporter: Ranjeet Ranjan
>Priority: Major
>
> Hi Have setup multi-node Kafka cluster but getting an error while connecting 
> one node to another although there is an issue with firewall or port. I am 
> able to telnet 
> WARN [ReplicaFetcherThread-0-1], Error in fetch 
> Kafka.server.ReplicaFetcherThread$FetchRequest@8395951 
> (Kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to Kafka-1:9092 (id: 1 rack: null) failed
>  
> {code:java}
>  
> at 
> kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
> at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
> at 
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
> at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234)
> at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> {code}
> Here you go server.properties
> Node:1
>  
> {code:java}
> # Server Basics #
> # The id of the broker. This must be set to a unique integer for each broker.
> broker.id=1
> # Switch to enable topic deletion or not, default value is false
> delete.topic.enable=true
> # Socket Server Settings 
> #
> listeners=PLAINTEXT://kafka-1:9092
> advertised.listeners=PLAINTEXT://kafka-1:9092
> #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> # The number of threads handling network requests
> num.network.threads=3
> # The number of threads doing disk I/O
> num.io.threads=8
> # The send buffer (SO_SNDBUF) used by the socket server
> socket.send.buffer.bytes=102400
> # The receive buffer (SO_RCVBUF) used by the socket server
> socket.receive.buffer.bytes=102400
> # The maximum size of a request that the socket server will accept 
> (protection against OOM)
> socket.request.max.bytes=104857600
> # Log Basics #
> # A comma seperated list of directories under which to store log files
> log.dirs=/var/log/kafka
> # The default number of log partitions per topic. More partitions allow 
> greater
> # parallelism for consumption, but this will also result in more files across
> # the brokers.
> num.partitions=1
> # The number of threads per data directory to be used for log recovery at 
> startup and flushing at shutdown.
> # This value is recommended to be increased for installations with data dirs 
> located in RAID array.
> num.recovery.threads.per.data.dir=1
> # Log Retention Policy 
> #
> # The minimum age of a log file to be eligible for deletion due to age
> log.retention.hours=48
> # A size-based retention policy for logs. Segments are pruned from the log as 
> long as the remaining
> # segments don't drop below log.retention.bytes. Functions independently of 
> log.retention.hours.
> log.retention.bytes=1073741824
> # The maximum size of a log segment file. When this size is reached a new log 
> segment will be created.
> log.segment.bytes=1073741824
> # The interval at which log segments are checked to see if they can be 
> deleted according
> # to the retention policies
> log.retention.check.interval.ms=30
> # Zookeeper #
> # root directory for all kafka znodes.
> zookeeper.connect=10.130.82.28:2181
> # Timeout in ms for connecting to zookeeper
> zookeeper.connection.timeout.ms=6000
> {code}
>  
>  
> Node-2
> {code:java}
> # Server Basics #
> # The id of the broker. This must be set to a unique integer for each broker.
> broker.id=2
> # Switch to enable topic deletion or not, default value is false
> delete.topic.enable=true
> # Socket Server Settings 
> #
> listeners=PLAINTEXT://kafka-2:9092
> advertised.listeners=PLAINTEXT://kafka-2:9092
> #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> # Th

[GitHub] [kafka] dielhennr opened a new pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-05 Thread GitBox


dielhennr opened a new pull request #10480:
URL: https://github.com/apache/kafka/pull/10480


   The KafkaRaftClient has a field for the BatchAccumulator that is only used 
and set when it is the leader. In other cases, leader specific information was 
stored in LeaderState. In a recent change EpochState, which LeaderState 
implements, was changed to be a Closable. QuorumState makes sure to always 
close the previous state before transitioning to the next state. This redesign 
was used to move the BatchAccumulator to the LeaderState and simplify some of 
the handling in KafkaRaftClient.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rsomu commented on pull request #4040: KAFKA-6324: Change LogSegment.delete to deleteIfExists and harden log recovery

2021-04-05 Thread GitBox


rsomu commented on pull request #4040:
URL: https://github.com/apache/kafka/pull/4040#issuecomment-813671433


   Is this fix supposed to avoid the[ NFS silly 
renames](https://sbg.technology/2018/07/10/kafka-nfs/) issue?  I am currently 
testing Kafka 2.7 on NFS filesystem and encountered the same error message when 
deleting a topic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10474: KAFKA-12602: Fix LICENSE file

2021-04-05 Thread GitBox


ableegoldman commented on a change in pull request #10474:
URL: https://github.com/apache/kafka/pull/10474#discussion_r607355497



##
File path: licenses/DWTFYWTPL
##
@@ -0,0 +1,14 @@
+DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE

Review comment:
   😂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-05 Thread GitBox


jsancio commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r607353807



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -48,13 +49,16 @@
 private final Set grantingVoters = new HashSet<>();
 private final Logger log;
 
+private final BatchAccumulator accumulator;
+
 protected LeaderState(
 int localId,
 int epoch,
 long epochStartOffset,
 Set voters,
 Set grantingVoters,
-LogContext logContext
+LogContext logContext,
+BatchAccumulator accumulator

Review comment:
   I would keep `LogContext` as the last argument.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2252,9 +2247,12 @@ public Long scheduleAtomicAppend(int epoch, List 
records) {
 return append(epoch, records, true);
 }
 
+@SuppressWarnings("unchecked")
 private Long append(int epoch, List records, boolean isAtomic) {
-BatchAccumulator accumulator = this.accumulator;
-if (accumulator == null) {
+BatchAccumulator accumulator;
+try {
+accumulator =  (BatchAccumulator) 
quorum.leaderStateOrThrow().accumulator();

Review comment:
   I think you should be able to remove this cast.

##
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##
@@ -36,30 +36,31 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class LeaderStateTest {
+public class LeaderStateTest {
 private final int localId = 0;
 private final int epoch = 5;
 private final LogContext logContext = new LogContext();
 
-private LeaderState newLeaderState(
+private LeaderState newLeaderState(
 Set voters,
 long epochStartOffset
 ) {
-return new LeaderState(
+return new LeaderState<>(
 localId,
 epoch,
 epochStartOffset,
 voters,
 voters,
-logContext
+logContext,
+null

Review comment:
   I would not pass a `null` and add a test checking that this field is 
returned correctly.

##
File path: raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
##
@@ -269,7 +269,7 @@ public void testCandidateToLeader() throws IOException {
 assertTrue(state.isCandidate());
 assertEquals(1, state.epoch());
 
-state.transitionToLeader(0L);
+state.transitionToLeader(0L, null);

Review comment:
   Again, the code in `KafkaRaftClient` assumes that this field cannot be 
`null`.

##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -319,6 +328,10 @@ public String name() {
 }
 
 @Override
-public void close() {}
+public void close() {
+if (accumulator != null) {

Review comment:
   When would accumulator be null?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #10481: KAFKA-12619; Raft leader should expose hw only after committing LeaderChange

2021-04-05 Thread GitBox


hachikuji opened a new pull request #10481:
URL: https://github.com/apache/kafka/pull/10481


   KIP-595 describes an extra condition on commitment here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch.
 In order to ensure that a newly elected leader's committed entries cannot get 
lost, it must commit one record from its own epoch. This guarantees that its 
latest entry is larger (in terms of epoch/offset) than any previously written 
record which ensures that any future leader must also include it. This is the 
purpose of the `LeaderChange` record which is written to the log as soon as the 
leader gets elected.
   
   Although we had this check implemented, it was off by one. We only ensured 
that replication reached the epoch start offset, which does not reflect the 
appended `LeaderChange` record. This patch fixes the check and clarifies the 
point of the check. The rest of the patch is just fixing up test cases.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10481: KAFKA-12619; Raft leader should expose hw only after committing LeaderChange

2021-04-05 Thread GitBox


guozhangwang commented on pull request #10481:
URL: https://github.com/apache/kafka/pull/10481#issuecomment-813683249


   Nice catch! Regarding the fix, WDYT to just remember the `LeaderChange` 
record's offset and compare against it instead of the epoch start offset? I'm 
thinking if in the future there are any scenarios that can fill in some more 
records between these two, we are still immune to any future bugs. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ewencp commented on a change in pull request #10474: KAFKA-12602: Fix LICENSE file

2021-04-05 Thread GitBox


ewencp commented on a change in pull request #10474:
URL: https://github.com/apache/kafka/pull/10474#discussion_r607362884



##
File path: LICENSE-binary
##
@@ -0,0 +1,602 @@
+
+ Apache License
+   Version 2.0, January 2004
+http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+  "License" shall mean the terms and conditions for use, reproduction,
+  and distribution as defined by Sections 1 through 9 of this document.
+
+  "Licensor" shall mean the copyright owner or entity authorized by
+  the copyright owner that is granting the License.
+
+  "Legal Entity" shall mean the union of the acting entity and all
+  other entities that control, are controlled by, or are under common
+  control with that entity. For the purposes of this definition,
+  "control" means (i) the power, direct or indirect, to cause the
+  direction or management of such entity, whether by contract or
+  otherwise, or (ii) ownership of fifty percent (50%) or more of the
+  outstanding shares, or (iii) beneficial ownership of such entity.
+
+  "You" (or "Your") shall mean an individual or Legal Entity
+  exercising permissions granted by this License.
+
+  "Source" form shall mean the preferred form for making modifications,
+  including but not limited to software source code, documentation
+  source, and configuration files.
+
+  "Object" form shall mean any form resulting from mechanical
+  transformation or translation of a Source form, including but
+  not limited to compiled object code, generated documentation,
+  and conversions to other media types.
+
+  "Work" shall mean the work of authorship, whether in Source or
+  Object form, made available under the License, as indicated by a
+  copyright notice that is included in or attached to the work
+  (an example is provided in the Appendix below).
+
+  "Derivative Works" shall mean any work, whether in Source or Object
+  form, that is based on (or derived from) the Work and for which the
+  editorial revisions, annotations, elaborations, or other modifications
+  represent, as a whole, an original work of authorship. For the purposes
+  of this License, Derivative Works shall not include works that remain
+  separable from, or merely link (or bind by name) to the interfaces of,
+  the Work and Derivative Works thereof.
+
+  "Contribution" shall mean any work of authorship, including
+  the original version of the Work and any modifications or additions
+  to that Work or Derivative Works thereof, that is intentionally
+  submitted to Licensor for inclusion in the Work by the copyright owner
+  or by an individual or Legal Entity authorized to submit on behalf of
+  the copyright owner. For the purposes of this definition, "submitted"
+  means any form of electronic, verbal, or written communication sent
+  to the Licensor or its representatives, including but not limited to
+  communication on electronic mailing lists, source code control systems,
+  and issue tracking systems that are managed by, or on behalf of, the
+  Licensor for the purpose of discussing and improving the Work, but
+  excluding communication that is conspicuously marked or otherwise
+  designated in writing by the copyright owner as "Not a Contribution."
+
+  "Contributor" shall mean Licensor and any individual or Legal Entity
+  on behalf of whom a Contribution has been received by Licensor and
+  subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+  this License, each Contributor hereby grants to You a perpetual,
+  worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+  copyright license to reproduce, prepare Derivative Works of,
+  publicly display, publicly perform, sublicense, and distribute the
+  Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+  this License, each Contributor hereby grants to You a perpetual,
+  worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+  (except as stated in this section) patent license to make, have made,
+  use, offer to sell, sell, import, and otherwise transfer the Work,
+  where such license applies only to those patent claims licensable
+  by such Contributor that are necessarily infringed by their
+  Contribution(s) alone or by combination of their Contribution(s)
+  with the Work to which such Contribution(s) was submitted. If You
+  institute patent litigation against any entity (including a
+  cross-claim or counterclaim in a lawsuit) alleging that the Work
+  or a Contribution

[GitHub] [kafka] hachikuji commented on pull request #10481: KAFKA-12619; Raft leader should expose hw only after committing LeaderChange

2021-04-05 Thread GitBox


hachikuji commented on pull request #10481:
URL: https://github.com/apache/kafka/pull/10481#issuecomment-813686724


   @guozhangwang Thanks for the quick comment. I did consider that. I can't say 
I had a particularly strong reason to reject it, but ultimately I convinced 
myself that modifying the existing check was good enough and probably simpler. 
The only invariant that we need to protect is that all records appended in the 
leader's epoch actually carry the right epoch tag. The refactor proposed here 
may even give us a stronger way to enforce the invariant: 
https://github.com/apache/kafka/pull/10480/files. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up

2021-04-05 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315140#comment-17315140
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12492:


The kafka-site repo is what the actual, live docs are built from. That's why 
there are separate folders like 27, 26, etc -- these correspond to the docs for 
versions 2.7 and 2.6, and so on. You only need to submit a PR to the kafka-site 
repo if you want your change to show up immediately -- if you don't mind 
waiting for the next release, you can just open a PR to fix the docs in the 
kafka repo directly. Then, these will be copied over to the kafka-site repo and 
made live when the next version is released.

Obviously it would be ideal if we could fix this in all versions, but it's 
probably sufficient to just fix it going forward. The 2.8 release is actually 
going on at the moment, so I would recommend submitting a PR to the kafka repo 
for now. If we can get it merged before 2.8 is released, then we're good -- 
otherwise you can open a followup PR with the same fix in just the 28 
subdirectory of the kafka-site repo.

I'm not sure why you're getting a 403, I was able to setup a local apache 
server to test some docs but that was a while ago. Since it's just a fix of an 
existing formatting error, I wouldn't worry about testing it too much. As long 
as you can figure out why the formatting was messed up to begin with, and feel 
reasonably confident in your fix, then that's good enough. Remember, once the 
fix is in kafka-site it'll be live so you can just see what it looks like then. 
If something is still off, you can always submit a followup PR to fix it right 
away in kafka-site

> Formatting of example RocksDBConfigSetter is messed up
> --
>
> Key: KAFKA-12492
> URL: https://issues.apache.org/jira/browse/KAFKA-12492
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Trivial
>  Labels: docs, newbie
>
> See the example implementation class CustomRocksDBConfig in the docs for the 
> rocksdb.config.setter
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12607) Allow votes to be granted in resigned state

2021-04-05 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12607:
--
Labels: newbie++  (was: )

> Allow votes to be granted in resigned state
> ---
>
> Key: KAFKA-12607
> URL: https://issues.apache.org/jira/browse/KAFKA-12607
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: dengziming
>Priority: Major
>  Labels: newbie++
>
> When the leader is shutting down, it transitions to a resigned state. 
> Currently all votes are rejected in this state, but we should allow the 
> resigned leader to help a candidate get elected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

2021-04-05 Thread GitBox


hachikuji commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r607374397



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -905,19 +907,33 @@ class GroupCoordinator(val brokerId: Int,
*
* @param offsetTopicPartitionId The partition we are now leading
*/
-  def onElection(offsetTopicPartitionId: Int): Unit = {
-info(s"Elected as the group coordinator for partition 
$offsetTopicPartitionId")
-groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, 
onGroupLoaded)
+  def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId)
+if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) {
+  info(s"Elected as the group coordinator for partition 
$offsetTopicPartitionId in epoch $coordinatorEpoch")
+  groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, 
onGroupLoaded)
+  epochForPartitionId.put(offsetTopicPartitionId, coordinatorEpoch)
+} else {
+  warn(s"Ignored election as group coordinator for partition 
$offsetTopicPartitionId " +
+s"in epoch $coordinatorEpoch since current epoch is $currentEpoch")
+}
   }
 
   /**
* Unload cached state for the given partition and stop handling requests 
for groups which map to it.
*
* @param offsetTopicPartitionId The partition we are no longer leading
*/
-  def onResignation(offsetTopicPartitionId: Int): Unit = {
-info(s"Resigned as the group coordinator for partition 
$offsetTopicPartitionId")
-groupManager.removeGroupsForPartition(offsetTopicPartitionId, 
onGroupUnloaded)
+  def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: 
Option[Int]): Unit = {
+val currentEpoch = epochForPartitionId.get(offsetTopicPartitionId)
+if (currentEpoch.forall(currentEpoch => currentEpoch <= 
coordinatorEpoch.getOrElse(Int.MaxValue))) {
+  info(s"Resigned as the group coordinator for partition 
$offsetTopicPartitionId in epoch $coordinatorEpoch")
+  groupManager.removeGroupsForPartition(offsetTopicPartitionId, 
onGroupUnloaded)
+  epochForPartitionId.remove(offsetTopicPartitionId)

Review comment:
   Hmm.. Why remove the epoch after resignation? It seems like it would be 
useful to keep tracking it. Maybe it's useful to distinguish the case where the 
replica is to be deleted?

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -87,6 +87,8 @@ class GroupCoordinator(val brokerId: Int,
 
   private val isActive = new AtomicBoolean(false)
 
+  val epochForPartitionId = mutable.Map[Int, Int]()

Review comment:
   Does this need to be a concurrent collection? It does not look like we 
can count on a lock protecting `onElection` and `onResignation`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha

2021-04-05 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315142#comment-17315142
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12574:


> This is well documented in 
> https://kafka.apache.org/27/documentation/streams/upgrade-guide

Ah, you're right. I must have been on an older version of the docs -- I find 
Google often takes me to 2.4 docs for some reason. We need better SEO :/ (I 
filed a ticket for this already a while back)

Also not sure how I forgot about EosBetaUpgradeIntegrationTest after spending 
so much time trying to help fix it -- sorry for doubting our test coverage and 
docs.

Regarding the proposal, I thin users would find it very odd if we moved on to 
eos-v2 and then suddenly deprecated it and went back to just "eos" -- makes it 
seem like there was a problem with eos-v2. I would be fine with just staying on 
eos-v2 though. For one thing it leaves the door open to further developments in 
eos that need to be gated by a config, eg eos-v3, if we ever have need for that 
again. 


> Deprecate eos-alpha
> ---
>
> Key: KAFKA-12574
> URL: https://issues.apache.org/jira/browse/KAFKA-12574
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> In KIP-447 we introduced a new thread-producer which is capable of 
> exactly-once semantics across multiple tasks. The new mode of EOS, called 
> eos-beta, is intended to eventually be the preferred processing mode for EOS 
> as it improves the performance and scaling of partitions/tasks. The only 
> downside is that it requires brokers to be on version 2.5+ in order to 
> understand the latest APIs that are necessary for this thread-producer.
> We should consider deprecating the eos-alpha config, ie 
> StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate 
> to the new-and-improved processing mode, and upgrade their brokers if 
> necessary.
> Eventually we would like to be able to remove the eos-alpha code paths from 
> Streams as this will help to simplify the logic and reduce the processing 
> mode branching. But since this will break client-broker compatibility, and 
> 2.5 is still a relatively recent version, we probably can't actually remove 
> eos-alpha in the near future



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha

2021-04-05 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315144#comment-17315144
 ] 

Ismael Juma commented on KAFKA-12574:
-

{quote}Regarding the proposal, I thin users would find it very odd if we moved 
on to eos-v2 and then suddenly deprecated it and went back to just "eos" – 
makes it seem like there was a problem with eos-v2. I would be fine with just 
staying on eos-v2 though. For one thing it leaves the door open to further 
developments in eos that need to be gated by a config, eg eos-v3, if we ever 
have need for that again.
{quote}
+1

> Deprecate eos-alpha
> ---
>
> Key: KAFKA-12574
> URL: https://issues.apache.org/jira/browse/KAFKA-12574
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> In KIP-447 we introduced a new thread-producer which is capable of 
> exactly-once semantics across multiple tasks. The new mode of EOS, called 
> eos-beta, is intended to eventually be the preferred processing mode for EOS 
> as it improves the performance and scaling of partitions/tasks. The only 
> downside is that it requires brokers to be on version 2.5+ in order to 
> understand the latest APIs that are necessary for this thread-producer.
> We should consider deprecating the eos-alpha config, ie 
> StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate 
> to the new-and-improved processing mode, and upgrade their brokers if 
> necessary.
> Eventually we would like to be able to remove the eos-alpha code paths from 
> Streams as this will help to simplify the logic and reduce the processing 
> mode branching. But since this will break client-broker compatibility, and 
> 2.5 is still a relatively recent version, we probably can't actually remove 
> eos-alpha in the near future



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10142: KAFKA-12294: forward auto topic request within envelope on behalf of clients

2021-04-05 Thread GitBox


hachikuji merged pull request #10142:
URL: https://github.com/apache/kafka/pull/10142


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12294) Consider using the forwarding mechanism for metadata auto topic creation

2021-04-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12294.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

> Consider using the forwarding mechanism for metadata auto topic creation
> 
>
> Key: KAFKA-12294
> URL: https://issues.apache.org/jira/browse/KAFKA-12294
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Boyang Chen
>Priority: Major
> Fix For: 3.0.0
>
>
> Once [https://github.com/apache/kafka/pull/9579] is merged, there is a way to 
> improve the topic creation auditing by forwarding the CreateTopicsRequest 
> inside Envelope for the given client. Details in 
> [here|https://github.com/apache/kafka/pull/9579#issuecomment-772283780]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-05 Thread GitBox


hachikuji commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r607380961



##
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##
@@ -36,30 +36,31 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class LeaderStateTest {
+public class LeaderStateTest {
 private final int localId = 0;
 private final int epoch = 5;
 private final LogContext logContext = new LogContext();
 
-private LeaderState newLeaderState(
+private LeaderState newLeaderState(
 Set voters,
 long epochStartOffset
 ) {
-return new LeaderState(
+return new LeaderState<>(
 localId,
 epoch,
 epochStartOffset,
 voters,
 voters,
-logContext
+logContext,
+null

Review comment:
   Yeah, let's add an explicit `requireNonNull` in the constructor. Here we 
could potentially use a mock.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-05 Thread GitBox


hachikuji commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r607381875



##
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##
@@ -36,30 +36,31 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class LeaderStateTest {
+public class LeaderStateTest {

Review comment:
   I confess it's a little annoying to see the generic type leak down to 
here, but I guess that's the price we have to pay. There's a big part of me 
that wants to remove the generic and let `ApiMessageAndVersion` be the only 
supported type. Anyway, that is fuel for a separate issue/discussion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-05 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r607383662



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2252,9 +2247,12 @@ public Long scheduleAtomicAppend(int epoch, List 
records) {
 return append(epoch, records, true);
 }
 
+@SuppressWarnings("unchecked")
 private Long append(int epoch, List records, boolean isAtomic) {
-BatchAccumulator accumulator = this.accumulator;
-if (accumulator == null) {
+BatchAccumulator accumulator;
+try {
+accumulator =  (BatchAccumulator) 
quorum.leaderStateOrThrow().accumulator();

Review comment:
   Removing the cast causes this...
   `Type Mismatch: Cannot convert from BatchAccumulator to 
BatchAccumulator `
   
   even though this is the signature of accumulator()
   `public BatchAccumulator accumulator()`
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-05 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r607386832



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -319,6 +328,10 @@ public String name() {
 }
 
 @Override
-public void close() {}
+public void close() {
+if (accumulator != null) {

Review comment:
   QuorumState tests where I was passing in null for the accumulator to get 
the code to compile. I'll see about using a mock.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >