Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]

2024-04-05 Thread via GitHub


brandboat commented on PR #15668:
URL: https://github.com/apache/kafka/pull/15668#issuecomment-2040977883

   > @brandboat nice finding! Should we add the thread prefix to 
https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2487
   to avoid similar issue in the future?
   
   Oh wow ! I didn't notice that we have this check before. Sure ! Already 
added the thread name in this check in commit 
https://github.com/apache/kafka/pull/15668/commits/8daf268f2c99b7a88dde150967c22301c88cc1d6.
 Many thanks !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]

2024-04-05 Thread via GitHub


brandboat commented on code in PR #15659:
URL: https://github.com/apache/kafka/pull/15659#discussion_r1554519247


##
tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java:
##
@@ -654,6 +654,13 @@ static Map> 
getReplicaAssignmentForPartitions(Admi
 res.put(tp, 
info.replicas().stream().map(Node::id).collect(Collectors.toList()));
 })
 );
+
+if (!res.keySet().equals(partitions)) {
+Set missingPartitions = new HashSet<>(partitions);
+missingPartitions.removeAll(res.keySet());
+throw new UnknownTopicOrPartitionException("Unable to find 
partition: " +
+
missingPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));

Review Comment:
   Though I don't know why do we do this, but follow the pattern is a good 
choice. Thanks for the comment :smiley: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]

2024-04-05 Thread via GitHub


chia7712 commented on code in PR #15656:
URL: https://github.com/apache/kafka/pull/15656#discussion_r1554518847


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+
+public class KafkaConfig {

Review Comment:
   not sure whether we need another class to collect those ssl/sasl-related 
configs. If we do need it, could we rename it to `KafkaSecurityConfig` to avoid 
adding other security-unrelated configs in the future. Personally, a fat class 
like `KafkaConfig.scala` is a anti-pattern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16047: Leverage the fenceProducers timeout in the InitProducerId [kafka]

2024-04-05 Thread via GitHub


github-actions[bot] commented on PR #15078:
URL: https://github.com/apache/kafka/pull/15078#issuecomment-2040935039

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Default test name added to tools [kafka]

2024-04-05 Thread via GitHub


chia7712 commented on PR #15666:
URL: https://github.com/apache/kafka/pull/15666#issuecomment-2040912811

   @nizhikov Could you please rebase code to see the display name of tools test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16472) Integration tests in Java don't really run kraft case

2024-04-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16472.

Resolution: Fixed

> Integration tests in Java don't really run kraft case
> -
>
> Key: KAFKA-16472
> URL: https://issues.apache.org/jira/browse/KAFKA-16472
> Project: Kafka
>  Issue Type: Test
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> Following test cases don't really run kraft case. The reason is that the test 
> info doesn't contain parameter name, so it always returns false in 
> TestInfoUtils#isKRaft.
>  * TopicCommandIntegrationTest
>  * DeleteConsumerGroupsTest
>  * AuthorizerIntegrationTest
>  * DeleteOffsetsConsumerGroupCommandIntegrationTest
>  
> We can add `options.compilerArgs += '-parameters'` after 
> [https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/build.gradle#L264-L273]
>  to fix it.
>  
> Also, we have to add `String quorum` to cases in 
> DeleteOffsetsConsumerGroupCommandIntegrationTest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16472) Integration tests in Java don't really run kraft case

2024-04-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16472:
---
Fix Version/s: 3.8.0
   3.7.1

> Integration tests in Java don't really run kraft case
> -
>
> Key: KAFKA-16472
> URL: https://issues.apache.org/jira/browse/KAFKA-16472
> Project: Kafka
>  Issue Type: Test
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> Following test cases don't really run kraft case. The reason is that the test 
> info doesn't contain parameter name, so it always returns false in 
> TestInfoUtils#isKRaft.
>  * TopicCommandIntegrationTest
>  * DeleteConsumerGroupsTest
>  * AuthorizerIntegrationTest
>  * DeleteOffsetsConsumerGroupCommandIntegrationTest
>  
> We can add `options.compilerArgs += '-parameters'` after 
> [https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/build.gradle#L264-L273]
>  to fix it.
>  
> Also, we have to add `String quorum` to cases in 
> DeleteOffsetsConsumerGroupCommandIntegrationTest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16472: Fix integration tests in Java with parameter name [kafka]

2024-04-05 Thread via GitHub


chia7712 merged PR #15663:
URL: https://github.com/apache/kafka/pull/15663


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16472: Fix integration tests in Java with parameter name [kafka]

2024-04-05 Thread via GitHub


chia7712 commented on PR #15663:
URL: https://github.com/apache/kafka/pull/15663#issuecomment-2040889410

   I'm going to merge this PR in order to make other tool tests can run with 
KRaft. We can address all late reviews in other PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-05 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1554474003


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-05 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1554466415


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optional

Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]

2024-04-05 Thread via GitHub


showuon commented on code in PR #15659:
URL: https://github.com/apache/kafka/pull/15659#discussion_r1554446742


##
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java:
##
@@ -300,6 +300,12 @@ public void testGetReplicaAssignments() throws Exception {
 
 assertEquals(assignments,
 getReplicaAssignmentForPartitions(adminClient, new 
HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0);
+
+assignments.clear();
+
+UnknownTopicOrPartitionException exception = 
assertThrows(UnknownTopicOrPartitionException.class,
+() -> getReplicaAssignmentForPartitions(adminClient, new 
HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("foo", 
10);

Review Comment:
   In `ReassignPartitionsCommand#describeTopics`, we wrap 
`UnknownTopicOrPartitionException` with `ExecutionException`. Maybe we should 
make it consistent by this pattern?



##
tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java:
##
@@ -654,6 +654,13 @@ static Map> 
getReplicaAssignmentForPartitions(Admi
 res.put(tp, 
info.replicas().stream().map(Node::id).collect(Collectors.toList()));
 })
 );
+
+if (!res.keySet().equals(partitions)) {
+Set missingPartitions = new HashSet<>(partitions);
+missingPartitions.removeAll(res.keySet());
+throw new UnknownTopicOrPartitionException("Unable to find 
partition: " +
+
missingPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));

Review Comment:
   In ReassignPartitionsCommand#describeTopics, we wrap 
UnknownTopicOrPartitionException with ExecutionException. Maybe we should make 
it consistent by this pattern?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]

2024-04-05 Thread via GitHub


showuon commented on code in PR #15659:
URL: https://github.com/apache/kafka/pull/15659#discussion_r1554446742


##
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java:
##
@@ -300,6 +300,12 @@ public void testGetReplicaAssignments() throws Exception {
 
 assertEquals(assignments,
 getReplicaAssignmentForPartitions(adminClient, new 
HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0);
+
+assignments.clear();
+
+UnknownTopicOrPartitionException exception = 
assertThrows(UnknownTopicOrPartitionException.class,
+() -> getReplicaAssignmentForPartitions(adminClient, new 
HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("foo", 
10);

Review Comment:
   In `ReassignPartitionsCommand#describeTopics`, we wrap 
`UnknownTopicOrPartitionException` with `ExecutionException`. Maybe we should 
make it consistent by this pattern?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-04-05 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16474:
--
Labels: kip-848-client-support  (was: )

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-04-05 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16474:
--
Component/s: clients

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-04-05 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16474:
--
Fix Version/s: 3.8.0

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16478) Links for Kafka 3.5.2 release are broken

2024-04-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834456#comment-17834456
 ] 

Matthias J. Sax commented on KAFKA-16478:
-

\cc [~mimaison] – seems you remove some older release from the download path 
(3.4.1, 3.5.0, 3.5.1, and 3.6.0). Let's check all links for these releases to 
point to "archive".

> Links for Kafka 3.5.2 release are broken
> 
>
> Key: KAFKA-16478
> URL: https://issues.apache.org/jira/browse/KAFKA-16478
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 3.5.2
>Reporter: Philipp Trulson
>Priority: Major
>
> While trying to update our setup, I noticed that the download links for the 
> 3.5.2 links are broken. They all point to a different host and also contain 
> an additional `/kafka` in their URL. Compare:
> not working:
> [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html]
> working:
> [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html]
> [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html]
> This goes for all links in the release - archives, checksums, signatures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-16478) Links for Kafka 3.5.2 release are broken

2024-04-05 Thread Matthias J. Sax (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-16478 ]


Matthias J. Sax deleted comment on KAFKA-16478:
-

was (Author: mjsax):
\cc [~omkreddy] [~stanislavkozlovski] as last RMs – did any of you move 
artifacts and forgot to update the links?

> Links for Kafka 3.5.2 release are broken
> 
>
> Key: KAFKA-16478
> URL: https://issues.apache.org/jira/browse/KAFKA-16478
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 3.5.2
>Reporter: Philipp Trulson
>Priority: Major
>
> While trying to update our setup, I noticed that the download links for the 
> 3.5.2 links are broken. They all point to a different host and also contain 
> an additional `/kafka` in their URL. Compare:
> not working:
> [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html]
> working:
> [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html]
> [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html]
> This goes for all links in the release - archives, checksums, signatures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16478) Links for Kafka 3.5.2 release are broken

2024-04-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834452#comment-17834452
 ] 

Matthias J. Sax commented on KAFKA-16478:
-

\cc [~omkreddy] [~stanislavkozlovski] as last RMs – did any of you move 
artifacts and forgot to update the links?

> Links for Kafka 3.5.2 release are broken
> 
>
> Key: KAFKA-16478
> URL: https://issues.apache.org/jira/browse/KAFKA-16478
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 3.5.2
>Reporter: Philipp Trulson
>Priority: Major
>
> While trying to update our setup, I noticed that the download links for the 
> 3.5.2 links are broken. They all point to a different host and also contain 
> an additional `/kafka` in their URL. Compare:
> not working:
> [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html]
> working:
> [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html]
> [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html]
> This goes for all links in the release - archives, checksums, signatures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16478) Links for Kafka 3.5.2 release are broken

2024-04-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834451#comment-17834451
 ] 

Matthias J. Sax commented on KAFKA-16478:
-

[~der_eismann] – Thanks for reporting this. Do you want to do a PR to fix it? 
Should be simple? Just need a small update to 
[https://github.com/apache/kafka-site/blob/asf-site/downloads.html] 

> Links for Kafka 3.5.2 release are broken
> 
>
> Key: KAFKA-16478
> URL: https://issues.apache.org/jira/browse/KAFKA-16478
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 3.5.2
>Reporter: Philipp Trulson
>Priority: Major
>
> While trying to update our setup, I noticed that the download links for the 
> 3.5.2 links are broken. They all point to a different host and also contain 
> an additional `/kafka` in their URL. Compare:
> not working:
> [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html]
> working:
> [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html]
> [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html]
> This goes for all links in the release - archives, checksums, signatures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: fix javadoc warnings [kafka]

2024-04-05 Thread via GitHub


gharris1727 commented on PR #15527:
URL: https://github.com/apache/kafka/pull/15527#issuecomment-2040626161

   > I don't think that's a good idea. Usually IDEs help in figuring out the 
usages of fields/classes anyway. Should we remove that tag?
   
   I agree, let's remove that. RemoteStorageMetrics is already referenced by 
MetricsTest, so this back-link isn't adding anything.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix javadoc warnings [kafka]

2024-04-05 Thread via GitHub


gaurav-narula commented on PR #15527:
URL: https://github.com/apache/kafka/pull/15527#issuecomment-2040616724

   @gharris1727 Thanks for pointing that out.
   
   The warning below
   ```
   
kafka/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java:31:
 warning: reference not found: kafka.api.MetricsTest
* @see kafka.api.MetricsTest
   ```
   
   occurs because we're trying to refer to a class in the test compile path 
from production code. I don't think that's a good idea. Usually IDEs help in 
figuring out the usages of fields/classes anyway. Should we remove that tag? 
WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]

2024-04-05 Thread via GitHub


chia7712 commented on code in PR #15670:
URL: https://github.com/apache/kafka/pull/15670#discussion_r1554282432


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -1136,18 +1137,18 @@ class DynamicListenerConfig(server: KafkaBroker) 
extends BrokerReconfigurable wi
 class DynamicProducerStateManagerConfig(val producerStateManagerConfig: 
ProducerStateManagerConfig) extends BrokerReconfigurable with Logging {
   def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
 if (producerStateManagerConfig.producerIdExpirationMs != 
newConfig.producerIdExpirationMs) {
-  info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from 
${producerStateManagerConfig.producerIdExpirationMs} to 
${newConfig.producerIdExpirationMs}")
+  info(s"Reconfigure 
${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} from 
${producerStateManagerConfig.producerIdExpirationMs} to 
${newConfig.producerIdExpirationMs}")
   
producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs)
 }
 if (producerStateManagerConfig.transactionVerificationEnabled != 
newConfig.transactionPartitionVerificationEnable) {
-  info(s"Reconfigure 
${KafkaConfig.TransactionPartitionVerificationEnableProp} from 
${producerStateManagerConfig.transactionVerificationEnabled} to 
${newConfig.transactionPartitionVerificationEnable}")
+  info(s"Reconfigure 
${TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from 
${producerStateManagerConfig.transactionVerificationEnabled} to 
${newConfig.transactionPartitionVerificationEnable}")
   
producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionPartitionVerificationEnable)
 }
   }
 
   def validateReconfiguration(newConfig: KafkaConfig): Unit = {
 if (newConfig.producerIdExpirationMs < 0)
-  throw new ConfigException(s"${KafkaConfig.ProducerIdExpirationMsProp} 
cannot be less than 0, current value is 
${producerStateManagerConfig.producerIdExpirationMs}, and new value is 
${newConfig.producerIdExpirationMs}")
+  throw new 
ConfigException(s"${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} 
cannot be less than 0, current value is 
${producerStateManagerConfig.producerIdExpirationMs}, and new value is 
${newConfig.producerIdExpirationMs}")

Review Comment:
   maybe `ProducerStateManagerConfig.PRODUCER_ID_EXPIRATION_MS` is more 
suitable in this case because the code is used for logging `Producer` state.



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1062,21 +1027,21 @@ object KafkaConfig {
   .define(CompressionTypeProp, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, 
in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, CompressionTypeDoc)
 
   /** * Transaction management configuration ***/
-  .define(TransactionalIdExpirationMsProp, INT, 
Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, 
TransactionalIdExpirationMsDoc)
-  .define(TransactionsMaxTimeoutMsProp, INT, 
Defaults.TRANSACTIONS_MAX_TIMEOUT_MS, atLeast(1), HIGH, 
TransactionsMaxTimeoutMsDoc)
-  .define(TransactionsTopicMinISRProp, INT, 
Defaults.TRANSACTIONS_TOPIC_MIN_ISR, atLeast(1), HIGH, 
TransactionsTopicMinISRDoc)
-  .define(TransactionsLoadBufferSizeProp, INT, 
Defaults.TRANSACTIONS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, 
TransactionsLoadBufferSizeDoc)
-  .define(TransactionsTopicReplicationFactorProp, SHORT, 
Defaults.TRANSACTIONS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, 
TransactionsTopicReplicationFactorDoc)
-  .define(TransactionsTopicPartitionsProp, INT, 
Defaults.TRANSACTIONS_TOPIC_PARTITIONS, atLeast(1), HIGH, 
TransactionsTopicPartitionsDoc)
-  .define(TransactionsTopicSegmentBytesProp, INT, 
Defaults.TRANSACTIONS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, 
TransactionsTopicSegmentBytesDoc)
-  .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, 
Defaults.TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS, atLeast(1), LOW, 
TransactionsAbortTimedOutTransactionsIntervalMsDoc)
-  .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, 
INT, Defaults.TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS, atLeast(1), LOW, 
TransactionsRemoveExpiredTransactionsIntervalMsDoc)
-
-  .define(TransactionPartitionVerificationEnableProp, BOOLEAN, 
Defaults.TRANSACTION_PARTITION_VERIFICATION_ENABLE, LOW, 
TransactionPartitionVerificationEnableDoc)
-
-  .define(ProducerIdExpirationMsProp, INT, 
Defaults.PRODUCER_ID_EXPIRATION_MS, atLeast(1), LOW, ProducerIdExpirationMsDoc)
+  
.define(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, 
INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, 
TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_DOC)

Review Comment:
   Should we do similar refactor for `Defaults`?



-- 
This is an automated message from the Apache Git Service.
To respond to the 

Re: [PR] MINOR: fix javadoc warnings [kafka]

2024-04-05 Thread via GitHub


gharris1727 commented on PR #15527:
URL: https://github.com/apache/kafka/pull/15527#issuecomment-2040586001

   Hi @gaurav-narula I see some other javadoc warnings, do you think we should 
address these?
   
   ```
   > Task :storage:storage-api:javadoc
   
kafka/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java:31:
 warning: reference not found: kafka.api.MetricsTest
* @see kafka.api.MetricsTest
  ^
   1 warning
   
   > Task :streams:javadoc
   
kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:52:
 warning: invalid input: '<'
* A {@link QueryableStoreType} that accepts {@link 
ReadOnlyKeyValueStore ReadOnlyKeyValueStore>}.

  ^
   
kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:74:
 warning: invalid input: '<'
* A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore 
ReadOnlyWindowStore>}.

  ^
   
kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:52:
 warning: invalid input: '<'
* A {@link QueryableStoreType} that accepts {@link 
ReadOnlyKeyValueStore ReadOnlyKeyValueStore>}.

  ^
   
kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:74:
 warning: invalid input: '<'
* A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore 
ReadOnlyWindowStore>}.

  ^
   
kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:52:
 warning: invalid input: '<'
* A {@link QueryableStoreType} that accepts {@link 
ReadOnlyKeyValueStore ReadOnlyKeyValueStore>}.

  ^
   
kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:74:
 warning: invalid input: '<'
* A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore 
ReadOnlyWindowStore>}.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-05 Thread via GitHub


jsancio opened a new pull request, #15671:
URL: https://github.com/apache/kafka/pull/15671

   DRAFT
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on PR #15613:
URL: https://github.com/apache/kafka/pull/15613#issuecomment-2040495611

   Thanks for the changes @lucasbru, looks good to me overall. This is tidying 
up the whole async commit callbacks execution story. Left some comments, mostly 
minor, and to make sure we're on the same page with the reasoning behind the 
change.
   
   Should we update the PR description to refer not only to the 
`consumer.commitSync()`, but also `consumer.close()`, both being fixed here to 
ensure that previous async commit callbacks are always executed?
   
   Thanks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16068: Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors [kafka]

2024-04-05 Thread via GitHub


gharris1727 commented on code in PR #15642:
URL: https://github.com/apache/kafka/pull/15642#discussion_r1554188423


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java:
##
@@ -69,6 +69,9 @@ public static void setup() {
 Map workerProps = new HashMap<>();
 workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID);
 
+// Work around a circular-dependency in TestPlugins.
+TestPlugins.pluginPath();

Review Comment:
   The circular dependency is still a problem that I haven't resolved, sorry. 
It's fixed in this un-merged PR: #1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.

Review Comment:
   I would extend with something like : ...before the consumer is closed, 
**even when no commit sync is performed as part of the close (due to 
auto-commit disabled, or simply because there no consumed offsets).**
   
   That's the key as I see it, fixed in this PR, and being tested here. If the 
call to consumer.close performs an actual commit sync (needs auto-commit 
enabled and non-empty consumed offsets), then the async callbacks were always 
called I expect. The contract was not being respected in case the commit sync 
did not happen for some of the reasons mentioned above. Agree?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554177925


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount);
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(1, cb.successCount);
+
+// Enforce looking up the coordinator
+consumer.committed(Set(tp, tp2).asJava)

Review Comment:
   I would say we don't need this, because of the successful `assertEquals` 
with call to `committed` above, ln 694. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554165141


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   nit: maybe better name testCommitAsyncCompleted**Before**ConsumerCloses 
(clearer and consistent with the similar one below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.

Review Comment:
   I would extend with something like : ...before the consumer is closed, 
**even when no commit sync is performed as part of the close (due to 
auto-commit disabled, or simply because there no consumed offsets).**
   
   That's the key as I see it, fixed in this PR, and being tested here. If the 
call to consumer.close performs an actual commit sync (needs auto-commit 
enabled and non-empty consumed offsets), then the async callbacks were always 
called I expect. The contract was not being respected in case the commit sync 
did not happen for some of the reasons mentioned above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.

Review Comment:
   I would extend with something like : ...before the consumer is closed, 
**even when no commit sync is performed as part of the close (due to 
auto-commit disabled, or simply because there no consumed offsets).**
   
   That's the key as I see it, fixed in this PR, and being tested here. If the 
call to consumer.close performs a commit sync (needs auto-commit enabled and 
non-empty consumed offsets), then the async callbacks were always called I 
expect. The contract was not being respected in case the commit sync did not 
happen for some of the reasons mentioned above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improvements to release.py [kafka]

2024-04-05 Thread via GitHub


chia7712 commented on code in PR #15651:
URL: https://github.com/apache/kafka/pull/15651#discussion_r1554150631


##
release.py:
##
@@ -348,6 +348,9 @@ def command_release_announcement_email():
 
 
 
+An overview of the release and its notable changes can be found in the
+release blog post: 

Review Comment:
   Can we generate the link automatically if the link formate is like 
`https://kafka.apache.org/blog#apache_kafka_362_release_announcement`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16472: Fix integration tests in Java with parameter name [kafka]

2024-04-05 Thread via GitHub


chia7712 commented on PR #15663:
URL: https://github.com/apache/kafka/pull/15663#issuecomment-2040402625

   @ijuma Could you please take a look at this PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13501: Avoid state restore via rebalance if standbys are enabled [kafka]

2024-04-05 Thread via GitHub


vamossagar12 closed pull request #11592: KAFKA-13501: Avoid state restore via 
rebalance if standbys are enabled
URL: https://github.com/apache/kafka/pull/11592


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10526: leader fsync deferral on write [kafka]

2024-04-05 Thread via GitHub


vamossagar12 closed pull request #10278: KAFKA-10526: leader fsync deferral on 
write
URL: https://github.com/apache/kafka/pull/10278


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 12373:Improve KafkaRaftClient handling of graceful shutdown [kafka]

2024-04-05 Thread via GitHub


vamossagar12 closed pull request #10468: Kafka 12373:Improve KafkaRaftClient 
handling of graceful shutdown
URL: https://github.com/apache/kafka/pull/10468


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15853: Move transactions configs out of core [kafka]

2024-04-05 Thread via GitHub


OmniaGM opened a new pull request, #15670:
URL: https://github.com/apache/kafka/pull/15670

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12848: kafka streams jmh benchmarks [kafka]

2024-04-05 Thread via GitHub


vamossagar12 closed pull request #10842: KAFKA-12848: kafka streams jmh 
benchmarks
URL: https://github.com/apache/kafka/pull/10842


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-04-05 Thread via GitHub


gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1554016667


##
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java:
##
@@ -33,28 +34,36 @@
 public class TimestampedKeyAndJoinSide {
 private final K key;
 private final long timestamp;
-private final boolean leftSide;
+private final JoinSide joinSide;
 
-private TimestampedKeyAndJoinSide(final boolean leftSide, final K key, 
final long timestamp) {
+private TimestampedKeyAndJoinSide(final JoinSide joinSide, final K key, 
final long timestamp) {
 this.key = Objects.requireNonNull(key, "key cannot be null");
-this.leftSide = leftSide;
+this.joinSide = joinSide;
 this.timestamp = timestamp;
 }
 
 /**
- * Create a new {@link TimestampedKeyAndJoinSide} instance if the provide 
{@code key} is not {@code null}.
+ * Create a new {@link TimestampedKeyAndJoinSide} instance if the provided 
{@code key} is not {@code null}.
  *
- * @param leftSide True if the key is part of the left join side; False if 
it is from the right join side
+ * @param joinSide Whether the key is part of the {@link JoinSide#LEFT} 
side; or it is from the {@link JoinSide#RIGHT} side
  * @param key  the key
  * @param   the type of the key
- * @return a new {@link TimestampedKeyAndJoinSide} instance if the provide 
{@code key} is not {@code null}
+ * @return a new {@link TimestampedKeyAndJoinSide} instance if the 
provided {@code key} is not {@code null}
  */
-public static  TimestampedKeyAndJoinSide make(final boolean 
leftSide, final K key, final long timestamp) {
-return new TimestampedKeyAndJoinSide<>(leftSide, key, timestamp);
+public static  TimestampedKeyAndJoinSide make(final JoinSide 
joinSide, final K key, final long timestamp) {

Review Comment:
   Since this is only used in tests now, I think you can remove this and 
replace the test call-sites with the new functions.
   
   Make sure to copy the javadoc to the new signatures too.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.Optional;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamLeftJoin extends KStreamKStreamJoin {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
+
+KStreamKStreamLeftJoin(final String otherWindowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joiner,
+final boolean outer,
+final Optional outerJoinWindowName,
+final TimeTrackerSupplier sharedTimeTrackerSupplier) {
+super(otherWindowName, windows, windows.beforeMs, windows.afterMs, 
joiner, outerJoinWindowName,
+sharedTimeTrackerSupplier, outer);
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamLeftJoinProcessor();
+}
+
+private class KStreamKStreamLeftJoinProcessor extends 
KStreamKStreamJoinProcessor {
+@Override
+public void process(final Record leftRecord) {
+final long inputRecordTimestamp = leftRecord.timestamp();
+final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
+final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
+

Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]

2024-04-05 Thread via GitHub


philipnee commented on PR #15661:
URL: https://github.com/apache/kafka/pull/15661#issuecomment-2040290159

   @cadonna @lucasbru - Is it possible for me to ask for a review on this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554025678


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount);
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(1, cb.successCount);
+
+// Enforce looking up the coordinator
+consumer.committed(Set(tp, tp2).asJava)
+
+// Try with coordinator known
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(2L))).asJava, cb)
+consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(2L))).asJava)
+assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+assertEquals(2, cb.successCount);
+
+// Try with empty sync commit
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(3L))).asJava, cb)
+consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+assertEquals(3, cb.successCount);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554024796


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount);
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(1, cb.successCount);
+
+// Enforce looking up the coordinator
+consumer.committed(Set(tp, tp2).asJava)
+
+// Try with coordinator known
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(2L))).asJava, cb)
+consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(2L))).asJava)
+assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+assertEquals(2, cb.successCount);

Review Comment:
   nit: semi-colon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554024079


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount);

Review Comment:
   nit: unneeded semi-colon. Java to scala jump tricking us...been there :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554020736


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -229,7 +229,11 @@ private GroupRebalanceConfig 
buildRebalanceConfig(Optional groupInstance
 @AfterEach
 public void teardown() {
 this.metrics.close();
-this.coordinator.close(time.timer(0));
+try {
+this.coordinator.close(time.timer(0));

Review Comment:
   I see, I would say it's fine to throw the error at the coordinator level 
(and live with code like this). 
   
   And actually, the need for this catch is not introduced by this PR as I see 
it. The coordinator close before this PR could throw fenced exception for async 
commits that were waiting for coord and completed 
[here](https://github.com/apache/kafka/blob/fd9c7d2932dee055289b403e37a0bbb631c080a9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L983)
 getting fenced.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1554011665


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   We have a new `PlainTextConsumerCommitTest` for all commit-relates tests. 
These 2 should go there I would say. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-05 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2040270805

   > Regarding the previous failed tests, one possibility is that the data on 
the server passed the retention time and is garbage collected. The default 
retention time is 7 days, which should be long enough. However, since we reuse 
mockTime, if the test runs long, the retention time might still be reached. 
Perhaps we could set 
[log.retention.ms](https://kafka.apache.org/documentation/#brokerconfigs_log.retention.ms)
 to -1?
   
   ```
   org.opentest4j.AssertionFailedError: expected: <0> but was: <3>
   ```
   
   You really hit the bullseye. I can reproduce the error by doing a little 
sleep before fetching data. Will set `retention.ms` to -1
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -984,6 +984,8 @@ public void close(final Timer timer) {
 }
 } finally {
 super.close(timer);
+// Super-class close may wait for more commit callbacks to 
complete.
+invokeCompletedOffsetCommitCallbacks();

Review Comment:
   Agree, there could be async requests, with known coord, not getting a 
response within the above commit sync time, then getting it while the 
super.close waits, so we should trigger the callbacks at this point.  
   
   But this makes me notice, aren't we breaking the `close(Duration)` contract 
here, calling that `super.close(timer)` on the finally clause? Let's say async 
requests that are not getting a response within the timeout in the above while 
loop (so we block for time on the while), then `finally`, the super class 
blocks for that time again 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139).
 Am I missing something? (I can file a separate Jira if I'm not missing 
something here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-05 Thread via GitHub


junrao commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2040231361

   @chia7712 : Thanks for the updated PR. Regarding the previous failed tests, 
one possibility is that the data on the server passed the retention time and is 
garbage collected. The default retention time is 7 days, which should be long 
enough. However, since we reuse mockTime, if the test runs long, the retention 
time might still be reached. Perhaps we could set 
[log.retention.ms](https://kafka.apache.org/documentation/#brokerconfigs_log.retention.ms)
 to -1?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-04-05 Thread via GitHub


CalvinConfluent commented on PR #15470:
URL: https://github.com/apache/kafka/pull/15470#issuecomment-2040217099

   @mumrah Thanks for the review. Ticket filed.
   https://issues.apache.org/jira/browse/KAFKA-15579


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16479) Add pagination supported describeTopic interface

2024-04-05 Thread Calvin Liu (Jira)
Calvin Liu created KAFKA-16479:
--

 Summary: Add pagination supported  describeTopic interface
 Key: KAFKA-16479
 URL: https://issues.apache.org/jira/browse/KAFKA-16479
 Project: Kafka
  Issue Type: Sub-task
Reporter: Calvin Liu


During the DescribeTopicPartitions API implementations, we found it awkward to 
place the pagination logic within the current admin client describe topic 
interface. So, in order to change the interface, we may need to have a boarder 
discussion like creating a KIP. Or even a step forward, to discuss a general 
client side pagination framework. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15583) High watermark can only advance if ISR size is larger than min ISR

2024-04-05 Thread Calvin Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Calvin Liu resolved KAFKA-15583.

Resolution: Fixed

> High watermark can only advance if ISR size is larger than min ISR
> --
>
> Key: KAFKA-15583
> URL: https://issues.apache.org/jira/browse/KAFKA-15583
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Calvin Liu
>Assignee: Calvin Liu
>Priority: Major
>
> This is the new high watermark advancement requirement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-04-05 Thread via GitHub


junrao commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1553959430


##
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##
@@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest {
  */
 public static final long EARLIEST_LOCAL_TIMESTAMP = -4L;
 
+public static final long LATEST_TIERED_TIMESTAMP = -5L;

Review Comment:
   Thanks for following up, @clolov !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) {
 }
 
 private boolean invokePendingAsyncCommits(Timer timer) {
-if (inFlightAsyncCommits.get() == 0) {
+if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) 
{

Review Comment:
   This makes sense to me, to fill a gap in the case of commit sync with empty 
offsets, that skips the path of sending an actual request, and that's why it 
looses the guarantee of executing the callbacks as I see it.
   
   This makes the logic consistent with what happens if the commit sync has 
non-empty offsets. In that case, it does execute the callbacks for previous 
async commits that were waiting for coord:  the sync commit would be blocked on 
the same findCoord request (there's always just 1), and the moment the coord is 
found the async is marked as inflight 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036),
 so it would be considered for callbacks 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121).
 
   
   Am I getting the reasoning for the change right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) {
 }
 
 private boolean invokePendingAsyncCommits(Timer timer) {
-if (inFlightAsyncCommits.get() == 0) {
+if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) 
{

Review Comment:
   This makes sense to me, to fill a gap in the case of commit sync with empty 
offsets, that skips the path of sending an actual request, and that's why it 
looses the guarantee of executing the callbacks as I see it, right? 
   
   This makes the logic consistent with what happens if the commit sync has 
non-empty offsets. In that case, it does execute the callbacks for previous 
async commits that were waiting for coord:  the sync commit would be blocked on 
the same findCoord request (there's always just 1), and the moment the coord is 
found the async is marked as inflight 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036),
 so it would be considered for callbacks 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121).
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) {
 }
 
 private boolean invokePendingAsyncCommits(Timer timer) {
-if (inFlightAsyncCommits.get() == 0) {
+if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) 
{

Review Comment:
   This makes sense to me, to fill a gap in the case of commit sync with empty 
offsets, that skips the path of sending an actual request, and that's why it 
looses the guarantee of executing the callbacks as I see it. 
   
   This makes the logic consistent with what happens if the commit sync has 
non-empty offsets. In that case, it does execute the callbacks for previous 
async commits that were waiting for coord:  the sync commit would be blocked on 
the same findCoord request (there's always just 1), and the moment the coord is 
found the async is marked as inflight 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036),
 so it would be considered for callbacks 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121).
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1553899936


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1360,6 +1362,9 @@ public void commitSync(Map offsets, Duration
 Timer requestTimer = time.timer(timeout.toMillis());
 SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, 
requestTimer);
 CompletableFuture commitFuture = commit(syncCommitEvent);
+
+awaitPendingAsyncCommits(requestTimer, false);

Review Comment:
   nit: maybe helpful to reflect in the name that this does execute the 
callbacks (or leave it as it is and then have the one line 1406 that does 
execute the callbacks here, right after) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-04-05 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1553899936


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1360,6 +1362,9 @@ public void commitSync(Map offsets, Duration
 Timer requestTimer = time.timer(timeout.toMillis());
 SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, 
requestTimer);
 CompletableFuture commitFuture = commit(syncCommitEvent);
+
+awaitPendingAsyncCommits(requestTimer, false);

Review Comment:
   nit: maybe helpful to reflect in the name that this does execute the 
callbacks (or leave it as it is and then have the one line that does execute 
the callbacks here, right after) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]

2024-04-05 Thread via GitHub


lianetm commented on PR #15669:
URL: https://github.com/apache/kafka/pull/15669#issuecomment-2039970916

   Hey @lucasbru, could you take a look at this when you have a chance? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]

2024-04-05 Thread via GitHub


lianetm opened a new pull request, #15669:
URL: https://github.com/apache/kafka/pull/15669

   Minor changes for improving the logging and docs related to the auto-commit 
inflight logic, also adding tests to ensure the expected behaviour:
   
   - auto-commit on the interval does not send a request if another one 
inflight, and it sends the next as soon as a response is received (without 
waiting for the full interval again)
   - auto-commit before revocation always send a request (even if another one 
from auto-commit on interval is in-flight), to ensure the latest is committed 
before revoking partitions. 
   
   No changes in logic, just adding tests, docs and minor refactoring. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Increase parallelism for Jenkins [kafka]

2024-04-05 Thread via GitHub


divijvaidya closed pull request #15099: MINOR: Increase parallelism for Jenkins
URL: https://github.com/apache/kafka/pull/15099


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add 3.6.2 to system tests [kafka]

2024-04-05 Thread via GitHub


omkreddy merged PR #15665:
URL: https://github.com/apache/kafka/pull/15665


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add 3.6.2 to system tests [kafka]

2024-04-05 Thread via GitHub


omkreddy commented on PR #15665:
URL: https://github.com/apache/kafka/pull/15665#issuecomment-2039898497

   Thanks for the Review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-04-05 Thread via GitHub


viktorsomogyi commented on PR #15605:
URL: https://github.com/apache/kafka/pull/15605#issuecomment-2039754554

   Aslo, thank you @akatona84 for the contribution, @soarez and @urbandan for 
the reviews. Fixing flaky tests is always very welcomed, keep it up!  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16472: Fix integration tests in Java with parameter name [kafka]

2024-04-05 Thread via GitHub


FrankYang0529 commented on PR #15663:
URL: https://github.com/apache/kafka/pull/15663#issuecomment-2039753742

   > > I think the class file size increasing is indeed a direct drawback after 
adding -parameter option because we'll include all the parameters into .class 
files. I'd like to know if there's any other way to fix this? Could we use 
ARGUMENTS instead of ARGUMENTS_WITH_NAMES?
   > 
   > Or we can add the new arg to compileTestJava only to avoid impacting 
production binary
   
   Updated it as `compileTestJava.options.compilerArgs.add "-parameters"`. 
Thanks for the suggestion @chia7712 @showuon .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-04-05 Thread via GitHub


viktorsomogyi merged PR #15605:
URL: https://github.com/apache/kafka/pull/15605


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16478) Links for Kafka 3.5.2 release are broken

2024-04-05 Thread Philipp Trulson (Jira)
Philipp Trulson created KAFKA-16478:
---

 Summary: Links for Kafka 3.5.2 release are broken
 Key: KAFKA-16478
 URL: https://issues.apache.org/jira/browse/KAFKA-16478
 Project: Kafka
  Issue Type: Bug
  Components: website
Affects Versions: 3.5.2
Reporter: Philipp Trulson


While trying to update our setup, I noticed that the download links for the 
3.5.2 links are broken. They all point to a different host and also contain an 
additional `/kafka` in their URL. Compare:

not working:
[https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html]

working:
[https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html]
[https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html]

This goes for all links in the release - archives, checksums, signatures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-05 Thread via GitHub


dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1553514811


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+return 
ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) &&
+!classicGroup.isInState(DEAD) &&
+
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) 
&&

Review Comment:
   I think that we have `usesConsumerGroupProtocol()` in the `ClassicGroup` 
class. Could we use it?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {

Review Comment:
   Does it have to be public? Should we add some javadoc?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+return 
ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) &&
+!classicGroup.isInState(DEAD) &&
+
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) 
&&
+classicGroup.size() <= consumerGroupMaxSize;
+}
+
+ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List records) {
+classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+createGroupTombstoneRecords(classicGroup, records);
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
classicGroup.groupId(), metrics);
+classicGroup.convertToConsumerGroup(consumerGroup, records, 
metadataImage.topics());

Review Comment:
   I was wondering whether it would make more sense the other way around and 
have something like `ConsumerGroup.fromClassicGroup()`. I guess that it 
does not really matter in the end.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+return 
ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) &&
+!classicGroup.isInState(DEAD) &&
+
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) 
&&
+classicGroup.size() <= consumerGroupMaxSize;
+}

Review Comment:
   I wonder whether we should log something (with the reason) when the upgrade 
is disallowed. Have you considered it?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##
@@ -1300,6 +1341,68 @@ public Map groupAssignment() {
 ));
 }
 
+/**
+ * Convert the current classic group to a consumer group.
+ * Add the records for the conversion.
+ *
+ * @param consumerGroup The converted consumer group.
+ * @param records   The list to which the new records are added.
+ *
+ * @throws GroupIdNotFoundException if any of the group's member doesn't 
support the consumer protocol.
+ */
+public void convertToConsumerGroup(
+ConsumerGroup consumerGroup,
+List records,
+TopicsImage topicsImage
+) throws GroupIdNotFoundException {
+consumerGroup.setGroupEpoch(generationId);
+consumerGroup.setTargetAssignmentEpoch(generationId);
+
+records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
generationId));
+// SubscriptionMetadata will be computed in the following 
consumerGroupHeartbeat
+
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), 
Collections.emptyMap()));
+records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
generationId));
+
+members.forEach((memberId, member) -> {
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+Map> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata()));

Review Comment:
   * `deserializeAssignment` and `deserializeSubscription` could throw an 
`SchemaException` if not mistaken if the bytes are incorrect. We should handle 
those, I suppose.
   * We also discussed offline the need to 

Re: [PR] KAFKA-16383: fix flaky IdentityReplicationIntegrationTest .testReplicateFromLatest [kafka]

2024-04-05 Thread via GitHub


johnnychhsu commented on PR #15556:
URL: https://github.com/apache/kafka/pull/15556#issuecomment-2039670307

   @vamossagar12 thanks for the comment.
   sure! let's wait and monitor more builds 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]

2024-04-05 Thread via GitHub


dajac commented on code in PR #15587:
URL: https://github.com/apache/kafka/pull/15587#discussion_r1553499653


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2415,6 +2415,20 @@ private CoordinatorResult 
classicGroupJoinExistingMember(
 return EMPTY_RESULT;
 }
 
+/**
+ * An overload of {@link 
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+ * timeout operation. It additionally looks up the group by the id and 
checks the group type.
+ * completeClassicGroupJoin will only be called if the group is CLASSIC.
+ */
+private CoordinatorResult completeClassicGroupJoin(String 
groupId) {
+if (containsClassicGroup(groupId)) {
+return 
completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false));

Review Comment:
   I am not a fan of this pattern because you effectively have to look up the 
group twice. One option would be to use a try..catch to catch the exception 
thrown by getOrMaybeCreateClassicGroup. Another option would be to 1) do the 
lookup, 2) verify non-null and group type and return if it fails.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2415,6 +2415,20 @@ private CoordinatorResult 
classicGroupJoinExistingMember(
 return EMPTY_RESULT;
 }
 
+/**
+ * An overload of {@link 
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+ * timeout operation. It additionally looks up the group by the id and 
checks the group type.
+ * completeClassicGroupJoin will only be called if the group is CLASSIC.
+ */
+private CoordinatorResult completeClassicGroupJoin(String 
groupId) {
+if (containsClassicGroup(groupId)) {
+return 
completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false));
+} else {
+log.info("Group {} is null or not a classic group, skipping 
rebalance stage.", groupId);

Review Comment:
   I wonder if we could use `debug` here.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2805,31 +2826,36 @@ private CoordinatorResult 
maybeCompleteJoinElseSchedule(
  * Try to complete the join phase of the initial rebalance.
  * Otherwise, extend the rebalance.
  *
- * @param group The group under initial rebalance.
+ * @param groupId The group under initial rebalance.
  *
  * @return The coordinator result that will be appended to the log.
  */
 private CoordinatorResult 
tryCompleteInitialRebalanceElseSchedule(
-ClassicGroup group,
+String groupId,
 int delayMs,
 int remainingMs
 ) {
-if (group.newMemberAdded() && remainingMs != 0) {
-// A new member was added. Extend the delay.
-group.setNewMemberAdded(false);
-int newDelayMs = Math.min(classicGroupInitialRebalanceDelayMs, 
remainingMs);
-int newRemainingMs = Math.max(remainingMs - delayMs, 0);
-
-timer.schedule(
-classicGroupJoinKey(group.groupId()),
-newDelayMs,
-TimeUnit.MILLISECONDS,
-false,
-() -> tryCompleteInitialRebalanceElseSchedule(group, 
newDelayMs, newRemainingMs)
-);
+if (containsClassicGroup(groupId)) {

Review Comment:
   ditto.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2533,45 +2547,52 @@ private void schedulePendingSync(ClassicGroup group) {
 group.rebalanceTimeoutMs(),
 TimeUnit.MILLISECONDS,
 false,
-() -> expirePendingSync(group, group.generationId()));
+() -> expirePendingSync(group.groupId(), group.generationId()));
 }
 
 /**
  * Invoked when the heartbeat operation is expired from the timer. 
Possibly remove the member and
  * try complete the join phase.
  *
- * @param group The group.
+ * @param groupId   The group id.
  * @param memberId  The member id.
  *
  * @return The coordinator result that will be appended to the log.
  */
 private CoordinatorResult expireClassicGroupMemberHeartbeat(
-ClassicGroup group,
+String groupId,
 String memberId
 ) {
-if (group.isInState(DEAD)) {
-log.info("Received notification of heartbeat expiration for member 
{} after group {} " +
-"had already been unloaded or deleted.",
-memberId, group.groupId());
-} else if (group.isPendingMember(memberId)) {
-log.info("Pending member {} in group {} has been removed after 
session timeout expiration.",
-memberId, group.groupId());
-
-return removePendingMemberAndUpdateClassicGroup(group, 

Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-05 Thread via GitHub


dajac commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1553481594


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java:
##
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum ConsumerGroupMigrationPolicy {
+/** Both upgrade and downgrade are enabled.*/
+BIDIRECTIONAL("bidirectional"),
+
+/** Only upgrade is enabled.*/
+UPGRADE("upgrade"),
+
+/** Only downgrade is enabled.*/
+DOWNGRADE("downgrade"),
+
+/** Neither upgrade nor downgrade is enabled.*/
+DISABLED("disabled");
+
+private final String policy;
+
+ConsumerGroupMigrationPolicy(String config) {
+this.policy = config;
+}

Review Comment:
   nit: We use different names `config` and `policy`. This is confusing. How 
about using `name`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java:
##
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum ConsumerGroupMigrationPolicy {
+/** Both upgrade and downgrade are enabled.*/
+BIDIRECTIONAL("bidirectional"),
+
+/** Only upgrade is enabled.*/
+UPGRADE("upgrade"),
+
+/** Only downgrade is enabled.*/
+DOWNGRADE("downgrade"),
+
+/** Neither upgrade nor downgrade is enabled.*/
+DISABLED("disabled");
+
+private final String policy;
+
+ConsumerGroupMigrationPolicy(String config) {
+this.policy = config;
+}
+
+@Override
+public String toString() {
+return policy;
+}
+
+public static String validValuesDescription =
+BIDIRECTIONAL   + ": both upgrade from classic group to consumer group 
and downgrade from consumer group to classic group are enabled" + ", " +
+UPGRADE + ": only upgrade is enabled" + ", " +
+DOWNGRADE   + ": only downgrade is enabled" + ", " +

Review Comment:
   nit: Should we complement with the from...to... like you side for 
BIDIRECTIONAL? 



##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1831,6 +1832,22 @@ class KafkaConfigTest {
 assertTrue(config.isNewGroupCoordinatorEnabled)
   }
 
+  @Test
+  def testGroupProtocolMigrationPolicy(): Unit = {
+val props = new Properties()
+props.putAll(kraftProps())
+
+// Invalid GroupProtocolMigrationPolicy value.
+props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, "foo")
+assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
+
+ConsumerGroupMigrationPolicy.values().foreach { policy =>
+  props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, policy.toString)

Review Comment:
   Is it case sensitive?



##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1831,6 +1832,22 @@ class KafkaConfigTest {
 assertTrue(config.isNewGroupCoordinatorEnabled)
   }
 
+  @Test
+  def testGroupProtocolMigrationPolicy(): Unit = {
+val props = new 

Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-05 Thread via GitHub


dajac commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1553480909


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -677,6 +679,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum heartbeat interval 
for registered consumers."
   val ConsumerGroupMaxSizeDoc = "The maximum number of consumers that a single 
consumer group can accommodate."
   val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full 
class names. The first one in the list is considered as the default assignor to 
be used in the case where the consumer does not specify an assignor."
+  val ConsumerGroupMigrationPolicyDoc = "The config that enables converting 
the classic group using the consumer embedded protocol to the consumer group 
using the consumer group protocol and vice versa. " + 
ConsumerGroupMigrationPolicy.validValuesDescription

Review Comment:
   Thanks. I would rather prefer to keep all the documentation defined here. 
This is what we usually do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-05 Thread via GitHub


dajac commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1553477271


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -248,9 +249,10 @@ object KafkaConfig {
   val ConsumerGroupMaxSessionTimeoutMsProp = 
"group.consumer.max.session.timeout.ms"
   val ConsumerGroupHeartbeatIntervalMsProp = 
"group.consumer.heartbeat.interval.ms"
   val ConsumerGroupMinHeartbeatIntervalMsProp = 
"group.consumer.min.heartbeat.interval.ms"
-  val ConsumerGroupMaxHeartbeatIntervalMsProp 
="group.consumer.max.heartbeat.interval.ms"
+  val ConsumerGroupMaxHeartbeatIntervalMsProp = 
"group.consumer.max.heartbeat.interval.ms"
   val ConsumerGroupMaxSizeProp = "group.consumer.max.size"
   val ConsumerGroupAssignorsProp = "group.consumer.assignors"
+  val ConsumerGroupMigrationPolicyProp = "consumer.group.migration.policy"

Review Comment:
   Sorry, I missed this one: `consumer.group` -> `group.consumer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8735: Check properties file existence first [kafka]

2024-04-05 Thread via GitHub


qinghui-xu commented on PR #7139:
URL: https://github.com/apache/kafka/pull/7139#issuecomment-2039564348

   For me this should be merged even just for the sake of the codebase sanity.
   I can try to rebase it and resolve the conflict to update the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove redundant check in appendLegacyRecord [kafka]

2024-04-05 Thread via GitHub


chia7712 merged PR #15638:
URL: https://github.com/apache/kafka/pull/15638


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16475) Create unit test for TopicImageNode

2024-04-05 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834261#comment-17834261
 ] 

Johnny Hsu commented on KAFKA-16475:


hi [~cmccabe] 
I am willing to work on this ticket, thanks! 

> Create unit test for TopicImageNode
> ---
>
> Key: KAFKA-16475
> URL: https://issues.apache.org/jira/browse/KAFKA-16475
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16475) Create unit test for TopicImageNode

2024-04-05 Thread Johnny Hsu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnny Hsu reassigned KAFKA-16475:
--

Assignee: Johnny Hsu

> Create unit test for TopicImageNode
> ---
>
> Key: KAFKA-16475
> URL: https://issues.apache.org/jira/browse/KAFKA-16475
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Johnny Hsu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]

2024-04-05 Thread via GitHub


chia7712 commented on PR #15668:
URL: https://github.com/apache/kafka/pull/15668#issuecomment-2039540752

   @brandboat nice finding! Should we add the thread prefix to 
https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2487
 to avoid similar issue in the future?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-04-05 Thread via GitHub


OmniaGM commented on code in PR #15335:
URL: https://github.com/apache/kafka/pull/15335#discussion_r1553431953


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -289,10 +300,11 @@ class Partition(val topicPartition: TopicPartition,
 delayedOperations: DelayedOperations,
 metadataCache: MetadataCache,
 logManager: LogManager,
-alterIsrManager: AlterPartitionManager) extends Logging {
+alterIsrManager: AlterPartitionManager,
+@volatile private var _topicId: Option[Uuid] = None // TODO: 
merge topicPartition and _topicId into TopicIdPartition once TopicId persist in 
most of the code

Review Comment:
   there is a jira that will address this already which is 
[KAFKA-16212](https://issues.apache.org/jira/browse/KAFKA-16212) I'll update 
the comment 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-04-05 Thread via GitHub


OmniaGM commented on code in PR #15335:
URL: https://github.com/apache/kafka/pull/15335#discussion_r1553431953


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -289,10 +300,11 @@ class Partition(val topicPartition: TopicPartition,
 delayedOperations: DelayedOperations,
 metadataCache: MetadataCache,
 logManager: LogManager,
-alterIsrManager: AlterPartitionManager) extends Logging {
+alterIsrManager: AlterPartitionManager,
+@volatile private var _topicId: Option[Uuid] = None // TODO: 
merge topicPartition and _topicId into TopicIdPartition once TopicId persist in 
most of the code

Review Comment:
   there is a jira that will be address this already which is 
[KAFKA-16212](https://issues.apache.org/jira/browse/KAFKA-16212) I'll update 
the comment 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-04-05 Thread via GitHub


OmniaGM commented on code in PR #15335:
URL: https://github.com/apache/kafka/pull/15335#discussion_r1553428780


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -104,9 +104,19 @@ class DelayedOperations(topicPartition: TopicPartition,
 object Partition {
   private val metricsGroup = new KafkaMetricsGroup(classOf[Partition])
 
-  def apply(topicPartition: TopicPartition,
+  def apply(topicIdPartition: TopicIdPartition,
 time: Time,
 replicaManager: ReplicaManager): Partition = {
+Partition(

Review Comment:
   The plan is to use this apply in 
[KAFKA-16212](https://issues.apache.org/jira/browse/KAFKA-16212) which will be 
raised soon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-04-05 Thread via GitHub


chia7712 commented on code in PR #15335:
URL: https://github.com/apache/kafka/pull/15335#discussion_r1552954260


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -104,9 +104,19 @@ class DelayedOperations(topicPartition: TopicPartition,
 object Partition {
   private val metricsGroup = new KafkaMetricsGroup(classOf[Partition])
 
-  def apply(topicPartition: TopicPartition,
+  def apply(topicIdPartition: TopicIdPartition,
 time: Time,
 replicaManager: ReplicaManager): Partition = {
+Partition(

Review Comment:
   not sure whether we need this new `apply`. No callers have 
`TopicIdPartition` and hence they have to create `TopicIdPartition` to use this 
`apply`



##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -289,10 +300,11 @@ class Partition(val topicPartition: TopicPartition,
 delayedOperations: DelayedOperations,
 metadataCache: MetadataCache,
 logManager: LogManager,
-alterIsrManager: AlterPartitionManager) extends Logging {
+alterIsrManager: AlterPartitionManager,
+@volatile private var _topicId: Option[Uuid] = None // TODO: 
merge topicPartition and _topicId into TopicIdPartition once TopicId persist in 
most of the code

Review Comment:
   Can we add jira link to the comment? The reader can trace the updates easily.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]

2024-04-05 Thread via GitHub


brandboat opened a new pull request, #15668:
URL: https://github.com/apache/kafka/pull/15668

   related to KAFKA-16477, 
   
   After profiling the kafka tests, tons of `client-metrics-reaper` thread not 
cleanup after BrokerServer shutdown.
   The thread `client-metrics-reaper` comes from 
[ClientMetricsManager#expirationTimer](https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java#L115),
 and BrokerServer#shudown doesn't close ClientMetricsManager which let the 
thread still runs in background.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-04-05 Thread via GitHub


OmniaGM commented on PR #15335:
URL: https://github.com/apache/kafka/pull/15335#issuecomment-2039474182

   > @OmniaGM , there is compilation error in jdk8_scala2.12 job. Could you 
have a look?
   > 
   > ```
   > [2024-04-04T09:19:51.266Z] [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15335/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:4952:125:
 recursive value leaderTopicsDelta needs type
   > [2024-04-04T09:19:51.266Z] [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15335/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:4953:17:
 value makeLeader is not a member of Any
   > [2024-04-04T09:19:51.266Z] [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15335/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:4957:7:
 overloaded method value assertTrue with alternatives:
   > [2024-04-04T09:19:51.266Z]   (x$1: java.util.function.BooleanSupplier)Unit 

   > [2024-04-04T09:19:51.266Z]   (x$1: Boolean)Unit
   > [2024-04-04T09:19:51.266Z]  cannot be applied to (Any)
   > ```
   > 
   > 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15335/13/pipeline
   
   Fixed it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16477) Detect thread leaked client-metrics-reaper in tests

2024-04-05 Thread Kuan Po Tseng (Jira)
Kuan Po Tseng created KAFKA-16477:
-

 Summary: Detect thread leaked client-metrics-reaper in tests
 Key: KAFKA-16477
 URL: https://issues.apache.org/jira/browse/KAFKA-16477
 Project: Kafka
  Issue Type: Improvement
Reporter: Kuan Po Tseng
Assignee: Kuan Po Tseng


After profiling the kafka tests, tons of `client-metrics-reaper` thread not 
cleanup after BrokerServer shutdown.

The thread {{client-metrics-reaper}} comes from 
[ClientMetricsManager#expirationTimer|https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java#L115],
 and BrokerServer#shudown doesn't close ClientMetricsManager which let the 
timer thread still runs in background.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-04-05 Thread via GitHub


clolov commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1553357069


##
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##
@@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest {
  */
 public static final long EARLIEST_LOCAL_TIMESTAMP = -4L;
 
+public static final long LATEST_TIERED_TIMESTAMP = -5L;

Review Comment:
   Yup, thanks a lot for bringing this up in the mailing list and here, I will 
open a pull request to amend this miss!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-04-05 Thread via GitHub


clolov commented on PR #14716:
URL: https://github.com/apache/kafka/pull/14716#issuecomment-2039443091

   Heya @cadonna! I have rebased and hopefully addressed all of the first batch 
of comments. The verifications which are missing are reported as 
unnecessary/uncalled by Mockito, but if you think they are necessary I will 
circle back to check


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-04-05 Thread via GitHub


clolov commented on code in PR #14716:
URL: https://github.com/apache/kafka/pull/14716#discussion_r1553349919


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -1652,81 +1598,64 @@ public void shouldReInitializeTopologyWhenResuming() 
throws IOException {
 assertTrue(source1.initialized);
 assertTrue(source2.initialized);
 
-EasyMock.verify(stateManager, recordCollector);
-
-EasyMock.reset(recordCollector);
-EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap());
-EasyMock.replay(recordCollector);
 assertThat("Map did not contain the partition", 
task.highWaterMark().containsKey(partition1));
+
+verify(recordCollector).offsets();
 }
 
 @Test
 public void 
shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
 final Long offset = 543L;
 
-
EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition,
 offset)).anyTimes();
-stateManager.checkpoint();
-EasyMock.expectLastCall().once();
-EasyMock.expect(stateManager.changelogOffsets())
-.andReturn(singletonMap(changelogPartition, 10L)) // restoration 
checkpoint
-.andReturn(singletonMap(changelogPartition, 10L))
-.andReturn(singletonMap(changelogPartition, 20L));
-EasyMock.expectLastCall();
-EasyMock.replay(stateManager, recordCollector);
+
when(recordCollector.offsets()).thenReturn(singletonMap(changelogPartition, 
offset));
+when(stateManager.changelogOffsets())
+.thenReturn(singletonMap(changelogPartition, 10L)) // restoration 
checkpoint
+.thenReturn(singletonMap(changelogPartition, 10L))
+.thenReturn(singletonMap(changelogPartition, 20L));
 
 task = createStatefulTask(createConfig("100"), true);
 
 task.initializeIfNeeded();
-task.completeRestoration(noOpResetter -> { });
+task.completeRestoration(noOpResetter -> { }); // should checkpoint
 
 task.prepareCommit();
-task.postCommit(true);   // should checkpoint
+task.postCommit(true); // should checkpoint
 
 task.prepareCommit();
-task.postCommit(false);   // should not checkpoint
+task.postCommit(false); // should not checkpoint
 
-EasyMock.verify(stateManager, recordCollector);
 assertThat("Map was empty", task.highWaterMark().size() == 2);
+
+verify(stateManager, times(2)).checkpoint();
 }
 
 @Test
 public void shouldCheckpointOffsetsOnCommitIfSnapshotMuchChanged() {
 final Long offset = 543L;
 
-
EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition,
 offset)).anyTimes();
-stateManager.checkpoint();
-EasyMock.expectLastCall().times(2);
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition));
-EasyMock.expect(stateManager.changelogOffsets())
-.andReturn(singletonMap(changelogPartition, 0L))
-.andReturn(singletonMap(changelogPartition, 10L))
-.andReturn(singletonMap(changelogPartition, 12000L));
-stateManager.registerStore(stateStore, 
stateStore.stateRestoreCallback, null);
-EasyMock.expectLastCall();
-EasyMock.replay(stateManager, recordCollector);
+
when(recordCollector.offsets()).thenReturn(singletonMap(changelogPartition, 
offset));
+when(stateManager.changelogOffsets())
+.thenReturn(singletonMap(changelogPartition, 0L))
+.thenReturn(singletonMap(changelogPartition, 10L))
+.thenReturn(singletonMap(changelogPartition, 12000L));
 
 task = createStatefulTask(createConfig("100"), true);
 
 task.initializeIfNeeded();
-task.completeRestoration(noOpResetter -> { });
+task.completeRestoration(noOpResetter -> { }); // should checkpoint
 task.prepareCommit();
-task.postCommit(true);
+task.postCommit(true); // should checkpoint
 
 task.prepareCommit();
-task.postCommit(false);
+task.postCommit(false); // should checkpoint since the offset delta is 
greater than the threshold
 
-EasyMock.verify(recordCollector);
 assertThat("Map was empty", task.highWaterMark().size() == 2);
+
+verify(stateManager, times(3)).checkpoint();
 }
 
 @Test
 public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition));
-stateManager.registerStore(stateStore, 
stateStore.stateRestoreCallback, null);

Review Comment:
   According to Mockito neither of these were called



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-04-05 Thread via GitHub


clolov commented on code in PR #14716:
URL: https://github.com/apache/kafka/pull/14716#discussion_r1553347186


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -312,55 +302,40 @@ public void 
shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce
 
 @Test
 public void shouldNotAttemptToLockIfNoStores() {
-stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-EasyMock.replay(stateDirectory);
+stateDirectory = mock(StateDirectory.class);
 
 task = createStatelessTask(createConfig("100"));
 
 task.initializeIfNeeded();
 
 // should fail if lock is called
-EasyMock.verify(stateDirectory);
+verify(stateDirectory, never()).lock(any());
 }
 
 @Test
-public void 
shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws 
IOException {
-final IMocksControl ctrl = EasyMock.createNiceControl();
-final ProcessorStateManager stateManager = 
ctrl.createMock(ProcessorStateManager.class);
-
EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
-stateDirectory = ctrl.createMock(StateDirectory.class);
+public void 
shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() {
+when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+stateDirectory = mock(StateDirectory.class);
 
-stateManager.registerGlobalStateStores(emptyList());
-EasyMock.expectLastCall();
-
-EasyMock.expect(stateManager.taskId()).andReturn(taskId);
-
-EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-EasyMock.expectLastCall();
-
-stateManager.transitionTaskState(SUSPENDED);

Review Comment:
   When I moved these to verifications Mockito claimed they were never called



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16305) Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake

2024-04-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16305.

Fix Version/s: 3.7.1
   Resolution: Fixed

push 
https://github.com/apache/kafka/commit/633d2f139c403cbbe2912d04f823d74c561dab76 
to 3.7 branch

> Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake
> --
>
> Key: KAFKA-16305
> URL: https://issues.apache.org/jira/browse/KAFKA-16305
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Gaurav Narula
>Assignee: Gaurav Narula
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> Kafka allows users to configure custom {{SSLEngine}} via the 
> {{SslEngineFactory}} interface. There have been attempts to use an OpenSSL 
> based {{SSLEngine}} using {{netty-handler}} over the JDK based implementation 
> for performance reasons.
> While trying to use a Netty/Openssl based SSLEngine, we observe that the 
> server hangs while performing the TLS handshake.  We observe the following 
> logs
> {code}
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [INFO ] 
> Selector - [SocketServer listenerType=ZK_BROKER, nodeId=101] 
> XX-Gaurav: calling prepare channelId 127.0.0.1:60045-127.0.0.1:60046-0
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 state HANDSHAKE
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake NEED_UNWRAP channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0, appReadBuffer pos 0, netReadBuffer pos 0, 
> netWriteBuffer pos 0
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake handshakeUnwrap channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 doRead true
> 2024-02-26 01:40:00,118 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake post handshakeUnwrap: channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 handshakeStatus NEED_UNWRAP status 
> BUFFER_UNDERFLOW read 0
> 2024-02-26 01:40:00,118 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake post unwrap NEED_UNWRAP channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0, handshakeResult Status = BUFFER_UNDERFLOW 
> 

[jira] [Reopened] (KAFKA-16305) Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake

2024-04-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reopened KAFKA-16305:


reopen for backport to 3.7

> Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake
> --
>
> Key: KAFKA-16305
> URL: https://issues.apache.org/jira/browse/KAFKA-16305
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Gaurav Narula
>Assignee: Gaurav Narula
>Priority: Major
> Fix For: 3.8.0
>
>
> Kafka allows users to configure custom {{SSLEngine}} via the 
> {{SslEngineFactory}} interface. There have been attempts to use an OpenSSL 
> based {{SSLEngine}} using {{netty-handler}} over the JDK based implementation 
> for performance reasons.
> While trying to use a Netty/Openssl based SSLEngine, we observe that the 
> server hangs while performing the TLS handshake.  We observe the following 
> logs
> {code}
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [INFO ] 
> Selector - [SocketServer listenerType=ZK_BROKER, nodeId=101] 
> XX-Gaurav: calling prepare channelId 127.0.0.1:60045-127.0.0.1:60046-0
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 state HANDSHAKE
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake NEED_UNWRAP channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0, appReadBuffer pos 0, netReadBuffer pos 0, 
> netWriteBuffer pos 0
> 2024-02-26 01:40:00,117 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake handshakeUnwrap channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 doRead true
> 2024-02-26 01:40:00,118 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake post handshakeUnwrap: channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0 handshakeStatus NEED_UNWRAP status 
> BUFFER_UNDERFLOW read 0
> 2024-02-26 01:40:00,118 
> data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] 
> SslTransportLayer- [SslTransportLayer 
> channelId=127.0.0.1:60045-127.0.0.1:60046-0 
> key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 
> remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, 
> interestOps=1, readyOps=0] SSLHandshake post unwrap NEED_UNWRAP channelId 
> 127.0.0.1:60045-127.0.0.1:60046-0, handshakeResult Status = BUFFER_UNDERFLOW 
> HandshakeStatus = NEED_UNWRAP bytesConsumed = 0 bytesProduced = 0, 
> appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 0 
> 

[jira] [Commented] (KAFKA-16471) SslTransportLayer may leak SSLEngine resources

2024-04-05 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834229#comment-17834229
 ] 

Chia-Ping Tsai commented on KAFKA-16471:


{quote}
perhaps we backport KAFKA-16305 to 3.7 and leave out 3.6 for now?
{quote}

yep, please backport  KAFKA-16305 to 3.7. And you are right, we can leave out 
3.6 and let 3.6.3 (if users require it) RM (if there is a volunteer) to 
cherry-pick in the future.

> SslTransportLayer may leak SSLEngine resources
> --
>
> Key: KAFKA-16471
> URL: https://issues.apache.org/jira/browse/KAFKA-16471
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Gaurav Narula
>Assignee: Gaurav Narula
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> {{SslTransportLayer}} does not invoke {{SSLEngine::closeInbound}} in 
> {{close()}} after flushing the {{close_notify}} TLS alert.
> While this isn't a problem for the default JDK SSLEngine, it results in 
> resource leak in Netty/OpenSSL based SSLEngine which frees native resources 
> only when {{closeInbound}} is invoked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-04-05 Thread via GitHub


clolov commented on code in PR #14716:
URL: https://github.com/apache/kafka/pull/14716#discussion_r1553289186


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -1866,9 +1810,6 @@ public void 
shouldReturnOffsetsForRepartitionTopicsForPurging() {
 
 @Test
 public void shouldThrowStreamsExceptionWhenFetchCommittedFailed() {
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(partition1));

Review Comment:
   Reported as unnecessary by Mockito



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-05 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-2039360963

   Hey Chris, sorry for the long delay on this. I finally got a chance to 
verify the code that you provided and it makes sense. I agree that so far I was 
only thinking about either having 2 separate futures such that one waits for 
the other or trying to chain futures like CompletableFutures. However, the 
version you have provided is pretty straight forward and all the new tests 
passed OOB for me.
   
   Regarding 
   
   > I think the only question left is whether out-of-order writes are possible 
because of how things are chained
   
   I am assuming that for non-exactly-once source tasks, you are referring to 
scenarios when offset flushes are triggered and when flush operations finish 
out of order. I reviewed the code and I can see that this is being checked in 
`handleFinishWrite` which does not complete the flush in case the currently 
completed flush isn't the current one.  For any other erroneous cases, 
`cancelFlush` is being invoked ( as you mentioned).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-04-05 Thread via GitHub


clolov commented on code in PR #14716:
URL: https://github.com/apache/kafka/pull/14716#discussion_r1553287165


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -1798,8 +1746,6 @@ public void 
shouldCloseStateManagerEvenDuringFailureOnUncleanTaskClose() {
 
 assertThrows(RuntimeException.class, () -> task.suspend());
 task.closeDirty();
-
-EasyMock.verify(stateManager);

Review Comment:
   I have moved
   ```
   doNothing().when(stateManager).close();
   ```
   to
   ```
   verify(stateManager).close();
   ```
   in subsequent commits.
   Everything else is related to empty maps or sets, so I have removed the 
stubbings



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-04-05 Thread via GitHub


clolov commented on code in PR #14716:
URL: https://github.com/apache/kafka/pull/14716#discussion_r1553284545


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -514,8 +496,6 @@ public void 
shouldTransitToRestoringThenRunningAfterCreation() throws IOExceptio
 assertEquals(RUNNING, task.state());
 assertTrue(source1.initialized);
 assertTrue(source2.initialized);
-
-EasyMock.verify(stateDirectory);

Review Comment:
   Yeah, this is where I should have been more explicit:
   ```
   
EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition));
   ```
   is reported as an unnecessary stub and
   ```
   stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, 
null);
   ```
   if verified is reported as not called



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-04-05 Thread via GitHub


clolov commented on code in PR #14716:
URL: https://github.com/apache/kafka/pull/14716#discussion_r1553245804


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -365,11 +356,10 @@ public void 
shouldResetOffsetsToLastCommittedForSpecifiedPartitions() {
 consumer.seek(partition1, 10L);
 consumer.seek(partition2, 15L);
 
+@SuppressWarnings("unchecked")
 final java.util.function.Consumer> resetter =
-EasyMock.mock(java.util.function.Consumer.class);
-resetter.accept(Collections.emptySet());
-EasyMock.expectLastCall();
-EasyMock.replay(resetter);
+mock(java.util.function.Consumer.class);
+doNothing().when(resetter).accept(Collections.emptySet());

Review Comment:
   This is fair, I have added a verification instead since it was expected in 
EasyMock. If you don't think the verification is necessary I can remove it 
completely



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-04-05 Thread via GitHub


clolov commented on code in PR #14716:
URL: https://github.com/apache/kafka/pull/14716#discussion_r1553235267


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -1568,10 +1548,8 @@ public void 
shouldNotShareHeadersBetweenPunctuateIterations() {
 
 @Test
 public void shouldWrapKafkaExceptionWithStreamsExceptionWhenProcess() {
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-
EasyMock.expect(stateManager.changelogOffsets()).andReturn(emptyMap()).anyTimes();
-
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-EasyMock.replay(stateManager, recordCollector);
+when(stateManager.changelogOffsets()).thenReturn(emptyMap());
+when(recordCollector.offsets()).thenReturn(emptyMap());

Review Comment:
   This is also absolutely correct, these are not needed. I have removed all 
stub-related references to emptyMap



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-04-05 Thread via GitHub


clolov commented on code in PR #14716:
URL: https://github.com/apache/kafka/pull/14716#discussion_r1553224333


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -309,49 +300,49 @@ public void 
shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce
 
 @Test
 public void shouldNotAttemptToLockIfNoStores() {
-stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-EasyMock.replay(stateDirectory);
+stateDirectory = mock(StateDirectory.class);
 
 task = createStatelessTask(createConfig("100"));
 
 task.initializeIfNeeded();
 
 // should fail if lock is called
-EasyMock.verify(stateDirectory);
+verify(stateDirectory, never()).lock(any());
 }
 
 @Test
 public void 
shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws 
IOException {
-final IMocksControl ctrl = EasyMock.createStrictControl();
-final ProcessorStateManager stateManager = 
ctrl.createMock(ProcessorStateManager.class);
-
EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
-stateDirectory = ctrl.createMock(StateDirectory.class);
+final ProcessorStateManager stateManager = 
mock(ProcessorStateManager.class);
+when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+stateDirectory = mock(StateDirectory.class);
 
-stateManager.registerGlobalStateStores(emptyList());
-EasyMock.expectLastCall();
+doNothing().when(stateManager).registerGlobalStateStores(emptyList());
 
-EasyMock.expect(stateManager.taskId()).andReturn(taskId);
+when(stateManager.taskId()).thenReturn(taskId);
 
-EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 
-stateManager.close();
-EasyMock.expectLastCall();
+doNothing().when(stateManager).close();
 
 // The `baseDir` will be accessed when attempting to delete the state 
store.
-
EasyMock.expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
-stateDirectory.unlock(taskId);
-EasyMock.expectLastCall();
+doNothing().when(stateDirectory).unlock(taskId);

Review Comment:
   No, it is already verified later on, removed



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -309,49 +300,49 @@ public void 
shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce
 
 @Test
 public void shouldNotAttemptToLockIfNoStores() {
-stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-EasyMock.replay(stateDirectory);
+stateDirectory = mock(StateDirectory.class);
 
 task = createStatelessTask(createConfig("100"));
 
 task.initializeIfNeeded();
 
 // should fail if lock is called
-EasyMock.verify(stateDirectory);
+verify(stateDirectory, never()).lock(any());
 }
 
 @Test
 public void 
shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws 
IOException {
-final IMocksControl ctrl = EasyMock.createStrictControl();
-final ProcessorStateManager stateManager = 
ctrl.createMock(ProcessorStateManager.class);
-
EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
-stateDirectory = ctrl.createMock(StateDirectory.class);
+final ProcessorStateManager stateManager = 
mock(ProcessorStateManager.class);
+when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+stateDirectory = mock(StateDirectory.class);
 
-stateManager.registerGlobalStateStores(emptyList());
-EasyMock.expectLastCall();
+doNothing().when(stateManager).registerGlobalStateStores(emptyList());
 
-EasyMock.expect(stateManager.taskId()).andReturn(taskId);
+when(stateManager.taskId()).thenReturn(taskId);
 
-EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 
-stateManager.close();
-EasyMock.expectLastCall();
+doNothing().when(stateManager).close();
 
 // The `baseDir` will be accessed when attempting to delete the state 
store.
-
EasyMock.expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
-stateDirectory.unlock(taskId);
-EasyMock.expectLastCall();
+doNothing().when(stateDirectory).unlock(taskId);
 
-ctrl.checkOrder(true);
-ctrl.replay();
+final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
 task = 

Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-04-05 Thread via GitHub


clolov commented on code in PR #14716:
URL: https://github.com/apache/kafka/pull/14716#discussion_r1553225208


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -309,49 +300,49 @@ public void 
shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce
 
 @Test
 public void shouldNotAttemptToLockIfNoStores() {
-stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-EasyMock.replay(stateDirectory);
+stateDirectory = mock(StateDirectory.class);
 
 task = createStatelessTask(createConfig("100"));
 
 task.initializeIfNeeded();
 
 // should fail if lock is called
-EasyMock.verify(stateDirectory);
+verify(stateDirectory, never()).lock(any());
 }
 
 @Test
 public void 
shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws 
IOException {
-final IMocksControl ctrl = EasyMock.createStrictControl();
-final ProcessorStateManager stateManager = 
ctrl.createMock(ProcessorStateManager.class);
-
EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
-stateDirectory = ctrl.createMock(StateDirectory.class);
+final ProcessorStateManager stateManager = 
mock(ProcessorStateManager.class);
+when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

Review Comment:
   Nope, removed in a subsequent commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >