[jira] [Commented] (KAFKA-13465) when auto create topics enable,server create inner topic of MirrorMaker unexpectedly

2021-11-18 Thread ZhenChun Pan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446337#comment-17446337
 ] 

ZhenChun Pan commented on KAFKA-13465:
--

I already have a patch for this issue, maybe the issue can assgin to me?

>  when auto create topics enable,server create inner topic of MirrorMaker 
> unexpectedly
> -
>
> Key: KAFKA-13465
> URL: https://issues.apache.org/jira/browse/KAFKA-13465
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0, 2.8.0, 2.7.1, 3.0.0
>Reporter: ZhenChun Pan
>Priority: Major
>
> Hi Team
> Mirror Maker: 2.7.0
> when i enable auto create topic in both side: 
> auto.create.topics.enable=true
>  
> sometimes,mirror maker inner topic create by server not expected,mirror naker 
> start error.
> ```
> [2021-11-19 18:03:56,707] ERROR [Worker clientId=connect-2, groupId=pr-mm2] 
> Uncaught exception in herder work thread, exiting: 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> org.apache.kafka.common.config.ConfigException: Topic 
> 'mm2-configs.pr.internal' supplied via the 'config.storage.topic' property is 
> required to have 'cleanup.policy=compact' to guarantee 
> consistenc{*}#{*}#*connector configurations, but found the topic 
> currently has 'cleanup.policy=delete'. Continuing would likely result in 
> eventually losing connector configurations and problems restarting this 
> Connect cluster in the future. Change the 'config.storage.topic' property in 
> the Connect worker configurations to use a topic with 
> 'cleanup.policy=compact'.
> at 
> org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:420)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore$1.run(KafkaConfigBackingStore.java:501)
> at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:133)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:268)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:130)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:288)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> ```
> I think the solution is to exclude mirror maker innner topic when auto 
> creating topics. (AutoTopicCreationManager.scala)
> With this change, this problem already resolve for me.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error

2021-11-18 Thread GitBox


showuon commented on pull request #11451:
URL: https://github.com/apache/kafka/pull/11451#issuecomment-973803318


   @dajac , thanks for your comments. Yes, you are right, I didn't make the if 
condition correct. I've updated it to use a `hasGenerationReset` helper method:
   ```java
   private boolean hasGenerationReset(Generation gen) {
   // the member ID might not be reset for ILLEGAL_GENERATION error, so 
only check generationID and protocol name here
   return gen.generationId != Generation.NO_GENERATION.generationId && 
gen.protocolName == null;
   }
   ```
   Before this change, we can just do `if (generation.equals(NO_GENERATION))`, 
to check if the generation object is reset or not, now, the check should check 
for `generationID` and `protocolName` field only, because member Id might not 
be reset for some cases.
   
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error

2021-11-18 Thread GitBox


showuon edited a comment on pull request #11451:
URL: https://github.com/apache/kafka/pull/11451#issuecomment-973803318


   @dajac , thanks for your comments. Yes, you are right, I didn't make the if 
condition correct. I've updated it to use a `hasGenerationReset` helper method:
   ```java
   private boolean hasGenerationReset(Generation gen) {
   // the member ID might not be reset for ILLEGAL_GENERATION error, so 
only check generationID and protocol name here
   return gen.generationId == Generation.NO_GENERATION.generationId && 
gen.protocolName == null;
   }
   ```
   Before this change, we can just do `if (generation.equals(NO_GENERATION))`, 
to check if the generation object is reset or not, now, the check should check 
for `generationID` and `protocolName` field only, because member Id might not 
be reset for some cases.
   
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error

2021-11-18 Thread GitBox


showuon commented on a change in pull request #11451:
URL: https://github.com/apache/kafka/pull/11451#discussion_r752901904



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -443,7 +443,9 @@ boolean joinGroupIfNeeded(final Timer timer) {
 stateSnapshot = this.state;
 }
 
-if (!generationSnapshot.equals(Generation.NO_GENERATION) && 
stateSnapshot == MemberState.STABLE) {
+if ((generationSnapshot.generationId != 
Generation.NO_GENERATION.generationId ||
+
!generationSnapshot.memberId.equals(Generation.NO_GENERATION.memberId)) &&
+stateSnapshot == MemberState.STABLE) {

Review comment:
   You are right! Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error

2021-11-18 Thread GitBox


showuon commented on a change in pull request #11451:
URL: https://github.com/apache/kafka/pull/11451#discussion_r752900442



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -746,8 +748,8 @@ public void handle(SyncGroupResponse syncResponse,
 sensors.syncSensor.record(response.requestLatencyMs());
 
 synchronized (AbstractCoordinator.this) {
-if (!generation.equals(Generation.NO_GENERATION) && 
state == MemberState.COMPLETING_REBALANCE) {
-// check protocol name only if the generation is 
not reset
+if (generation.protocolName != null && state == 
MemberState.COMPLETING_REBALANCE) {
+// check protocol name only if the generation is 
not reset (protocol name is not null)

Review comment:
   You are right, updated and explained below. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13464) SCRAM does not validate client-final-message's nonce

2021-11-18 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446311#comment-17446311
 ] 

Travis Bischel commented on KAFKA-13464:


AFAICT this compromises no integrity. I'm not sure the purpose of this final 
check. The important part is the client-proof, which hashes the `c-nonce 
s-nonce` into it. Sending the c-nonce s-nonce back in plaintext doesn't seem to 
provide much benefit.

> SCRAM does not validate client-final-message's nonce
> 
>
> Key: KAFKA-13464
> URL: https://issues.apache.org/jira/browse/KAFKA-13464
> Project: Kafka
>  Issue Type: Bug
>Reporter: Travis Bischel
>Assignee: Luke Chen
>Priority: Minor
>
> [https://datatracker.ietf.org/doc/html/rfc5802#section-5.1]
> Relevant part, in "r="
>   nonce it initially specified.  The server MUST verify that the
>   nonce sent by the client in the second message is the same as the
>   one sent by the server in its first message.
> [https://github.com/apache/kafka/blob/8a1fcee86e42c8bd1f26309dde8748927109056e/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java#L149-L161]
> The only verification of client-final-message is verifyClientProof:
> [https://github.com/apache/kafka/blob/8a1fcee86e42c8bd1f26309dde8748927109056e/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java#L225-L235]
> This function only looks at the key itself. It does not ensure that the 
> gs2-header is "biws" (base64("n,,")), meaning the user can erroneously 
> specify channel binding. This also does not check that the client's nonce is 
> correct (c-nonce + s-nonce).
>  
> While I'm not 100% sure on what security concerns an invalid nonce could 
> result in _at this stage_ of the auth flow (it's clearer in the first stage 
> w.r.t. replay attacks), it's likely still important to validate.
>  
> I noticed this validation is missing because my own client erroneously 
> replies with only the original c-nonce, not c-nonce s-nonce. The scram flow 
> has always worked, though. Today I changed the client-final-reply to always 
> return nonce "foo", which still successfully talks to Kafka.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on pull request #11509: KAFKA-13370: add unit test for offset-commit metrics

2021-11-18 Thread GitBox


showuon commented on pull request #11509:
URL: https://github.com/apache/kafka/pull/11509#issuecomment-973787826


   Failed tests are unrelated. Thank you.
   ```
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testTopicPartition, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testTopicPartition, 
Security=PLAINTEXT
   Build / JDK 8 and Scala 2.12 / kafka.admin.LeaderElectionCommandTest.[1] 
Type=Raft, Name=testAllTopicPartition, Security=PLAINTEXT
   Build / JDK 8 and Scala 2.12 / kafka.admin.LeaderElectionCommandTest.[1] 
Type=Raft, Name=testAllTopicPartition, Security=PLAINTEXT
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13370) Offset commit failure percentage metric is not computed correctly (regression)

2021-11-18 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446303#comment-17446303
 ] 

Luke Chen commented on KAFKA-13370:
---

[~rhauch] , PR: [https://github.com/apache/kafka/pull/11509] is created to add 
unit tests for offset-commit metrics. Thank you.

> Offset commit failure percentage metric is not computed correctly (regression)
> --
>
> Key: KAFKA-13370
> URL: https://issues.apache.org/jira/browse/KAFKA-13370
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, metrics
>Affects Versions: 2.8.0
> Environment: Confluent Platform Helm Chart (v6.2.0)
>Reporter: Vincent Giroux
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.1.0, 3.0.1, 2.8.2
>
>
> There seems to have been a regression in the way the offset-commit-* metrics 
> are calculated for *source* Kafka Connect connectors since version 2.8.0.
> Before this version, any timeout or interruption while trying to commit 
> offsets for source connectors (e.g. MM2 MirrorSourceConnector) would get 
> correctly flagged as an offset commit failure (i.e the 
> *offset-commit-failure-percentage* metric ** would be non-zero). Since 
> version 2.8.0, these errors are considered as successes.
> After digging through the code, the commit where this bug was introduced 
> appears to be this one : 
> [https://github.com/apache/kafka/commit/047ad654da7903f3903760b0e6a6a58648ca7715]
> I believe removing the boolean *success* argument in the *recordCommit* 
> method of the *WorkerTask* class (argument deemed redundant because of the 
> presence of the Throwable *error* argument) and only considering the presence 
> of a non-null error to determine if a commit is a success or failure might be 
> a mistake. This is because in the *commitOffsets* method of the 
> *WorkerSourceTask* class, there are multiple cases where an exception object 
> is either not available or is not passed to the *recordCommitFailure* method, 
> e.g. :
>  * *TImeout #1* : 
> [https://github.com/apache/kafka/blob/2.8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L519]
>  
>  * *Timeout #2* : 
> [https://github.com/apache/kafka/blob/2.8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L584]
>  
>  * *Interruption* : 
> [https://github.com/apache/kafka/blob/2.8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L529]
>  
>  * *Unserializable offset* : 
> [https://github.com/apache/kafka/blob/2.8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L562]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13458) The Stream is not able to consume from some of the partitions

2021-11-18 Thread Darshan Marathe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446297#comment-17446297
 ] 

Darshan Marathe commented on KAFKA-13458:
-

[~guozhang] Thanks, No we are not using any third-party client to stream 
messages.
I've read through the KAFKA-13375, it seems like a similar issue.
I want to point out one thing here, 
There are other consumers also who are consuming through the same topic but 
they use KafkaConsumer and they are able to consume it successfully.
if the issue is on the broker side why it's failing for KafkaStream only.

> The Stream is not able to consume from some of the partitions
> -
>
> Key: KAFKA-13458
> URL: https://issues.apache.org/jira/browse/KAFKA-13458
> Project: Kafka
>  Issue Type: Bug
>Reporter: Darshan Marathe
>Priority: Blocker
>
> Hi Team
> Kafka-stream version: 2.6.0
> some messages are stuck in the following partitions, and the stream is not 
> able to consume them from those partitions.
> Restart the stream multiple times, but still issue is same.
> Have faced the following issue,
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-29], this could be either transactional offsets 
> waiting for completion, or normal offsets waiting for replication after 
> appending to local log
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-0]
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-3]
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-9]
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-14]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13463) Improvement: KafkaConsumer pause(Collection partitions)

2021-11-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446295#comment-17446295
 ] 

RivenSun commented on KAFKA-13463:
--

Hi [~tombentley] 

I recently noticed that you have become a Kafka PMC member.
Congratulations to you, and thank you very much for helping me in Jira in the 
past.

If you are willing to take the time to browse  Jiras I created and give me some 
suggestions, I will be very grateful.

Thanks

> Improvement: KafkaConsumer pause(Collection partitions)
> ---
>
> Key: KAFKA-13463
> URL: https://issues.apache.org/jira/browse/KAFKA-13463
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>
> h1. 1.Background
> When users use the kafkaConsumer#pause(...) method, they will maybe ignore: 
> the pause method may no longer work, and data will be lost.
> For example, the following simple code:
> {code:java}
> while (true) {
> try {
> kafkaConsumer.pause(kafkaConsumer.assignment());
> ConsumerRecords records = 
> kafkaConsumer.poll(Duration.ofSeconds(2));
> if (!records.isEmpty()) {
> log.error("kafka poll for rebalance discard some record!");
> }
> } catch (Exception e) {
> log.error("maintain poll for rebalance with error:{}", 
> e.getMessage(), e);
> }
> }{code}
> Even if you call pause(assignment) before the poll method every time, the 
> poll method may still return messages.
>  
> h1. 2. RootCause:
> In short, during the rebalance of the group, 
> ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark 
> on the partitions previously held by kafkaConsumer. However, while clearing 
> the paused mark of partitions, the corresponding message in the memory 
> (Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in 
> Fetcher#fetchedRecords() still fetching the message and returning it to the 
> customer.
> For more detailed analysis, if you are interested, you can read Jira 
> https://issues.apache.org/jira/browse/KAFKA-13425 
> looking forward to your reply.
>  
> h1. 3.Discuss : Can KafkaConsumer support the pause method that is not 
> affected by groupRebalance?
> The KafkaConsumer#pause method actually stated one point at the beginning of 
> its design:
>  * Rebalance does not preserve pause/resume state.
> link:https://issues.apache.org/jira/browse/KAFKA-2350
> Unfortunately, I did not see this from the comments of the 
> KafkaConsumer#pause(...) method. At the same time, 
> ConsumerCoordinator#invokePartitionsRevoked did not have any log output when 
> cleaning up the paused mark. I believe that this will cause many users to use 
> the KafkaConsumer#pause(...) method incorrectly.
> But I think it is necessary for KafkaConsumer to provide a pause method that 
> is not affected by groupRebalance.
>  
> h1. 4. Suggestions
> I will optimize the existing pause method from several different 
> perspectives, or provide some new {{pause}} methods, and each point is an 
> independent solution
> h2. 1)ConsumerCoordinator#invokePartitionsRevoked should also trigger Fetcher 
> to clean up the revokedAndPausedPartitions message in memory when clearing 
> the paused mark
> This can prevent the Fetcher#fetchedRecords() method from mistakenly thinking 
> that revokedAndPausedPartitions is legal and returning messages. There are 
> various checks on the partition in the fetchedRecords method.
> The price of this is that if the user does not call the pause(...) method 
> before calling the poll method next time, a new FetchMessage request may be 
> initiated, which will cause additional network transmission.
>  
> h2. 2)Efforts to maintain the old paused mark on the KafkaConsumer side
> <1>In the ConsumerCoordinator#onJoinPrepare(...) method, record all 
> pausedTopicPartitions from the current assignment of KafkaConsumer;
>  <2> In the ConsumerCoordinator#onJoinComplete(...) method, use 
> pausedTopicPartitions to render the latest assignment and restore the paused 
> marks of the partitions that are still in the latest assignment.
> {*}Note{*}: If the new assignment of kafkaConsumer no longer contains 
> topicPartitions that have been paused before rebalance, the paused mark of 
> these topicPartitions will be lost forever on the kafkaConsumer side, even if 
> in a future rebalance, the kafkaConsumer will hold these partitions again.
> At the end of the Jira KAFKA-13425 I mentioned above, I gave a draft code 
> suggestion on this point
> <3> In fact, for consumers who use the RebalanceProtocol.COOPERATIVE protocol
> For example, consumers who use the currently supported PartitionAssignor: 
> CooperativeStickyAssignor, through code analysis, we can find that the 
> 

[jira] [Commented] (KAFKA-13463) Improvement: KafkaConsumer pause(Collection partitions)

2021-11-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446292#comment-17446292
 ] 

RivenSun commented on KAFKA-13463:
--

Is anyone willing to give some discussion?
I feel very depressed recently. In addition to this Jira, I have created 
several Jira, and I haven't received any comments  from everyone for a long 
time.
And I recently sorted out some Jira that I thought was valuable, and started a 
discussion on d...@kafka.apache.org, but it seems that everyone is busy, and I 
have not received any feedback.

Okay...

> Improvement: KafkaConsumer pause(Collection partitions)
> ---
>
> Key: KAFKA-13463
> URL: https://issues.apache.org/jira/browse/KAFKA-13463
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>
> h1. 1.Background
> When users use the kafkaConsumer#pause(...) method, they will maybe ignore: 
> the pause method may no longer work, and data will be lost.
> For example, the following simple code:
> {code:java}
> while (true) {
> try {
> kafkaConsumer.pause(kafkaConsumer.assignment());
> ConsumerRecords records = 
> kafkaConsumer.poll(Duration.ofSeconds(2));
> if (!records.isEmpty()) {
> log.error("kafka poll for rebalance discard some record!");
> }
> } catch (Exception e) {
> log.error("maintain poll for rebalance with error:{}", 
> e.getMessage(), e);
> }
> }{code}
> Even if you call pause(assignment) before the poll method every time, the 
> poll method may still return messages.
>  
> h1. 2. RootCause:
> In short, during the rebalance of the group, 
> ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark 
> on the partitions previously held by kafkaConsumer. However, while clearing 
> the paused mark of partitions, the corresponding message in the memory 
> (Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in 
> Fetcher#fetchedRecords() still fetching the message and returning it to the 
> customer.
> For more detailed analysis, if you are interested, you can read Jira 
> https://issues.apache.org/jira/browse/KAFKA-13425 
> looking forward to your reply.
>  
> h1. 3.Discuss : Can KafkaConsumer support the pause method that is not 
> affected by groupRebalance?
> The KafkaConsumer#pause method actually stated one point at the beginning of 
> its design:
>  * Rebalance does not preserve pause/resume state.
> link:https://issues.apache.org/jira/browse/KAFKA-2350
> Unfortunately, I did not see this from the comments of the 
> KafkaConsumer#pause(...) method. At the same time, 
> ConsumerCoordinator#invokePartitionsRevoked did not have any log output when 
> cleaning up the paused mark. I believe that this will cause many users to use 
> the KafkaConsumer#pause(...) method incorrectly.
> But I think it is necessary for KafkaConsumer to provide a pause method that 
> is not affected by groupRebalance.
>  
> h1. 4. Suggestions
> I will optimize the existing pause method from several different 
> perspectives, or provide some new {{pause}} methods, and each point is an 
> independent solution
> h2. 1)ConsumerCoordinator#invokePartitionsRevoked should also trigger Fetcher 
> to clean up the revokedAndPausedPartitions message in memory when clearing 
> the paused mark
> This can prevent the Fetcher#fetchedRecords() method from mistakenly thinking 
> that revokedAndPausedPartitions is legal and returning messages. There are 
> various checks on the partition in the fetchedRecords method.
> The price of this is that if the user does not call the pause(...) method 
> before calling the poll method next time, a new FetchMessage request may be 
> initiated, which will cause additional network transmission.
>  
> h2. 2)Efforts to maintain the old paused mark on the KafkaConsumer side
> <1>In the ConsumerCoordinator#onJoinPrepare(...) method, record all 
> pausedTopicPartitions from the current assignment of KafkaConsumer;
>  <2> In the ConsumerCoordinator#onJoinComplete(...) method, use 
> pausedTopicPartitions to render the latest assignment and restore the paused 
> marks of the partitions that are still in the latest assignment.
> {*}Note{*}: If the new assignment of kafkaConsumer no longer contains 
> topicPartitions that have been paused before rebalance, the paused mark of 
> these topicPartitions will be lost forever on the kafkaConsumer side, even if 
> in a future rebalance, the kafkaConsumer will hold these partitions again.
> At the end of the Jira KAFKA-13425 I mentioned above, I gave a draft code 
> suggestion on this point
> <3> In fact, for consumers who use the RebalanceProtocol.COOPERATIVE protocol
> For example, consumers who use the currently 

[jira] [Commented] (KAFKA-13437) Broker parameter optimization: security.inter.broker.protocol and num.network.threads

2021-11-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446289#comment-17446289
 ] 

RivenSun commented on KAFKA-13437:
--

Is this issue worthless? No one has been willing to give me any response, I 
feel very frustrated about it.

> Broker parameter optimization: security.inter.broker.protocol and 
> num.network.threads
> -
>
> Key: KAFKA-13437
> URL: https://issues.apache.org/jira/browse/KAFKA-13437
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>
> h1. 1. security.inter.broker.protocol
> Firstly see this parameter comment
> {code:java}
> security.inter.broker.protocolSecurity protocol used to communicate between 
> brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. It is an 
> error to set this and inter.broker.listener.name properties at the same time. 
> {code}
> We will not know from the comments, after using this configuration, the final 
> value of InterBrokerListenerName is the same as the value of 
> security.inter.broker.protocol. I originally thought it would find a suitable 
> listenerName from the listener.security.protocol.map configuration.
> The result is: broker startup failed
>  
> {code:java}
> [2021-11-09 06:28:44,058] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.lang.IllegalArgumentException: requirement failed: 
> inter.broker.listener.name must be a listener name defined in 
> advertised.listeners. The valid options based on currently configured 
> listeners are SASL_PLAINTEXT,INTERNAL_SSL,PLAIN_PLUGIN_SSL
>         at scala.Predef$.require(Predef.scala:337)
>         at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1952)
>         at kafka.server.KafkaConfig.(KafkaConfig.scala:1897)
>         at kafka.server.KafkaConfig.(KafkaConfig.scala:1394)
>         at kafka.Kafka$.buildServer(Kafka.scala:67)
>         at kafka.Kafka$.main(Kafka.scala:87)
>         at kafka.Kafka.main(Kafka.scala)
>  {code}
>  
>  
> h1. 2. num.network.threads
> The networkThreads corresponding to this parameter are not shared by all 
> listeners, but each listener will create the same number of 
> networkProcessors, which causes the Kafka process to open too many 
> unnecessary threads, which leads to a waste of resources.
> for example:
> listenerNameA: used for communication between brokers
> listenerNameB: used to connect production messages and fetch messages on the 
> client side
> listenerNameC: Used by Kafka operation and maintenance personnel to manage 
> the cluster and send control type requests, such as deleting topics or adding 
> partitions, etc.
> So as expected, the num.network.threads of listenerNameB should be increased, 
> and the networkThreads of the other two listeners can be appropriately reduced
>  
> h1. Rootcause:
> 1. See "getInterBrokerListenerNameAndSecurityProtocol" method in 
> KafkaConfig.scala
> {code:java}
> private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, 
> SecurityProtocol) = {
>   Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match {
> case Some(_) if 
> originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) =>
>   throw new ConfigException(s"Only one of 
> ${KafkaConfig.InterBrokerListenerNameProp} and " +
> s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.")
> case Some(name) =>
>   val listenerName = ListenerName.normalised(name)
>   val securityProtocol = 
> listenerSecurityProtocolMap.getOrElse(listenerName,
> throw new ConfigException(s"Listener with name ${listenerName.value} 
> defined in " +
>   s"${KafkaConfig.InterBrokerListenerNameProp} not found in 
> ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
>   (listenerName, securityProtocol)
> case None =>
>   val securityProtocol = 
> getSecurityProtocol(getString(KafkaConfig.InterBrokerSecurityProtocolProp),
> KafkaConfig.InterBrokerSecurityProtocolProp)
>   (ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
>   }
> } {code}
> ListenerName.forSecurityProtocol(securityProtocol) limits the value of 
> InterBrokerListenerName to the value of securityProtocol.name
> 2. See "addDataPlaneProcessors" method in SocketServer.scala
> In this method, processors of the size of newProcessorsPerListener are 
> created for each EndPoint, the value of newProcessorsPerListener is 
> config.numNetworkThreads
>  
> h1. Suggestion
>  # Optimize the getInterBrokerListenerNameAndSecurityProtocol method.
> Use listenerSecurityProtocolMap to find a suitable listenerName for 
> security.inter.broker.protocol.
> If there are multiple keys in the listenerSecurityProtocolMap with 
> 

[jira] [Commented] (KAFKA-13422) Even if the correct username and password are configured, when ClientBroker or KafkaClient tries to establish a SASL connection to ServerBroker, an exception is thrown

2021-11-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446288#comment-17446288
 ] 

RivenSun commented on KAFKA-13422:
--

Can anyone give me a response? I feel very frustrated about this. Maybe this 
issue is not worth discussing

> Even if the correct username and password are configured, when ClientBroker 
> or KafkaClient tries to establish a SASL connection to ServerBroker, an 
> exception is thrown: (Authentication failed: Invalid username or password)
> --
>
> Key: KAFKA-13422
> URL: https://issues.apache.org/jira/browse/KAFKA-13422
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 2.7.1, 3.0.0
>Reporter: RivenSun
>Priority: Major
> Attachments: CustomerAuthCallbackHandler.java, 
> LoginContext_login_debug.png, SaslClientCallbackHandler_handle_debug.png
>
>
>  
> h1. Foreword:
> When deploying a Kafka cluster with a higher version (2.7.1), I encountered 
> an exception of communication identity authentication failure between 
> brokers. In the current latest version 3.0.0, this problem can also be 
> reproduced.
> h1. Problem recurring:
> h2. 1)broker Version is 3.0.0
> h3. The content of kafka_server_jaas.conf of each broker is exactly the same, 
> the content is as follows:
>  
>  
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="admin_scram_password";
>  
> };
> {code}
>  
>  
> h3. broker server.properties:
> One of the broker configuration files is provided, and the content of the 
> configuration files of other brokers is only different from the localPublicIp 
> of advertised.listeners.
>  
> {code:java}
> broker.id=1
> broker.rack=us-east-1a
> advertised.listeners=SASL_PLAINTEXT://localPublicIp:9779,SASL_SSL://localPublicIp:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://localPublicIp:9669
> log.dirs=/asyncmq/kafka/data_1,/asyncmq/kafka/data_2
> zookeeper.connect=***
> listeners=SASL_PLAINTEXT://:9779,SASL_SSL://:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://:9669
> listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,PLAIN_PLUGIN_SSL:SASL_SSL
> listener.name.plain_plugin_ssl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler
> #ssl config
> ssl.keystore.password=***
> ssl.key.password=***
> ssl.truststore.password=***
> ssl.keystore.location=***
> ssl.truststore.location=***
> ssl.client.auth=none
> ssl.endpoint.identification.algorithm=
> #broker communicate config
> #security.inter.broker.protocol=SASL_PLAINTEXT
> inter.broker.listener.name=INTERNAL_SSL
> sasl.mechanism.inter.broker.protocol=PLAIN
> #sasl authentication config
> sasl.kerberos.service.name=kafka
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI
> delegation.token.master.key=***
> delegation.token.expiry.time.ms=8640
> delegation.token.max.lifetime.ms=31536
> {code}
>  
>  
> Then start all brokers at the same time. Each broker has actually been 
> started successfully, but when establishing a connection between the 
> controller node and all brokers, the identity authentication has always 
> failed. The connection between brokers cannot be established normally, 
> causing the entire Kafka cluster to be unable to provide external services.
> h3. The server log keeps printing abnormally like crazy:
> The real ip sensitive information of the broker in the log, I use ** 
> instead of here
>  
> {code:java}
> [2021-10-29 14:16:19,831] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Started socket server acceptors and processors 
> (kafka.network.SocketServer)
> [2021-10-29 14:16:19,836] INFO Kafka version: 3.0.0 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka commitId: 8cb0a5e9d3441962 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka startTimeMs: 1635516979831 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,837] INFO [KafkaServer id=3] started 
> (kafka.server.KafkaServer)
> [2021-10-29 14:16:20,249] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Failed authentication with /** (Authentication failed: Invalid 
> username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 

[GitHub] [kafka] RivenSun2 commented on pull request #11512: MINOR: Modify the Exception type of the testCommitOffsetAsyncNotCoordinator method

2021-11-18 Thread GitBox


RivenSun2 commented on pull request #11512:
URL: https://github.com/apache/kafka/pull/11512#issuecomment-973762702


   @dajac  @hachikuji 
   This commit avoids repeated tests of testCommitOffsetAsyncNotCoordinator and 
testCommitOffsetAsyncCoordinatorNotAvailable.
   please help to review it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #11511: MINOR: Brokers in KRaft don't need controller listener

2021-11-18 Thread GitBox


jsancio commented on pull request #11511:
URL: https://github.com/apache/kafka/pull/11511#issuecomment-973737297


   @hachikuji I updated the description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13465) when auto create topics enable,server create inner topic of MirrorMaker unexpectedly

2021-11-18 Thread ZhenChun Pan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZhenChun Pan updated KAFKA-13465:
-
Description: 
Hi Team

Mirror Maker: 2.7.0

when i enable auto create topic in both side: 

auto.create.topics.enable=true

 

sometimes,mirror maker inner topic create by server not expected,mirror naker 
start error.

```

[2021-11-19 18:03:56,707] ERROR [Worker clientId=connect-2, groupId=pr-mm2] 
Uncaught exception in herder work thread, exiting: 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.common.config.ConfigException: Topic 'mm2-configs.pr.internal' 
supplied via the 'config.storage.topic' property is required to have 
'cleanup.policy=compact' to guarantee consistenc{*}#{*}#*connector 
configurations, but found the topic currently has 'cleanup.policy=delete'. 
Continuing would likely result in eventually losing connector configurations 
and problems restarting this Connect cluster in the future. Change the 
'config.storage.topic' property in the Connect worker configurations to use a 
topic with 'cleanup.policy=compact'.
at 
org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:420)
at 
org.apache.kafka.connect.storage.KafkaConfigBackingStore$1.run(KafkaConfigBackingStore.java:501)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:133)
at 
org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:268)
at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:130)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:288)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

```

I think the solution is to exclude mirror maker innner topic when auto creating 
topics. (AutoTopicCreationManager.scala)

With this change, this problem already resolve for me.

  was:
Hi Team

Mirror Maker: 2.7.0

when i enable auto create topic in both side: 

auto.create.topics.enable=true

 

sometimes,mirror maker inner topic create by server not expected,mirror naker 
start error.

```

[2021-11-19 18:03:56,707] ERROR [Worker clientId=connect-2, groupId=pr-mm2] 
Uncaught exception in herder work thread, exiting: 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.common.config.ConfigException: Topic 'mm2-configs.pr.internal' 
supplied via the 'config.storage.topic' property is required to have 
'cleanup.policy=compact' to guarantee consistenc*#*#*connector 
configurations, but found the topic currently has 'cleanup.policy=delete'. 
Continuing would likely result in eventually losing connector configurations 
and problems restarting this Connect cluster in the future. Change the 
'config.storage.topic' property in the Connect worker configurations to use a 
topic with 'cleanup.policy=compact'.
at 
org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:420)
at 
org.apache.kafka.connect.storage.KafkaConfigBackingStore$1.run(KafkaConfigBackingStore.java:501)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:133)
at 
org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:268)
at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:130)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:288)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

```

I think the solution is to exclude mirror maker innner topic when auto creating 
topics. 

(AutoTopicCreationManager.scala)

With this change, this problem already resolve for me.


>  when auto create topics enable,server create inner topic of MirrorMaker 
> unexpectedly
> -
>
> Key: KAFKA-13465
> URL: https://issues.apache.org/jira/browse/KAFKA-13465
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0, 2.8.0, 2.7.1, 3.0.0
>Reporter: ZhenChun Pan
>Priority: Major
>
> Hi Team
> Mirror Maker: 2.7.0
> when i enable auto create topic in both side: 
> auto.create.topics.enable=true
>  
> 

[jira] [Created] (KAFKA-13465) when auto create topics enable,server create inner topic of MirrorMaker unexpectedly

2021-11-18 Thread ZhenChun Pan (Jira)
ZhenChun Pan created KAFKA-13465:


 Summary:  when auto create topics enable,server create inner topic 
of MirrorMaker unexpectedly
 Key: KAFKA-13465
 URL: https://issues.apache.org/jira/browse/KAFKA-13465
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 3.0.0, 2.7.1, 2.8.0, 2.7.0
Reporter: ZhenChun Pan


Hi Team

Mirror Maker: 2.7.0

when i enable auto create topic in both side: 

auto.create.topics.enable=true

 

sometimes,mirror maker inner topic create by server not expected,mirror naker 
start error.

```

[2021-11-19 18:03:56,707] ERROR [Worker clientId=connect-2, groupId=pr-mm2] 
Uncaught exception in herder work thread, exiting: 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.common.config.ConfigException: Topic 'mm2-configs.pr.internal' 
supplied via the 'config.storage.topic' property is required to have 
'cleanup.policy=compact' to guarantee consistenc*#*#*connector 
configurations, but found the topic currently has 'cleanup.policy=delete'. 
Continuing would likely result in eventually losing connector configurations 
and problems restarting this Connect cluster in the future. Change the 
'config.storage.topic' property in the Connect worker configurations to use a 
topic with 'cleanup.policy=compact'.
at 
org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:420)
at 
org.apache.kafka.connect.storage.KafkaConfigBackingStore$1.run(KafkaConfigBackingStore.java:501)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:133)
at 
org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:268)
at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:130)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:288)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

```

I think the solution is to exclude mirror maker innner topic when auto creating 
topics. 

(AutoTopicCreationManager.scala)

With this change, this problem already resolve for me.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

2021-11-18 Thread GitBox


wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752834113



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 
 /**
  * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
- * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+ * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
  * it to begin processing the new topology.
  *
  * @throws IllegalArgumentException if this topology name is already in use
  * @throws IllegalStateExceptionif streams has not been started or has 
already shut down
  * @throws TopologyExceptionif this topology subscribes to any 
input topics or pattern already in use
  */
-public void addNamedTopology(final NamedTopology newTopology) {
+public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+log.debug("Adding topology: {}", newTopology.name());
 if (hasStartedOrFinishedShuttingDown()) {
 throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
 } else if (getTopologyByName(newTopology.name()).isPresent()) {
 throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
" as another of the same 
name already exists");
 }
-
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+return new AddNamedTopologyResult(
+
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+);
 }
 
 /**
  * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
- * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+ * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure
  * it stops processing the old topology.
  *
+ * @param topologyToRemove  name of the topology to be removed
+ * @param resetOffsets  whether to reset the committed offsets 
for any source topics
+ *
  * @throws IllegalArgumentException if this topology name cannot be found
  * @throws IllegalStateExceptionif streams has not been started or has 
already shut down
  * @throws TopologyExceptionif this topology subscribes to any 
input topics or pattern already in use
  */
-public void removeNamedTopology(final String topologyToRemove) {
+public RemoveNamedTopologyResult removeNamedTopology(final String 
topologyToRemove, final boolean resetOffsets) {
+log.debug("Removing topology: {}", topologyToRemove);
 if (!isRunningOrRebalancing()) {
 throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
 } else if (!getTopologyByName(topologyToRemove).isPresent()) {
 throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
 }
+final Set partitionsToReset = metadataForLocalThreads()
+.stream()
+.flatMap(t -> {
+final HashSet tasks = new HashSet<>();
+tasks.addAll(t.activeTasks());
+tasks.addAll(t.standbyTasks());
+return tasks.stream();
+})
+.flatMap(t -> t.topicPartitions().stream())
+.filter(t -> 
topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+.collect(Collectors.toSet());
+
+
+
+final KafkaFuture removeTopologyFuture = 
topologyMetadata.unregisterTopology(topologyToRemove);
+
+if (resetOffsets) {
+log.info("partitions to reset: {}", partitionsToReset);
+if (!partitionsToReset.isEmpty()) {
+try {
+removeTopologyFuture.get();

Review comment:
   Yeah I hadn't thought of multi node clusters, that will be a problem. 
calling get here isn't going to be enough even if we wanted to do that.
   
   We can maybe do it so that it will call it for each cluster after it has 
unsubscribed and expect it to fail for all but the last one? we can talk 
tomorrow 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] ableegoldman commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

2021-11-18 Thread GitBox


ableegoldman commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752831258



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -108,6 +124,48 @@ private void unlock() {
 version.topologyLock.unlock();
 }
 
+public Collection sourceTopologies(final String name) {
+return builders.get(name).sourceTopicCollection();
+}
+
+public boolean needsUpdate(final String threadName) {
+return threadVersions.get(threadName) < topologyVersion();
+}
+
+public void registerThread(final String threadName) {
+threadVersions.put(threadName, 0L);
+}
+
+public void unregisterThread(final String threadName) {
+threadVersions.remove(threadName);
+}
+
+
+public boolean reachedLatestVersion(final String threadName) {
+boolean rebalance = false;
+try {
+lock();
+final Iterator iterator = 
version.activeTopologyWaiters.listIterator();
+TopologyVersionWaiters topologyVersionWaiters;
+threadVersions.put(threadName, topologyVersion());
+while (iterator.hasNext()) {
+topologyVersionWaiters = iterator.next();
+final long verison = topologyVersionWaiters.topologyVersion;
+if (verison <= threadVersions.get(threadName)) {
+if (threadVersions.values().stream().allMatch(t -> t >= 
verison)) {
+topologyVersionWaiters.future.complete(null);
+iterator.remove();
+log.info("thread {} is now on on version {}", 
threadName, topologyVersionWaiters.topologyVersion);
+rebalance = true;

Review comment:
   You mean just always trigger a rebalance? That SGTM 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

2021-11-18 Thread GitBox


ableegoldman commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752831095



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 
 /**
  * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
- * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+ * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
  * it to begin processing the new topology.
  *
  * @throws IllegalArgumentException if this topology name is already in use
  * @throws IllegalStateExceptionif streams has not been started or has 
already shut down
  * @throws TopologyExceptionif this topology subscribes to any 
input topics or pattern already in use
  */
-public void addNamedTopology(final NamedTopology newTopology) {
+public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+log.debug("Adding topology: {}", newTopology.name());
 if (hasStartedOrFinishedShuttingDown()) {
 throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
 } else if (getTopologyByName(newTopology.name()).isPresent()) {
 throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
" as another of the same 
name already exists");
 }
-
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+return new AddNamedTopologyResult(
+
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+);
 }
 
 /**
  * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
- * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+ * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure
  * it stops processing the old topology.
  *
+ * @param topologyToRemove  name of the topology to be removed
+ * @param resetOffsets  whether to reset the committed offsets 
for any source topics
+ *
  * @throws IllegalArgumentException if this topology name cannot be found
  * @throws IllegalStateExceptionif streams has not been started or has 
already shut down
  * @throws TopologyExceptionif this topology subscribes to any 
input topics or pattern already in use
  */
-public void removeNamedTopology(final String topologyToRemove) {
+public RemoveNamedTopologyResult removeNamedTopology(final String 
topologyToRemove, final boolean resetOffsets) {
+log.debug("Removing topology: {}", topologyToRemove);
 if (!isRunningOrRebalancing()) {
 throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
 } else if (!getTopologyByName(topologyToRemove).isPresent()) {
 throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
 }
+final Set partitionsToReset = metadataForLocalThreads()
+.stream()
+.flatMap(t -> {
+final HashSet tasks = new HashSet<>();
+tasks.addAll(t.activeTasks());
+tasks.addAll(t.standbyTasks());
+return tasks.stream();
+})
+.flatMap(t -> t.topicPartitions().stream())
+.filter(t -> 
topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+.collect(Collectors.toSet());
+
+
+
+final KafkaFuture removeTopologyFuture = 
topologyMetadata.unregisterTopology(topologyToRemove);
+
+if (resetOffsets) {
+log.info("partitions to reset: {}", partitionsToReset);
+if (!partitionsToReset.isEmpty()) {
+try {
+removeTopologyFuture.get();

Review comment:
   Hm, I see. Ok that's potentially going to be a bit of an issue with 
multi-node clusters...let's chat about this tomorrow, I have some thoughts on a 
few different approaches we might want to consider here.
   
   That said, we still should not call `get()` here, can we just move the 
offset reset into `TopologyMetadata` and do it after the last thread has 
processed the topology removal? I know it's similar but I really think we 
should make sure to return from `removeNamedTopology()` ASAP so that it's async 
and the 

[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

2021-11-18 Thread GitBox


wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752822683



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -108,6 +124,48 @@ private void unlock() {
 version.topologyLock.unlock();
 }
 
+public Collection sourceTopologies(final String name) {
+return builders.get(name).sourceTopicCollection();
+}
+
+public boolean needsUpdate(final String threadName) {
+return threadVersions.get(threadName) < topologyVersion();
+}
+
+public void registerThread(final String threadName) {
+threadVersions.put(threadName, 0L);
+}
+
+public void unregisterThread(final String threadName) {
+threadVersions.remove(threadName);
+}
+
+
+public boolean reachedLatestVersion(final String threadName) {
+boolean rebalance = false;
+try {
+lock();
+final Iterator iterator = 
version.activeTopologyWaiters.listIterator();
+TopologyVersionWaiters topologyVersionWaiters;
+threadVersions.put(threadName, topologyVersion());
+while (iterator.hasNext()) {
+topologyVersionWaiters = iterator.next();
+final long verison = topologyVersionWaiters.topologyVersion;
+if (verison <= threadVersions.get(threadName)) {
+if (threadVersions.values().stream().allMatch(t -> t >= 
verison)) {
+topologyVersionWaiters.future.complete(null);
+iterator.remove();
+log.info("thread {} is now on on version {}", 
threadName, topologyVersionWaiters.topologyVersion);
+rebalance = true;

Review comment:
   How about we just take the rebalances for now? We can improve this later




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

2021-11-18 Thread GitBox


wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752821836



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 
 /**
  * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
- * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+ * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
  * it to begin processing the new topology.
  *
  * @throws IllegalArgumentException if this topology name is already in use
  * @throws IllegalStateExceptionif streams has not been started or has 
already shut down
  * @throws TopologyExceptionif this topology subscribes to any 
input topics or pattern already in use
  */
-public void addNamedTopology(final NamedTopology newTopology) {
+public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+log.debug("Adding topology: {}", newTopology.name());
 if (hasStartedOrFinishedShuttingDown()) {
 throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
 } else if (getTopologyByName(newTopology.name()).isPresent()) {
 throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
" as another of the same 
name already exists");
 }
-
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+return new AddNamedTopologyResult(
+
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+);
 }
 
 /**
  * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
- * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+ * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure
  * it stops processing the old topology.
  *
+ * @param topologyToRemove  name of the topology to be removed
+ * @param resetOffsets  whether to reset the committed offsets 
for any source topics
+ *
  * @throws IllegalArgumentException if this topology name cannot be found
  * @throws IllegalStateExceptionif streams has not been started or has 
already shut down
  * @throws TopologyExceptionif this topology subscribes to any 
input topics or pattern already in use
  */
-public void removeNamedTopology(final String topologyToRemove) {
+public RemoveNamedTopologyResult removeNamedTopology(final String 
topologyToRemove, final boolean resetOffsets) {
+log.debug("Removing topology: {}", topologyToRemove);
 if (!isRunningOrRebalancing()) {
 throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
 } else if (!getTopologyByName(topologyToRemove).isPresent()) {
 throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
 }
+final Set partitionsToReset = metadataForLocalThreads()
+.stream()
+.flatMap(t -> {
+final HashSet tasks = new HashSet<>();
+tasks.addAll(t.activeTasks());
+tasks.addAll(t.standbyTasks());
+return tasks.stream();
+})
+.flatMap(t -> t.topicPartitions().stream())
+.filter(t -> 
topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+.collect(Collectors.toSet());
+
+
+
+final KafkaFuture removeTopologyFuture = 
topologyMetadata.unregisterTopology(topologyToRemove);
+
+if (resetOffsets) {
+log.info("partitions to reset: {}", partitionsToReset);
+if (!partitionsToReset.isEmpty()) {
+try {
+removeTopologyFuture.get();

Review comment:
   unfortunately if we want to reset the offsets we need to make sure we 
are not subscribed. otherwise we get : 
   
   java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting 
offsets of a topic is forbidden while the consumer group is actively subscribed 
to it.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

2021-11-18 Thread GitBox


wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752821836



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 
 /**
  * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
- * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+ * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
  * it to begin processing the new topology.
  *
  * @throws IllegalArgumentException if this topology name is already in use
  * @throws IllegalStateExceptionif streams has not been started or has 
already shut down
  * @throws TopologyExceptionif this topology subscribes to any 
input topics or pattern already in use
  */
-public void addNamedTopology(final NamedTopology newTopology) {
+public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
+log.debug("Adding topology: {}", newTopology.name());
 if (hasStartedOrFinishedShuttingDown()) {
 throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
 } else if (getTopologyByName(newTopology.name()).isPresent()) {
 throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
" as another of the same 
name already exists");
 }
-
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+return new AddNamedTopologyResult(
+
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+);
 }
 
 /**
  * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
- * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+ * running, you should inform all of them by calling {@code 
#removeNamedTopology(String)} on each client to ensure
  * it stops processing the old topology.
  *
+ * @param topologyToRemove  name of the topology to be removed
+ * @param resetOffsets  whether to reset the committed offsets 
for any source topics
+ *
  * @throws IllegalArgumentException if this topology name cannot be found
  * @throws IllegalStateExceptionif streams has not been started or has 
already shut down
  * @throws TopologyExceptionif this topology subscribes to any 
input topics or pattern already in use
  */
-public void removeNamedTopology(final String topologyToRemove) {
+public RemoveNamedTopologyResult removeNamedTopology(final String 
topologyToRemove, final boolean resetOffsets) {
+log.debug("Removing topology: {}", topologyToRemove);
 if (!isRunningOrRebalancing()) {
 throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
 } else if (!getTopologyByName(topologyToRemove).isPresent()) {
 throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
 }
+final Set partitionsToReset = metadataForLocalThreads()
+.stream()
+.flatMap(t -> {
+final HashSet tasks = new HashSet<>();
+tasks.addAll(t.activeTasks());
+tasks.addAll(t.standbyTasks());
+return tasks.stream();
+})
+.flatMap(t -> t.topicPartitions().stream())
+.filter(t -> 
topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+.collect(Collectors.toSet());
+
+
+
+final KafkaFuture removeTopologyFuture = 
topologyMetadata.unregisterTopology(topologyToRemove);
+
+if (resetOffsets) {
+log.info("partitions to reset: {}", partitionsToReset);
+if (!partitionsToReset.isEmpty()) {
+try {
+removeTopologyFuture.get();

Review comment:
   unfortunately if we want to reset the offsets we need to make sure we 
are not subscribed. otherwise we get : 
   
   ```java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting 
offsets of a topic is forbidden while the consumer group is actively subscribed 
to it.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] ableegoldman commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

2021-11-18 Thread GitBox


ableegoldman commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752809604



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -108,6 +124,48 @@ private void unlock() {
 version.topologyLock.unlock();
 }
 
+public Collection sourceTopologies(final String name) {
+return builders.get(name).sourceTopicCollection();
+}
+
+public boolean needsUpdate(final String threadName) {
+return threadVersions.get(threadName) < topologyVersion();
+}
+
+public void registerThread(final String threadName) {
+threadVersions.put(threadName, 0L);
+}
+
+public void unregisterThread(final String threadName) {
+threadVersions.remove(threadName);
+}
+
+
+public boolean reachedLatestVersion(final String threadName) {
+boolean rebalance = false;
+try {
+lock();
+final Iterator iterator = 
version.activeTopologyWaiters.listIterator();
+TopologyVersionWaiters topologyVersionWaiters;
+threadVersions.put(threadName, topologyVersion());
+while (iterator.hasNext()) {
+topologyVersionWaiters = iterator.next();
+final long verison = topologyVersionWaiters.topologyVersion;
+if (verison <= threadVersions.get(threadName)) {
+if (threadVersions.values().stream().allMatch(t -> t >= 
verison)) {
+topologyVersionWaiters.future.complete(null);
+iterator.remove();
+log.info("thread {} is now on on version {}", 
threadName, topologyVersionWaiters.topologyVersion);

Review comment:
   ```suggestion
   log.info("Thread {} is now on topology version {}", 
threadName, topologyVersionWaiters.topologyVersion);
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -108,6 +124,48 @@ private void unlock() {
 version.topologyLock.unlock();
 }
 
+public Collection sourceTopologies(final String name) {
+return builders.get(name).sourceTopicCollection();
+}
+
+public boolean needsUpdate(final String threadName) {
+return threadVersions.get(threadName) < topologyVersion();
+}
+
+public void registerThread(final String threadName) {
+threadVersions.put(threadName, 0L);
+}
+
+public void unregisterThread(final String threadName) {
+threadVersions.remove(threadName);
+}
+

Review comment:
   super nit: extra line break

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -108,6 +124,48 @@ private void unlock() {
 version.topologyLock.unlock();
 }
 
+public Collection sourceTopologies(final String name) {
+return builders.get(name).sourceTopicCollection();
+}
+
+public boolean needsUpdate(final String threadName) {
+return threadVersions.get(threadName) < topologyVersion();
+}
+
+public void registerThread(final String threadName) {
+threadVersions.put(threadName, 0L);
+}
+
+public void unregisterThread(final String threadName) {
+threadVersions.remove(threadName);
+}
+
+
+public boolean reachedLatestVersion(final String threadName) {
+boolean rebalance = false;
+try {
+lock();
+final Iterator iterator = 
version.activeTopologyWaiters.listIterator();
+TopologyVersionWaiters topologyVersionWaiters;
+threadVersions.put(threadName, topologyVersion());
+while (iterator.hasNext()) {
+topologyVersionWaiters = iterator.next();
+final long verison = topologyVersionWaiters.topologyVersion;

Review comment:
   nit: typo in "verison", also since `version` is already taken by the 
`TopologyVersion` field we should rename either that field or this variable 
(probably makes sense to rename the field to `topologyVersion` or 
`currentTopologyVersion` or `latestTopologyVersion`, etc...

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final 
String topologyName) {
 
 /**
  * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
- * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+ * you should inform all of them by calling {@code 
#addNamedTopology(NamedTopology)} on each client in order for
  * it to begin processing the new topology.
  *
  * @throws IllegalArgumentException if this topology name 

[jira] [Assigned] (KAFKA-13464) SCRAM does not validate client-final-message's nonce

2021-11-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-13464:
-

Assignee: Luke Chen

> SCRAM does not validate client-final-message's nonce
> 
>
> Key: KAFKA-13464
> URL: https://issues.apache.org/jira/browse/KAFKA-13464
> Project: Kafka
>  Issue Type: Bug
>Reporter: Travis Bischel
>Assignee: Luke Chen
>Priority: Minor
>
> [https://datatracker.ietf.org/doc/html/rfc5802#section-5.1]
> Relevant part, in "r="
>   nonce it initially specified.  The server MUST verify that the
>   nonce sent by the client in the second message is the same as the
>   one sent by the server in its first message.
> [https://github.com/apache/kafka/blob/8a1fcee86e42c8bd1f26309dde8748927109056e/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java#L149-L161]
> The only verification of client-final-message is verifyClientProof:
> [https://github.com/apache/kafka/blob/8a1fcee86e42c8bd1f26309dde8748927109056e/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java#L225-L235]
> This function only looks at the key itself. It does not ensure that the 
> gs2-header is "biws" (base64("n,,")), meaning the user can erroneously 
> specify channel binding. This also does not check that the client's nonce is 
> correct (c-nonce + s-nonce).
>  
> While I'm not 100% sure on what security concerns an invalid nonce could 
> result in _at this stage_ of the auth flow (it's clearer in the first stage 
> w.r.t. replay attacks), it's likely still important to validate.
>  
> I noticed this validation is missing because my own client erroneously 
> replies with only the original c-nonce, not c-nonce s-nonce. The scram flow 
> has always worked, though. Today I changed the client-final-reply to always 
> return nonce "foo", which still successfully talks to Kafka.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13435) Group won't consume partitions added after static member restart

2021-11-18 Thread Ryan Leslie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446209#comment-17446209
 ] 

Ryan Leslie commented on KAFKA-13435:
-

[~guozhang] Awesome, glad we are more in sync now. :)

I think it would be tough for most users of static membership to pick up a new 
Kafka version that suddenly triggers rebalance on leader rejoin, since this was 
avoided in the original implementation. This ticket is an edge case, after all. 
My thought was that if {{JoinResponse}} can at least inform the client that it 
is the returning leader, then it can still have opportunity to check metadata 
for changes compared to its subscription and trigger the rebalance. It may not 
be a perfect solution, and it's still some careful refactoring. Hoping David 
has some ideas around this too. I agree with you it would be ideal if the 
broker controlled these decisions, but that does sound like an even bigger 
change.

> Group won't consume partitions added after static member restart
> 
>
> Key: KAFKA-13435
> URL: https://issues.apache.org/jira/browse/KAFKA-13435
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.0.0
>Reporter: Ryan Leslie
>Assignee: David Jacot
>Priority: Critical
>
> When using consumer groups with static membership, if the consumer marked as 
> leader has restarted, then metadata changes such as partition increase are 
> not triggering expected rebalances.
> To reproduce this issue, simply:
>  # Create a static consumer subscribed to a single topic
>  # Close the consumer and create a new one with the same group instance id
>  # Increase partitions for the topic
>  # Observe that no rebalance occurs and the new partitions are not assigned
> I have only tested this in 2.7, but it may apply to newer versions as well.
> h3. Analysis
> In {_}ConsumerCoordinator{_}, one responsibility of the leader consumer is to 
> track metadata and trigger a rebalance if there are changes such as new 
> partitions added:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793]
> {code:java}
> if (assignmentSnapshot != null && 
> !assignmentSnapshot.matches(metadataSnapshot)) {
> ...
> requestRejoinIfNecessary(reason);
> return true;
> }
> {code}
> Note that _assignmentSnapshot_ is currently only set if the consumer is the 
> leader:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353]
> {code:java}
> // Only the leader is responsible for monitoring for metadata changes (i.e. 
> partition changes)
> if (!isLeader)
> assignmentSnapshot = null;
> {code}
> And _isLeader_ is only true after an assignment is performed during a 
> rebalance:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634]
> That is, when a consumer group forms, exactly one consumer in the group 
> should have _isLeader == True_ and be responsible for triggering rebalances 
> on metadata changes.
> However, in the case of static membership, if the leader has been restarted 
> and rejoined the group, the group essentially no longer has a current leader. 
> Even though the metadata changes are fetched, no rebalance will be triggered. 
> That is, _isLeader_ will be false for all members.
> This issue does not resolve until after an actual group change that causes a 
> proper rebalance. In order to safely make a partition increase when using 
> static membership, consumers must be stopped and have timed out, or forcibly 
> removed with {_}AdminClient.removeMembersFromConsumerGroup(){_}.
> Correcting this in the client probably also requires help from the broker. 
> Currently, when a static consumer that is leader is restarted, the 
> coordinator does recognize the change:
> e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted
> {noformat}
> [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member 
> Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test 
> with unknown member id rejoins, assigning new member id 
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
> 6ebf-47da-95ef-c54fef17ab74, while old member id 
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff
>  will be removed. (
> kafka.coordinator.group.GroupCoordinator){noformat}
> However, it does not attempt to update the leader id since this isn't a new 
> rebalance, and JOIN_GROUP will continue returning the now stale member id as 
> leader:
> 

[jira] [Commented] (KAFKA-13435) Group won't consume partitions added after static member restart

2021-11-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446206#comment-17446206
 ] 

Guozhang Wang commented on KAFKA-13435:
---

[~rleslie] Sorry I misunderstood your scenario, I thought that the member was 
not newly created and hence would still have its {{assignmentSnapshot}} 
locally, now that I re-read the first section of the JIRA description I think I 
get you now.

In this case, at the moment I feel we cannot avoid doing an unnecessary full 
rebalance which triggers perform-assignment.. but in the near future when we 
improve the rebalance protocol, I think it would be better to not having the 
client to remember whether itself is the leader or not, instead, anyone with 
the full subscription information should be able to do the assignment, and for 
anyone (including the leader) to re-join the group, it should be the broker's 
responsibility to decide if a rebalance needs to be triggered or not.

> Group won't consume partitions added after static member restart
> 
>
> Key: KAFKA-13435
> URL: https://issues.apache.org/jira/browse/KAFKA-13435
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.0.0
>Reporter: Ryan Leslie
>Assignee: David Jacot
>Priority: Critical
>
> When using consumer groups with static membership, if the consumer marked as 
> leader has restarted, then metadata changes such as partition increase are 
> not triggering expected rebalances.
> To reproduce this issue, simply:
>  # Create a static consumer subscribed to a single topic
>  # Close the consumer and create a new one with the same group instance id
>  # Increase partitions for the topic
>  # Observe that no rebalance occurs and the new partitions are not assigned
> I have only tested this in 2.7, but it may apply to newer versions as well.
> h3. Analysis
> In {_}ConsumerCoordinator{_}, one responsibility of the leader consumer is to 
> track metadata and trigger a rebalance if there are changes such as new 
> partitions added:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793]
> {code:java}
> if (assignmentSnapshot != null && 
> !assignmentSnapshot.matches(metadataSnapshot)) {
> ...
> requestRejoinIfNecessary(reason);
> return true;
> }
> {code}
> Note that _assignmentSnapshot_ is currently only set if the consumer is the 
> leader:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353]
> {code:java}
> // Only the leader is responsible for monitoring for metadata changes (i.e. 
> partition changes)
> if (!isLeader)
> assignmentSnapshot = null;
> {code}
> And _isLeader_ is only true after an assignment is performed during a 
> rebalance:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634]
> That is, when a consumer group forms, exactly one consumer in the group 
> should have _isLeader == True_ and be responsible for triggering rebalances 
> on metadata changes.
> However, in the case of static membership, if the leader has been restarted 
> and rejoined the group, the group essentially no longer has a current leader. 
> Even though the metadata changes are fetched, no rebalance will be triggered. 
> That is, _isLeader_ will be false for all members.
> This issue does not resolve until after an actual group change that causes a 
> proper rebalance. In order to safely make a partition increase when using 
> static membership, consumers must be stopped and have timed out, or forcibly 
> removed with {_}AdminClient.removeMembersFromConsumerGroup(){_}.
> Correcting this in the client probably also requires help from the broker. 
> Currently, when a static consumer that is leader is restarted, the 
> coordinator does recognize the change:
> e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted
> {noformat}
> [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member 
> Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test 
> with unknown member id rejoins, assigning new member id 
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
> 6ebf-47da-95ef-c54fef17ab74, while old member id 
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff
>  will be removed. (
> kafka.coordinator.group.GroupCoordinator){noformat}
> However, it does not attempt to update the leader id since this isn't a new 
> rebalance, and JOIN_GROUP will continue returning the now stale member id 

[jira] [Comment Edited] (KAFKA-13435) Group won't consume partitions added after static member restart

2021-11-18 Thread Ryan Leslie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446202#comment-17446202
 ] 

Ryan Leslie edited comment on KAFKA-13435 at 11/18/21, 11:18 PM:
-

[~dajac] Thank you for picking this up quickly.

[~guozhang] Thanks for reviewing again. I think as {{ConsumerCoordinator}} is 
currently written, at least one member needs to know its the leader and record 
the assignment. Otherwise {{assignmentSnapshot}} is not set and rebalance won't 
trigger when the metadata is refreshed with new partitions added. But I 
understand there may be another way to address this. Let's wait for David and 
see how the PR ends up looking.


was (Author: rleslie):
[~dajac] Thank you for picking this up quickly.

[~guozhang] Thanks for reviewing again. I think as {{ConsumerCoordinator}} is 
currently written, at least one member needs to know its the leader and record 
the assignment. Otherwise {{assignmentSnapshot }}is not set and rebalance won't 
trigger when the metadata is refreshed with new partitions added. But I 
understand there may be another way to address this. Let's wait for David and 
see how the PR ends up looking.

> Group won't consume partitions added after static member restart
> 
>
> Key: KAFKA-13435
> URL: https://issues.apache.org/jira/browse/KAFKA-13435
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.0.0
>Reporter: Ryan Leslie
>Assignee: David Jacot
>Priority: Critical
>
> When using consumer groups with static membership, if the consumer marked as 
> leader has restarted, then metadata changes such as partition increase are 
> not triggering expected rebalances.
> To reproduce this issue, simply:
>  # Create a static consumer subscribed to a single topic
>  # Close the consumer and create a new one with the same group instance id
>  # Increase partitions for the topic
>  # Observe that no rebalance occurs and the new partitions are not assigned
> I have only tested this in 2.7, but it may apply to newer versions as well.
> h3. Analysis
> In {_}ConsumerCoordinator{_}, one responsibility of the leader consumer is to 
> track metadata and trigger a rebalance if there are changes such as new 
> partitions added:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793]
> {code:java}
> if (assignmentSnapshot != null && 
> !assignmentSnapshot.matches(metadataSnapshot)) {
> ...
> requestRejoinIfNecessary(reason);
> return true;
> }
> {code}
> Note that _assignmentSnapshot_ is currently only set if the consumer is the 
> leader:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353]
> {code:java}
> // Only the leader is responsible for monitoring for metadata changes (i.e. 
> partition changes)
> if (!isLeader)
> assignmentSnapshot = null;
> {code}
> And _isLeader_ is only true after an assignment is performed during a 
> rebalance:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634]
> That is, when a consumer group forms, exactly one consumer in the group 
> should have _isLeader == True_ and be responsible for triggering rebalances 
> on metadata changes.
> However, in the case of static membership, if the leader has been restarted 
> and rejoined the group, the group essentially no longer has a current leader. 
> Even though the metadata changes are fetched, no rebalance will be triggered. 
> That is, _isLeader_ will be false for all members.
> This issue does not resolve until after an actual group change that causes a 
> proper rebalance. In order to safely make a partition increase when using 
> static membership, consumers must be stopped and have timed out, or forcibly 
> removed with {_}AdminClient.removeMembersFromConsumerGroup(){_}.
> Correcting this in the client probably also requires help from the broker. 
> Currently, when a static consumer that is leader is restarted, the 
> coordinator does recognize the change:
> e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted
> {noformat}
> [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member 
> Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test 
> with unknown member id rejoins, assigning new member id 
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
> 6ebf-47da-95ef-c54fef17ab74, while old member id 
> 

[jira] [Commented] (KAFKA-13435) Group won't consume partitions added after static member restart

2021-11-18 Thread Ryan Leslie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446202#comment-17446202
 ] 

Ryan Leslie commented on KAFKA-13435:
-

[~dajac] Thank you for picking this up quickly.

[~guozhang] Thanks for reviewing again. I think as {{ConsumerCoordinator}} is 
currently written, at least one member needs to know its the leader and record 
the assignment. Otherwise {{assignmentSnapshot }}is not set and rebalance won't 
trigger when the metadata is refreshed with new partitions added. But I 
understand there may be another way to address this. Let's wait for David and 
see how the PR ends up looking.

> Group won't consume partitions added after static member restart
> 
>
> Key: KAFKA-13435
> URL: https://issues.apache.org/jira/browse/KAFKA-13435
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.0.0
>Reporter: Ryan Leslie
>Assignee: David Jacot
>Priority: Critical
>
> When using consumer groups with static membership, if the consumer marked as 
> leader has restarted, then metadata changes such as partition increase are 
> not triggering expected rebalances.
> To reproduce this issue, simply:
>  # Create a static consumer subscribed to a single topic
>  # Close the consumer and create a new one with the same group instance id
>  # Increase partitions for the topic
>  # Observe that no rebalance occurs and the new partitions are not assigned
> I have only tested this in 2.7, but it may apply to newer versions as well.
> h3. Analysis
> In {_}ConsumerCoordinator{_}, one responsibility of the leader consumer is to 
> track metadata and trigger a rebalance if there are changes such as new 
> partitions added:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793]
> {code:java}
> if (assignmentSnapshot != null && 
> !assignmentSnapshot.matches(metadataSnapshot)) {
> ...
> requestRejoinIfNecessary(reason);
> return true;
> }
> {code}
> Note that _assignmentSnapshot_ is currently only set if the consumer is the 
> leader:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353]
> {code:java}
> // Only the leader is responsible for monitoring for metadata changes (i.e. 
> partition changes)
> if (!isLeader)
> assignmentSnapshot = null;
> {code}
> And _isLeader_ is only true after an assignment is performed during a 
> rebalance:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634]
> That is, when a consumer group forms, exactly one consumer in the group 
> should have _isLeader == True_ and be responsible for triggering rebalances 
> on metadata changes.
> However, in the case of static membership, if the leader has been restarted 
> and rejoined the group, the group essentially no longer has a current leader. 
> Even though the metadata changes are fetched, no rebalance will be triggered. 
> That is, _isLeader_ will be false for all members.
> This issue does not resolve until after an actual group change that causes a 
> proper rebalance. In order to safely make a partition increase when using 
> static membership, consumers must be stopped and have timed out, or forcibly 
> removed with {_}AdminClient.removeMembersFromConsumerGroup(){_}.
> Correcting this in the client probably also requires help from the broker. 
> Currently, when a static consumer that is leader is restarted, the 
> coordinator does recognize the change:
> e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted
> {noformat}
> [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member 
> Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test 
> with unknown member id rejoins, assigning new member id 
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
> 6ebf-47da-95ef-c54fef17ab74, while old member id 
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff
>  will be removed. (
> kafka.coordinator.group.GroupCoordinator){noformat}
> However, it does not attempt to update the leader id since this isn't a new 
> rebalance, and JOIN_GROUP will continue returning the now stale member id as 
> leader:
> {noformat}
> 2021-11-04 13:53:13,490 DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer 
> instanceId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, 
> clientId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, 
> groupId=ryan_test] Received successful JoinGroup response: 
> 

[jira] [Comment Edited] (KAFKA-13435) Group won't consume partitions added after static member restart

2021-11-18 Thread Ryan Leslie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446202#comment-17446202
 ] 

Ryan Leslie edited comment on KAFKA-13435 at 11/18/21, 11:17 PM:
-

[~dajac] Thank you for picking this up quickly.

[~guozhang] Thanks for reviewing again. I think as {{ConsumerCoordinator}} is 
currently written, at least one member needs to know its the leader and record 
the assignment. Otherwise {{assignmentSnapshot }}is not set and rebalance won't 
trigger when the metadata is refreshed with new partitions added. But I 
understand there may be another way to address this. Let's wait for David and 
see how the PR ends up looking.


was (Author: rleslie):
[~dajac] Thank you for picking this up quickly.

[~guozhang] Thanks for reviewing again. I think as {{ConsumerCoordinator}} is 
currently written, at least one member needs to know its the leader and record 
the assignment. Otherwise {{assignmentSnapshot }}is not set and rebalance won't 
trigger when the metadata is refreshed with new partitions added. But I 
understand there may be another way to address this. Let's wait for David and 
see how the PR ends up looking.

> Group won't consume partitions added after static member restart
> 
>
> Key: KAFKA-13435
> URL: https://issues.apache.org/jira/browse/KAFKA-13435
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.0.0
>Reporter: Ryan Leslie
>Assignee: David Jacot
>Priority: Critical
>
> When using consumer groups with static membership, if the consumer marked as 
> leader has restarted, then metadata changes such as partition increase are 
> not triggering expected rebalances.
> To reproduce this issue, simply:
>  # Create a static consumer subscribed to a single topic
>  # Close the consumer and create a new one with the same group instance id
>  # Increase partitions for the topic
>  # Observe that no rebalance occurs and the new partitions are not assigned
> I have only tested this in 2.7, but it may apply to newer versions as well.
> h3. Analysis
> In {_}ConsumerCoordinator{_}, one responsibility of the leader consumer is to 
> track metadata and trigger a rebalance if there are changes such as new 
> partitions added:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793]
> {code:java}
> if (assignmentSnapshot != null && 
> !assignmentSnapshot.matches(metadataSnapshot)) {
> ...
> requestRejoinIfNecessary(reason);
> return true;
> }
> {code}
> Note that _assignmentSnapshot_ is currently only set if the consumer is the 
> leader:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353]
> {code:java}
> // Only the leader is responsible for monitoring for metadata changes (i.e. 
> partition changes)
> if (!isLeader)
> assignmentSnapshot = null;
> {code}
> And _isLeader_ is only true after an assignment is performed during a 
> rebalance:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634]
> That is, when a consumer group forms, exactly one consumer in the group 
> should have _isLeader == True_ and be responsible for triggering rebalances 
> on metadata changes.
> However, in the case of static membership, if the leader has been restarted 
> and rejoined the group, the group essentially no longer has a current leader. 
> Even though the metadata changes are fetched, no rebalance will be triggered. 
> That is, _isLeader_ will be false for all members.
> This issue does not resolve until after an actual group change that causes a 
> proper rebalance. In order to safely make a partition increase when using 
> static membership, consumers must be stopped and have timed out, or forcibly 
> removed with {_}AdminClient.removeMembersFromConsumerGroup(){_}.
> Correcting this in the client probably also requires help from the broker. 
> Currently, when a static consumer that is leader is restarted, the 
> coordinator does recognize the change:
> e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted
> {noformat}
> [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member 
> Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test 
> with unknown member id rejoins, assigning new member id 
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
> 6ebf-47da-95ef-c54fef17ab74, while old member id 
> 

[jira] [Created] (KAFKA-13464) SCRAM does not validate client-final-message's nonce

2021-11-18 Thread Travis Bischel (Jira)
Travis Bischel created KAFKA-13464:
--

 Summary: SCRAM does not validate client-final-message's nonce
 Key: KAFKA-13464
 URL: https://issues.apache.org/jira/browse/KAFKA-13464
 Project: Kafka
  Issue Type: Bug
Reporter: Travis Bischel


[https://datatracker.ietf.org/doc/html/rfc5802#section-5.1]

Relevant part, in "r="
  nonce it initially specified.  The server MUST verify that the
  nonce sent by the client in the second message is the same as the
  one sent by the server in its first message.
[https://github.com/apache/kafka/blob/8a1fcee86e42c8bd1f26309dde8748927109056e/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java#L149-L161]

The only verification of client-final-message is verifyClientProof:

[https://github.com/apache/kafka/blob/8a1fcee86e42c8bd1f26309dde8748927109056e/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java#L225-L235]

This function only looks at the key itself. It does not ensure that the 
gs2-header is "biws" (base64("n,,")), meaning the user can erroneously specify 
channel binding. This also does not check that the client's nonce is correct 
(c-nonce + s-nonce).

 

While I'm not 100% sure on what security concerns an invalid nonce could result 
in _at this stage_ of the auth flow (it's clearer in the first stage w.r.t. 
replay attacks), it's likely still important to validate.

 

I noticed this validation is missing because my own client erroneously replies 
with only the original c-nonce, not c-nonce s-nonce. The scram flow has always 
worked, though. Today I changed the client-final-reply to always return nonce 
"foo", which still successfully talks to Kafka.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] hachikuji commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

2021-11-18 Thread GitBox


hachikuji commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r752683020



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -638,15 +636,15 @@ public void onFailure(RuntimeException e) {
 /**
  * Return the fetched records, empty the record buffer and update the 
consumed position.
  *
- * NOTE: returning empty records guarantees the consumed position are NOT 
updated.
+ * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the 
consumed position is not updated.

Review comment:
   I was considering whether it would be possible to do something a little 
simpler without caring as much about the case of aborted data. In the future we 
may have more instances of control records which would probably need similar 
logic. The case of an empty committed transaction mentioned in the other 
comment is an example of this.
   
   I think the main thing we're interested in is when the position advances 
(regardless of the reason). We want to ensure that we return from poll() 
whenever this happens. In the code, this happens here: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L744.
 Could we structure this logic so that `Fetch` is tracking all the positions 
which have advanced? Then the condition for returning would only consider 
positions which have advanced.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

2021-11-18 Thread GitBox


hachikuji commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r752683020



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -638,15 +636,15 @@ public void onFailure(RuntimeException e) {
 /**
  * Return the fetched records, empty the record buffer and update the 
consumed position.
  *
- * NOTE: returning empty records guarantees the consumed position are NOT 
updated.
+ * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the 
consumed position is not updated.

Review comment:
   I was considering whether it would be possible to do something a little 
simpler without caring as much about the case of aborted data. In the future we 
may have more instances of control records which would probably need similar 
logic. The case of an empty committed transaction mentioned in the other 
comment is an example of this.
   
   I think the main thing we're interested in is when the position advances. We 
want to ensure that we return from poll() whenever this happens. In the code, 
this happens here: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L744.
 Could we structure this logic so that `Fetch` is tracking all the positions 
which have advanced? Then the condition for returning would only consider 
positions which have advanced.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

2021-11-18 Thread GitBox


hachikuji commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r752683020



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -638,15 +636,15 @@ public void onFailure(RuntimeException e) {
 /**
  * Return the fetched records, empty the record buffer and update the 
consumed position.
  *
- * NOTE: returning empty records guarantees the consumed position are NOT 
updated.
+ * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the 
consumed position is not updated.

Review comment:
   I was considering whether it would be possible to do something a little 
simpler without caring as much about the case of aborted data. In the future we 
may have more instances of control records which would probably need similar 
logic. The case of an empty committed transaction mentioned in the other 
comment is an example of this.
   
   I think the main thing we're interested in is when the position advances. We 
want to ensure that we return from poll() whenever this happens. In the code, 
this happens here: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L744.
 Could we structure this logic so that `Fetch` is tracking all the positions 
which have advanced?
   
   

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -1235,8 +1237,14 @@ public void assign(Collection 
partitions) {
 }
 }
 
-final Map>> records 
= pollForFetches(timer);
-if (!records.isEmpty()) {
+final Fetch fetch = pollForFetches(timer);
+if (!fetch.isEmpty()) {
+if (fetch.records().isEmpty()) {
+assert fetch.containsAborts();

Review comment:
   nit: we do not often use assertions. If you feel the check is 
worthwhile, maybe we can raise an IllegalStateException with a useful error?  
   

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
##
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class Fetch {
+private final Map>> records;
+private boolean containsAborts;

Review comment:
   Could we clarify what this means (maybe with a better name)? From the 
implementation, it looks like we set this for either aborted data or markers. I 
also wanted to mention that it is also possible for the position to advance due 
to an empty committed transaction. It wouldn't be a common case, but nothing in 
the protocol forces a producer to write to each partition that is included in 
the transaction.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

2021-11-18 Thread GitBox


wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752681064



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -908,20 +908,25 @@ private void initializeAndRestorePhase() {
 
 // Check if the topology has been updated since we last checked, ie via 
#addNamedTopology or #removeNamedTopology
 private void checkForTopologyUpdates() {
-if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || 
topologyMetadata.isEmpty()) {
-lastSeenTopologyVersion = topologyMetadata.topologyVersion();
-taskManager.handleTopologyUpdates();
-

Review comment:
   I ended up not doing this to avoid exposing locks without need. We did 
get rid of one of the loops so I hope that helps




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

2021-11-18 Thread GitBox


wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752680404



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -908,20 +908,25 @@ private void initializeAndRestorePhase() {
 
 // Check if the topology has been updated since we last checked, ie via 
#addNamedTopology or #removeNamedTopology
 private void checkForTopologyUpdates() {
-if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || 
topologyMetadata.isEmpty()) {
-lastSeenTopologyVersion = topologyMetadata.topologyVersion();
-taskManager.handleTopologyUpdates();
-
+do {
 topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);
+if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) {

Review comment:
   Okay @ableegoldman and I talked offline about this. I think we found a 
good solution




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

2021-11-18 Thread GitBox


wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752680014



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -83,6 +100,7 @@ public TopologyMetadata(final InternalTopologyBuilder 
builder,
 } else {
 builders.put(UNNAMED_TOPOLOGY, builder);
 }
+getStreamThreadCount = () -> getNumStreamThreads(config);

Review comment:
   removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #11479: KAFKA-12648: Make changing the named topologies blocking

2021-11-18 Thread GitBox


wcarlson5 commented on pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#issuecomment-973307507


   @guozhangwang  @ableegoldman I think this is ready for another pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

2021-11-18 Thread GitBox


wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752580337



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -121,9 +164,9 @@ public void maybeWaitForNonEmptyTopology(final 
Supplier threadState) {
 if (isEmpty() && threadState.get().isAlive()) {
 try {
 lock();
-while (isEmpty() && threadState.get().isAlive()) {

Review comment:
   It seems that we can either leave it in a loop or expose the lock and 
unlock to the stream thread. We can't just call it raw as spotbugs complains




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13458) The Stream is not able to consume from some of the partitions

2021-11-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446108#comment-17446108
 ] 

Guozhang Wang commented on KAFKA-13458:
---

I think you are hitting some known issues that cause transactions to hang (are 
you using any third-party clients?): you can find some related JIRA ticket here 
https://issues.apache.org/jira/browse/KAFKA-13375

> The Stream is not able to consume from some of the partitions
> -
>
> Key: KAFKA-13458
> URL: https://issues.apache.org/jira/browse/KAFKA-13458
> Project: Kafka
>  Issue Type: Bug
>Reporter: Darshan Marathe
>Priority: Blocker
>
> Hi Team
> Kafka-stream version: 2.6.0
> some messages are stuck in the following partitions, and the stream is not 
> able to consume them from those partitions.
> Restart the stream multiple times, but still issue is same.
> Have faced the following issue,
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-29], this could be either transactional offsets 
> waiting for completion, or normal offsets waiting for replication after 
> appending to local log
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-0]
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-3]
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-9]
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-14]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13375) Kafka streams apps w/EOS unable to start at InitProducerId

2021-11-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446107#comment-17446107
 ] 

Guozhang Wang commented on KAFKA-13375:
---

There are actually multiple known issues on the broker side and one of them is 
https://issues.apache.org/jira/browse/KAFKA-7408. But more frequently we found 
that a lot of the root causes is on clients (not java clients, but e.g. 
third-party clients, non-java buggy clients etc) that do not strictly follow 
the EOS protocol.

Here's our current thinking on further improving EOS design: The transaction 
protocol relies on well-behaved clients. Before writing transactional data to 
the partition, it is up to the client to ensure that the partition has been 
added to the transaction using the `AddPartitionsToTxn` API. The broker today 
has no way to ensure that the client does this properly which means that a 
buggy client could cause hanging transactions. And the fact that the client has 
to do this in the first place means the client implementation is more complex 
and likely to contain bugs.

We can consider moving the responsibility of adding the partition to the 
transaction to the partition leader. When the leader encounters the first 
append of a transaction from a given producer, it can itself send 
`AddPartitionsToTxn` to the transaction coordinator. Notably, the produce 
request already contains the transactionalId, so it is possible to find the 
coordinator efficiently. And the produce data contains the producer epoch, so 
we can still fence at the transaction coordinator. 

> Kafka streams apps w/EOS unable to start at InitProducerId
> --
>
> Key: KAFKA-13375
> URL: https://issues.apache.org/jira/browse/KAFKA-13375
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Lerh Chuan Low
>Priority: Major
>
> Hello, I'm wondering if this is a Kafka bug. Our environment setup is as 
> follows:
> Kafka streams 2.8 - with *EXACTLY_ONCE* turned on (Not *EOS_BETA*, but I 
> don't think the changes introduced in EOS beta should affect this). 
> Transaction timeout = 60s.
>  Kafka broker 2.8. 
> We have this situation where we were doing a rolling restart of the broker to 
> apply some security changes. After we finished, 4 out of some 15 Stream Apps 
> are unable to start. They can never succeed, no matter what we do. 
> They fail with the error:
> {code:java}
>  2021-10-14 07:20:13,548 WARN 
> [srn-rec-feeder-802c18a1-9512-4a2a-8c2e-00e37550199d-StreamThread-3] 
> o.a.k.s.p.i.StreamsProducer stream-thread 
> [srn-rec-feeder-802c18a1-9512-4a2a-8c2e-00e37550199d-StreamThread-3] task 
> [0_6] Timeout exception caught trying to initialize transactions. The broker 
> is either slow or in bad state (like not having enough replicas) in 
> responding to the request, or the connection to broker was interrupted 
> sending the request or receiving the response. Will retry initializing the 
> task in the next loop. Consider overwriting max.block.ms to a larger value to 
> avoid timeout errors{code}
> We found a previous Jira describing the issue here: 
> https://issues.apache.org/jira/browse/KAFKA-8803. It seems like back then 
> what people did was to rolling restart the brokers. We tried that - we 
> targeted the group coordinators for our failing apps, then transaction 
> coordinators, then all of them. It hasn't resolved our issue so far. 
> A few interesting things we've found so far:
>  - What I can see is that all the failing apps only fail on certain 
> partitions. E.g for the app above, only partition 6 never succeeds. Partition 
> 6 shares the same coordinator as some of the other partitions and those work, 
> so it seems like the issue isn't related to broker memory state. 
>  - All the failing apps have a message similar to this 
> {code:java}
> [2021-10-14 00:54:51,569] INFO [Transaction Marker Request Completion Handler 
> 103]: Sending srn-rec-feeder-0_6's transaction marker for partition 
> srn-bot-003-14 has permanently failed with error 
> org.apache.kafka.common.errors.InvalidProducerEpochException with the current 
> coordinator epoch 143; cancel sending any more transaction markers 
> TxnMarkerEntry{producerId=7001, producerEpoch=610, coordinatorEpoch=143, 
> result=ABORT, partitions=[srn-bot-003-14]} to the brokers 
> (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler) 
> {code}
> While we were restarting the brokers. They all failed shortly after. No other 
> consumer groups for the other working partitions/working stream apps logged 
> this message. 
> On digging around in git blame and reading through the source, it looks like 
> this is meant to be benign. 
>  - We tried DEBUG logging for the TransactionCoordinator and 
> TransactionStateManager. We can 

[jira] [Commented] (KAFKA-13435) Group won't consume partitions added after static member restart

2021-11-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17446106#comment-17446106
 ] 

Guozhang Wang commented on KAFKA-13435:
---

[~rleslie] today only the brokers can determine if a join-group request would 
trigger a rebalance or not. In this specific case, if a join-group request is 
sent with a known instance id and its corresponding old member id is considered 
the leader, the broker's coordinator can still assign a new member id to it, 
set the leader to the new member id; but whether or not it would set the Leader 
ID in the response depends on whether there are any metadata change that really 
do need a rebalance. If there's no need for a rebalance, it would not set the 
leader id at all --- i.e. only the broker knows who's the current leader, but 
members do not know. I think this is okay since members do not try to remember 
who's the current leader anyways.

[~dajac] please let me know if you've have a PR ready.

> Group won't consume partitions added after static member restart
> 
>
> Key: KAFKA-13435
> URL: https://issues.apache.org/jira/browse/KAFKA-13435
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.0.0
>Reporter: Ryan Leslie
>Assignee: David Jacot
>Priority: Critical
>
> When using consumer groups with static membership, if the consumer marked as 
> leader has restarted, then metadata changes such as partition increase are 
> not triggering expected rebalances.
> To reproduce this issue, simply:
>  # Create a static consumer subscribed to a single topic
>  # Close the consumer and create a new one with the same group instance id
>  # Increase partitions for the topic
>  # Observe that no rebalance occurs and the new partitions are not assigned
> I have only tested this in 2.7, but it may apply to newer versions as well.
> h3. Analysis
> In {_}ConsumerCoordinator{_}, one responsibility of the leader consumer is to 
> track metadata and trigger a rebalance if there are changes such as new 
> partitions added:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793]
> {code:java}
> if (assignmentSnapshot != null && 
> !assignmentSnapshot.matches(metadataSnapshot)) {
> ...
> requestRejoinIfNecessary(reason);
> return true;
> }
> {code}
> Note that _assignmentSnapshot_ is currently only set if the consumer is the 
> leader:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353]
> {code:java}
> // Only the leader is responsible for monitoring for metadata changes (i.e. 
> partition changes)
> if (!isLeader)
> assignmentSnapshot = null;
> {code}
> And _isLeader_ is only true after an assignment is performed during a 
> rebalance:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634]
> That is, when a consumer group forms, exactly one consumer in the group 
> should have _isLeader == True_ and be responsible for triggering rebalances 
> on metadata changes.
> However, in the case of static membership, if the leader has been restarted 
> and rejoined the group, the group essentially no longer has a current leader. 
> Even though the metadata changes are fetched, no rebalance will be triggered. 
> That is, _isLeader_ will be false for all members.
> This issue does not resolve until after an actual group change that causes a 
> proper rebalance. In order to safely make a partition increase when using 
> static membership, consumers must be stopped and have timed out, or forcibly 
> removed with {_}AdminClient.removeMembersFromConsumerGroup(){_}.
> Correcting this in the client probably also requires help from the broker. 
> Currently, when a static consumer that is leader is restarted, the 
> coordinator does recognize the change:
> e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted
> {noformat}
> [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member 
> Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test 
> with unknown member id rejoins, assigning new member id 
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
> 6ebf-47da-95ef-c54fef17ab74, while old member id 
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff
>  will be removed. (
> kafka.coordinator.group.GroupCoordinator){noformat}
> However, it does not attempt to update the leader id since this isn't a new 
> rebalance, and JOIN_GROUP will continue returning the 

[GitHub] [kafka] guozhangwang commented on a change in pull request #11496: KAFKA-13454: kafka has duplicate configuration information log information printin…

2021-11-18 Thread GitBox


guozhangwang commented on a change in pull request #11496:
URL: https://github.com/apache/kafka/pull/11496#discussion_r752509125



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -291,23 +291,23 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 dynamicDefaultConfigs.clone()
   }
 
-  private[server] def updateBrokerConfig(brokerId: Int, persistentProps: 
Properties): Unit = CoreUtils.inWriteLock(lock) {
+  private[server] def updateBrokerConfig(brokerId: Int, persistentProps: 
Properties, doLog: Boolean = false): Unit = CoreUtils.inWriteLock(lock) {

Review comment:
   I think in order to not modify the behavior in `ConfigHandler`, we need 
to make the default `doLog` as true, and then explicitly set it in line 215 
above as `false`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna merged pull request #11462: MINOR: Set mock correctly in RocksDBMetricsRecorderTest

2021-11-18 Thread GitBox


cadonna merged pull request #11462:
URL: https://github.com/apache/kafka/pull/11462


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11462: MINOR: Set mock correctly in RocksDBMetricsRecorderTest

2021-11-18 Thread GitBox


cadonna commented on pull request #11462:
URL: https://github.com/apache/kafka/pull/11462#issuecomment-973069804


   Test failures are unrelated and known to be flaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hgeraldino opened a new pull request #11515: KIP-795 Make AbstractCoordinator part of the public API

2021-11-18 Thread GitBox


hgeraldino opened a new pull request #11515:
URL: https://github.com/apache/kafka/pull/11515


   As part of KIP-795, this PR relocates some classes from the 
`org.apache.kafka.clients.consumer.internals` package to the 
`org.apache.kafka.clients.consumer` package, in order to make them part of 
Kafka's public API.
   
   No new functionality has been added or removed.  The methods included in the 
new `Coordinator` interface have forcibly made public, and so has methods from 
other internal classes (Hearbeat, ConsumerNetworkClient) that are referenced 
from the newly public classes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

2021-11-18 Thread GitBox


dajac commented on pull request #11459:
URL: https://github.com/apache/kafka/pull/11459#issuecomment-973034180


   @jolshan Let's try to address that flaky test separately as it is not 
related to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

2021-11-18 Thread GitBox


dajac merged pull request #11459:
URL: https://github.com/apache/kafka/pull/11459


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11459: KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they are no longer sent by the controller

2021-11-18 Thread GitBox


dajac commented on pull request #11459:
URL: https://github.com/apache/kafka/pull/11459#issuecomment-973030538


   The flaky tests are unrelated to this PR. Going to merge to trunk and 3.1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-18 Thread GitBox


rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r752346217



##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -297,7 +297,7 @@ object TestUtils extends Logging {
   props.put(KafkaConfig.NodeIdProp, nodeId.toString)
   props.put(KafkaConfig.BrokerIdProp, nodeId.toString)
   props.put(KafkaConfig.AdvertisedListenersProp, listeners)
-  props.put(KafkaConfig.ListenersProp, listeners + 
",CONTROLLER://localhost:0")
+  props.put(KafkaConfig.ListenersProp, listeners)

Review comment:
   This is the same change from 
https://github.com/apache/kafka/pull/11511/files#diff-b8f9f9d1b191457cbdb332a3429f0ad65b50fa4cef5af8562abcfd1f177a2cfeL300.
  Without adding it here we cannot get a clean build in this PR right now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-18 Thread GitBox


rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r752341197



##
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##
@@ -223,31 +220,26 @@ class KafkaConfigTest {
 
 //advertised listener should contain control-plane listener
 val advertisedEndpoints = serverConfig.advertisedListeners
-assertFalse(advertisedEndpoints.filter { endpoint =>
+assertTrue(advertisedEndpoints.exists { endpoint =>
   endpoint.securityProtocol == controlEndpoint.securityProtocol && 
endpoint.listenerName.value().equals(controlEndpoint.listenerName.value())
-}.isEmpty)
+})
 
 // interBrokerListener name should be different from control-plane 
listener name
 val interBrokerListenerName = serverConfig.interBrokerListenerName
 
assertFalse(interBrokerListenerName.value().equals(controlEndpoint.listenerName.value()))
   }
 
   @Test
-  def testControllerListenerName() = {
-val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://localhost:0,CONTROLPLANE://localhost:4000,CONTROLLER://localhost:5000")
-props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
"PLAINTEXT:PLAINTEXT,CONTROLPLANE:SSL,CONTROLLER:SASL_SSL")
-props.put(KafkaConfig.AdvertisedListenersProp, 
"PLAINTEXT://localhost:0,CONTROLPLANE://localhost:4000")
-props.put(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLPLANE")

Review comment:
   This controller listener name test included control plane listener 
configurations, and it was declaring the config to be using ZooKeeper instead 
of KRaft.  We now disallow controller.listener.names when using ZooKeeper and 
control.plane.listener.name when using KRaft.  So now this test does what it 
says: it tests controller listener name, and it sets a KRaft config and leaves 
out the control plane listener stuff (which is tested above anyway, in 
`testControlPlaneListenerName()`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-18 Thread GitBox


rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r752335041



##
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##
@@ -186,34 +186,31 @@ class KafkaConfigTest {
 
 // listeners with duplicate port
 props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://localhost:9091,SSL://localhost:9091")
-var caught = assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
-assertTrue(caught.getMessage.contains("Each listener must have a different 
port"))
+assertBadConfigContainingMessage(props, "Each listener must have a 
different port")
 
 // listeners with duplicate name
 props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092")
-caught = assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
-assertTrue(caught.getMessage.contains("Each listener must have a different 
name"))
+assertBadConfigContainingMessage(props, "Each listener must have a 
different name")
 
 // advertised listeners can have duplicate ports
 props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
"HOST:SASL_SSL,LB:SASL_SSL")
 props.put(KafkaConfig.InterBrokerListenerNameProp, "HOST")
 props.put(KafkaConfig.ListenersProp, 
"HOST://localhost:9091,LB://localhost:9092")
 props.put(KafkaConfig.AdvertisedListenersProp, 
"HOST://localhost:9091,LB://localhost:9091")
-assertTrue(isValidKafkaConfig(props))
+KafkaConfig.fromProps(props)

Review comment:
   Simply trying to create the config is better because if the config is 
invalid we actually see the exception that caused it, which is much more 
helpful than simply seeing that `true` was expected and we got `false` instead. 
 I made this change in many places below as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-18 Thread GitBox


rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r752331857



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -2031,11 +2088,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable 
meta-address 0.0.0.0. "+
   s"Use a routable IP address.")
 
-// Ensure controller listeners are not in the advertised listeners list
-require(!controllerListeners.exists(advertisedListeners.contains),
-  s"${KafkaConfig.AdvertisedListenersProp} cannot contain any of 
${KafkaConfig.ControllerListenerNamesProp}")
-

Review comment:
   This is now checked above -- this exact check is done when running the 
broker role, but the check for when running just the controller role is 
actually now more constrained: advertised listeners must be empty in that case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-18 Thread GitBox


rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r752327468



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -2011,12 +2012,68 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
 require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
   " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-  s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-

Review comment:
   This check is now handled below




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-18 Thread GitBox


rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r752328227



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -2011,12 +2012,68 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
 require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
   " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-  s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-
 val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+if (usesSelfManagedQuorum) {
+  require(controlPlaneListenerName.isEmpty,
+s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")

Review comment:
   Used to be checked in `BrokerServer` -- now we just do it here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position information of records to changelog

2021-11-18 Thread GitBox


vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r752325491



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
##
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.internals.Position;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+@Category({IntegrationTest.class})
+public class ConsistencyVectorIntegrationTest {
+
+private static final int NUM_BROKERS = 1;
+private static int port = 0;
+private static final String INPUT_TOPIC_NAME = "input-topic";
+private static final String TABLE_NAME = "source-table";
+
+public final EmbeddedKafkaCluster cluster = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+@Rule
+public TestName testName = new TestName();
+
+private final List streamsToCleanup = new ArrayList<>();
+private final MockTime mockTime = cluster.time;
+
+@Before
+public void before() throws InterruptedException, IOException {
+cluster.start();
+cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
+}
+
+@After
+public void after() {
+for (final KafkaStreams kafkaStreams : streamsToCleanup) {
+kafkaStreams.close();
+}
+cluster.stop();
+}
+
+@Test
+public void shouldHaveSamePositionBoundActiveAndStandBy() throws Exception 
{
+final int batch1NumMessages = 100;
+final int key = 1;
+final Semaphore semaphore = new Semaphore(0);
+
+final StreamsBuilder builder = new StreamsBuilder();
+Objects.requireNonNull(TABLE_NAME, "name cannot be 

[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-18 Thread GitBox


rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r752324936



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -2011,12 +2012,68 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
 require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
   " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-  s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
-
 val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+if (usesSelfManagedQuorum) {
+  require(controlPlaneListenerName.isEmpty,
+s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in 
KRaft mode.")
+  val sourceOfAdvertisedListeners = if 
(getString(KafkaConfig.AdvertisedListenersProp) != null)
+s"${KafkaConfig.AdvertisedListenersProp}"
+  else
+s"${KafkaConfig.ListenersProp}"
+  if (!processRoles.contains(BrokerRole)) {
+// advertised listeners must be empty when not also running the broker 
role
+require(advertisedListeners.isEmpty,
+  sourceOfAdvertisedListeners +
+s" must only contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when 
${KafkaConfig.ProcessRolesProp}=controller")
+  } else {
+// when running broker role advertised listeners cannot contain 
controller listeners
+require(!advertisedListenerNames.exists(aln => 
controllerListenerNames.contains(aln.value())),
+  sourceOfAdvertisedListeners +
+s" must not contain KRaft controller listeners from 
${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp} 
contains the broker role")
+  }
+  if (processRoles.contains(ControllerRole)) {
+// has controller role (and optionally broker role as well)
+// controller.listener.names must be non-empty
+// every one must appear in listeners
+// the port appearing in controller.quorum.voters for this node must 
match the port of the first controller listener
+// (we allow other nodes' voter ports to differ to support running 
multiple controllers on the same host)
+require(controllerListeners.nonEmpty,
+  s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+val listenerNameValues = listeners.map(_.listenerName.value).toSet
+require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
+  s"${KafkaConfig.ControllerListenerNamesProp} must only contain 
values appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running the KRaft controller role")
+val addressSpecForThisNode = 
RaftConfig.parseVoterConnections(quorumVoters).get(nodeId)
+addressSpecForThisNode match {
+  case inetAddressSpec: RaftConfig.InetAddressSpec => {
+val quorumVotersPort = inetAddressSpec.address.getPort
+require(controllerListeners.head.port == quorumVotersPort,
+  s"Port in ${KafkaConfig.QuorumVotersProp} for this controller 
node (${KafkaConfig.NodeIdProp}=$nodeId, port=$quorumVotersPort) does not match 
the port for the first controller listener in 
${KafkaConfig.ControllerListenerNamesProp} 
(${controllerListeners.head.listenerName.value()}, 
port=${controllerListeners.head.port})")
+  }
+  case _ =>
+}
+  } else {
+// only broker role
+// controller.listener.names must be non-empty
+// none of them can appear in listeners
+// warn that only the first one is used if there is more than one
+require(controllerListenerNames.exists(_.nonEmpty),
+  s"${KafkaConfig.ControllerListenerNamesProp} must contain at least 
one value when running KRaft with just the broker role")
+if (controllerListenerNames.size > 1) {
+  warn(s"${KafkaConfig.ControllerListenerNamesProp} has multiple 
entries; only the first will be used since 
${KafkaConfig.ProcessRolesProp}=broker: $controllerListenerNames")
+}
+require(controllerListeners.isEmpty,
+  s"${KafkaConfig.ControllerListenerNamesProp} must not contain a 
value appearing in the '${KafkaConfig.ListenersProp}' configuration when 
running KRaft with just the broker role")
+  }
+} else {
+  // controller listener names must be empty when not in KRaft mode
+  require(!controllerListenerNames.exists(_.nonEmpty), 

[jira] [Resolved] (KAFKA-13462) KRaft server does not return internal topics on list topics RPC

2021-11-18 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming resolved KAFKA-13462.

Resolution: Invalid

__consumer_offsets will not be created unless we store commit offset

> KRaft server does not return internal topics on list topics RPC
> ---
>
> Key: KAFKA-13462
> URL: https://issues.apache.org/jira/browse/KAFKA-13462
> Project: Kafka
>  Issue Type: Bug
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-18 Thread GitBox


rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r752312628



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -2007,8 +2007,13 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
 require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
   " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-require(!processRoles.contains(ControllerRole) || 
controllerListeners.nonEmpty,
-  s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the 
server has the controller role")
+if (usesSelfManagedQuorum) {

Review comment:
   I've defined the voter port validation to check that this controller's 
voter node port matches the first controller listener port.  We can't validate 
all controllers' voter ports because we run multiple controllers on the same 
host in tests, and in that case every controller must list on a different port 
despite using the same security protocol.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: controller.listener.names required in KRaft

2021-11-18 Thread GitBox


rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r752307030



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -404,10 +404,6 @@ class BrokerServer(
 config.numIoThreads, 
s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent",
 SocketServer.DataPlaneThreadPrefix)
 
-  if (socketServer.controlPlaneRequestChannelOpt.isDefined) {
-throw new RuntimeException(KafkaConfig.ControlPlaneListenerNameProp + 
" is not " +
-  "supported when in KRaft mode.")
-  }

Review comment:
   Removing since it is redundant -- we are now checking this when creating 
a KafkaConfig




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi opened a new pull request #11514: Tracking latest seen offset in state stores

2021-11-18 Thread GitBox


patrickstuedi opened a new pull request #11514:
URL: https://github.com/apache/kafka/pull/11514


   This PR is levering the previously added StateStoreContext::recordMetadata() 
to track the current position seen by a state store. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #11450: KAFKA-13414: Replace Powermock/EasyMock by Mockito in connect.storage

2021-11-18 Thread GitBox


dengziming commented on a change in pull request #11450:
URL: https://github.com/apache/kafka/pull/11450#discussion_r752292519



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
##
@@ -77,21 +74,17 @@ public void teardown() {
 @Test
 public void testGetSet() throws Exception {
 Callback setCallback = expectSuccessfulSetCallback();
-PowerMock.replayAll();
 
 store.set(firstSet, setCallback).get();
 
 Map values = 
store.get(Arrays.asList(buffer("key"), buffer("bad"))).get();
 assertEquals(buffer("value"), values.get(buffer("key")));
 assertNull(values.get(buffer("bad")));
-
-PowerMock.verifyAll();

Review comment:
   The update LGTM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vpapavas opened a new pull request #11513: feat: Write offset position information of records to changelog

2021-11-18 Thread GitBox


vpapavas opened a new pull request #11513:
URL: https://github.com/apache/kafka/pull/11513


   As part of the consistency work, a RocksDBStore has a consistency vector 
that contains the latest seen offset of every partition. We need this 
information on StandBy servers as well. To achieve this, we persist the offset 
along each record that is written to the changelog topic. This way, Standby 
servers can re-create the consistency vector during restoration.
   
   There are three tests, one integration test that checks that the offsets are 
written to the changelog and restored at the standby. A unit test that tests 
restoration and a unit test that tests writing to changelog.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11393: MINOR: Refactor RequestResponseTest

2021-11-18 Thread GitBox


mimaison commented on pull request #11393:
URL: https://github.com/apache/kafka/pull/11393#issuecomment-972862126


   Rebased on trunk.
   @dajac @hachikuji Gentle reminder :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 edited a comment on pull request #11512: MINOR: Modify the Exception type of the testCommitOffsetAsyncNotCoordinator method

2021-11-18 Thread GitBox


RivenSun2 edited a comment on pull request #11512:
URL: https://github.com/apache/kafka/pull/11512#issuecomment-972857020


   Hi @dajac  @hachikuji and @showuon  , please help to review the PR .
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11512: MINOR: Modify the Exception type of the testCommitOffsetAsyncNotCoordinator method

2021-11-18 Thread GitBox


RivenSun2 commented on pull request #11512:
URL: https://github.com/apache/kafka/pull/11512#issuecomment-972857020


   Hi @dajac  @hachikuji , please help to review the PR .
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 opened a new pull request #11512: MINOR: Modify the Exception type of the testCommitOffsetAsyncNotCoordinator method

2021-11-18 Thread GitBox


RivenSun2 opened a new pull request #11512:
URL: https://github.com/apache/kafka/pull/11512


   Modify the Exception type of the testCommitOffsetAsyncNotCoordinator method
   
   Committer Checklist (excluded from commit message)
Verify design and implementation
Verify test coverage and CI build status
Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11450: KAFKA-13414: Replace Powermock/EasyMock by Mockito in connect.storage

2021-11-18 Thread GitBox


mimaison commented on pull request #11450:
URL: https://github.com/apache/kafka/pull/11450#issuecomment-972840100


   @chia7712 @tombentley Can you take a look? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mdedetrich commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2021-11-18 Thread GitBox


mdedetrich commented on pull request #11478:
URL: https://github.com/apache/kafka/pull/11478#issuecomment-972827297


   I have updated the PR to add documentation to 
https://kafka.apache.org/documentation/#brokerconfigs_listeners


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] katheris commented on a change in pull request #11500: KAFKA-13455: Add steps to run Kafka Connect to quickstart

2021-11-18 Thread GitBox


katheris commented on a change in pull request #11500:
URL: https://github.com/apache/kafka/pull/11500#discussion_r752179967



##
File path: docs/quickstart.html
##
@@ -158,15 +158,78 @@ 
 
 
 
-You probably have lots of data in existing systems like relational 
databases or traditional messaging systems,
-along with many applications that already use these systems.
-Kafka Connect allows you to 
continuously ingest
-data from external systems into Kafka, and vice versa.  It is thus 
very easy to integrate existing systems with
-Kafka. To make this process even easier, there are hundreds of 
such connectors readily available.
+Reading data from the console and writing it back to the console 
is a convenient place to start, but you'll probably want

Review comment:
   @mimaison thanks for your review. I think the existing paragraph and the 
new ones cover similar things so I've had a go at combining them. Let me know 
if you think it reads ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] katheris commented on a change in pull request #11500: KAFKA-13455: Add steps to run Kafka Connect to quickstart

2021-11-18 Thread GitBox


katheris commented on a change in pull request #11500:
URL: https://github.com/apache/kafka/pull/11500#discussion_r752179967



##
File path: docs/quickstart.html
##
@@ -158,15 +158,78 @@ 
 
 
 
-You probably have lots of data in existing systems like relational 
databases or traditional messaging systems,
-along with many applications that already use these systems.
-Kafka Connect allows you to 
continuously ingest
-data from external systems into Kafka, and vice versa.  It is thus 
very easy to integrate existing systems with
-Kafka. To make this process even easier, there are hundreds of 
such connectors readily available.
+Reading data from the console and writing it back to the console 
is a convenient place to start, but you'll probably want

Review comment:
   I think the existing paragraph and the new ones cover similar things so 
I've had a go at combining them. Let me know if you think it reads ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mdedetrich commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2021-11-18 Thread GitBox


mdedetrich commented on pull request #11478:
URL: https://github.com/apache/kafka/pull/11478#issuecomment-972784982


   I have updated the PR to add some upgrade notes to `docs/upgrade.html`. I am 
not sure if additional documentation is needed elsewhere (I had a look at 
`docs` in general and couldn't find anything specific enough but I may have 
missed something).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #11467: MINOR: fix java doc in kafkaProducer

2021-11-18 Thread GitBox


mimaison merged pull request #11467:
URL: https://github.com/apache/kafka/pull/11467


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11500: KAFKA-13455: Add steps to run Kafka Connect to quickstart

2021-11-18 Thread GitBox


mimaison commented on a change in pull request #11500:
URL: https://github.com/apache/kafka/pull/11500#discussion_r752125838



##
File path: docs/quickstart.html
##
@@ -158,15 +158,78 @@ 
 
 
 
-You probably have lots of data in existing systems like relational 
databases or traditional messaging systems,
-along with many applications that already use these systems.
-Kafka Connect allows you to 
continuously ingest
-data from external systems into Kafka, and vice versa.  It is thus 
very easy to integrate existing systems with
-Kafka. To make this process even easier, there are hundreds of 
such connectors readily available.
+Reading data from the console and writing it back to the console 
is a convenient place to start, but you'll probably want

Review comment:
   Should we keep the existing paragraph and add this after it? I think it 
helps introduce Connect.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #11472: TRIVIAL: Remove unused parameters, exceptions, comments, etc.

2021-11-18 Thread GitBox


mimaison merged pull request #11472:
URL: https://github.com/apache/kafka/pull/11472


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error

2021-11-18 Thread GitBox


dajac commented on a change in pull request #11451:
URL: https://github.com/apache/kafka/pull/11451#discussion_r752117851



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -746,8 +748,8 @@ public void handle(SyncGroupResponse syncResponse,
 sensors.syncSensor.record(response.requestLatencyMs());
 
 synchronized (AbstractCoordinator.this) {
-if (!generation.equals(Generation.NO_GENERATION) && 
state == MemberState.COMPLETING_REBALANCE) {
-// check protocol name only if the generation is 
not reset
+if (generation.protocolName != null && state == 
MemberState.COMPLETING_REBALANCE) {
+// check protocol name only if the generation is 
not reset (protocol name is not null)

Review comment:
   I think that the intend was to validate `protocolName` only when the 
generation was not reset. It seems that we are changing this here. Why?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error

2021-11-18 Thread GitBox


dajac commented on a change in pull request #11451:
URL: https://github.com/apache/kafka/pull/11451#discussion_r752115394



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -443,7 +443,9 @@ boolean joinGroupIfNeeded(final Timer timer) {
 stateSnapshot = this.state;
 }
 
-if (!generationSnapshot.equals(Generation.NO_GENERATION) && 
stateSnapshot == MemberState.STABLE) {
+if ((generationSnapshot.generationId != 
Generation.NO_GENERATION.generationId ||
+
!generationSnapshot.memberId.equals(Generation.NO_GENERATION.memberId)) &&
+stateSnapshot == MemberState.STABLE) {

Review comment:
   Is the `||` in this condition correct? I thought that we would consider 
the rebalance successful only if we have a valid generation and a valid member 
id. Am I missing something?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13463) Improvement: KafkaConsumer pause(Collection partitions)

2021-11-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17445793#comment-17445793
 ] 

RivenSun edited comment on KAFKA-13463 at 11/18/21, 10:06 AM:
--

Supplement to the suggestion:
h2. 6) Strip the paused mark of topicpartition from assignment

<1> The new instance variable TopicPartitionPausedState is used in 
SubscriptionState to store the paused mark of each topicPartition, and the 
paused mark is not stored in the assignment.

 

<2> In my opinion, the pause and resume methods are entirely the behavior of 
the kafkConsumer client, and it should not be affected by groupRebalance. 
During the groupRebalance process, KafkaConsumer will not silently modify 
TopicPartitionPausedState. TopicPartitionPausedState can only be modified by 
the user's pause and resume.

At the same time, it supports the user to pause a topicPartition that is not in 
the assignment, because the paused mark is only the concept of the partition 
setting of the kafkaConsumer.

In other words, no matter whether the consumer has assigned any topicPartitions 
or not, kafkaConsumer can pause any topicPartition, even if the topicPartition 
has not been existed, it may be created(or by addPartition) in the future.

 

<3> The resume(Collection partitions) method, clean up the 
paused mark in TopicPartitionPausedState


was (Author: rivensun):
Supplement to the suggestion:
h2. 6) Strip the paused mark of topicpartition from assignment

<1> The new instance variable TopicPartitionPausedState is used in 
SubscriptionState to store the paused mark of each topicPartition, and the 
paused mark is not stored in the assignment.

 

<2> In my opinion, the pause and resume methods are entirely the behavior of 
the kafkConsumer client, and it should not be affected by groupRebalance. 
During the groupRebalance process, KafkaConsumer will not silently modify 
TopicPartitionPausedState. TopicPartitionPausedState can only be modified by 
the user's pause and resume.

At the same time, it supports the user to pause a topicPartition that is not in 
the assignment, because the paused mark is only the concept of the partition 
setting of the kafkaConsumer.

In other words, no matter whether the consumer has assigned any topicPartitions 
or not, kafkaConsumer can pause any topicPartition, even if the topicPartition 
has not been existed, it may be created(or by addPartition) in the future.

 

<3> The pause(Collection partitions) method, clean up the 
paused mark in TopicPartitionPausedState

> Improvement: KafkaConsumer pause(Collection partitions)
> ---
>
> Key: KAFKA-13463
> URL: https://issues.apache.org/jira/browse/KAFKA-13463
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>
> h1. 1.Background
> When users use the kafkaConsumer#pause(...) method, they will maybe ignore: 
> the pause method may no longer work, and data will be lost.
> For example, the following simple code:
> {code:java}
> while (true) {
> try {
> kafkaConsumer.pause(kafkaConsumer.assignment());
> ConsumerRecords records = 
> kafkaConsumer.poll(Duration.ofSeconds(2));
> if (!records.isEmpty()) {
> log.error("kafka poll for rebalance discard some record!");
> }
> } catch (Exception e) {
> log.error("maintain poll for rebalance with error:{}", 
> e.getMessage(), e);
> }
> }{code}
> Even if you call pause(assignment) before the poll method every time, the 
> poll method may still return messages.
>  
> h1. 2. RootCause:
> In short, during the rebalance of the group, 
> ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark 
> on the partitions previously held by kafkaConsumer. However, while clearing 
> the paused mark of partitions, the corresponding message in the memory 
> (Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in 
> Fetcher#fetchedRecords() still fetching the message and returning it to the 
> customer.
> For more detailed analysis, if you are interested, you can read Jira 
> https://issues.apache.org/jira/browse/KAFKA-13425 
> looking forward to your reply.
>  
> h1. 3.Discuss : Can KafkaConsumer support the pause method that is not 
> affected by groupRebalance?
> The KafkaConsumer#pause method actually stated one point at the beginning of 
> its design:
>  * Rebalance does not preserve pause/resume state.
> link:https://issues.apache.org/jira/browse/KAFKA-2350
> Unfortunately, I did not see this from the comments of the 
> KafkaConsumer#pause(...) method. At the same time, 
> ConsumerCoordinator#invokePartitionsRevoked did not have any log output when 
> cleaning up the paused mark. I believe that 

[GitHub] [kafka] dajac commented on pull request #11510: MINOR: Fix `client.quota.callback.class` doc

2021-11-18 Thread GitBox


dajac commented on pull request #11510:
URL: https://github.com/apache/kafka/pull/11510#issuecomment-972717480


   Merged to trunk and 3.1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11510: MINOR: Fix `client.quota.callback.class` doc

2021-11-18 Thread GitBox


dajac merged pull request #11510:
URL: https://github.com/apache/kafka/pull/11510


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13463) Improvement: KafkaConsumer pause(Collection partitions)

2021-11-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17445793#comment-17445793
 ] 

RivenSun edited comment on KAFKA-13463 at 11/18/21, 10:01 AM:
--

Supplement to the suggestion:
h2. 6) Strip the paused mark of topicpartition from assignment

<1> The new instance variable TopicPartitionPausedState is used in 
SubscriptionState to store the paused mark of each topicPartition, and the 
paused mark is not stored in the assignment.

 

<2> In my opinion, the pause and resume methods are entirely the behavior of 
the kafkConsumer client, and it should not be affected by groupRebalance. 
During the groupRebalance process, KafkaConsumer will not silently modify 
TopicPartitionPausedState. TopicPartitionPausedState can only be modified by 
the user's pause and resume.

At the same time, it supports the user to pause a topicPartition that is not in 
the assignment, because the paused mark is only the concept of the partition 
setting of the kafkaConsumer.

In other words, no matter whether the consumer has assigned any topicPartitions 
or not, kafkaConsumer can pause any topicPartition, even if the topicPartition 
has not been existed, it may be created(or by addPartition) in the future.

 

<3> The pause(Collection partitions) method, clean up the 
paused mark in TopicPartitionPausedState


was (Author: rivensun):
Supplement to the suggestion:
h2. 6) Strip the paused mark of topicpartition from assignment

<1> The new instance variable TopicPartitionPausedState is used in 
SubscriptionState to store the paused mark of each topicPartition, and the 
paused mark is not stored in the assignment.

 

<2> In my opinion, the pause and resume methods are entirely the behavior of 
the kafkConsumer client, and it should not be affected by groupRebalance.

At the same time, it supports the user to pause a topicPartition that is not in 
the assignment, because the paused mark is only the concept of the partition 
setting of the kafkaConsumer.

In other words, no matter whether the consumer has assigned any topicPartitions 
or not, kafkaConsumer can pause any topicPartition, even if the topicPartition 
has not been existed, it may be created(or by addPartition) in the future.

 

<3> The pause(Collection partitions) method, clean up the 
paused mark in TopicPartitionPausedState

> Improvement: KafkaConsumer pause(Collection partitions)
> ---
>
> Key: KAFKA-13463
> URL: https://issues.apache.org/jira/browse/KAFKA-13463
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>
> h1. 1.Background
> When users use the kafkaConsumer#pause(...) method, they will maybe ignore: 
> the pause method may no longer work, and data will be lost.
> For example, the following simple code:
> {code:java}
> while (true) {
> try {
> kafkaConsumer.pause(kafkaConsumer.assignment());
> ConsumerRecords records = 
> kafkaConsumer.poll(Duration.ofSeconds(2));
> if (!records.isEmpty()) {
> log.error("kafka poll for rebalance discard some record!");
> }
> } catch (Exception e) {
> log.error("maintain poll for rebalance with error:{}", 
> e.getMessage(), e);
> }
> }{code}
> Even if you call pause(assignment) before the poll method every time, the 
> poll method may still return messages.
>  
> h1. 2. RootCause:
> In short, during the rebalance of the group, 
> ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark 
> on the partitions previously held by kafkaConsumer. However, while clearing 
> the paused mark of partitions, the corresponding message in the memory 
> (Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in 
> Fetcher#fetchedRecords() still fetching the message and returning it to the 
> customer.
> For more detailed analysis, if you are interested, you can read Jira 
> https://issues.apache.org/jira/browse/KAFKA-13425 
> looking forward to your reply.
>  
> h1. 3.Discuss : Can KafkaConsumer support the pause method that is not 
> affected by groupRebalance?
> The KafkaConsumer#pause method actually stated one point at the beginning of 
> its design:
>  * Rebalance does not preserve pause/resume state.
> link:https://issues.apache.org/jira/browse/KAFKA-2350
> Unfortunately, I did not see this from the comments of the 
> KafkaConsumer#pause(...) method. At the same time, 
> ConsumerCoordinator#invokePartitionsRevoked did not have any log output when 
> cleaning up the paused mark. I believe that this will cause many users to use 
> the KafkaConsumer#pause(...) method incorrectly.
> But I think it is necessary for KafkaConsumer to provide a pause method that 
> is not affected 

[jira] [Comment Edited] (KAFKA-13463) Improvement: KafkaConsumer pause(Collection partitions)

2021-11-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17445793#comment-17445793
 ] 

RivenSun edited comment on KAFKA-13463 at 11/18/21, 9:57 AM:
-

Supplement to the suggestion:
h2. 6) Strip the paused mark of topicpartition from assignment

<1> The new instance variable TopicPartitionPausedState is used in 
SubscriptionState to store the paused mark of each topicPartition, and the 
paused mark is not stored in the assignment.

 

<2> In my opinion, the pause and resume methods are entirely the behavior of 
the kafkConsumer client, and it should not be affected by groupRebalance.

At the same time, it supports the user to pause a topicPartition that is not in 
the assignment, because the paused mark is only the concept of the partition 
setting of the kafkaConsumer.

In other words, no matter whether the consumer has assigned any topicPartitions 
or not, kafkaConsumer can pause any topicPartition, even if the topicPartition 
has not been existed, it may be created(or by addPartition) in the future.

 

<3> The pause(Collection partitions) method, clean up the 
paused mark in TopicPartitionPausedState


was (Author: rivensun):
Supplement to the suggestion:
h2. 6) Strip the paused mark of topicpartition from assignment

<1> The new instance variable TopicPartitionPausedState is used in 
SubscriptionState to store the paused mark of each topicPartition, and the 
paused mark is not stored in the assignment.

 

<2> In my opinion, the pause and resume methods are entirely the behavior of 
the kafkConsumer client, and it should not be affected by groupRebalance.

At the same time, it supports the user to pause a topicPartition that is not in 
the assignment, because the paused mark is only the concept of the partition 
setting of the kafkaConsumer.

In other words, no matter whether the consumer has assigned any topicPartitions 
or not, kafkaConsumer can pause any topicPartition, even if the topicPartition 
has not been created, it may be created in the future.

 

<3> The resume method, clean up the paused mark in TopicPartitionPausedState

> Improvement: KafkaConsumer pause(Collection partitions)
> ---
>
> Key: KAFKA-13463
> URL: https://issues.apache.org/jira/browse/KAFKA-13463
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>
> h1. 1.Background
> When users use the kafkaConsumer#pause(...) method, they will maybe ignore: 
> the pause method may no longer work, and data will be lost.
> For example, the following simple code:
> {code:java}
> while (true) {
> try {
> kafkaConsumer.pause(kafkaConsumer.assignment());
> ConsumerRecords records = 
> kafkaConsumer.poll(Duration.ofSeconds(2));
> if (!records.isEmpty()) {
> log.error("kafka poll for rebalance discard some record!");
> }
> } catch (Exception e) {
> log.error("maintain poll for rebalance with error:{}", 
> e.getMessage(), e);
> }
> }{code}
> Even if you call pause(assignment) before the poll method every time, the 
> poll method may still return messages.
>  
> h1. 2. RootCause:
> In short, during the rebalance of the group, 
> ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark 
> on the partitions previously held by kafkaConsumer. However, while clearing 
> the paused mark of partitions, the corresponding message in the memory 
> (Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in 
> Fetcher#fetchedRecords() still fetching the message and returning it to the 
> customer.
> For more detailed analysis, if you are interested, you can read Jira 
> https://issues.apache.org/jira/browse/KAFKA-13425 
> looking forward to your reply.
>  
> h1. 3.Discuss : Can KafkaConsumer support the pause method that is not 
> affected by groupRebalance?
> The KafkaConsumer#pause method actually stated one point at the beginning of 
> its design:
>  * Rebalance does not preserve pause/resume state.
> link:https://issues.apache.org/jira/browse/KAFKA-2350
> Unfortunately, I did not see this from the comments of the 
> KafkaConsumer#pause(...) method. At the same time, 
> ConsumerCoordinator#invokePartitionsRevoked did not have any log output when 
> cleaning up the paused mark. I believe that this will cause many users to use 
> the KafkaConsumer#pause(...) method incorrectly.
> But I think it is necessary for KafkaConsumer to provide a pause method that 
> is not affected by groupRebalance.
>  
> h1. 4. Suggestions
> I will optimize the existing pause method from several different 
> perspectives, or provide some new {{pause}} methods, and each point is an 
> independent solution
> h2. 

[jira] [Commented] (KAFKA-13463) Improvement: KafkaConsumer pause(Collection partitions)

2021-11-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17445793#comment-17445793
 ] 

RivenSun commented on KAFKA-13463:
--

Supplement to the suggestion:
h2. 6) Strip the paused mark of topicpartition from assignment

<1> The new instance variable TopicPartitionPausedState is used in 
SubscriptionState to store the paused mark of each topicPartition, and the 
paused mark is not stored in the assignment.

 

<2> In my opinion, the pause and resume methods are entirely the behavior of 
the kafkConsumer client, and it should not be affected by groupRebalance.

At the same time, it supports the user to pause a topicPartition that is not in 
the assignment, because the paused mark is only the concept of the partition 
setting of the kafkaConsumer.

In other words, no matter whether the consumer has assigned any topicPartitions 
or not, kafkaConsumer can pause any topicPartition, even if the topicPartition 
has not been created, it may be created in the future.

 

<3> The resume method, clean up the paused mark in TopicPartitionPausedState

> Improvement: KafkaConsumer pause(Collection partitions)
> ---
>
> Key: KAFKA-13463
> URL: https://issues.apache.org/jira/browse/KAFKA-13463
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>
> h1. 1.Background
> When users use the kafkaConsumer#pause(...) method, they will maybe ignore: 
> the pause method may no longer work, and data will be lost.
> For example, the following simple code:
> {code:java}
> while (true) {
> try {
> kafkaConsumer.pause(kafkaConsumer.assignment());
> ConsumerRecords records = 
> kafkaConsumer.poll(Duration.ofSeconds(2));
> if (!records.isEmpty()) {
> log.error("kafka poll for rebalance discard some record!");
> }
> } catch (Exception e) {
> log.error("maintain poll for rebalance with error:{}", 
> e.getMessage(), e);
> }
> }{code}
> Even if you call pause(assignment) before the poll method every time, the 
> poll method may still return messages.
>  
> h1. 2. RootCause:
> In short, during the rebalance of the group, 
> ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark 
> on the partitions previously held by kafkaConsumer. However, while clearing 
> the paused mark of partitions, the corresponding message in the memory 
> (Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in 
> Fetcher#fetchedRecords() still fetching the message and returning it to the 
> customer.
> For more detailed analysis, if you are interested, you can read Jira 
> https://issues.apache.org/jira/browse/KAFKA-13425 
> looking forward to your reply.
>  
> h1. 3.Discuss : Can KafkaConsumer support the pause method that is not 
> affected by groupRebalance?
> The KafkaConsumer#pause method actually stated one point at the beginning of 
> its design:
>  * Rebalance does not preserve pause/resume state.
> link:https://issues.apache.org/jira/browse/KAFKA-2350
> Unfortunately, I did not see this from the comments of the 
> KafkaConsumer#pause(...) method. At the same time, 
> ConsumerCoordinator#invokePartitionsRevoked did not have any log output when 
> cleaning up the paused mark. I believe that this will cause many users to use 
> the KafkaConsumer#pause(...) method incorrectly.
> But I think it is necessary for KafkaConsumer to provide a pause method that 
> is not affected by groupRebalance.
>  
> h1. 4. Suggestions
> I will optimize the existing pause method from several different 
> perspectives, or provide some new {{pause}} methods, and each point is an 
> independent solution
> h2. 1)ConsumerCoordinator#invokePartitionsRevoked should also trigger Fetcher 
> to clean up the revokedAndPausedPartitions message in memory when clearing 
> the paused mark
> This can prevent the Fetcher#fetchedRecords() method from mistakenly thinking 
> that revokedAndPausedPartitions is legal and returning messages. There are 
> various checks on the partition in the fetchedRecords method.
> The price of this is that if the user does not call the pause(...) method 
> before calling the poll method next time, a new FetchMessage request may be 
> initiated, which will cause additional network transmission.
>  
> h2. 2)Efforts to maintain the old paused mark on the KafkaConsumer side
> <1>In the ConsumerCoordinator#onJoinPrepare(...) method, record all 
> pausedTopicPartitions from the current assignment of KafkaConsumer;
>  <2> In the ConsumerCoordinator#onJoinComplete(...) method, use 
> pausedTopicPartitions to render the latest assignment and restore the paused 
> marks of the partitions that are still in the latest assignment.
> {*}Note{*}: 

[GitHub] [kafka] zzccctv edited a comment on pull request #11496: KAFKA-13454: kafka has duplicate configuration information log information printin…

2021-11-18 Thread GitBox


zzccctv edited a comment on pull request #11496:
URL: https://github.com/apache/kafka/pull/11496#issuecomment-972630217


   @guozhangwang This suggestion looks very good, But there is a problem. The 
BrokerConfigHandler class also calls the updateDefaultConfig and 
updateBrrokerConfig functions respectively. If the validateonly parameter is 
added to updateCurrentConfig and false is set in updateDefaultConfig function. 
Although it solves the problem of repeatedly printing logs during startup, 
during dynamic configuration adjustment, It seems that this function will not 
work. Can we add a parameter to the processReconfiguration function to control 
the print log?I updated this PR,  if you agree with this idea, can you help me 
check it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13458) The Stream is not able to consume from some of the partitions

2021-11-18 Thread Darshan Marathe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17445735#comment-17445735
 ] 

Darshan Marathe commented on KAFKA-13458:
-

Kafka broker Version: 2.6.1

> The Stream is not able to consume from some of the partitions
> -
>
> Key: KAFKA-13458
> URL: https://issues.apache.org/jira/browse/KAFKA-13458
> Project: Kafka
>  Issue Type: Bug
>Reporter: Darshan Marathe
>Priority: Blocker
>
> Hi Team
> Kafka-stream version: 2.6.0
> some messages are stuck in the following partitions, and the stream is not 
> able to consume them from those partitions.
> Restart the stream multiple times, but still issue is same.
> Have faced the following issue,
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-29], this could be either transactional offsets 
> waiting for completion, or normal offsets waiting for replication after 
> appending to local log
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-0]
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-3]
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-9]
> The following partitions still have unstable offsets which are not cleared on 
> the broker side: [TASK_STREAM-14]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] zzccctv edited a comment on pull request #11496: KAFKA-13454: kafka has duplicate configuration information log information printin…

2021-11-18 Thread GitBox


zzccctv edited a comment on pull request #11496:
URL: https://github.com/apache/kafka/pull/11496#issuecomment-972630217


   @guozhangwang This suggestion looks very good, But there is a problem. The 
BrokerConfigHandler class also calls the updateDefaultConfig and 
updateBrrokerConfig functions respectively. It simply adds the validateonly 
parameter to updateCurrentConfig and sets false in the updateDefaultConfig 
function. Although it solves the problem of repeatedly printing logs during 
startup, during dynamic configuration adjustment, It seems that this function 
will not work. Can we add a parameter to the processReconfiguration function to 
control the print log?I updated this PR,  if you agree with this idea, can you 
help me check it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zzccctv edited a comment on pull request #11496: KAFKA-13454: kafka has duplicate configuration information log information printin…

2021-11-18 Thread GitBox


zzccctv edited a comment on pull request #11496:
URL: https://github.com/apache/kafka/pull/11496#issuecomment-972630217


   @guozhangwang This suggestion looks very good, But there is a problem. The 
BrokerConfigHandler class also calls the updateDefaultConfig and 
updateBrrokerConfig functions respectively. It simply adds the validateonly 
parameter to updateCurrentConfig and sets false in the updateDefaultConfig 
function. Although it solves the problem of repeatedly printing logs during 
startup, during dynamic configuration adjustment, It seems that this function 
will not work. Can we add a parameter to the processReconfiguration function to 
control the print log?I updated this PR, if I agree with this idea, can you 
help me check it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zzccctv commented on pull request #11496: KAFKA-13454: kafka has duplicate configuration information log information printin…

2021-11-18 Thread GitBox


zzccctv commented on pull request #11496:
URL: https://github.com/apache/kafka/pull/11496#issuecomment-972630217


   @guozhangwang This suggestion looks very good, But there is a problem. The 
BrokerConfigHandler class also calls the updateDefaultConfig and 
updateBrrokerConfig functions respectively. It simply adds the validateonly 
parameter to updateCurrentConfig and sets false in the updateDefaultConfig 
function. Although it solves the problem of repeatedly printing logs during 
startup, during dynamic configuration adjustment, It seems that this function 
will not work. Can we add a parameter to the processReconfiguration function to 
control the print log?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org