[GitHub] [kafka] showuon commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

2021-11-15 Thread GitBox


showuon commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r749936830



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
 throttledSockets += DelayedCloseSocket(socketChannel, 
endThrottleTimeMs)
 None
+  case e: IOException =>
+info(s"Encounter IOException", e)

Review comment:
   nit: `Encountered IOException, closing connection.`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

2021-11-15 Thread GitBox


showuon commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r749936830



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
 throttledSockets += DelayedCloseSocket(socketChannel, 
endThrottleTimeMs)
 None
+  case e: IOException =>
+info(s"Encounter IOException", e)

Review comment:
   nit: `Encountered IOException. Closing connection.`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11490: KAFKA-13439: Deprecate eager rebalance protocol in kafka stream

2021-11-15 Thread GitBox


ableegoldman commented on a change in pull request #11490:
URL: https://github.com/apache/kafka/pull/11490#discussion_r749923117



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##
@@ -105,13 +105,15 @@ public RebalanceProtocol rebalanceProtocol() {
 case StreamsConfig.UPGRADE_FROM_21:
 case StreamsConfig.UPGRADE_FROM_22:
 case StreamsConfig.UPGRADE_FROM_23:
-log.info("Eager rebalancing enabled now for upgrade from 
{}.x", upgradeFrom);
+log.info("Eager rebalancing protocol is enabled now for 
upgrade from {}.x", upgradeFrom);
+log.warn("Eagar rebalancing protocol is deprecated in 
Kafka Stream since V3.1, and will not get supported" +
+"in future release. Please remove the 'upgrade.from' 
setting soon.");

Review comment:
   ```suggestion
   log.warn("The eager rebalancing protocol is deprecated 
and will stop being supported in a future release. " +
   "Please be prepared to remove the 'upgrade.from' 
config soon.");
   ```

##
File path: docs/upgrade.html
##
@@ -26,6 +26,8 @@ Notable changes in 3
 and iotime-total. Please use 
bufferpool-wait-time-ns-total, io-wait-time-ns-total,
 and io-time-ns-total instead. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-773%3A+Differentiate+consistently+metric+latency+measured+in+millis+and+nanos;>KIP-773
 for more details.
+Eagar rebalancing protocol is deprecated in Kafka Stream since V3.1, 
and will not get supported in future release.
+Please remove the upgrade.from setting soon.

Review comment:
   I think we should be a little more specific about how this does and 
doesn't affect users, and a little less specific about the deprecation -- if we 
say this is deprecated since 3.1 then we're more likely to have to commit to 
waiting for the full usual deprecation period AND the next major version bump. 
But technically, continuing to support EAGER was always intended to be 
temporary ever since COOPERATIVE was introduced, we just needed to give users 
an upgrade path and also initially a way to disable COOPERATIVE if serious bugs 
were found after it was first rolled out. 
   
   I think we should instead position 3.1 as (close to, if not exactly) the 
_last_ version in which EAGER will be available, not the first version in which 
it's deprecated. Kind of a fine line, but maybe something like
   ```suggestion
   The cooperative rebalancing protocol has been the default since 2.4, 
but we have continued to support the eager rebalancing protocol to provide 
users an upgrade path. This support will be dropped in a future release, so any 
users still on the eager protocol should prepare to finish upgrading their 
applications to the cooperative protocol in version 3.1. This only affects 
users who are still on a version older than 2.4, and users who have upgraded 
already but have not yet removed the upgrade.from config that they 
set when upgrading from a version below 2.4. Users fitting into the latter case 
will simply need to unset this config when upgrading beyond 3.1, while users in 
the former will need to follow a slightly different upgrade path if they 
attempt to upgrade from 2.3 or below to a version beyond 3.1. Those 
applications will need to go through a bridge release, by first upgrading to a 
version between 2.4 - 3.1 and setting the upgrade.from config, 
 then removing that config and upgrading to the final version above 3.1. 
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11505: KAFKA-13449: Comment optimization for parameter log.cleaner.delete.retention.ms

2021-11-15 Thread GitBox


RivenSun2 commented on pull request #11505:
URL: https://github.com/apache/kafka/pull/11505#issuecomment-969861680


   @showuon 
   Sure, luke.
   please help to review changes when available.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13406) Cooperative sticky assignor got stuck due to assignment validation failed

2021-11-15 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13406:
---
Fix Version/s: 2.8.2

> Cooperative sticky assignor got stuck due to assignment validation failed
> -
>
> Key: KAFKA-13406
> URL: https://issues.apache.org/jira/browse/KAFKA-13406
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.1.0, 3.0.1, 2.8.2
>
>
> We'll do validateCooperativeAssignment for cooperative assignor, where we 
> validate if there are previously owned partitions directly transfer to other 
> consumers without "revoke" step. However, the "ownedPartition" in 
> subscription might contain out-of-dated data, which might cause the 
> validation always failure.
> We should consider the short-term fix it by disabling 
> validateCooperationAssignment for built-in cooperativeStickyAssignor because 
> we've already consider the generation in the assignor, and discard the old 
> generation ownedPartition if any.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ableegoldman commented on pull request #11439: KAFKA-13406: skip assignment validation for built-in cooperativeStickyAssignor

2021-11-15 Thread GitBox


ableegoldman commented on pull request #11439:
URL: https://github.com/apache/kafka/pull/11439#issuecomment-969761211


   Merged to trunk and ported back to 3.0 and 2.8, just waiting for David's 
approval before cherrypicking to the 3.1 branch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13406) Cooperative sticky assignor got stuck due to assignment validation failed

2021-11-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444245#comment-17444245
 ] 

Luke Chen commented on KAFKA-13406:
---

[~ableegoldman] , thank you very much! :)

> Cooperative sticky assignor got stuck due to assignment validation failed
> -
>
> Key: KAFKA-13406
> URL: https://issues.apache.org/jira/browse/KAFKA-13406
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.1.0, 3.0.1
>
>
> We'll do validateCooperativeAssignment for cooperative assignor, where we 
> validate if there are previously owned partitions directly transfer to other 
> consumers without "revoke" step. However, the "ownedPartition" in 
> subscription might contain out-of-dated data, which might cause the 
> validation always failure.
> We should consider the short-term fix it by disabling 
> validateCooperationAssignment for built-in cooperativeStickyAssignor because 
> we've already consider the generation in the assignor, and discard the old 
> generation ownedPartition if any.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13406) Cooperative sticky assignor got stuck due to assignment validation failed

2021-11-15 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13406:
---
Fix Version/s: 3.1.0
   (was: 3.2.0)

> Cooperative sticky assignor got stuck due to assignment validation failed
> -
>
> Key: KAFKA-13406
> URL: https://issues.apache.org/jira/browse/KAFKA-13406
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.1.0, 3.0.1
>
>
> We'll do validateCooperativeAssignment for cooperative assignor, where we 
> validate if there are previously owned partitions directly transfer to other 
> consumers without "revoke" step. However, the "ownedPartition" in 
> subscription might contain out-of-dated data, which might cause the 
> validation always failure.
> We should consider the short-term fix it by disabling 
> validateCooperationAssignment for built-in cooperativeStickyAssignor because 
> we've already consider the generation in the assignor, and discard the old 
> generation ownedPartition if any.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13406) Cooperative sticky assignor got stuck due to assignment validation failed

2021-11-15 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444241#comment-17444241
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13406:


Yes, we should definitely get this fix into 3.1. I'll also try to port it back 
to earlier branches if the cherrypick is smooth

> Cooperative sticky assignor got stuck due to assignment validation failed
> -
>
> Key: KAFKA-13406
> URL: https://issues.apache.org/jira/browse/KAFKA-13406
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.0.1, 3.2.0
>
>
> We'll do validateCooperativeAssignment for cooperative assignor, where we 
> validate if there are previously owned partitions directly transfer to other 
> consumers without "revoke" step. However, the "ownedPartition" in 
> subscription might contain out-of-dated data, which might cause the 
> validation always failure.
> We should consider the short-term fix it by disabling 
> validateCooperationAssignment for built-in cooperativeStickyAssignor because 
> we've already consider the generation in the assignor, and discard the old 
> generation ownedPartition if any.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ableegoldman merged pull request #11439: KAFKA-13406: skip assignment validation for built-in cooperativeStickyAssignor

2021-11-15 Thread GitBox


ableegoldman merged pull request #11439:
URL: https://github.com/apache/kafka/pull/11439


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11439: KAFKA-13406: skip assignment validation for built-in cooperativeStickyAssignor

2021-11-15 Thread GitBox


ableegoldman commented on pull request #11439:
URL: https://github.com/apache/kafka/pull/11439#issuecomment-969744052


   Failure is unrelated, merging to trunk. Will also need to cherrypick back to 
3.1 cc @dajac 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13406) Cooperative sticky assignor got stuck due to assignment validation failed

2021-11-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444229#comment-17444229
 ] 

Luke Chen commented on KAFKA-13406:
---

[~ableegoldman] , I think this issue should fix into v3.1.0. Otherwise, user 
might get stuck during rebalance again. What do you think?

> Cooperative sticky assignor got stuck due to assignment validation failed
> -
>
> Key: KAFKA-13406
> URL: https://issues.apache.org/jira/browse/KAFKA-13406
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.0.1, 3.2.0
>
>
> We'll do validateCooperativeAssignment for cooperative assignor, where we 
> validate if there are previously owned partitions directly transfer to other 
> consumers without "revoke" step. However, the "ownedPartition" in 
> subscription might contain out-of-dated data, which might cause the 
> validation always failure.
> We should consider the short-term fix it by disabling 
> validateCooperationAssignment for built-in cooperativeStickyAssignor because 
> we've already consider the generation in the assignor, and discard the old 
> generation ownedPartition if any.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-11-15 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444226#comment-17444226
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13128:


[~wcarlson5]  unassigning you in case someone else wants to pick this up and 
fix the remaining source(s?) of failure

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.1, 3.0.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.2.0
>
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-11-15 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13128:
---
Fix Version/s: 3.2.0
   (was: 3.1.0)

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.1, 3.0.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.2.0
>
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-11-15 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13128:
---
Priority: Critical  (was: Blocker)

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.1, 3.0.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.1.0
>
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-11-15 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-13128:
--

Assignee: (was: Walker Carlson)

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.1, 3.0.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.1.0
>
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13449) Comment optimization for parameter log.cleaner.delete.retention.ms

2021-11-15 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444223#comment-17444223
 ] 

RivenSun commented on KAFKA-13449:
--

Hi [~showuon] 
please help review this PR linked in 'Issue Links' when available.
Thanks.

> Comment optimization for parameter log.cleaner.delete.retention.ms 
> ---
>
> Key: KAFKA-13449
> URL: https://issues.apache.org/jira/browse/KAFKA-13449
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>
> You can view the comment of this parameter from Kafka's official website.
> https://kafka.apache.org/documentation/#brokerconfigs_log.cleaner.delete.retention.ms
> {code:java}
> log.cleaner.delete.retention.ms
> How long are delete records retained? {code}
> I think it should be consistent with the comment of topic level parameter 
> *delete.retention.ms* .
> https://kafka.apache.org/documentation/#topicconfigs_delete.retention.ms



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] RivenSun2 opened a new pull request #11505: KAFKA-13449: Comment optimization for parameter log.cleaner.delete.retention.ms

2021-11-15 Thread GitBox


RivenSun2 opened a new pull request #11505:
URL: https://github.com/apache/kafka/pull/11505


   Story JIRA: https://issues.apache.org/jira/browse/KAFKA-13449
   
   Author: RivenSun2 riven@zoom.us
   
   Reviewers: Luke Chen show...@gmail.com
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-15 Thread GitBox


jsancio commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r749838162



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -2007,8 +2007,13 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
 require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
   " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-  s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
+if (usesSelfManagedQuorum) {

Review comment:
   Okay. How about this algorithm/check:
   
   If `process.roles=broker` then Kafka requires `controller.listener.names` is 
set to one name and doesn't allow the `listeners` to contain 
`controller.listener.names`. This is consistent since controller ports are only 
specified in `controller.quorum.voters`
   
   When `process.roles=controller,*` then Kafka requires 
`controller.listener.names` is set and `listeners` contains 
`controller.listener.names`. Additionally check that the port in 
`controller.quorum.voters` for `node.id` matches the port in the `listeners` 
for one of `controller.listener.names`. Fix `RaftManager` to pick the right 
controller listener name based on the port of it's id in the voters.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] functioner opened a new pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

2021-11-15 Thread GitBox


functioner opened a new pull request #11504:
URL: https://github.com/apache/kafka/pull/11504


   A patch for [KAFKA-13457](https://issues.apache.org/jira/browse/KAFKA-13457)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-15 Thread GitBox


hachikuji commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r749827016



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -2007,8 +2007,13 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
 require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
   " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-  s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
+if (usesSelfManagedQuorum) {

Review comment:
   I think that's fine. When the test cases override 
`controller.quorum.voters`, they also override the controller listener. For 
example, see here: 
https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala#L220.
 So it seems reasonable to validate port consistency in any case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13422) Even if the correct username and password are configured, when ClientBroker or KafkaClient tries to establish a SASL connection to ServerBroker, an exception is thrown

2021-11-15 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444212#comment-17444212
 ] 

RivenSun commented on KAFKA-13422:
--

 hi [~rajinisiva...@gmail.com] , [~ijuma] , [~hachikuji] and [~manikumar]

Can you read through this issue and give some suggestions ?
Thanks.

> Even if the correct username and password are configured, when ClientBroker 
> or KafkaClient tries to establish a SASL connection to ServerBroker, an 
> exception is thrown: (Authentication failed: Invalid username or password)
> --
>
> Key: KAFKA-13422
> URL: https://issues.apache.org/jira/browse/KAFKA-13422
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 2.7.1, 3.0.0
>Reporter: RivenSun
>Priority: Major
> Attachments: CustomerAuthCallbackHandler.java, 
> LoginContext_login_debug.png, SaslClientCallbackHandler_handle_debug.png
>
>
>  
> h1. Foreword:
> When deploying a Kafka cluster with a higher version (2.7.1), I encountered 
> an exception of communication identity authentication failure between 
> brokers. In the current latest version 3.0.0, this problem can also be 
> reproduced.
> h1. Problem recurring:
> h2. 1)broker Version is 3.0.0
> h3. The content of kafka_server_jaas.conf of each broker is exactly the same, 
> the content is as follows:
>  
>  
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="admin_scram_password";
>  
> };
> {code}
>  
>  
> h3. broker server.properties:
> One of the broker configuration files is provided, and the content of the 
> configuration files of other brokers is only different from the localPublicIp 
> of advertised.listeners.
>  
> {code:java}
> broker.id=1
> broker.rack=us-east-1a
> advertised.listeners=SASL_PLAINTEXT://localPublicIp:9779,SASL_SSL://localPublicIp:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://localPublicIp:9669
> log.dirs=/asyncmq/kafka/data_1,/asyncmq/kafka/data_2
> zookeeper.connect=***
> listeners=SASL_PLAINTEXT://:9779,SASL_SSL://:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://:9669
> listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,PLAIN_PLUGIN_SSL:SASL_SSL
> listener.name.plain_plugin_ssl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler
> #ssl config
> ssl.keystore.password=***
> ssl.key.password=***
> ssl.truststore.password=***
> ssl.keystore.location=***
> ssl.truststore.location=***
> ssl.client.auth=none
> ssl.endpoint.identification.algorithm=
> #broker communicate config
> #security.inter.broker.protocol=SASL_PLAINTEXT
> inter.broker.listener.name=INTERNAL_SSL
> sasl.mechanism.inter.broker.protocol=PLAIN
> #sasl authentication config
> sasl.kerberos.service.name=kafka
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI
> delegation.token.master.key=***
> delegation.token.expiry.time.ms=8640
> delegation.token.max.lifetime.ms=31536
> {code}
>  
>  
> Then start all brokers at the same time. Each broker has actually been 
> started successfully, but when establishing a connection between the 
> controller node and all brokers, the identity authentication has always 
> failed. The connection between brokers cannot be established normally, 
> causing the entire Kafka cluster to be unable to provide external services.
> h3. The server log keeps printing abnormally like crazy:
> The real ip sensitive information of the broker in the log, I use ** 
> instead of here
>  
> {code:java}
> [2021-10-29 14:16:19,831] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Started socket server acceptors and processors 
> (kafka.network.SocketServer)
> [2021-10-29 14:16:19,836] INFO Kafka version: 3.0.0 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka commitId: 8cb0a5e9d3441962 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka startTimeMs: 1635516979831 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,837] INFO [KafkaServer id=3] started 
> (kafka.server.KafkaServer)
> [2021-10-29 14:16:20,249] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Failed authentication with /** (Authentication failed: Invalid 
> username or password) 

[jira] [Created] (KAFKA-13457) socketChannel in Acceptor#accept is not closed upon IOException

2021-11-15 Thread Haoze Wu (Jira)
Haoze Wu created KAFKA-13457:


 Summary: socketChannel in Acceptor#accept is not closed upon 
IOException
 Key: KAFKA-13457
 URL: https://issues.apache.org/jira/browse/KAFKA-13457
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 2.8.0
Reporter: Haoze Wu


When the kafka.network.Acceptor in SocketServer.scala accepts a new connection 
in the `accept` function, it handles the `TooManyConnectionsException` and 
`ConnectionThrottledException`. However, line 717 or the socketChannel 
operations within the try block may potentially throw an IOException as well, 
which is not handled.

 
{code:java}
//core/src/main/scala/kafka/network/SocketServer.scala
// Acceptor class
  private def accept(key: SelectionKey): Option[SocketChannel] = {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()     // line 717
    try {
      connectionQuotas.inc(endPoint.listenerName, 
socketChannel.socket.getInetAddress, blockedPercentMeter)
      socketChannel.configureBlocking(false)             
      socketChannel.socket().setTcpNoDelay(true)         
      socketChannel.socket().setKeepAlive(true)          
      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socketChannel.socket().setSendBufferSize(sendBufferSize)
      Some(socketChannel)
    } catch {
      case e: TooManyConnectionsException =>       
        info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
        close(endPoint.listenerName, socketChannel)
        None
      case e: ConnectionThrottledException => 
        val ip = socketChannel.socket.getInetAddress
        debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} 
ms")
        val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
        throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
        None
    }
  }
{code}
This thrown IOException is caught in the caller `acceptNewConnections` in line 
706, which only prints an error message. The socketChannel that throws this 
IOException is not closed.

 
{code:java}
//core/src/main/scala/kafka/network/SocketServer.scala
  private def acceptNewConnections(): Unit = {
    val ready = nioSelector.select(500)
    if (ready > 0) {
      val keys = nioSelector.selectedKeys()
      val iter = keys.iterator()
      while (iter.hasNext && isRunning) {
        try {
          val key = iter.next
          iter.remove()          if (key.isAcceptable) {
            accept(key).foreach { socketChannel => 
                ...
              } while (!assignNewConnection(socketChannel, processor, 
retriesLeft == 0))
            }
          } else
            throw new IllegalStateException("Unrecognized key state for 
acceptor thread.")
        } catch {
          case e: Throwable => error("Error while accepting connection", e)   
// line 706
        }
      }
    }
  }
{code}
We found during testing this would cause our Kafka clients to experience errors 
(InvalidReplicationFactorException) for 40+ seconds when creating new topics. 
After 40 seconds, the clients would be able to create new topics successfully.

We check that after adding the socketChannel.close() upon IOException, the 
symptoms will disappear, so the clients do not need to wait for 40s to be 
working again.

 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13422) Even if the correct username and password are configured, when ClientBroker or KafkaClient tries to establish a SASL connection to ServerBroker, an exception is thrown

2021-11-15 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444209#comment-17444209
 ] 

RivenSun commented on KAFKA-13422:
--

[~showuon] , I'm very sorry to bother you, but the author of this module seems 
to be busy and has not responded to me.
Let me try to ping them again and seek their advice.
Thank you again, luke.

ping refers to @ them here, right?

> Even if the correct username and password are configured, when ClientBroker 
> or KafkaClient tries to establish a SASL connection to ServerBroker, an 
> exception is thrown: (Authentication failed: Invalid username or password)
> --
>
> Key: KAFKA-13422
> URL: https://issues.apache.org/jira/browse/KAFKA-13422
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 2.7.1, 3.0.0
>Reporter: RivenSun
>Priority: Major
> Attachments: CustomerAuthCallbackHandler.java, 
> LoginContext_login_debug.png, SaslClientCallbackHandler_handle_debug.png
>
>
>  
> h1. Foreword:
> When deploying a Kafka cluster with a higher version (2.7.1), I encountered 
> an exception of communication identity authentication failure between 
> brokers. In the current latest version 3.0.0, this problem can also be 
> reproduced.
> h1. Problem recurring:
> h2. 1)broker Version is 3.0.0
> h3. The content of kafka_server_jaas.conf of each broker is exactly the same, 
> the content is as follows:
>  
>  
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="admin_scram_password";
>  
> };
> {code}
>  
>  
> h3. broker server.properties:
> One of the broker configuration files is provided, and the content of the 
> configuration files of other brokers is only different from the localPublicIp 
> of advertised.listeners.
>  
> {code:java}
> broker.id=1
> broker.rack=us-east-1a
> advertised.listeners=SASL_PLAINTEXT://localPublicIp:9779,SASL_SSL://localPublicIp:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://localPublicIp:9669
> log.dirs=/asyncmq/kafka/data_1,/asyncmq/kafka/data_2
> zookeeper.connect=***
> listeners=SASL_PLAINTEXT://:9779,SASL_SSL://:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://:9669
> listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,PLAIN_PLUGIN_SSL:SASL_SSL
> listener.name.plain_plugin_ssl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler
> #ssl config
> ssl.keystore.password=***
> ssl.key.password=***
> ssl.truststore.password=***
> ssl.keystore.location=***
> ssl.truststore.location=***
> ssl.client.auth=none
> ssl.endpoint.identification.algorithm=
> #broker communicate config
> #security.inter.broker.protocol=SASL_PLAINTEXT
> inter.broker.listener.name=INTERNAL_SSL
> sasl.mechanism.inter.broker.protocol=PLAIN
> #sasl authentication config
> sasl.kerberos.service.name=kafka
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI
> delegation.token.master.key=***
> delegation.token.expiry.time.ms=8640
> delegation.token.max.lifetime.ms=31536
> {code}
>  
>  
> Then start all brokers at the same time. Each broker has actually been 
> started successfully, but when establishing a connection between the 
> controller node and all brokers, the identity authentication has always 
> failed. The connection between brokers cannot be established normally, 
> causing the entire Kafka cluster to be unable to provide external services.
> h3. The server log keeps printing abnormally like crazy:
> The real ip sensitive information of the broker in the log, I use ** 
> instead of here
>  
> {code:java}
> [2021-10-29 14:16:19,831] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Started socket server acceptors and processors 
> (kafka.network.SocketServer)
> [2021-10-29 14:16:19,836] INFO Kafka version: 3.0.0 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka commitId: 8cb0a5e9d3441962 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka startTimeMs: 1635516979831 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,837] INFO [KafkaServer id=3] started 
> (kafka.server.KafkaServer)
> [2021-10-29 14:16:20,249] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Failed authentication with 

[jira] [Commented] (KAFKA-13437) Broker parameter optimization: security.inter.broker.protocol and num.network.threads

2021-11-15 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444207#comment-17444207
 ] 

RivenSun commented on KAFKA-13437:
--

[~showuon]  Hi luke, please browse this issue when available.
What do you think of this issue?
Thanks.

> Broker parameter optimization: security.inter.broker.protocol and 
> num.network.threads
> -
>
> Key: KAFKA-13437
> URL: https://issues.apache.org/jira/browse/KAFKA-13437
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>
> h1. 1. security.inter.broker.protocol
> Firstly see this parameter comment
> {code:java}
> security.inter.broker.protocolSecurity protocol used to communicate between 
> brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. It is an 
> error to set this and inter.broker.listener.name properties at the same time. 
> {code}
> We will not know from the comments, after using this configuration, the final 
> value of InterBrokerListenerName is the same as the value of 
> security.inter.broker.protocol. I originally thought it would find a suitable 
> listenerName from the listener.security.protocol.map configuration.
> The result is: broker startup failed
>  
> {code:java}
> [2021-11-09 06:28:44,058] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.lang.IllegalArgumentException: requirement failed: 
> inter.broker.listener.name must be a listener name defined in 
> advertised.listeners. The valid options based on currently configured 
> listeners are SASL_PLAINTEXT,INTERNAL_SSL,PLAIN_PLUGIN_SSL
>         at scala.Predef$.require(Predef.scala:337)
>         at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1952)
>         at kafka.server.KafkaConfig.(KafkaConfig.scala:1897)
>         at kafka.server.KafkaConfig.(KafkaConfig.scala:1394)
>         at kafka.Kafka$.buildServer(Kafka.scala:67)
>         at kafka.Kafka$.main(Kafka.scala:87)
>         at kafka.Kafka.main(Kafka.scala)
>  {code}
>  
>  
> h1. 2. num.network.threads
> The networkThreads corresponding to this parameter are not shared by all 
> listeners, but each listener will create the same number of 
> networkProcessors, which causes the Kafka process to open too many 
> unnecessary threads, which leads to a waste of resources.
> for example:
> listenerNameA: used for communication between brokers
> listenerNameB: used to connect production messages and fetch messages on the 
> client side
> listenerNameC: Used by Kafka operation and maintenance personnel to manage 
> the cluster and send control type requests, such as deleting topics or adding 
> partitions, etc.
> So as expected, the num.network.threads of listenerNameB should be increased, 
> and the networkThreads of the other two listeners can be appropriately reduced
>  
> h1. Rootcause:
> 1. See "getInterBrokerListenerNameAndSecurityProtocol" method in 
> KafkaConfig.scala
> {code:java}
> private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, 
> SecurityProtocol) = {
>   Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match {
> case Some(_) if 
> originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) =>
>   throw new ConfigException(s"Only one of 
> ${KafkaConfig.InterBrokerListenerNameProp} and " +
> s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.")
> case Some(name) =>
>   val listenerName = ListenerName.normalised(name)
>   val securityProtocol = 
> listenerSecurityProtocolMap.getOrElse(listenerName,
> throw new ConfigException(s"Listener with name ${listenerName.value} 
> defined in " +
>   s"${KafkaConfig.InterBrokerListenerNameProp} not found in 
> ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
>   (listenerName, securityProtocol)
> case None =>
>   val securityProtocol = 
> getSecurityProtocol(getString(KafkaConfig.InterBrokerSecurityProtocolProp),
> KafkaConfig.InterBrokerSecurityProtocolProp)
>   (ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
>   }
> } {code}
> ListenerName.forSecurityProtocol(securityProtocol) limits the value of 
> InterBrokerListenerName to the value of securityProtocol.name
> 2. See "addDataPlaneProcessors" method in SocketServer.scala
> In this method, processors of the size of newProcessorsPerListener are 
> created for each EndPoint, the value of newProcessorsPerListener is 
> config.numNetworkThreads
>  
> h1. Suggestion
>  # Optimize the getInterBrokerListenerNameAndSecurityProtocol method.
> Use listenerSecurityProtocolMap to find a suitable listenerName for 
> security.inter.broker.protocol.
> If there are multiple keys in the listenerSecurityProtocolMap with 
> 

[GitHub] [kafka] junrao merged pull request #11487: KAFKA-13445: Add ECDSA test for JWT validation

2021-11-15 Thread GitBox


junrao merged pull request #11487:
URL: https://github.com/apache/kafka/pull/11487


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-15 Thread GitBox


jsancio commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r749777697



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -2007,8 +2007,13 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
 require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
   " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-  s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
+if (usesSelfManagedQuorum) {

Review comment:
   > 2. Verify port consistency between the controller listener and the 
quorum voter string. As you noted offline, the port gets duplicated, so we 
should be sure they match.
   
   One complexity is when the controllers let the OS assign a port (`0`) to the 
listener. This feature is heavily used in the tests to allow multiple 
controllers to run on the same host.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13444) Fix OAuthCompatibilityTool help and add SSL options

2021-11-15 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444186#comment-17444186
 ] 

Kirk True commented on KAFKA-13444:
---

[~dajac] - can we get this into 3.1.0 release too? Otherwise whichever version 
this Jira does slot into, it will be backward-incompatible.

> Fix OAuthCompatibilityTool help and add SSL options
> ---
>
> Key: KAFKA-13444
> URL: https://issues.apache.org/jira/browse/KAFKA-13444
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 3.1.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
> Fix For: 3.1.0
>
>
> The {{OAuthCompatibilityTool}} is missing the SSL configuration options. In 
> addition, the help text is incorrect.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] junrao commented on pull request #11486: KAFKA-13444: Fix OAuthCompatibilityTool help and add SSL options

2021-11-15 Thread GitBox


junrao commented on pull request #11486:
URL: https://github.com/apache/kafka/pull/11486#issuecomment-969447178


   @kirktrue : Could you let David Jacot know if this needs to be in 3.1.0? 
Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-15 Thread GitBox


hachikuji commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r749765601



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -2007,8 +2007,13 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
 require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
   " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-  s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
+if (usesSelfManagedQuorum) {

Review comment:
   A few additional validations we can do:
   
   1. Ensure control plane listener is empty. We have this check in 
BrokerServer already, but we should check it here as well.
   2. Verify port consistency between the controller listener and the quorum 
voter string. As you noted offline, the port gets duplicated, so we should be 
sure they match. 
   3. We currently assume that the first listener in 
`controller.listener.names` is the one that matches the voter string. We should 
either require this to be the case, or change the logic in `RaftManager` to 
choose the right listener.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-15 Thread GitBox


hachikuji commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r749765601



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -2007,8 +2007,13 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
 require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
   " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-  s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
+if (usesSelfManagedQuorum) {

Review comment:
   A couple additional validations we can do:
   
   1. Ensure control plane listener is empty. We have this check in 
BrokerServer already, but we should check it here as well.
   2. Verify port consistency between the controller listener and the quorum 
voter string. As you noted offline, the port gets duplicated, so we should be 
sure they match. 
   3. We currently assume that the first listener in 
`controller.listener.names` is the one that matches the voter string. We should 
either require this to be the case, or change the logic in `RaftManager` to 
choose the right listener.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #11486: KAFKA-13444: Fix OAuthCompatibilityTool help and add SSL options

2021-11-15 Thread GitBox


junrao merged pull request #11486:
URL: https://github.com/apache/kafka/pull/11486


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12226) High-throughput source tasks fail to commit offsets

2021-11-15 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-12226:
--
Fix Version/s: 3.0.1
   3.2.0

> High-throughput source tasks fail to commit offsets
> ---
>
> Key: KAFKA-12226
> URL: https://issues.apache.org/jira/browse/KAFKA-12226
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.1.0, 3.0.1, 3.2.0
>
>
> The current source task thread has the following workflow:
>  # Poll messages from the source task
>  # Queue these messages to the producer and send them to Kafka asynchronously.
>  # Add the message to outstandingMessages, or if a flush is currently active, 
> outstandingMessagesBacklog
>  # When the producer completes the send of a record, remove it from 
> outstandingMessages
> The commit offsets thread has the following workflow:
>  # Wait a flat timeout for outstandingMessages to flush completely
>  # If this times out, add all of the outstandingMessagesBacklog to the 
> outstandingMessages and reset
>  # If it succeeds, commit the source task offsets to the backing store.
>  # Retry the above on a fixed schedule
> If the source task is producing records quickly (faster than the producer can 
> send), then the producer will throttle the task thread by blocking in its 
> {{send}} method, waiting at most {{max.block.ms}} for space in the 
> {{buffer.memory}} to be available. This means that the number of records in 
> {{outstandingMessages}} + {{outstandingMessagesBacklog}} is proportional to 
> the size of the producer memory buffer.
> This amount of data might take more than {{offset.flush.timeout.ms}} to 
> flush, and thus the flush will never succeed while the source task is 
> rate-limited by the producer memory. This means that we may write multiple 
> hours of data to Kafka and not ever commit source offsets for the connector. 
> When the task is lost due to a worker failure, hours of data will be 
> re-processed that otherwise were successfully written to Kafka.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dongjinleekr commented on pull request #11472: TRIVIAL: Remove unused parameters, exceptions, comments, etc.

2021-11-15 Thread GitBox


dongjinleekr commented on pull request #11472:
URL: https://github.com/apache/kafka/pull/11472#issuecomment-969438136


   @mimaison Here is the fix; redundant parameters to call 
`DumpLogSegments#[dumpLog, dumpTimeIndex]` in `DumpLogSegments#main` also had 
to be removed. After removing it, it works like a charm.
   
![2026-082947](https://user-images.githubusercontent.com/2375128/141869153-b7f66d4f-f067-4290-9603-8c995c8ba326.png)
   I am still curious why the scala compiler has been past this, explicit 
syntax errors. It may be related to `DumpLogSegments.dumpTimeIndex( ... 
Int.MaxValue)` I removed earlier, but I can't be certain yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11499: [WIP] KAFKA-10199: Add interface for state updater

2021-11-15 Thread GitBox


guozhangwang commented on a change in pull request #11499:
URL: https://github.com/apache/kafka/pull/11499#discussion_r749758167



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -246,4 +246,11 @@ default boolean commitRequested() {
  * @return This returns the time the task started idling. If it is not 
idling it returns empty.
  */
 Optional timeCurrentIdlingStarted();
+
+/**
+ * Gets the exception that caused the failure of the task.
+ *
+ * @return exception that caused the failure of the task
+ */
+Optional getException();

Review comment:
   I think we can narrow down the scope of this exception, if it is only 
going to be used during restoration time. But nevertheless we can discuss about 
this later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on pull request #11484: KAFKA-13443: Kafka broker exits when OAuth enabled and certain configuration not specified

2021-11-15 Thread GitBox


kirktrue commented on pull request #11484:
URL: https://github.com/apache/kafka/pull/11484#issuecomment-969380314


   > @kirktrue Should we add or extend tests in KafkaConfigTest to cover them?
   
   They're already present on lines 780 and 781 in `KafkaConfigTest.scala`:
   
   ```scala
   case KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMaxMsProp =>
   case KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMsProp =>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13294) Upgrade Netty to 4.1.68 for CVE fixes

2021-11-15 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444139#comment-17444139
 ] 

Dongjin Lee commented on KAFKA-13294:
-

[~mimaison] +1.

> Upgrade Netty to 4.1.68 for CVE fixes
> -
>
> Key: KAFKA-13294
> URL: https://issues.apache.org/jira/browse/KAFKA-13294
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Utkarsh Khare
>Assignee: Dongjin Lee
>Priority: Minor
> Fix For: 3.1.0
>
>
> netty has reported a couple of CVEs regarding the usage of Bzip2Decoder and 
> SnappyFrameDecoder. 
> Reference :
> [CVE-2021-37136 - 
> https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv|https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv]
> [CVE-2021-37137 - 
> https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363|https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363]
>  
> Can we upgrade Netty to version 4.1.68.Final to fix this ? 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] kirktrue commented on a change in pull request #11486: KAFKA-13444: Fix OAuthCompatibilityTool help and add SSL options

2021-11-15 Thread GitBox


kirktrue commented on a change in pull request #11486:
URL: https://github.com/apache/kafka/pull/11486#discussion_r749722788



##
File path: 
tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
##
@@ -208,71 +170,217 @@ public static void main(String[] args) {
 
 if (t instanceof ConfigException) {
 System.out.printf("%n");
-parser.printHelp();
+argsHandler.parser.printHelp();
 }
 
 Exit.exit(1);
 }
 }
 
-private static Map getConfigs(Namespace namespace) {
-Map c = new HashMap<>();
-maybeAddInt(namespace, "connectTimeoutMs", c, 
SASL_LOGIN_CONNECT_TIMEOUT_MS);
-maybeAddInt(namespace, "readTimeoutMs", c, SASL_LOGIN_READ_TIMEOUT_MS);
-maybeAddLong(namespace, "loginRetryBackoffMs", c, 
SASL_LOGIN_RETRY_BACKOFF_MS);
-maybeAddLong(namespace, "loginRetryBackoffMax", c, 
SASL_LOGIN_RETRY_BACKOFF_MAX_MS);
-maybeAddString(namespace, "scopeClaimName", c, 
SASL_OAUTHBEARER_SCOPE_CLAIM_NAME);
-maybeAddString(namespace, "subClaimName", c, 
SASL_OAUTHBEARER_SUB_CLAIM_NAME);
-maybeAddString(namespace, "tokenEndpointUrl", c, 
SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL);
-maybeAddString(namespace, "jwksEndpointUrl", c, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_URL);
-maybeAddLong(namespace, "jwksEndpdointRefreshMs", c, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS);
-maybeAddLong(namespace, "jwksEndpdointRetryBackoffMaxMs", c, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS);
-maybeAddLong(namespace, "jwksEndpdointRetryBackoffMs", c, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS);
-maybeAddInt(namespace, "clockSkewSeconds", c, 
SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS);
-maybeAddStringList(namespace, "expectedAudience", c, 
SASL_OAUTHBEARER_EXPECTED_AUDIENCE);
-maybeAddString(namespace, "expectedIssuer", c, 
SASL_OAUTHBEARER_EXPECTED_ISSUER);
-
-// This here is going to fill in all the defaults for the values we 
don't specify...
-ConfigDef cd = new ConfigDef();
-SaslConfigs.addClientSaslSupport(cd);
-AbstractConfig config = new AbstractConfig(cd, c);
-return config.values();
-}
 
-private static void maybeAddInt(Namespace namespace, String namespaceKey, 
Map configs, String configsKey) {
-Integer value = namespace.getInt(namespaceKey);
+private static class ArgsHandler {
 
-if (value != null)
-configs.put(configsKey, value);
-}
+private static final String DESCRIPTION = String.format(
+"This tool is used to verify OAuth/OIDC provider 
compatibility.%n%n" +
+"Run the following script to determine the configuration 
options:%n%n" +
+"./bin/kafka-run-class.sh %s --help",
+OAuthCompatibilityTool.class.getName());
 
-private static void maybeAddLong(Namespace namespace, String namespaceKey, 
Map configs, String configsKey) {
-Long value = namespace.getLong(namespaceKey);
+private final ArgumentParser parser;
 
-if (value != null)
-configs.put(configsKey, value);
-}
+private ArgsHandler() {
+this.parser = ArgumentParsers
+.newArgumentParser("oauth-compatibility-tool")
+.defaultHelp(true)
+.description(DESCRIPTION);
+}
 
-private static void maybeAddString(Namespace namespace, String 
namespaceKey, Map configs, String configsKey) {
-String value = namespace.getString(namespaceKey);
+private Namespace parseArgs(String[] args) throws 
ArgumentParserException {
+// SASL/OAuth
+addArgument(SASL_LOGIN_CONNECT_TIMEOUT_MS, 
SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC, Integer.class);
+addArgument(SASL_LOGIN_READ_TIMEOUT_MS, 
SASL_LOGIN_READ_TIMEOUT_MS_DOC, Integer.class);
+addArgument(SASL_LOGIN_RETRY_BACKOFF_MAX_MS, 
SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC, Long.class);
+addArgument(SASL_LOGIN_RETRY_BACKOFF_MS, 
SASL_LOGIN_RETRY_BACKOFF_MS_DOC, Long.class);
+addArgument(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, 
SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC, Integer.class);
+addArgument(SASL_OAUTHBEARER_EXPECTED_AUDIENCE, 
SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
+.action(Arguments.append());
+addArgument(SASL_OAUTHBEARER_EXPECTED_ISSUER, 
SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
+addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC, Long.class);
+addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC, Long.class);
+addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC, Long.class);
+

[GitHub] [kafka] kirktrue commented on a change in pull request #11487: KAFKA-13445: Add ECDSA test for JWT validation

2021-11-15 Thread GitBox


kirktrue commented on a change in pull request #11487:
URL: https://github.com/apache/kafka/pull/11487#discussion_r749721422



##
File path: 
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenBuilder.java
##
@@ -49,22 +49,24 @@
 
 private Long expirationSeconds;
 
-private RsaJsonWebKey jwk;
+private PublicJsonWebKey jwk;
 
-public AccessTokenBuilder() throws JoseException {
+public AccessTokenBuilder() {
 this(new MockTime());
 }
 
-public AccessTokenBuilder(Time time) throws JoseException {
+public AccessTokenBuilder(Time time) {
 this.issuedAtSeconds = time.milliseconds() / 1000;
 this.expirationSeconds = this.issuedAtSeconds + 60;
-this.jwk = createJwk();
 }
 
-public static RsaJsonWebKey createJwk() throws JoseException {
-RsaJsonWebKey jwk = RsaJwkGenerator.generateJwk(2048);
-jwk.setKeyId("key-1");
-return jwk;
+public String alg() {
+return alg;
+}
+
+public AccessTokenBuilder alg(String alg) {

Review comment:
   They're there for completeness. I've removed them now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on a change in pull request #11487: KAFKA-13445: Add ECDSA test for JWT validation

2021-11-15 Thread GitBox


kirktrue commented on a change in pull request #11487:
URL: https://github.com/apache/kafka/pull/11487#discussion_r749721059



##
File path: 
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidatorTest.java
##
@@ -38,14 +40,35 @@ protected AccessTokenValidator 
createAccessTokenValidator(AccessTokenBuilder bui
 }
 
 @Test
-public void testBasicEncryption() throws Exception {
-AccessTokenBuilder builder = new AccessTokenBuilder();
+public void testRsaEncryptionAlgorithm() throws Exception {
+PublicJsonWebKey jwk = createRsaJwk();
+testEncryptionAlgorithm(jwk, AlgorithmIdentifiers.RSA_USING_SHA256);
+}
+
+@Test
+public void testEcdsaEncryptionAlgorithm() throws Exception {
+PublicJsonWebKey jwk = createEcJwk();
+testEncryptionAlgorithm(jwk, 
AlgorithmIdentifiers.ECDSA_USING_P256_CURVE_AND_SHA256);
+}
+
+@Test
+public void testInvalidEncryptionAlgorithm() throws Exception {
+PublicJsonWebKey jwk = createRsaJwk();
+
+assertThrowsWithMessage(InvalidAlgorithmException.class,
+() -> testEncryptionAlgorithm(jwk, "fake"),
+"fake is an unknown, unsupported or unavailable alg algorithm");
+}
+
+private void testEncryptionAlgorithm(PublicJsonWebKey jwk, String alg) 
throws Exception {
+AccessTokenBuilder builder = new 
AccessTokenBuilder().jwk(jwk).alg(alg);
 AccessTokenValidator validator = createAccessTokenValidator(builder);
 
 JsonWebSignature jws = new JsonWebSignature();
 jws.setKey(builder.jwk().getPrivateKey());
 jws.setKeyIdHeaderValue(builder.jwk().getKeyId());
-jws.setAlgorithmHeaderValue(AlgorithmIdentifiers.RSA_USING_SHA256);
+jws.setAlgorithmHeaderValue(alg);
+jws.setPayload("{}");

Review comment:
   It isn't. I've removed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-15 Thread GitBox


rondagostino opened a new pull request #11503:
URL: https://github.com/apache/kafka/pull/11503


   The `controller.listener.names` config is currently checked for existence 
when the `process.roles` contains the controller role (i.e. 
`process.roles=controller` or `process.roles=broker,contrtoller`); it is not 
checked for existence when `process.roles=broker`. However, KRaft brokers have 
to talk to KRaft controllers, of course, and they do so by taking the first 
entry in the `controller.listener.names` list. Therefore, 
`controller.listener.names` is required in KRaft mode even when 
`process.roles=broker`.
   
   This patch performs the check being done in the `process.roles=controller` 
and `process.roles=broker,controller` cases to the `process.roles=broker` case: 
we make sure `controller.listener.names` is a non-empty subset of the listeners 
appearing in the `listeners` config.  This entailed adding an entry to the 
KRaft Quickstart broker-only config file (confirmed to now work via a manual 
test) as well as adding the entry to the KRaft broker-only system test configs, 
which I tested locally with the following success:
   
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.8.1
   session_id:   2021-11-15--015
   run time: 6 minutes 10.745 seconds
   tests run:5
   passed:   5
   failed:   0
   ignored:  0
   

   test_id:
kafkatest.sanity_checks.test_bounce.TestBounce.test_simple_run.metadata_quorum=COLOCATED_KRAFT.quorum_size=3
   status: PASS
   run time:   1 minute 41.968 seconds
   

   test_id:
kafkatest.sanity_checks.test_bounce.TestBounce.test_simple_run.metadata_quorum=REMOTE_KRAFT.quorum_size=3
   status: PASS
   run time:   1 minute 35.705 seconds
   

   test_id:
kafkatest.sanity_checks.test_bounce.TestBounce.test_simple_run.metadata_quorum=COLOCATED_KRAFT.quorum_size=1
   status: PASS
   run time:   56.347 seconds
   

   test_id:
kafkatest.sanity_checks.test_bounce.TestBounce.test_simple_run.metadata_quorum=REMOTE_KRAFT.quorum_size=1
   status: PASS
   run time:   1 minute 8.743 seconds
   

   test_id:
kafkatest.sanity_checks.test_bounce.TestBounce.test_simple_run.metadata_quorum=ZK.quorum_size=1
   status: PASS
   run time:   47.454 seconds
   

   ``` 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-11-15 Thread GitBox


C0urante commented on pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#issuecomment-969330955


   I've rebased onto the latest trunk. I think with the latest changes 
(especially [these logging 
improvements](https://github.com/apache/kafka/blob/a3ab7d5b42d02f194a8bdf3047c374f938ca0dad/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L479-L504))
 most of the changes in this PR were made redundant. The only remaining room 
for improvement IMO is skipping log messages for failed tasks; the other issues 
(squatting on the source task offset commit thread too long for failed messages 
to be acknowledged, and misleading users with messages about flushing 0 
records) have already been addressed.
   
   I've force-pushed a single commit that brings this PR up to date with the 
latest trunk; going to push an additional commit later this week that addresses 
the review comments that have been left on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11004: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)

2021-11-15 Thread GitBox


jolshan commented on a change in pull request #11004:
URL: https://github.com/apache/kafka/pull/11004#discussion_r749683412



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -1206,7 +1205,8 @@ private void validatePositionsOnMetadataChange() {
 fetchable.put(node, builder);
 }
 
-builder.add(partition, 
topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID), new 
FetchRequest.PartitionData(position.offset,
+Uuid topicId = metadata.topicId(partition.topic());

Review comment:
   It would have been a problem before KAFKA-13111 when we assumed only one 
topic ID per build for a given topic name (we had a mapping), but maybe it is 
ok now that we store the ID in the data and use it to build the request.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11004: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)

2021-11-15 Thread GitBox


hachikuji commented on a change in pull request #11004:
URL: https://github.com/apache/kafka/pull/11004#discussion_r749668458



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -372,6 +379,49 @@ public void testUpdateLastEpoch() {
 assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
 }
 
+@Test
+public void testEpochUpdateOnChangedTopicIds() {
+TopicPartition tp = new TopicPartition("topic-1", 0);
+Map topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+
+MetadataResponse metadataResponse = emptyMetadataResponse();
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+// Start with a topic with no topic ID
+metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+// We should treat an added topic ID as though it is the same topic. 
Handle only when epoch increases.
+// Don't update to an older one
+metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, 
topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
+assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+// Don't cause update if it's the same one
+metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, 
topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L);
+assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+// Update if we see newer epoch
+metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12, 
topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 4L);
+assertEquals(Optional.of(12), metadata.lastSeenLeaderEpoch(tp));
+
+// We should also update if we see a new topicId even if the epoch is 
lower
+Map newTopicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 3, 
newTopicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 5L);
+assertEquals(Optional.of(3), metadata.lastSeenLeaderEpoch(tp));
+
+// Finally, update when the topic ID is new and the epoch is higher
+Map newTopicIds2 = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 20, 
newTopicIds2);
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 6L);
+assertEquals(Optional.of(20), metadata.lastSeenLeaderEpoch(tp));
+

Review comment:
   nit: unneeded newline

##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -375,17 +382,25 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 
 /**
  * Compute the latest partition metadata to cache given ordering by leader 
epochs (if both
- * available and reliable).
+ * available and reliable) and whether the topic ID changed.
  */
 private Optional updateLatestMetadata(
 MetadataResponse.PartitionMetadata partitionMetadata,
-boolean hasReliableLeaderEpoch) {
+boolean hasReliableLeaderEpoch,
+Uuid topicId,
+Uuid oldTopicId) {
 TopicPartition tp = partitionMetadata.topicPartition;
 if (hasReliableLeaderEpoch && 
partitionMetadata.leaderEpoch.isPresent()) {
 int newEpoch = partitionMetadata.leaderEpoch.get();
-// If the received leader epoch is at least the same as the 
previous one, update the metadata
 Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-if (currentEpoch == null || newEpoch >= currentEpoch) {
+if (topicId != null && oldTopicId != null && 
!topicId.equals(oldTopicId)) {
+// If both topic IDs were valid and the topic ID changed, 
update the metadata
+log.debug("Topic ID for partition {} changed from {} to {}, so 
this topic must have been recreated. " +

Review comment:
   Can we change this level to INFO?

##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -77,6 +79,11 @@ private static 

[GitHub] [kafka] hachikuji commented on pull request #11502: KAFKA-13071; Deprecate support for changing acls through the authorizer

2021-11-15 Thread GitBox


hachikuji commented on pull request #11502:
URL: https://github.com/apache/kafka/pull/11502#issuecomment-969291197


   cc @dajac It would be nice to do this in 3.1 if possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #11502: KAFKA-13071; Deprecate support for changing acls through the authorizer

2021-11-15 Thread GitBox


hachikuji opened a new pull request #11502:
URL: https://github.com/apache/kafka/pull/11502


   This patch marks the following arguments as deprecated in kafka-acls.sh as 
documented in 
[KIP-604](https://cwiki.apache.org/confluence/display/KAFKA/KIP-604%3A+Remove+ZooKeeper+Flags+from+the+Administrative+Tools):
 --authorizer, --authorizer-properties, and --zk-tls-config-file.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10104) Remove deprecated --zookeeper flags as specified in KIP-604

2021-11-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10104.
-
Resolution: Duplicate

> Remove deprecated --zookeeper flags as specified in KIP-604
> ---
>
> Key: KAFKA-10104
> URL: https://issues.apache.org/jira/browse/KAFKA-10104
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>
> Remove deprecated --zookeeper flags as specified in KIP-604



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] jolshan opened a new pull request #11501: MINOR: Fix FetchSessionBenchmark

2021-11-15 Thread GitBox


jolshan opened a new pull request #11501:
URL: https://github.com/apache/kafka/pull/11501


   There were a few small things I didn't clean up in KAFKA-13111. Clean up 
here. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13456) controller.listener.names is required for all KRaft nodes, not just controllers

2021-11-15 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-13456:
--
Description: The controller.listener.names config is currently checked for 
existence when the process.roles contains the controller role (i.e. 
process.roles=controller or process.roles=broker,contrtoller); it is not 
checked for existence when process.roles=broker.  However, KRaft brokers have 
to talk to KRaft controllers, of course, and they do so by taking the first 
entry in the controller.listener.names list.  Therefore, 
controller.listener.names is required in KRaft mode even when 
process.roles=broker.  (was: The controller.listener.names config is currently 
checked for existence when the process.roles contains the controller role (i.e. 
process.roles=controller or process.roles=broker,contrtoller); it is not 
checked for existence when process.roles=broker.  However, KRaft brokers have 
to talk to KRaft controllers, of course, and they do so by taking the first 
entry in the controller.listener.names list.  Therefore, 
controller.listener.names is required in KRaft mode even when 
process.roles.broker.)

> controller.listener.names is required for all KRaft nodes, not just 
> controllers
> ---
>
> Key: KAFKA-13456
> URL: https://issues.apache.org/jira/browse/KAFKA-13456
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0, 3.1.0, 3.0.0
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
>
> The controller.listener.names config is currently checked for existence when 
> the process.roles contains the controller role (i.e. process.roles=controller 
> or process.roles=broker,contrtoller); it is not checked for existence when 
> process.roles=broker.  However, KRaft brokers have to talk to KRaft 
> controllers, of course, and they do so by taking the first entry in the 
> controller.listener.names list.  Therefore, controller.listener.names is 
> required in KRaft mode even when process.roles=broker.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] junrao merged pull request #11480: MINOR: fix comment in TimingWheel

2021-11-15 Thread GitBox


junrao merged pull request #11480:
URL: https://github.com/apache/kafka/pull/11480


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #11482: KAFKA-13448: Align behaviour for producer callbacks with documented behaviour

2021-11-15 Thread GitBox


junrao commented on pull request #11482:
URL: https://github.com/apache/kafka/pull/11482#issuecomment-969213761


   @soceanainn : Thanks for the KIP.  Making ProducerBatch accept 
InterceptorCallback seems fine since it's an internal class.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name

2021-11-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12257:

Fix Version/s: 2.8.2

>  Consumer mishandles topics deleted and recreated with the same name
> 
>
> Key: KAFKA-12257
> URL: https://issues.apache.org/jira/browse/KAFKA-12257
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.7.1, 2.8.1
>Reporter: Ryan Leslie
>Assignee: lqjacklee
>Priority: Blocker
> Fix For: 3.1.0, 3.0.0, 2.8.2
>
> Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch
>
>
> In KAFKA-7738, caching of leader epochs (KIP-320) was added to 
> o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the 
> last seen epoch.
> The current implementation can cause problems in cases where a consumer is 
> subscribed to a topic that has been deleted and then recreated with the same 
> name. This is something seen more often in consumers that subscribe to a 
> multitude of topics using a wildcard.
> Currently, when a topic is deleted and the Fetcher receives 
> UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later 
> time while the consumer is still running a topic is created with the same 
> name, the leader epochs are set to 0 for the new topics partitions, and are 
> likely smaller than those for the previous topic. For example, if a broker 
> had restarted during the lifespan of the previous topic, the leader epoch 
> would be at least 1 or 2. In this case the metadata will be ignored since it 
> is incorrectly considered stale. Of course, the user will sometimes get 
> lucky, and if a topic was only recently created so that the epoch is still 0, 
> no problem will occur on recreation. The issue is also not seen when 
> consumers happen to have been restarted in between deletion and recreation.
> The most common side effect of the new metadata being disregarded is that the 
> new partitions end up assigned but the Fetcher is unable to fetch data 
> because it does not know the leaders. When recreating a topic with the same 
> name it is likely that the partition leaders are not the same as for the 
> previous topic, and the number of partitions may even be different. Besides 
> not being able to retrieve data for the new topic, there is a more sinister 
> side effect of the Fetcher triggering a metadata update after the fetch 
> fails. The subsequent update will again ignore the topic's metadata if the 
> leader epoch is still smaller than the cached value. This metadata refresh 
> loop can continue indefinitely and with a sufficient number of consumers may 
> even put a strain on a cluster since the requests are occurring in a tight 
> loop. This can also be hard for clients to identify since there is nothing 
> logged by default that would indicate what's happening. Both the Metadata 
> class's logging of "_Not replacing existing epoch_", and the Fetcher's 
> logging of "_Leader for partition  is unknown_" are at DEBUG level.
> A second possible side effect was observed where if the consumer is acting as 
> leader of the group and happens to not have any current data for the previous 
> topic, e.g. it was cleared due to a metadata error from a broker failure, 
> then the new topic's partitions may simply end up unassigned within the 
> group. This is because while the subscription list contains the recreated 
> topic the metadata for it was previously ignored due to the leader epochs. In 
> this case the user would see logs such as:
> {noformat}
> WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, 
> groupId=myGroup] The following subscribed topics are not assigned to any 
> members: [myTopic]{noformat}
> Interestingly, I believe the Producer is less affected by this problem since 
> o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in 
> retainTopics() after each metadata expiration. ConsumerMetadata does no such 
> thing.
> To reproduce this issue:
>  # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and 
> org.apache.kafka.clients.Metadata
>  # Begin a consumer for a topic (or multiple topics)
>  # Restart a broker that happens to be a leader for one of the topic's 
> partitions
>  # Delete the topic
>  # Create another topic with the same name
>  # Publish data for the new topic
>  # The consumer will not receive data for the new topic, and there will be a 
> high rate of metadata requests.
>  # The issue can be corrected by restarting the consumer or restarting 
> brokers until leader epochs are large enough
> I believe KIP-516 (unique topic ids) will likely fix this problem, since 
> after those changes the leader epoch map should be keyed off of 

[jira] [Assigned] (KAFKA-13071) Deprecate and remove --authorizer option in kafka-acls.sh

2021-11-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-13071:
---

Assignee: Jason Gustafson  (was: HaiyuanZhao)

> Deprecate and remove --authorizer option in kafka-acls.sh
> -
>
> Key: KAFKA-13071
> URL: https://issues.apache.org/jira/browse/KAFKA-13071
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
>
> Now that we have all of the ACL APIs implemented through the admin client, we 
> should consider deprecating and removing support for the --authorizer flag in 
> kafka-acls.sh.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13071) Deprecate and remove --authorizer option in kafka-acls.sh

2021-11-15 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444032#comment-17444032
 ] 

Jason Gustafson commented on KAFKA-13071:
-

[~zhaohaidao] I'm going to pick this up so that we can get it into 3.1. Please 
let me know if you were planning to submit something soon.

> Deprecate and remove --authorizer option in kafka-acls.sh
> -
>
> Key: KAFKA-13071
> URL: https://issues.apache.org/jira/browse/KAFKA-13071
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: HaiyuanZhao
>Priority: Major
>  Labels: needs-kip
>
> Now that we have all of the ACL APIs implemented through the admin client, we 
> should consider deprecating and removing support for the --authorizer flag in 
> kafka-acls.sh.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13407) Kafka controller out of service after ZK leader restart

2021-11-15 Thread Daniel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17444014#comment-17444014
 ] 

Daniel commented on KAFKA-13407:


Thanks, [~vinsonZhang] when declaring what we had in the JAAS file in 
server.properties seems to solve the problem for us! I can no longer reproduce 
the issue and it looks much cleaner in the log when it reconnects.
{code:java}
Nov 15 17:46:28 ip-10-226-69-184 kafka[6489]: [2021-11-15 17:46:28,458] INFO 
Unable to read additional data from server sessionid 0x4d7d0049, likely server 
has closed socket, closing socket connection and attempting reconnect 
(org.apache.zoo>
Nov 15 17:46:29 ip-10-226-69-184 kafka[6489]: [2021-11-15 17:46:29,673] INFO 
Opening socket connection to server 
zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.226.84.12:2181. Will 
not attempt to authenticate using SASL (unknown >
Nov 15 17:46:29 ip-10-226-69-184 kafka[6489]: [2021-11-15 17:46:29,675] INFO 
Socket error occurred: 
zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.226.84.12:2181: 
Connection refused (org.apache.zookeeper.ClientCnxn)
Nov 15 17:46:31 ip-10-226-69-184 kafka[6489]: [2021-11-15 17:46:31,510] INFO 
Opening socket connection to server 
zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.226.76.12:2181. Will 
not attempt to authenticate using SASL (unknown >
Nov 15 17:46:31 ip-10-226-69-184 kafka[6489]: [2021-11-15 17:46:31,512] INFO 
Socket connection established, initiating session, client: /10.10.69.184:45330, 
server: 
zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.226.76.12:2181 (>
Nov 15 17:46:31 ip-10-226-69-184 kafka[6489]: [2021-11-15 17:46:31,513] INFO 
Session establishment complete on server 
zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.226.76.12:2181, 
sessionid = 0x4d7d0049, negotiated timeout = 180>
Nov 15 17:48:05 ip-10-226-69-184 kafka[6489]: [2021-11-15 17:48:05,988] INFO 
[Controller id=1001] Processing automatic preferred replica leader election 
(kafka.controller.KafkaController) {code}
This is what I added to my server.properties in my lab environment:
{code:java}
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
 required \
  username="admin" \
  password="admin-secret";
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
 required \
  username="admin" \
  password="admin-secret" \
  user_demo="demo-secret";
{code}
 

> Kafka controller out of service after ZK leader restart
> ---
>
> Key: KAFKA-13407
> URL: https://issues.apache.org/jira/browse/KAFKA-13407
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0, 2.8.1
> Environment: Ubuntu 20.04
>Reporter: Daniel
>Priority: Critical
>
> When the Zookeeper leader disappears, a new instance becomes the leader, the 
> instances need to reconnect to Zookeeper, but the Kafka "Controller" gets 
> lost in limbo state after re-establishing connection.
> See below for how I manage to reproduce this over and over.
> *Prerequisites*
> Have a Kafka cluster with 3 instances running version 2.8.1. Figure out which 
> one is the Controller. I'm using Kafkacat 1.5.0 and get this info using the 
> `-L` flag.
> Zookeeper runs with 3 instances on version 3.5.9. Figure out which one is 
> leader by checking
>  
> {code:java}
> echo stat | nc -v localhost 2181
> {code}
>  
>  
> *Reproduce*
> 1. Stop the leader Zookeeper service.
> 2. Watch the logs of the Kafka Controller and ensure that it reconnects and 
> registers again.
>  
> {code:java}
> Oct 27 09:13:08 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:08,882] INFO 
> Unable to read additional data from server sessionid 0x1f2a12870003, likely 
> server has closed socket, closing socket connection and attempting reconnect 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] WARN 
> SASL configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/opt/kafka/config/kafka_server_jaas.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it. (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] INFO 
> Opening socket connection to server 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] ERROR 
> [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,549] INFO 
> Socket connection 

[GitHub] [kafka] junrao commented on a change in pull request #11487: KAFKA-13445: Add ECDSA test for JWT validation

2021-11-15 Thread GitBox


junrao commented on a change in pull request #11487:
URL: https://github.com/apache/kafka/pull/11487#discussion_r749554644



##
File path: 
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenBuilder.java
##
@@ -49,22 +49,24 @@
 
 private Long expirationSeconds;
 
-private RsaJsonWebKey jwk;
+private PublicJsonWebKey jwk;
 
-public AccessTokenBuilder() throws JoseException {
+public AccessTokenBuilder() {
 this(new MockTime());
 }
 
-public AccessTokenBuilder(Time time) throws JoseException {
+public AccessTokenBuilder(Time time) {
 this.issuedAtSeconds = time.milliseconds() / 1000;
 this.expirationSeconds = this.issuedAtSeconds + 60;
-this.jwk = createJwk();
 }
 
-public static RsaJsonWebKey createJwk() throws JoseException {
-RsaJsonWebKey jwk = RsaJwkGenerator.generateJwk(2048);
-jwk.setKeyId("key-1");
-return jwk;
+public String alg() {
+return alg;
+}
+
+public AccessTokenBuilder alg(String alg) {

Review comment:
   While you are at this, a few other methods like audience() and 
subjectClaimName seem never used. Is that expected?

##
File path: 
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidatorTest.java
##
@@ -38,14 +40,35 @@ protected AccessTokenValidator 
createAccessTokenValidator(AccessTokenBuilder bui
 }
 
 @Test
-public void testBasicEncryption() throws Exception {
-AccessTokenBuilder builder = new AccessTokenBuilder();
+public void testRsaEncryptionAlgorithm() throws Exception {
+PublicJsonWebKey jwk = createRsaJwk();
+testEncryptionAlgorithm(jwk, AlgorithmIdentifiers.RSA_USING_SHA256);
+}
+
+@Test
+public void testEcdsaEncryptionAlgorithm() throws Exception {
+PublicJsonWebKey jwk = createEcJwk();
+testEncryptionAlgorithm(jwk, 
AlgorithmIdentifiers.ECDSA_USING_P256_CURVE_AND_SHA256);
+}
+
+@Test
+public void testInvalidEncryptionAlgorithm() throws Exception {
+PublicJsonWebKey jwk = createRsaJwk();
+
+assertThrowsWithMessage(InvalidAlgorithmException.class,
+() -> testEncryptionAlgorithm(jwk, "fake"),
+"fake is an unknown, unsupported or unavailable alg algorithm");
+}
+
+private void testEncryptionAlgorithm(PublicJsonWebKey jwk, String alg) 
throws Exception {
+AccessTokenBuilder builder = new 
AccessTokenBuilder().jwk(jwk).alg(alg);
 AccessTokenValidator validator = createAccessTokenValidator(builder);
 
 JsonWebSignature jws = new JsonWebSignature();
 jws.setKey(builder.jwk().getPrivateKey());
 jws.setKeyIdHeaderValue(builder.jwk().getKeyId());
-jws.setAlgorithmHeaderValue(AlgorithmIdentifiers.RSA_USING_SHA256);
+jws.setAlgorithmHeaderValue(alg);
+jws.setPayload("{}");

Review comment:
   Hmm, how is jws being used?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mkandaswamy commented on pull request #11494: MINOR: Improve KafkaStreamsTest: testInitializesAndDestroysMetricsReporters

2021-11-15 Thread GitBox


mkandaswamy commented on pull request #11494:
URL: https://github.com/apache/kafka/pull/11494#issuecomment-969156394


   Test failures: 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic
 and  kafka.admin.LeaderElectionCommandTest.[1] doesn't seem to be related to 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11486: KAFKA-13444: Fix OAuthCompatibilityTool help and add SSL options

2021-11-15 Thread GitBox


junrao commented on a change in pull request #11486:
URL: https://github.com/apache/kafka/pull/11486#discussion_r749549502



##
File path: 
tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
##
@@ -208,71 +170,217 @@ public static void main(String[] args) {
 
 if (t instanceof ConfigException) {
 System.out.printf("%n");
-parser.printHelp();
+argsHandler.parser.printHelp();
 }
 
 Exit.exit(1);
 }
 }
 
-private static Map getConfigs(Namespace namespace) {
-Map c = new HashMap<>();
-maybeAddInt(namespace, "connectTimeoutMs", c, 
SASL_LOGIN_CONNECT_TIMEOUT_MS);
-maybeAddInt(namespace, "readTimeoutMs", c, SASL_LOGIN_READ_TIMEOUT_MS);
-maybeAddLong(namespace, "loginRetryBackoffMs", c, 
SASL_LOGIN_RETRY_BACKOFF_MS);
-maybeAddLong(namespace, "loginRetryBackoffMax", c, 
SASL_LOGIN_RETRY_BACKOFF_MAX_MS);
-maybeAddString(namespace, "scopeClaimName", c, 
SASL_OAUTHBEARER_SCOPE_CLAIM_NAME);
-maybeAddString(namespace, "subClaimName", c, 
SASL_OAUTHBEARER_SUB_CLAIM_NAME);
-maybeAddString(namespace, "tokenEndpointUrl", c, 
SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL);
-maybeAddString(namespace, "jwksEndpointUrl", c, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_URL);
-maybeAddLong(namespace, "jwksEndpdointRefreshMs", c, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS);
-maybeAddLong(namespace, "jwksEndpdointRetryBackoffMaxMs", c, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS);
-maybeAddLong(namespace, "jwksEndpdointRetryBackoffMs", c, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS);
-maybeAddInt(namespace, "clockSkewSeconds", c, 
SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS);
-maybeAddStringList(namespace, "expectedAudience", c, 
SASL_OAUTHBEARER_EXPECTED_AUDIENCE);
-maybeAddString(namespace, "expectedIssuer", c, 
SASL_OAUTHBEARER_EXPECTED_ISSUER);
-
-// This here is going to fill in all the defaults for the values we 
don't specify...
-ConfigDef cd = new ConfigDef();
-SaslConfigs.addClientSaslSupport(cd);
-AbstractConfig config = new AbstractConfig(cd, c);
-return config.values();
-}
 
-private static void maybeAddInt(Namespace namespace, String namespaceKey, 
Map configs, String configsKey) {
-Integer value = namespace.getInt(namespaceKey);
+private static class ArgsHandler {
 
-if (value != null)
-configs.put(configsKey, value);
-}
+private static final String DESCRIPTION = String.format(
+"This tool is used to verify OAuth/OIDC provider 
compatibility.%n%n" +
+"Run the following script to determine the configuration 
options:%n%n" +
+"./bin/kafka-run-class.sh %s --help",
+OAuthCompatibilityTool.class.getName());
 
-private static void maybeAddLong(Namespace namespace, String namespaceKey, 
Map configs, String configsKey) {
-Long value = namespace.getLong(namespaceKey);
+private final ArgumentParser parser;
 
-if (value != null)
-configs.put(configsKey, value);
-}
+private ArgsHandler() {
+this.parser = ArgumentParsers
+.newArgumentParser("oauth-compatibility-tool")
+.defaultHelp(true)
+.description(DESCRIPTION);
+}
 
-private static void maybeAddString(Namespace namespace, String 
namespaceKey, Map configs, String configsKey) {
-String value = namespace.getString(namespaceKey);
+private Namespace parseArgs(String[] args) throws 
ArgumentParserException {
+// SASL/OAuth
+addArgument(SASL_LOGIN_CONNECT_TIMEOUT_MS, 
SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC, Integer.class);
+addArgument(SASL_LOGIN_READ_TIMEOUT_MS, 
SASL_LOGIN_READ_TIMEOUT_MS_DOC, Integer.class);
+addArgument(SASL_LOGIN_RETRY_BACKOFF_MAX_MS, 
SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC, Long.class);
+addArgument(SASL_LOGIN_RETRY_BACKOFF_MS, 
SASL_LOGIN_RETRY_BACKOFF_MS_DOC, Long.class);
+addArgument(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, 
SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC, Integer.class);
+addArgument(SASL_OAUTHBEARER_EXPECTED_AUDIENCE, 
SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
+.action(Arguments.append());
+addArgument(SASL_OAUTHBEARER_EXPECTED_ISSUER, 
SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
+addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC, Long.class);
+addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC, Long.class);
+addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, 
SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC, Long.class);
+

[jira] [Commented] (KAFKA-12226) High-throughput source tasks fail to commit offsets

2021-11-15 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17443975#comment-17443975
 ] 

Randall Hauch commented on KAFKA-12226:
---

Thanks for catching this, [~dajac]. I did miss merging this to the `3.1` branch 
-- I recall at the time looking for the branch and not seeing it. I'm in the 
process or building the branch after merging locally, so you should see this a 
bit later today.


> High-throughput source tasks fail to commit offsets
> ---
>
> Key: KAFKA-12226
> URL: https://issues.apache.org/jira/browse/KAFKA-12226
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.1.0
>
>
> The current source task thread has the following workflow:
>  # Poll messages from the source task
>  # Queue these messages to the producer and send them to Kafka asynchronously.
>  # Add the message to outstandingMessages, or if a flush is currently active, 
> outstandingMessagesBacklog
>  # When the producer completes the send of a record, remove it from 
> outstandingMessages
> The commit offsets thread has the following workflow:
>  # Wait a flat timeout for outstandingMessages to flush completely
>  # If this times out, add all of the outstandingMessagesBacklog to the 
> outstandingMessages and reset
>  # If it succeeds, commit the source task offsets to the backing store.
>  # Retry the above on a fixed schedule
> If the source task is producing records quickly (faster than the producer can 
> send), then the producer will throttle the task thread by blocking in its 
> {{send}} method, waiting at most {{max.block.ms}} for space in the 
> {{buffer.memory}} to be available. This means that the number of records in 
> {{outstandingMessages}} + {{outstandingMessagesBacklog}} is proportional to 
> the size of the producer memory buffer.
> This amount of data might take more than {{offset.flush.timeout.ms}} to 
> flush, and thus the flush will never succeed while the source task is 
> rate-limited by the producer memory. This means that we may write multiple 
> hours of data to Kafka and not ever commit source offsets for the connector. 
> When the task is lost due to a worker failure, hours of data will be 
> re-processed that otherwise were successfully written to Kafka.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13455) The Apache Kafka quickstart guide does not contain any steps for running Kafka Connect

2021-11-15 Thread Kate Stanley (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kate Stanley reassigned KAFKA-13455:


Assignee: Kate Stanley

> The Apache Kafka quickstart guide does not contain any steps for running 
> Kafka Connect
> --
>
> Key: KAFKA-13455
> URL: https://issues.apache.org/jira/browse/KAFKA-13455
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 2.7.2, 2.8.1, 3.0.0
>Reporter: Kate Stanley
>Assignee: Kate Stanley
>Priority: Major
>
> The Apache Kafka quickstart guide does not contain any steps for running 
> Kafka Connect. Instead it links to the User guide, which just links back to 
> the quickstart. The steps are present in version 2.6 of the documentation, 
> but not in the latest.
> See [https://kafka.apache.org/26/documentation/#quickstart_kafkaconnect] vs 
> https://kafka.apache.org/documentation/#quickstart_kafkaconnect
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] katheris opened a new pull request #11500: KAFKA-13455: Add steps to run Kafka Connect to quickstart

2021-11-15 Thread GitBox


katheris opened a new pull request #11500:
URL: https://github.com/apache/kafka/pull/11500


   Signed-off-by: Katherine Stanley <11195226+kathe...@users.noreply.github.com>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13456) controller.listener.names is required for all KRaft nodes, not just controllers

2021-11-15 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-13456:
-

 Summary: controller.listener.names is required for all KRaft 
nodes, not just controllers
 Key: KAFKA-13456
 URL: https://issues.apache.org/jira/browse/KAFKA-13456
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.0.0, 2.8.0, 3.1.0
Reporter: Ron Dagostino
Assignee: Ron Dagostino


The controller.listener.names config is currently checked for existence when 
the process.roles contains the controller role (i.e. process.roles=controller 
or process.roles=broker,contrtoller); it is not checked for existence when 
process.roles=broker.  However, KRaft brokers have to talk to KRaft 
controllers, of course, and they do so by taking the first entry in the 
controller.listener.names list.  Therefore, controller.listener.names is 
required in KRaft mode even when process.roles.broker.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13455) The Apache Kafka quickstart guide does not contain any steps for running Kafka Connect

2021-11-15 Thread Kate Stanley (Jira)
Kate Stanley created KAFKA-13455:


 Summary: The Apache Kafka quickstart guide does not contain any 
steps for running Kafka Connect
 Key: KAFKA-13455
 URL: https://issues.apache.org/jira/browse/KAFKA-13455
 Project: Kafka
  Issue Type: Improvement
  Components: documentation
Affects Versions: 3.0.0, 2.8.1, 2.7.2
Reporter: Kate Stanley


The Apache Kafka quickstart guide does not contain any steps for running Kafka 
Connect. Instead it links to the User guide, which just links back to the 
quickstart. The steps are present in version 2.6 of the documentation, but not 
in the latest.

See [https://kafka.apache.org/26/documentation/#quickstart_kafkaconnect] vs 
https://kafka.apache.org/documentation/#quickstart_kafkaconnect

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] cadonna commented on a change in pull request #11499: [WIP] KAFKA-10199: Add interface for state updater

2021-11-15 Thread GitBox


cadonna commented on a change in pull request #11499:
URL: https://github.com/apache/kafka/pull/11499#discussion_r749486443



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
##
@@ -0,0 +1,50 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.time.Duration;
+import java.util.List;
+
+public interface StateUpdater {

Review comment:
   An implementation of the state updater is passed to the task manager in 
its constructor. Since the threading model of the state updater is encapsulated 
and one can pass the same state updater to multiple task managers there is no 
fixed 1:1 relationship between a restoration thread and a stream thread.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11499: [WIP] KAFKA-10199: Add interface for state updater

2021-11-15 Thread GitBox


cadonna commented on a change in pull request #11499:
URL: https://github.com/apache/kafka/pull/11499#discussion_r749480160



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
##
@@ -0,0 +1,50 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.time.Duration;
+import java.util.List;
+
+public interface StateUpdater {
+
+/**
+ * Adds a task (active or standby) to the state updater.
+ *
+ * The state of the task will be updated.
+ *
+ * @param task task
+ */
+void add(final Task task);
+
+/**
+ * Removes a task (active and standby) from the state updater.
+ *
+ * A task is removed from the state updater irrespective of whether its 
state is up-to-date or not.
+ *
+ * @param task tasks to remove
+ */
+void remove(final Task task);
+
+/**
+ * Gets restored active tasks from state restoration/update
+ *
+ * @param timeout duration how long the calling thread should wait for 
restored active tasks
+ *
+ * @return list of active tasks with up-to-date states
+ */
+List getRestoredActiveTasks(final Duration timeout);
+
+/**
+ * Gets a list of tasks that failed during restoration.
+ *
+ * The exception that caused the failure can be retrieved by {@link 
Task#getException()}
+ *
+ * @return failed tasks
+ */
+List getFailedTasks();

Review comment:
   If a task fails during restoration it is returned by this method. The 
exception that caused the failure can be retrieved from the task itself with 
`Task#getException()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11499: [WIP] KAFKA-10199: Add interface for state updater

2021-11-15 Thread GitBox


cadonna commented on a change in pull request #11499:
URL: https://github.com/apache/kafka/pull/11499#discussion_r749478922



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
##
@@ -0,0 +1,50 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.time.Duration;
+import java.util.List;
+
+public interface StateUpdater {
+
+/**
+ * Adds a task (active or standby) to the state updater.
+ *
+ * The state of the task will be updated.
+ *
+ * @param task task
+ */
+void add(final Task task);
+
+/**
+ * Removes a task (active and standby) from the state updater.
+ *
+ * A task is removed from the state updater irrespective of whether its 
state is up-to-date or not.
+ *
+ * @param task tasks to remove
+ */
+void remove(final Task task);
+
+/**
+ * Gets restored active tasks from state restoration/update
+ *
+ * @param timeout duration how long the calling thread should wait for 
restored active tasks
+ *
+ * @return list of active tasks with up-to-date states
+ */
+List getRestoredActiveTasks(final Duration timeout);

Review comment:
   When the restoration of an active task is completed, the task is 
returned call to this method. The calling thread can decide how long to wait 
for the next restored active tasks. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11499: [WIP] KAFKA-10199: Add interface for state updater

2021-11-15 Thread GitBox


cadonna commented on a change in pull request #11499:
URL: https://github.com/apache/kafka/pull/11499#discussion_r749476568



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
##
@@ -0,0 +1,50 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.time.Duration;
+import java.util.List;
+
+public interface StateUpdater {
+
+/**
+ * Adds a task (active or standby) to the state updater.
+ *
+ * The state of the task will be updated.
+ *
+ * @param task task
+ */
+void add(final Task task);
+
+/**
+ * Removes a task (active and standby) from the state updater.
+ *
+ * A task is removed from the state updater irrespective of whether its 
state is up-to-date or not.
+ *
+ * @param task tasks to remove
+ */
+void remove(final Task task);

Review comment:
   Each time a task is suspended in state `RESTORING`, we need to remove it 
from the state updater.  

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
##
@@ -0,0 +1,50 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.time.Duration;
+import java.util.List;
+
+public interface StateUpdater {
+
+/**
+ * Adds a task (active or standby) to the state updater.
+ *
+ * The state of the task will be updated.
+ *
+ * @param task task
+ */
+void add(final Task task);

Review comment:
   Each time an active or standby task transits to state `RESTORING` it 
needs to be added to the state updater.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11499: [WIP] KAFKA-10199: Add interface for state updater

2021-11-15 Thread GitBox


cadonna commented on a change in pull request #11499:
URL: https://github.com/apache/kafka/pull/11499#discussion_r749468726



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
##
@@ -0,0 +1,50 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.time.Duration;
+import java.util.List;
+
+public interface StateUpdater {
+
+/**
+ * Adds a task (active or standby) to the state updater.
+ *
+ * The state of the task will be updated.
+ *
+ * @param task task
+ */
+void add(final Task task);

Review comment:
   Each time an active or standby task transits to state `RESTORING` it 
should be added to the state updater.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #11499: [WIP] KAFKA-10199: Add interface for state updater

2021-11-15 Thread GitBox


cadonna opened a new pull request #11499:
URL: https://github.com/apache/kafka/pull/11499


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #11466: :memo: Update Docs

2021-11-15 Thread GitBox


mimaison merged pull request #11466:
URL: https://github.com/apache/kafka/pull/11466


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11467: MINOR: fix java doc in kafkaProducer

2021-11-15 Thread GitBox


mimaison commented on a change in pull request #11467:
URL: https://github.com/apache/kafka/pull/11467#discussion_r749454721



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -117,15 +115,16 @@
  * as well as a background I/O thread that is responsible for turning these 
records into requests and transmitting them
  * to the cluster. Failure to close the producer after use will leak these 
resources.
  * 
- * The {@link #send(ProducerRecord) send()} method is asynchronous. When 
called it adds the record to a buffer of pending record sends
+ * The {@link #send(ProducerRecord) send()} method is asynchronous. When 
called, it adds the record to a buffer of pending sending record
  * and immediately returns. This allows the producer to batch together 
individual records for efficiency.
  * 
- * The acks config controls the criteria under which requests are 
considered complete. The "all" setting
- * we have specified will result in blocking on the full commit of the record, 
the slowest but most durable setting.
+ * The acks config controls the criteria under which requests are 
considered complete. The default "all" setting

Review comment:
   `The default "all" setting` -> `The default setting "all"`

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -117,15 +115,16 @@
  * as well as a background I/O thread that is responsible for turning these 
records into requests and transmitting them
  * to the cluster. Failure to close the producer after use will leak these 
resources.
  * 
- * The {@link #send(ProducerRecord) send()} method is asynchronous. When 
called it adds the record to a buffer of pending record sends
+ * The {@link #send(ProducerRecord) send()} method is asynchronous. When 
called, it adds the record to a buffer of pending sending record
  * and immediately returns. This allows the producer to batch together 
individual records for efficiency.
  * 
- * The acks config controls the criteria under which requests are 
considered complete. The "all" setting
- * we have specified will result in blocking on the full commit of the record, 
the slowest but most durable setting.
+ * The acks config controls the criteria under which requests are 
considered complete. The default "all" setting
+ * will result in blocking on the full commit of the record, the slowest but 
most durable setting.
  * 
- * If the request fails, the producer can automatically retry, though since we 
have specified retries
- * as 0 it won't. Enabling retries also opens up the possibility of duplicates 
(see the documentation on
- * http://kafka.apache.org/documentation.html#semantics;>message 
delivery semantics for details).
+ * If the request fails, the producer can automatically retry. We have 
specified retries default as Integer.MAX_VALUE, and

Review comment:
   `We have specified retries default as Integer.MAX_VALUE, 
and` -> `The retries setting defaults to 
Integer.MAX_VALUE, and`

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -117,15 +115,16 @@
  * as well as a background I/O thread that is responsible for turning these 
records into requests and transmitting them
  * to the cluster. Failure to close the producer after use will leak these 
resources.
  * 
- * The {@link #send(ProducerRecord) send()} method is asynchronous. When 
called it adds the record to a buffer of pending record sends
+ * The {@link #send(ProducerRecord) send()} method is asynchronous. When 
called, it adds the record to a buffer of pending sending record

Review comment:
   I think `record sends` reads better

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -156,7 +155,7 @@
  * to multiple partitions (and topics!) atomically.
  * 
  * 
- * To enable idempotence, the enable.idempotence configuration 
must be set to true. If set, the
+ * In Kafka 3.0, the enable.idempotence configuration is default 
to true. When enabling idempotence,

Review comment:
   `In Kafka 3.0,` -> `From Kafka 3.0,`
   `is default to true` -> `defaults to true`

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -117,15 +115,16 @@
  * as well as a background I/O thread that is responsible for turning these 
records into requests and transmitting them
  * to the cluster. Failure to close the producer after use will leak these 
resources.
  * 
- * The {@link #send(ProducerRecord) send()} method is asynchronous. When 
called it adds the record to a buffer of pending record sends
+ * The {@link #send(ProducerRecord) send()} method is asynchronous. When 
called, it adds the record to a buffer of pending sending record
  * and immediately returns. This allows the producer to batch together 
individual records for efficiency.
  * 
- * The acks 

[jira] [Commented] (KAFKA-13294) Upgrade Netty to 4.1.68 for CVE fixes

2021-11-15 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17443906#comment-17443906
 ] 

Mickael Maison commented on KAFKA-13294:


Thanks [~dongjin] I've updated the fix version field. The fix is going to be in 
Kafka 3.1.0

> Upgrade Netty to 4.1.68 for CVE fixes
> -
>
> Key: KAFKA-13294
> URL: https://issues.apache.org/jira/browse/KAFKA-13294
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Utkarsh Khare
>Assignee: Dongjin Lee
>Priority: Minor
> Fix For: 3.1.0
>
>
> netty has reported a couple of CVEs regarding the usage of Bzip2Decoder and 
> SnappyFrameDecoder. 
> Reference :
> [CVE-2021-37136 - 
> https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv|https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv]
> [CVE-2021-37137 - 
> https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363|https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363]
>  
> Can we upgrade Netty to version 4.1.68.Final to fix this ? 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13294) Upgrade Netty to 4.1.68 for CVE fixes

2021-11-15 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-13294:
---
Fix Version/s: 3.1.0

> Upgrade Netty to 4.1.68 for CVE fixes
> -
>
> Key: KAFKA-13294
> URL: https://issues.apache.org/jira/browse/KAFKA-13294
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Utkarsh Khare
>Assignee: Dongjin Lee
>Priority: Minor
> Fix For: 3.1.0
>
>
> netty has reported a couple of CVEs regarding the usage of Bzip2Decoder and 
> SnappyFrameDecoder. 
> Reference :
> [CVE-2021-37136 - 
> https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv|https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv]
> [CVE-2021-37137 - 
> https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363|https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363]
>  
> Can we upgrade Netty to version 4.1.68.Final to fix this ? 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13294) Upgrade Netty to 4.1.68 for CVE fixes

2021-11-15 Thread Dominique Mongelli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17443874#comment-17443874
 ] 

Dominique Mongelli edited comment on KAFKA-13294 at 11/15/21, 3:01 PM:
---

Hi, what will be the fix version for this CVE ?


was (Author: dominique):
Hi, what will be the target version of this fix ?

> Upgrade Netty to 4.1.68 for CVE fixes
> -
>
> Key: KAFKA-13294
> URL: https://issues.apache.org/jira/browse/KAFKA-13294
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Utkarsh Khare
>Assignee: Dongjin Lee
>Priority: Minor
>
> netty has reported a couple of CVEs regarding the usage of Bzip2Decoder and 
> SnappyFrameDecoder. 
> Reference :
> [CVE-2021-37136 - 
> https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv|https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv]
> [CVE-2021-37137 - 
> https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363|https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363]
>  
> Can we upgrade Netty to version 4.1.68.Final to fix this ? 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13294) Upgrade Netty to 4.1.68 for CVE fixes

2021-11-15 Thread Dominique Mongelli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17443874#comment-17443874
 ] 

Dominique Mongelli commented on KAFKA-13294:


Hi, what will be the target version of this fix ?

> Upgrade Netty to 4.1.68 for CVE fixes
> -
>
> Key: KAFKA-13294
> URL: https://issues.apache.org/jira/browse/KAFKA-13294
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Utkarsh Khare
>Assignee: Dongjin Lee
>Priority: Minor
>
> netty has reported a couple of CVEs regarding the usage of Bzip2Decoder and 
> SnappyFrameDecoder. 
> Reference :
> [CVE-2021-37136 - 
> https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv|https://github.com/netty/netty/security/advisories/GHSA-grg4-wf29-r9vv]
> [CVE-2021-37137 - 
> https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363|https://github.com/netty/netty/security/advisories/GHSA-9vjp-v76f-g363]
>  
> Can we upgrade Netty to version 4.1.68.Final to fix this ? 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13140) KRaft brokers do not expose kafka.controller metrics, breaking backwards compatibility

2021-11-15 Thread Ron Dagostino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17443850#comment-17443850
 ] 

Ron Dagostino edited comment on KAFKA-13140 at 11/15/21, 2:25 PM:
--

No longer applies due to the adoption of [KIP-771: KRaft brokers without the 
"controller" role should not expose controller 
metrics|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=188743985]


was (Author: rndgstn):
No longer applies due to the adoption of [KIP-7761: KRaft brokers without the 
"controller" role should not expose controller 
metrics|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=188743985]

> KRaft brokers do not expose kafka.controller metrics, breaking backwards 
> compatibility
> --
>
> Key: KAFKA-13140
> URL: https://issues.apache.org/jira/browse/KAFKA-13140
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 2.8.0, 3.0.0
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
> Fix For: 3.2.0
>
>
> The following controller metrics are exposed on every broker in a 
> ZooKeeper-based (i.e. non-KRaft) cluster regardless of whether the broker is 
> the active controller or not, but these metrics are not exposed on KRaft 
> nodes that have process.roles=broker (i.e. KRaft nodes that do not implement 
> the controller role).  For backwards compatibility, KRaft nodes that are just 
> brokers should expose these metrics with values all equal to 0: just like 
> ZooKeeper-based brokers do when they are not the active controller.
> kafka.controller:type=KafkaController,name=ActiveControllerCount
> kafka.controller:type=KafkaController,name=GlobalTopicCount
> kafka.controller:type=KafkaController,name=GlobalPartitionCount
> kafka.controller:type=KafkaController,name=OfflinePartitionsCount
> kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] rondagostino commented on pull request #11133: KAFKA-13140: KRaft brokers do not expose kafka.controller metrics

2021-11-15 Thread GitBox


rondagostino commented on pull request #11133:
URL: https://github.com/apache/kafka/pull/11133#issuecomment-968959242


   Closing as [KIP-771: KRaft brokers without the "controller" role should not 
expose controller 
metrics](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=188743985)
 was adopted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino closed pull request #11133: KAFKA-13140: KRaft brokers do not expose kafka.controller metrics

2021-11-15 Thread GitBox


rondagostino closed pull request #11133:
URL: https://github.com/apache/kafka/pull/11133


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] GuoPhilipse commented on pull request #11483: KAFKA-13441: improve upgrade doc

2021-11-15 Thread GitBox


GuoPhilipse commented on pull request #11483:
URL: https://github.com/apache/kafka/pull/11483#issuecomment-968928642


   > For tiny fixes like this you don't need to open a JIRA, just submit a PR. 
Thanks
   
   Thanks @mimaison for your kind advice, will pay more attention in future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] edoardocomar commented on pull request #11476: KAFKA-13407: Always start controller when broker wins election

2021-11-15 Thread GitBox


edoardocomar commented on pull request #11476:
URL: https://github.com/apache/kafka/pull/11476#issuecomment-968925912


   Hi @mimaison !
   so we managed to reproduce using 3.0 in our cloud clusters - not locally on 
a laptop with 3ZK and 3 Kafkas. So latencies may play a part ?
   
   As for doing the fix in trunk, we attempted to reproduce it using the 
ducktape on docker Kafka system tests
   but we had no luck
   https://github.com/edoardocomar/kafka/tree/controller_system_test
   
   any suggestions are welcome. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11472: TRIVIAL: Remove unused parameters, exceptions, comments, etc.

2021-11-15 Thread GitBox


mimaison commented on pull request #11472:
URL: https://github.com/apache/kafka/pull/11472#issuecomment-968910011


   Thanks @dongjinleekr for the cleanup. It looks like you missed some changes 
in `DumpLogSegments.scala`:
   ```
   [Error] 
/Users/mickael/github/kafka/core/src/main/scala/kafka/tools/DumpLogSegments.scala:62:
 too many arguments (found 7, expected 6) for method dumpLog: (file: 
java.io.File, printContents: Boolean, nonConsecutivePairsForLogFilesMap: 
scala.collection.mutable.Map[String,List[(Long, Long)]], isDeepIteration: 
Boolean, parser: kafka.tools.DumpLogSegments.MessageParser[_, _], 
skipRecordMetadata: Boolean): Unit
   [Error] 
/Users/mickael/github/kafka/core/src/main/scala/kafka/tools/DumpLogSegments.scala:66:
 too many arguments (found 5, expected 4) for method dumpTimeIndex: (file: 
java.io.File, indexSanityOnly: Boolean, verifyOnly: Boolean, 
timeIndexDumpErrors: kafka.tools.DumpLogSegments.TimeIndexDumpErrors): Unit
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11483: KAFKA-13441: improve upgrade doc

2021-11-15 Thread GitBox


mimaison commented on pull request #11483:
URL: https://github.com/apache/kafka/pull/11483#issuecomment-968903648


   For tiny fixes like this you don't need to open a JIRA, just submit a PR. 
Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13441) improve upgrade doc

2021-11-15 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-13441:
--

Assignee: guo

> improve upgrade doc
> ---
>
> Key: KAFKA-13441
> URL: https://issues.apache.org/jira/browse/KAFKA-13441
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 3.0.0
>Reporter: guo
>Assignee: guo
>Priority: Minor
> Fix For: 3.2.0
>
>
> improve upgrade doc



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison merged pull request #11483: KAFKA-13441: improve upgrade doc

2021-11-15 Thread GitBox


mimaison merged pull request #11483:
URL: https://github.com/apache/kafka/pull/11483


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13441) improve upgrade doc

2021-11-15 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-13441.

Fix Version/s: 3.2.0
   Resolution: Fixed

> improve upgrade doc
> ---
>
> Key: KAFKA-13441
> URL: https://issues.apache.org/jira/browse/KAFKA-13441
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 3.0.0
>Reporter: guo
>Priority: Minor
> Fix For: 3.2.0
>
>
> improve upgrade doc



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison commented on pull request #11476: KAFKA-13407: Always start controller when broker wins election

2021-11-15 Thread GitBox


mimaison commented on pull request #11476:
URL: https://github.com/apache/kafka/pull/11476#issuecomment-968895925


   Is this also an issue on trunk? If so let's fix trunk first then we can 
backport it to older branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11491: KAFKA-13442: REST API endpoint for fetching a connector's config def

2021-11-15 Thread GitBox


mimaison commented on pull request #11491:
URL: https://github.com/apache/kafka/pull/11491#issuecomment-968892334


   That looks similar to the KIP I proposed a few weeks ago: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+API+to+retrieve+connector+configuration+definitions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ocadaruma commented on a change in pull request #11422: KAFKA-9648: Add configuration to adjust listen backlog size for Acceptor

2021-11-15 Thread GitBox


ocadaruma commented on a change in pull request #11422:
URL: https://github.com/apache/kafka/pull/11422#discussion_r749271976



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -660,7 +662,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
   serverChannel.socket().setReceiveBufferSize(recvBufferSize)
 
 try {
-  serverChannel.socket.bind(socketAddress)
+  serverChannel.socket.bind(socketAddress, listenBacklogSize)

Review comment:
   @dajac Added a test. Please take a look.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13407) Kafka controller out of service after ZK leader restart

2021-11-15 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17443781#comment-17443781
 ] 

Edoardo Comar commented on KAFKA-13407:
---

[~vinsonZhang] are you suggesting loss of connection with ZK should not cause a 
controller to resign? 

> Kafka controller out of service after ZK leader restart
> ---
>
> Key: KAFKA-13407
> URL: https://issues.apache.org/jira/browse/KAFKA-13407
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0, 2.8.1
> Environment: Ubuntu 20.04
>Reporter: Daniel
>Priority: Critical
>
> When the Zookeeper leader disappears, a new instance becomes the leader, the 
> instances need to reconnect to Zookeeper, but the Kafka "Controller" gets 
> lost in limbo state after re-establishing connection.
> See below for how I manage to reproduce this over and over.
> *Prerequisites*
> Have a Kafka cluster with 3 instances running version 2.8.1. Figure out which 
> one is the Controller. I'm using Kafkacat 1.5.0 and get this info using the 
> `-L` flag.
> Zookeeper runs with 3 instances on version 3.5.9. Figure out which one is 
> leader by checking
>  
> {code:java}
> echo stat | nc -v localhost 2181
> {code}
>  
>  
> *Reproduce*
> 1. Stop the leader Zookeeper service.
> 2. Watch the logs of the Kafka Controller and ensure that it reconnects and 
> registers again.
>  
> {code:java}
> Oct 27 09:13:08 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:08,882] INFO 
> Unable to read additional data from server sessionid 0x1f2a12870003, likely 
> server has closed socket, closing socket connection and attempting reconnect 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] WARN 
> SASL configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/opt/kafka/config/kafka_server_jaas.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it. (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] INFO 
> Opening socket connection to server 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] ERROR 
> [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,549] INFO 
> Socket connection established, initiating session, client: 
> /10.10.85.215:39338, server: 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,569] INFO 
> Session establishment complete on server 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181, 
> sessionid = 0x1f2a12870003, negotiated timeout = 18000 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,548] INFO 
> [ZooKeeperClient Kafka server] Reinitializing due to auth failure. 
> (kafka.zookeeper.ZooKeeperClient)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,550] INFO 
> [PartitionStateMachine controllerId=1003] Stopped partition state machine 
> (kafka.controller.ZkPartitionStateMachine)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,550] INFO 
> [ReplicaStateMachine controllerId=1003] Stopped replica state machine 
> (kafka.controller.ZkReplicaStateMachine)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Shutting down 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Stopped 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Shutdown completed 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,552] INFO 
> [RequestSendThread controllerId=1003] Shutting down 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,552] INFO 
> [RequestSendThread controllerId=1003] Stopped 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,552] INFO 
> [RequestSendThread controllerId=1003] Shutdown completed 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,554] INFO 
> [RequestSendThread 

[jira] [Comment Edited] (KAFKA-13407) Kafka controller out of service after ZK leader restart

2021-11-15 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17443780#comment-17443780
 ] 

Edoardo Comar edited comment on KAFKA-13407 at 11/15/21, 11:44 AM:
---

Hi [~vinsonZhang] 
the zookeeper "Auth failed" error in the logs appears every time authentication 
is not configured for ZK "SASL configuration failed" then the zookeeper clients 
falls back to unauthenticated connection 
I don't see it as the cause of resignation


was (Author: ecomar):
[~vinsonZhang]the zookeeper "Auth failed" error in the logs appears every time 
authentication is not configured for ZK "SASL configuration failed" then the 
zookeeper clients falls back to unauthenticated connection 
I don't see it as the cause of resignation

> Kafka controller out of service after ZK leader restart
> ---
>
> Key: KAFKA-13407
> URL: https://issues.apache.org/jira/browse/KAFKA-13407
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0, 2.8.1
> Environment: Ubuntu 20.04
>Reporter: Daniel
>Priority: Critical
>
> When the Zookeeper leader disappears, a new instance becomes the leader, the 
> instances need to reconnect to Zookeeper, but the Kafka "Controller" gets 
> lost in limbo state after re-establishing connection.
> See below for how I manage to reproduce this over and over.
> *Prerequisites*
> Have a Kafka cluster with 3 instances running version 2.8.1. Figure out which 
> one is the Controller. I'm using Kafkacat 1.5.0 and get this info using the 
> `-L` flag.
> Zookeeper runs with 3 instances on version 3.5.9. Figure out which one is 
> leader by checking
>  
> {code:java}
> echo stat | nc -v localhost 2181
> {code}
>  
>  
> *Reproduce*
> 1. Stop the leader Zookeeper service.
> 2. Watch the logs of the Kafka Controller and ensure that it reconnects and 
> registers again.
>  
> {code:java}
> Oct 27 09:13:08 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:08,882] INFO 
> Unable to read additional data from server sessionid 0x1f2a12870003, likely 
> server has closed socket, closing socket connection and attempting reconnect 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] WARN 
> SASL configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/opt/kafka/config/kafka_server_jaas.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it. (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] INFO 
> Opening socket connection to server 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] ERROR 
> [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,549] INFO 
> Socket connection established, initiating session, client: 
> /10.10.85.215:39338, server: 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,569] INFO 
> Session establishment complete on server 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181, 
> sessionid = 0x1f2a12870003, negotiated timeout = 18000 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,548] INFO 
> [ZooKeeperClient Kafka server] Reinitializing due to auth failure. 
> (kafka.zookeeper.ZooKeeperClient)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,550] INFO 
> [PartitionStateMachine controllerId=1003] Stopped partition state machine 
> (kafka.controller.ZkPartitionStateMachine)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,550] INFO 
> [ReplicaStateMachine controllerId=1003] Stopped replica state machine 
> (kafka.controller.ZkReplicaStateMachine)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Shutting down 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Stopped 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Shutdown completed 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,552] INFO 
> [RequestSendThread controllerId=1003] Shutting down 
> 

[jira] [Commented] (KAFKA-13407) Kafka controller out of service after ZK leader restart

2021-11-15 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17443780#comment-17443780
 ] 

Edoardo Comar commented on KAFKA-13407:
---

[~vinsonZhang]the zookeeper "Auth failed" error in the logs appears every time 
authentication is not configured for ZK "SASL configuration failed" then the 
zookeeper clients falls back to unauthenticated connection 
I don't see it as the cause of resignation

> Kafka controller out of service after ZK leader restart
> ---
>
> Key: KAFKA-13407
> URL: https://issues.apache.org/jira/browse/KAFKA-13407
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0, 2.8.1
> Environment: Ubuntu 20.04
>Reporter: Daniel
>Priority: Critical
>
> When the Zookeeper leader disappears, a new instance becomes the leader, the 
> instances need to reconnect to Zookeeper, but the Kafka "Controller" gets 
> lost in limbo state after re-establishing connection.
> See below for how I manage to reproduce this over and over.
> *Prerequisites*
> Have a Kafka cluster with 3 instances running version 2.8.1. Figure out which 
> one is the Controller. I'm using Kafkacat 1.5.0 and get this info using the 
> `-L` flag.
> Zookeeper runs with 3 instances on version 3.5.9. Figure out which one is 
> leader by checking
>  
> {code:java}
> echo stat | nc -v localhost 2181
> {code}
>  
>  
> *Reproduce*
> 1. Stop the leader Zookeeper service.
> 2. Watch the logs of the Kafka Controller and ensure that it reconnects and 
> registers again.
>  
> {code:java}
> Oct 27 09:13:08 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:08,882] INFO 
> Unable to read additional data from server sessionid 0x1f2a12870003, likely 
> server has closed socket, closing socket connection and attempting reconnect 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] WARN 
> SASL configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/opt/kafka/config/kafka_server_jaas.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it. (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] INFO 
> Opening socket connection to server 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] ERROR 
> [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,549] INFO 
> Socket connection established, initiating session, client: 
> /10.10.85.215:39338, server: 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,569] INFO 
> Session establishment complete on server 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181, 
> sessionid = 0x1f2a12870003, negotiated timeout = 18000 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,548] INFO 
> [ZooKeeperClient Kafka server] Reinitializing due to auth failure. 
> (kafka.zookeeper.ZooKeeperClient)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,550] INFO 
> [PartitionStateMachine controllerId=1003] Stopped partition state machine 
> (kafka.controller.ZkPartitionStateMachine)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,550] INFO 
> [ReplicaStateMachine controllerId=1003] Stopped replica state machine 
> (kafka.controller.ZkReplicaStateMachine)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Shutting down 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Stopped 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Shutdown completed 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,552] INFO 
> [RequestSendThread controllerId=1003] Shutting down 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,552] INFO 
> [RequestSendThread controllerId=1003] Stopped 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,552] INFO 
> [RequestSendThread controllerId=1003] Shutdown completed 

[jira] [Comment Edited] (KAFKA-13454) kafka has duplicate configuration information log information printing during startup

2021-11-15 Thread zhangzhanchang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17443769#comment-17443769
 ] 

zhangzhanchang edited comment on KAFKA-13454 at 11/15/21, 11:21 AM:


HI,[~ijuma]  [~guozhang]  Can you help me look at this problem?


was (Author: zzccctv):
HI,[~ijuma]  Can you help me look at this problem?

> kafka has duplicate configuration information log information printing during 
> startup
> -
>
> Key: KAFKA-13454
> URL: https://issues.apache.org/jira/browse/KAFKA-13454
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.5.0, 3.0.0
>Reporter: zhangzhanchang
>Priority: Minor
>
> kafka has duplicate configuration information log information printing during 
> startup,repeated information printing will bring confusion to users.It is 
> better to add log information before and after repeating the configuration 
> information
> [2021-11-15 12:07:29,133] INFO KafkaConfig values: 
>         advertised.host.name = null
>         advertised.listeners = null
>         advertised.port = null
>         alter.config.policy.class.name = null
>         alter.log.dirs.replication.quota.window.num = 11
>         alter.log.dirs.replication.quota.window.size.seconds = 1
>         authorizer.class.name = 
>         ...
>         zookeeper.ssl.truststore.type = null
>         zookeeper.sync.time.ms = 2000
>  (kafka.server.KafkaConfig)
> [2021-11-15 12:07:29,161] INFO KafkaConfig values: 
>         advertised.host.name = null
>         advertised.listeners = null
>         advertised.port = null
>         alter.config.policy.class.name = null
>         alter.log.dirs.replication.quota.window.num = 11
>         alter.log.dirs.replication.quota.window.size.seconds = 1
>         authorizer.class.name = 
>         auto.create.topics.enable = true
>         auto.leader.rebalance.enable = true
>         background.threads = 10
>         ..
> (kafka.server.KafkaConfig)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] zzccctv commented on pull request #11496: KAFKA-13454: kafka has duplicate configuration information log information printin…

2021-11-15 Thread GitBox


zzccctv commented on pull request #11496:
URL: https://github.com/apache/kafka/pull/11496#issuecomment-968806449


   @ijuma @guozhangwang Can you help me look at this problem?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13454) kafka has duplicate configuration information log information printing during startup

2021-11-15 Thread zhangzhanchang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17443769#comment-17443769
 ] 

zhangzhanchang commented on KAFKA-13454:


HI,[~ijuma]  Can you help me look at this problem?

> kafka has duplicate configuration information log information printing during 
> startup
> -
>
> Key: KAFKA-13454
> URL: https://issues.apache.org/jira/browse/KAFKA-13454
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.5.0, 3.0.0
>Reporter: zhangzhanchang
>Priority: Minor
>
> kafka has duplicate configuration information log information printing during 
> startup,repeated information printing will bring confusion to users.It is 
> better to add log information before and after repeating the configuration 
> information
> [2021-11-15 12:07:29,133] INFO KafkaConfig values: 
>         advertised.host.name = null
>         advertised.listeners = null
>         advertised.port = null
>         alter.config.policy.class.name = null
>         alter.log.dirs.replication.quota.window.num = 11
>         alter.log.dirs.replication.quota.window.size.seconds = 1
>         authorizer.class.name = 
>         ...
>         zookeeper.ssl.truststore.type = null
>         zookeeper.sync.time.ms = 2000
>  (kafka.server.KafkaConfig)
> [2021-11-15 12:07:29,161] INFO KafkaConfig values: 
>         advertised.host.name = null
>         advertised.listeners = null
>         advertised.port = null
>         alter.config.policy.class.name = null
>         alter.log.dirs.replication.quota.window.num = 11
>         alter.log.dirs.replication.quota.window.size.seconds = 1
>         authorizer.class.name = 
>         auto.create.topics.enable = true
>         auto.leader.rebalance.enable = true
>         background.threads = 10
>         ..
> (kafka.server.KafkaConfig)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13454) kafka has duplicate configuration information log information printing during startup

2021-11-15 Thread zhangzhanchang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzhanchang updated KAFKA-13454:
---
Reviewer: Ismael Juma  (was: Guozhang Wang)

> kafka has duplicate configuration information log information printing during 
> startup
> -
>
> Key: KAFKA-13454
> URL: https://issues.apache.org/jira/browse/KAFKA-13454
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.5.0, 3.0.0
>Reporter: zhangzhanchang
>Priority: Minor
>
> kafka has duplicate configuration information log information printing during 
> startup,repeated information printing will bring confusion to users.It is 
> better to add log information before and after repeating the configuration 
> information
> [2021-11-15 12:07:29,133] INFO KafkaConfig values: 
>         advertised.host.name = null
>         advertised.listeners = null
>         advertised.port = null
>         alter.config.policy.class.name = null
>         alter.log.dirs.replication.quota.window.num = 11
>         alter.log.dirs.replication.quota.window.size.seconds = 1
>         authorizer.class.name = 
>         ...
>         zookeeper.ssl.truststore.type = null
>         zookeeper.sync.time.ms = 2000
>  (kafka.server.KafkaConfig)
> [2021-11-15 12:07:29,161] INFO KafkaConfig values: 
>         advertised.host.name = null
>         advertised.listeners = null
>         advertised.port = null
>         alter.config.policy.class.name = null
>         alter.log.dirs.replication.quota.window.num = 11
>         alter.log.dirs.replication.quota.window.size.seconds = 1
>         authorizer.class.name = 
>         auto.create.topics.enable = true
>         auto.leader.rebalance.enable = true
>         background.threads = 10
>         ..
> (kafka.server.KafkaConfig)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13454) kafka has duplicate configuration information log information printing during startup

2021-11-15 Thread zhangzhanchang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzhanchang updated KAFKA-13454:
---
Description: 
kafka has duplicate configuration information log information printing during 
startup,repeated information printing will bring confusion to users.It is 
better to add log information before and after repeating the configuration 
information

[2021-11-15 12:07:29,133] INFO KafkaConfig values: 
        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        alter.config.policy.class.name = null
        alter.log.dirs.replication.quota.window.num = 11
        alter.log.dirs.replication.quota.window.size.seconds = 1
        authorizer.class.name = 

        ...

        zookeeper.ssl.truststore.type = null
        zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)

[2021-11-15 12:07:29,161] INFO KafkaConfig values: 
        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        alter.config.policy.class.name = null
        alter.log.dirs.replication.quota.window.num = 11
        alter.log.dirs.replication.quota.window.size.seconds = 1
        authorizer.class.name = 
        auto.create.topics.enable = true
        auto.leader.rebalance.enable = true
        background.threads = 10

        ..

(kafka.server.KafkaConfig)

  was:
kafka has duplicate configuration information log information printing during 
startup,repeated information printing will bring confusion to users.It is 
better to add log information before and after repeating the configuration 
information

[2021-11-15 12:07:29,133] INFO KafkaConfig values: 
        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        alter.config.policy.class.name = null
        alter.log.dirs.replication.quota.window.num = 11
        alter.log.dirs.replication.quota.window.size.seconds = 1
        authorizer.class.name = 

        ...

[2021-11-15 12:07:29,161] INFO KafkaConfig values: 
        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        alter.config.policy.class.name = null
        alter.log.dirs.replication.quota.window.num = 11
        alter.log.dirs.replication.quota.window.size.seconds = 1
        authorizer.class.name = 
        auto.create.topics.enable = true
        auto.leader.rebalance.enable = true
        background.threads = 10

 


> kafka has duplicate configuration information log information printing during 
> startup
> -
>
> Key: KAFKA-13454
> URL: https://issues.apache.org/jira/browse/KAFKA-13454
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.5.0, 3.0.0
>Reporter: zhangzhanchang
>Priority: Minor
>
> kafka has duplicate configuration information log information printing during 
> startup,repeated information printing will bring confusion to users.It is 
> better to add log information before and after repeating the configuration 
> information
> [2021-11-15 12:07:29,133] INFO KafkaConfig values: 
>         advertised.host.name = null
>         advertised.listeners = null
>         advertised.port = null
>         alter.config.policy.class.name = null
>         alter.log.dirs.replication.quota.window.num = 11
>         alter.log.dirs.replication.quota.window.size.seconds = 1
>         authorizer.class.name = 
>         ...
>         zookeeper.ssl.truststore.type = null
>         zookeeper.sync.time.ms = 2000
>  (kafka.server.KafkaConfig)
> [2021-11-15 12:07:29,161] INFO KafkaConfig values: 
>         advertised.host.name = null
>         advertised.listeners = null
>         advertised.port = null
>         alter.config.policy.class.name = null
>         alter.log.dirs.replication.quota.window.num = 11
>         alter.log.dirs.replication.quota.window.size.seconds = 1
>         authorizer.class.name = 
>         auto.create.topics.enable = true
>         auto.leader.rebalance.enable = true
>         background.threads = 10
>         ..
> (kafka.server.KafkaConfig)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison commented on a change in pull request #11431: KAFKA-13397: Honor 'replication.policy.separator' configuration when creating MirrorMaker2 internal topics

2021-11-15 Thread GitBox


mimaison commented on a change in pull request #11431:
URL: https://github.com/apache/kafka/pull/11431#discussion_r749214261



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##
@@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) {
 
 // fill in reasonable defaults
 props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
-props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"mm2-offsets."
-+ sourceAndTarget.source() + ".internal");
-props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"mm2-status."
-+ sourceAndTarget.source() + ".internal");
-props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
-+ sourceAndTarget.source() + ".internal");
+
+String separator = 
originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR, 
REPLICATION_POLICY_SEPARATOR_DEFAULT);
+if (separator.equals("-")) {
+throw new ConfigException("You should not use a single dash as a " 
+ REPLICATION_POLICY_SEPARATOR);
+}
+
+props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"mm2-offsets" + separator

Review comment:
   I think the fix @OmniaGM proposed above 
(https://github.com/apache/kafka/pull/11431#discussion_r737592730) makes sense 
and I think it's fully compatible. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13255) Mirrormaker config property config.properties.exclude is not working as expected

2021-11-15 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-13255:
--

Assignee: Ed Berezitsky

> Mirrormaker config property config.properties.exclude is not working as 
> expected 
> -
>
> Key: KAFKA-13255
> URL: https://issues.apache.org/jira/browse/KAFKA-13255
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.8.0
>Reporter: Anamika Nadkarni
>Assignee: Ed Berezitsky
>Priority: Major
>
> Objective - Use MM2 (kafka connect in distributed cluster) for data migration 
> between cluster hosted in private data center and aws msk cluster.
> Steps performed -
>  # Started kafka-connect service.
>  # Created 3 MM2 connectors (i.e. source connector, checkpoint connector and 
> heartbeat connector). Curl commands used to create connectors are in the 
> attached file.  To exclude certain config properties while topic replication, 
> we are using the 'config.properties.exclude' property in the MM2 source 
> connector.
> Expected -
> Source topic 'dev.portlandDc.anamika.helloMsk' should be successfully created 
> in destination cluster.
> Actual -
> Creation of the source topic 'dev.portlandDc.anamika.helloMsk' in destination 
> cluster fails with an error. Error is
> {code:java}
> [2021-08-06 06:13:40,944] WARN [mm2-msc|worker] Could not create topic 
> dev.portlandDc.anamika.helloMsk. 
> (org.apache.kafka.connect.mirror.MirrorSourceConnector:371)
> org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic 
> config name: confluent.value.schema.validation{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13255) Mirrormaker config property config.properties.exclude is not working as expected

2021-11-15 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-13255.

Fix Version/s: 3.2.0
   Resolution: Fixed

> Mirrormaker config property config.properties.exclude is not working as 
> expected 
> -
>
> Key: KAFKA-13255
> URL: https://issues.apache.org/jira/browse/KAFKA-13255
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.8.0
>Reporter: Anamika Nadkarni
>Assignee: Ed Berezitsky
>Priority: Major
> Fix For: 3.2.0
>
>
> Objective - Use MM2 (kafka connect in distributed cluster) for data migration 
> between cluster hosted in private data center and aws msk cluster.
> Steps performed -
>  # Started kafka-connect service.
>  # Created 3 MM2 connectors (i.e. source connector, checkpoint connector and 
> heartbeat connector). Curl commands used to create connectors are in the 
> attached file.  To exclude certain config properties while topic replication, 
> we are using the 'config.properties.exclude' property in the MM2 source 
> connector.
> Expected -
> Source topic 'dev.portlandDc.anamika.helloMsk' should be successfully created 
> in destination cluster.
> Actual -
> Creation of the source topic 'dev.portlandDc.anamika.helloMsk' in destination 
> cluster fails with an error. Error is
> {code:java}
> [2021-08-06 06:13:40,944] WARN [mm2-msc|worker] Could not create topic 
> dev.portlandDc.anamika.helloMsk. 
> (org.apache.kafka.connect.mirror.MirrorSourceConnector:371)
> org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic 
> config name: confluent.value.schema.validation{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison merged pull request #11401: KAFKA-13255: use exclude filter for new topics

2021-11-15 Thread GitBox


mimaison merged pull request #11401:
URL: https://github.com/apache/kafka/pull/11401


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >