Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15645:
URL: https://github.com/apache/kafka/pull/15645#discussion_r1568266095


##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import kafka.cluster.Broker;
+import kafka.cluster.EndPoint;
+import kafka.server.KafkaConfig;
+import kafka.server.QuorumTestHarness;
+import kafka.zk.AdminZkClient;
+import kafka.zk.BrokerInfo;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.security.PasswordEncoder;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.ZooKeeperInternals;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+public class ConfigCommandIntegrationTest extends QuorumTestHarness {

Review Comment:
   I guess that is why you open #15729 to fix the `-parameters`. Is is possible 
to use `ClusterTestExtensions` to rewrite this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-16 Thread via GitHub


showuon commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1568266346


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future");

Review Comment:
   nit: The `future` string can be replaced with a variable. Also the regex can 
also become a variable, to make it readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]

2024-04-16 Thread via GitHub


soarez commented on PR #15733:
URL: https://github.com/apache/kafka/pull/15733#issuecomment-2060431133

   Thanks @chia7712 I'll have another look at this later today 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16572) allow defining number of disks per broker in ClusterTest

2024-04-16 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837974#comment-17837974
 ] 

PoAn Yang commented on KAFKA-16572:
---

Hi [~chia7712], I'm interested in this issue. May I take it? Thank you.

> allow defining number of disks per broker in ClusterTest
> 
>
> Key: KAFKA-16572
> URL: https://issues.apache.org/jira/browse/KAFKA-16572
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> This is a follow-up of KAFKA-16559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on PR #15733:
URL: https://github.com/apache/kafka/pull/15733#issuecomment-2060429559

   @soarez it would be great to get your reviews before merging it :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16572) allow defining number of disks per broker in ClusterTest

2024-04-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16572:
--

Assignee: PoAn Yang  (was: Chia-Ping Tsai)

> allow defining number of disks per broker in ClusterTest
> 
>
> Key: KAFKA-16572
> URL: https://issues.apache.org/jira/browse/KAFKA-16572
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>
> This is a follow-up of KAFKA-16559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16559) allow defining number of disks per broker in TestKitNodes

2024-04-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16559.

Fix Version/s: 3.8.0
   Resolution: Fixed

> allow defining number of disks per broker in TestKitNodes
> -
>
> Key: KAFKA-16559
> URL: https://issues.apache.org/jira/browse/KAFKA-16559
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Gaurav Narula
>Priority: Major
> Fix For: 3.8.0
>
>
> from: https://github.com/apache/kafka/pull/15136#discussion_r1565571409
> That allow us to run the reassignment tests.
> Also, we should enhance setNumBrokerNodes 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/TestKitNodes.java#L81)
>  to accept extra argument to define the number of folders (by 
> setLogDirectories)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16572) allow defining number of disks per broker in ClusterTest

2024-04-16 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16572:
--

 Summary: allow defining number of disks per broker in ClusterTest
 Key: KAFKA-16572
 URL: https://issues.apache.org/jira/browse/KAFKA-16572
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


This is a follow-up of KAFKA-16559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16559: allow defining number of disks per broker in TestKitNodes [kafka]

2024-04-16 Thread via GitHub


chia7712 merged PR #15730:
URL: https://github.com/apache/kafka/pull/15730


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-16 Thread via GitHub


showuon commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1568241856


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");

Review Comment:
   > We can use topicPartition, but we use similar regular expression in other 
place. Do we also need to update it?
   
   Thanks for pointing out these to me. I was wrong. I tried to match with 
topic name containing `.` or `-`, they both match correctly. That means we can 
use this regex directly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-16 Thread via GitHub


kamalcph commented on PR #15631:
URL: https://github.com/apache/kafka/pull/15631#issuecomment-2060333550

   @junrao 
   
   Gentle reminder. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]

2024-04-16 Thread via GitHub


LQXshane commented on code in PR #14446:
URL: https://github.com/apache/kafka/pull/14446#discussion_r1568199337


##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,47 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning in downstream key changing 
operations.
+ *
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used 
with interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in 
their original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.
+ * 
+ *
+ *
+ * This method will overwrite a default behavior as described below.
+ * 
+ * By default, Kafka Streams always automatically repartition the 
records to prepare for a stateful operation,
+ * however, it is not always required when input stream is partitioned 
as intended. As an example,
+ * if a input stream is partitioned by a String key1, calling the 
below function will trigger a repartition:
+ * {@code
+ * KStream inputStream = builder.stream("topic");
+ * stream
+ *   .selectKey( ... => (key1, metric))
+ *   .groupByKey()
+ *   .aggregate()
+ * }
+ * 
+ *
+ * 
+ * You can then overwrite the default behavior by calling this method:
+ * {@code
+ * stream
+ *   .selectKey( ... => (key1, metric))
+ *   .markAsPartitioned()
+ *   .groupByKey()
+ *   .aggregate()
+ * }
+ * 
+ *
+ * @return a new {@code KStream} instance that will not repartition in 
subsequent operations: {@link KStream#selectKey(KeyValueMapper)}, {@link 
KStream#map(KeyValueMapper)}, {@link KStream#flatTransform(TransformerSupplier, 
String...)}.

Review Comment:
   Removed for simplicity



##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,47 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning in downstream key changing 
operations.
+ *
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used 
with interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in 
their original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.
+ * 
+ *
+ *
+ * This method will overwrite a default behavior as described below.
+ * 
+ * By default, Kafka Streams always automatically repartition the 
records to prepare for a stateful operation,
+ * however, it is not always required when input stream is partitioned 
as intended. As an example,
+ * if a input stream is partitioned by a String key1, calling the 
below function will trigger a repartition:
+ * {@code
+ * KStream inputStream = builder.stream("topic");
+ * stream
+ *   .selectKey( ... => (key1, metric))
+ *   .groupByKey()
+ *   .aggregate()
+ * }
+ * 
+ *
+ * 
+ * You can then overwrite the default behavior by calling this method:
+ * {@code
+ * stream
+ *   .selectKey( ... => (key1, metric))
+ *   .markAsPartitioned()
+ *   .groupByKey()
+ *   .aggregate()
+ * }
+ * 
+ *
+ * @return a new {@code KStream} instance that will not repartition in 
subsequent operations: {@link KStream#selectKey(KeyValueMapper)}, {@link 
KStream#map(KeyValueMapper)}, {@link KStream#flatTransform(TransformerSupplier, 
String...)}.

Review Comment:
   Removed for simplicity in the new PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]

2024-04-16 Thread via GitHub


LQXshane commented on code in PR #14446:
URL: https://github.com/apache/kafka/pull/14446#discussion_r1568199214


##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,47 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning in downstream key changing 
operations.
+ *
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used 
with interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in 
their original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.
+ * 
+ *
+ *
+ * This method will overwrite a default behavior as described below.
+ * 
+ * By default, Kafka Streams always automatically repartition the 
records to prepare for a stateful operation,
+ * however, it is not always required when input stream is partitioned 
as intended. As an example,
+ * if a input stream is partitioned by a String key1, calling the 
below function will trigger a repartition:
+ * {@code
+ * KStream inputStream = builder.stream("topic");

Review Comment:
   This HTML tag helps the rendering from what I can tell, without them, the 
code indentation and whitespaces are not preserved. 
   
   Here is an image of what I see in my IntelliJ with `` tag:
   
   
   https://github.com/apache/kafka/assets/16904776/718a2658-4c54-4a7a-acde-dd0c77140c20;>
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]

2024-04-16 Thread via GitHub


LQXshane commented on code in PR #14446:
URL: https://github.com/apache/kafka/pull/14446#discussion_r1568198123


##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,47 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning in downstream key changing 
operations.

Review Comment:
   Resolved in the new PR.



##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,47 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning in downstream key changing 
operations.
+ *
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used 
with interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in 
their original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.
+ * 

Review Comment:
   Resolved in the new PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]

2024-04-16 Thread via GitHub


LQXshane commented on PR #14446:
URL: https://github.com/apache/kafka/pull/14446#issuecomment-2060317790

   @mjsax It's Shay. Taking this one up from my personal account. My personal 
account don't have the permission to push to the other account, is it okay to 
create a new PR here https://github.com/apache/kafka/pull/15740?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KIP-759 Mark as Partitioned [kafka]

2024-04-16 Thread via GitHub


LQXshane opened a new pull request, #15740:
URL: https://github.com/apache/kafka/pull/15740

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]

2024-04-16 Thread via GitHub


LQXshane commented on code in PR #14446:
URL: https://github.com/apache/kafka/pull/14446#discussion_r1568173605


##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,47 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning in downstream key changing 
operations.
+ *
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used 
with interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in 
their original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.
+ * 

Review Comment:
     wasn't sure as I saw the other javadocs doing it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]

2024-04-16 Thread via GitHub


LQXshane commented on code in PR #14446:
URL: https://github.com/apache/kafka/pull/14446#discussion_r1568173605


##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,47 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning in downstream key changing 
operations.
+ *
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used 
with interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in 
their original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.
+ * 

Review Comment:
     wasn't sure as I saw the other javadocs doing it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]

2024-04-16 Thread via GitHub


lianetm closed pull request #15738: KAFKA-16566: Fix consumer static membership 
system test with new protocol
URL: https://github.com/apache/kafka/pull/15738


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15466: Add KIP-919 support for some admin APIs [kafka]

2024-04-16 Thread via GitHub


bachmanity1 commented on PR #14399:
URL: https://github.com/apache/kafka/pull/14399#issuecomment-2060259657

   Hi @cmccabe 
   
   Given that this PR is merged now, is it possible to dynamically update 
controller configs using `kafka-configs.sh`? 
   
   I've tested this locally but couldn't make it work.
   
   
![image](https://github.com/apache/kafka/assets/81428651/4cd86184-bd7f-4b4f-8ad7-e57423c3e7d7)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]

2024-04-16 Thread via GitHub


lianetm commented on PR #15738:
URL: https://github.com/apache/kafka/pull/15738#issuecomment-2060248860

   FYI, I've been getting successful runs with this change:
   running:
   
`TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_fencing_static_consumer"
 bash tests/docker/run_tests.sh`
   results:
   
   > 

   > SESSION REPORT (ALL TESTS)
   > ducktape version: 0.11.4
   > session_id:   2024-04-16--016
   > run time: 27 minutes 15.117 seconds
   > tests run:16
   > passed:   16
   > flaky:0
   > failed:   0
   > ignored:  0
   > 

   
   But I still got less frequent failures after trying it several times, so I'm 
afraid there may be something else (maybe related to sessions remaining open 
between test executions happening too close to each other?). If that's the 
case, I'll probably be adding another change here. 
   
   I'll dig into it and update here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16467) Add README to docs folder

2024-04-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837928#comment-17837928
 ] 

ASF GitHub Bot commented on KAFKA-16467:


showuon commented on code in PR #596:
URL: https://github.com/apache/kafka-site/pull/596#discussion_r1568111065


##
README.md:
##
@@ -10,4 +10,32 @@ You can run it with the following command, note that it 
requires docker:
 
 Then you can open [localhost:8080](http://localhost:8080) on your browser and 
browse the documentation.
 
-To kill the process, just type ctrl + c
\ No newline at end of file
+To kill the process, just type ctrl + c.
+
+## How to preview the latest documentation changes in Kafka repository?
+
+1. Generating document from kafka repository:
+
+```shell
+# change directory into kafka repository
+cd KAFKA_REPO
+./gradlew clean siteDocTar
+# supposing built with scala 2.13
+tar zxvf core/build/distributions/kafka_2.13-$(./gradlew properties | grep 
version: | awk '{print $NF}' | head -n 1)-site-docs.tgz
+```
+
+2. Copying the generated documents from Kafka repository into kafka-site, and 
preview them (note that it requires docker):
+
+```shell
+# change directory into kafka-site repository
+cd KAFKA_SITE_REPO
+# copy the generated documents into dev folder
+rm -rf dev
+mkdir dev
+# change directory into kafka repository
+cp -r KAFKA_REPO/site-docs/* dev

Review Comment:
   I don't think this comment is correct.





> Add README to docs folder
> -
>
> Key: KAFKA-16467
> URL: https://issues.apache.org/jira/browse/KAFKA-16467
> Project: Kafka
>  Issue Type: Improvement
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>
> We don't have a guide in project root folder or docs folder to show how to 
> run local website. It's good to provide a way to run document with kafka-site 
> repository.
>  
> Option 1: Add links to wiki page 
> [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes]
>  and 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67634793]. 
> Option 2: Show how to run the document within container. For example: moving 
> `site-docs` from kafka to kafka-site repository and run `./start-preview.sh`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-16 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1568044239


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
 void handleUncleanBrokerShutdown(int brokerId, List 
records) {
 replicationControl.handleBrokerUncleanShutdown(brokerId, records);
 }
+
+void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws 
InterruptedException, ExecutionException {
+appendWriteEvent("partitionUpdateForMinIsrChange", 
OptionalLong.empty(),
+() -> 
replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get();
+}

Review Comment:
   hmm, I think a better way to think about it is that we want to append the 
min ISR config update atomically with the partition change records. Appending 
the partition change records once the config change is replayed is difficult to 
reason about and possibly incorrect. Thinking a bit more about it, triggering a 
write event from the `replay()` for the config change record means that every 
time we reload the metadata log, we would replay the config change record and 
generate new partition change records.
   
   Perhaps one example to look at is 
`ReplicationControlManager.handleBrokerFenced`. When a broker is fenced, we 
generate a broker registration change record along with the leaderAndIsr 
partition change records. I assume we want to follow a similar model with the 
topic configuration change events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-16 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1568084297


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2117,6 +2117,20 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
getPartitionElrUpdatesForConfigChanges(Optional topicName) {
+if (!isElrEnabled()) return 
ControllerResult.of(Collections.emptyList(), null);
+
+List records = new ArrayList<>();
+if (topicName.isPresent()) {
+generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, 
NO_LEADER, NO_LEADER, records,
+
brokersToElrs.partitionsWithElr(topicsByName.get(topicName.get(;
+} else {
+generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, 
NO_LEADER, NO_LEADER, records,
+brokersToElrs.partitionsWithElr());
+}

Review Comment:
   I haven't been following along too closely. Is my understanding correct that 
we would only expect to generate partition change records that clear the ELR 
when the min ISR config decreases?
   
   When the configured topic ISR increases, it would not be safe to include 
more replicas in the ELR, since they may not have the HWM.
   
   If my understanding is correct, should we have tests to verify that:
   1. When the configured topic ISR decreases, we generate the expected 
partition change record events.
   2. When the configured topic ISR increases, we do not generate any partition 
change record events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]

2024-04-16 Thread via GitHub


lianetm commented on code in PR #15738:
URL: https://github.com/apache/kafka/pull/15738#discussion_r1568061403


##
tests/kafkatest/tests/client/consumer_test.py:
##
@@ -348,13 +348,32 @@ def test_fencing_static_consumer(self, 
num_conflict_consumers, fencing_stage, me
 consumer.start()
 self.await_members(consumer, len(consumer.nodes))
 
+num_rebalances = consumer.num_rebalances()
 conflict_consumer.start()
-self.await_members(conflict_consumer, num_conflict_consumers)
-self.await_members(consumer, len(consumer.nodes) - 
num_conflict_consumers)
+if group_protocol == consumer_group.classic_group_protocol:
+# Classic protocol: conflicting members should join, and the 
intial ones with conflicting instance id should fail.
+self.await_members(conflict_consumer, num_conflict_consumers)
+self.await_members(consumer, len(consumer.nodes) - 
num_conflict_consumers)
 
-wait_until(lambda: len(consumer.dead_nodes()) == 
num_conflict_consumers,
+wait_until(lambda: len(consumer.dead_nodes()) == 
num_conflict_consumers,
timeout_sec=10,
err_msg="Timed out waiting for the fenced consumers to 
stop")
+else:
+# Consumer protocol: Existing members should remain active and 
new conflicting ones should not be able to join.
+self.await_consumed_messages(consumer)
+assert num_rebalances == consumer.num_rebalances(), "Static 
consumers attempt to join with instance id in use should not cause a rebalance"
+assert len(consumer.joined_nodes()) == len(consumer.nodes)
+assert len(conflict_consumer.joined_nodes()) == 0

Review Comment:
   There should be no timing issues as I see it. For the 
`consumer.joined_nodes` there is a previous `self.await_members`, that ensures 
that we wait for the time needed for all the nodes to join. As for the 
`conflict_consumer.joined_nodes()`, its for nodes that never joined, we're just 
asserting that after the non-conflicting remained without rebalance, consuming 
(ensuring activity), the conflicting ones did not join. Makes sense?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-04-16 Thread via GitHub


splett2 commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1568050802


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -108,6 +103,43 @@ object StorageTool extends Logging {
 }
   }
 
+  def metadataVersionValidation(metadataVersion: MetadataVersion, config: 
Option[KafkaConfig]): Unit = {

Review Comment:
   nit: `validateMetadataVersion`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-16 Thread via GitHub


showuon commented on code in PR #15719:
URL: https://github.com/apache/kafka/pull/15719#discussion_r1568045423


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -413,7 +413,7 @@ class LogManagerTest {
 assertEquals(numMessages * setSize / segmentBytes, log.numberOfSegments, 
"Check we have the expected number of segments.")
 
 // this cleanup shouldn't find any expired segments but should delete some 
to reduce size
-time.sleep(logManager.InitialTaskDelayMs)
+time.sleep(logManager.initialTaskDelayMs)
 assertEquals(6, log.numberOfSegments, "Now there should be exactly 6 
segments")
 time.sleep(log.config.fileDeleteDelayMs + 1)

Review Comment:
   > Huge thanks ! I'm a little confused about the comment above, the test in 
LogManagerTest itself verify that tasks like log cleanup, flush logs are 
triggered after sleeping initialTaskDelayMs.
   
   Yes, and currently, all of them are setting 
`initialTaskDelayMs=LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS`. I'm thinking we 
could have a test and set `initialTaskDelayMs` to, let's say, 1000, and we can 
verify the change is adopted by verifying if we sleep only 1000ms, the log 
cleanup will be triggered. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16571: reassign_partitions_test.bounce_brokers should wait for messages to be sent to every partition [kafka]

2024-04-16 Thread via GitHub


jolshan commented on PR #15739:
URL: https://github.com/apache/kafka/pull/15739#issuecomment-2060079351

   Hmmm -- I ran the test and it is not working. I will debug  
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-16 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1568044239


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
 void handleUncleanBrokerShutdown(int brokerId, List 
records) {
 replicationControl.handleBrokerUncleanShutdown(brokerId, records);
 }
+
+void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws 
InterruptedException, ExecutionException {
+appendWriteEvent("partitionUpdateForMinIsrChange", 
OptionalLong.empty(),
+() -> 
replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get();
+}

Review Comment:
   hmm, I think a better way to think about it is that we want to append the 
min ISR config update atomically with the partition change records. Only 
appending the partition change records once the config change is replayed is 
difficult to reason about and possibly incorrect.
   
   Perhaps one example to look at is 
`ReplicationControlManager.handleBrokerFenced`. When a broker is fenced, we 
generate a broker registration change record along with the leaderAndIsr 
partition change records. I assume we want to follow a similar model with the 
topic configuration change events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16571: reassign_partitions_test.bounce_brokers should wait for messages to be sent to every partition [kafka]

2024-04-16 Thread via GitHub


jolshan opened a new pull request, #15739:
URL: https://github.com/apache/kafka/pull/15739

   Added the check before the reassignment occurs and we start bouncing brokers.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-16 Thread via GitHub


kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1568001935


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1388,6 +1393,31 @@ public void commitSync(Map offsets, Duration
 }
 }
 
+private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {

Review Comment:
   nit: consider changing `disableWakeup` to `enableWakeup`. Double-negatives 
add nonzero cognitive overhead.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1388,6 +1393,31 @@ public void commitSync(Map offsets, Duration
 }
 }
 
+private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {
+if (lastPendingAsyncCommit == null) {
+return;
+}
+
+try {
+final CompletableFuture futureToAwait = new 
CompletableFuture<>();
+// We don't want the wake-up trigger to complete our pending async 
commit future,
+// so create new future here. Any errors in the pending async 
commit will be handled
+// by the async commit future / the commit callback - here, we 
just want to wait for it to complete.
+lastPendingAsyncCommit.whenComplete((v, t) -> 
futureToAwait.complete(null));
+if (!disableWakeup) {
+wakeupTrigger.setActiveTask(futureToAwait);
+}
+ConsumerUtils.getResult(futureToAwait, timer);

Review Comment:
   Is it true that the underlying `lastPendingAsyncCommit` `Future` could 
already be completed by this point, right?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();

Review Comment:
   Can this be replaced with a call to 
`testSyncCommitTimesoutAfterIncompleteAsyncCommit()` like the other tests? I 
glanced back and forth a couple of times and didn't see too much difference:
   
   ```suggestion
   final TopicPartition tp = new TopicPartition("foo", 0);
   testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+// Complete async commit event
+final ArgumentCaptor commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+verify(applicationEventHandler).add(commitEventCaptor.capture());

Review Comment:
   This use of JUnit is just about over my head...
   
   For my own understanding, at which line in this test does the 
`AsyncCommitEvent` get created and enqueued? I would assume at line 634, right?
   
   It looks like you're able to add the `ArgumentCaptor` _after_ the object 
pointed at by the argument was created. Is that correct? 樂 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1005,6 +1083,43 @@ public void testNoWakeupInCloseCommit() {
 assertFalse(capturedEvent.get().future().isCompletedExceptionally());
 }
 
+@Test
+public 

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-16 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1568011957


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ClientSideAssignorBenchmark.java:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ClientSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+COOPERATIVE_STICKY(new CooperativeStickyAssignor());
+
+private final ConsumerPartitionAssignor assignor;
+
+AssignorType(ConsumerPartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public ConsumerPartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"1000", "1"})
+private int memberCount;
+
+@Param({"10", "50"})
+private int partitionsPerTopicCount;
+
+@Param({"100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "COOPERATIVE_STICKY"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private Map subscriptions 
= new HashMap<>();
+
+private ConsumerPartitionAssignor.GroupSubscription groupSubscription;
+
+private static final int numberOfRacks = 3;
+
+private static final int replicationFactor = 2;

Review Comment:
   For the server side tests I always used replication factor as 2, so in order 
to get the same distribution of racks for consistency I added the replication 
factor as 2. That being said I think I'll just hardcode the replication factor 
here as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-16 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1568009821


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-16 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1568009691


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-16 Thread via GitHub


kirktrue commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1567992301


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -482,6 +482,15 @@ public long nextHeartbeatMs(final long currentTimeMs) {
 return heartbeatTimer.remainingMs();
 }
 
+public void onFailedAttempt(final long currentTimeMs) {
+// Reset timer to allow sending HB after a failure without waiting 
for the interval.
+// After a failure, a next HB may be needed with backoff (ex. 
errors that lead to
+// retries, like coordinator load error), or immediately (ex. 
errors that lead to
+// rejoining, like fencing errors).
+heartbeatTimer.reset(0);
+super.onFailedAttempt(currentTimeMs);

Review Comment:
   Does the decision to reset the heartbeat timer depend on what _type_ of 
error is received?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -380,7 +380,7 @@ private void onErrorResponse(final 
ConsumerGroupHeartbeatResponse response,
 break;
 
 case UNRELEASED_INSTANCE_ID:
-logger.error("GroupHeartbeatRequest failed due to the instance 
id {} was not released: {}",
+logger.error("GroupHeartbeatRequest failed due to unreleased 
instance id {}: {}",

Review Comment:
   QQ: are these logging changes of the ‘I'll just clean this up as long as I'm 
in here?’ variety, or dow it have some bearing on the correctness of the logs?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -231,6 +231,35 @@ public void testTimerNotDue() {
 assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
 }
 
+@Test
+public void testHeartbeatNotSentIfAnotherOneInFlight() {
+mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+// Heartbeat sent (no response received)
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a " +
+"previous on in-flight");

Review Comment:
   Super nit-picky, sorry  
   
   ```suggestion
   "previous one in-flight");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]

2024-04-16 Thread via GitHub


kirktrue commented on code in PR #15738:
URL: https://github.com/apache/kafka/pull/15738#discussion_r156796


##
tests/kafkatest/tests/client/consumer_test.py:
##
@@ -348,13 +348,32 @@ def test_fencing_static_consumer(self, 
num_conflict_consumers, fencing_stage, me
 consumer.start()
 self.await_members(consumer, len(consumer.nodes))
 
+num_rebalances = consumer.num_rebalances()
 conflict_consumer.start()
-self.await_members(conflict_consumer, num_conflict_consumers)
-self.await_members(consumer, len(consumer.nodes) - 
num_conflict_consumers)
+if group_protocol == consumer_group.classic_group_protocol:
+# Classic protocol: conflicting members should join, and the 
intial ones with conflicting instance id should fail.
+self.await_members(conflict_consumer, num_conflict_consumers)
+self.await_members(consumer, len(consumer.nodes) - 
num_conflict_consumers)
 
-wait_until(lambda: len(consumer.dead_nodes()) == 
num_conflict_consumers,
+wait_until(lambda: len(consumer.dead_nodes()) == 
num_conflict_consumers,
timeout_sec=10,
err_msg="Timed out waiting for the fenced consumers to 
stop")
+else:
+# Consumer protocol: Existing members should remain active and 
new conflicting ones should not be able to join.
+self.await_consumed_messages(consumer)
+assert num_rebalances == consumer.num_rebalances(), "Static 
consumers attempt to join with instance id in use should not cause a rebalance"
+assert len(consumer.joined_nodes()) == len(consumer.nodes)
+assert len(conflict_consumer.joined_nodes()) == 0

Review Comment:
   Do we anticipate any timing issues here? That is, will `num_rebalances()` 
and `joined_nodes()` be "guaranteed" to return the correct values immediately 
after the call to `await_consumed_messages()` is finished? Or do we want to 
wrap those assertions as `wait_until()`s to give them a few seconds to coalesce 
to the correct value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16571) reassign_partitions_test.bounce_brokers should wait for messages to be sent to every partition

2024-04-16 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-16571:
--

Assignee: Justine Olshan

> reassign_partitions_test.bounce_brokers should wait for messages to be sent 
> to every partition
> --
>
> Key: KAFKA-16571
> URL: https://issues.apache.org/jira/browse/KAFKA-16571
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Assignee: Justine Olshan
>Priority: Major
>
> This particular system test tries to bounce brokers while produce is ongoing. 
> The test also has rf=3 and min.isr=3 configured, so if any brokers are 
> bounced before records are produced to every partition, it is possible to run 
> into OutOfOrderSequence exceptions similar to what is described in 
> https://issues.apache.org/jira/browse/KAFKA-14359
> When running the produce_consume_validate for the reassign_partitions_test, 
> instead of waiting for 5 acked messages, we should wait for messages to be 
> acked on the full set of partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful

2024-04-16 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-16570:
---
Description: 
When we want to fence a producer using the admin client, we send an 
InitProducerId request.

There is logic in that API to fence (and abort) any ongoing transactions and 
that is what the API relies on to fence the producer. However, this handling 
also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because we 
want to actually get a new producer ID and want to retry until the the ID is 
supplied or we time out.  
[https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
 
[https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322]
 

In the case of fence producer, we don't retry and instead we have no handling 
for concurrent transactions and log a message about an unexpected error.
[https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
 

This is not unexpected though and the operation was successful. We should just 
swallow this error and treat this as a successful run of the command. 

  was:
When we want to fence a producer using the admin client, we send an 
InitProducerId request.

There is logic in that API to fence (and abort) any ongoing transactions and 
that is what the API relies on to fence the producer. However, this handling 
also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because we 
want to actually get a new producer ID and want to retry until the the ID is 
supplied or we time out.  
[https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
 



In the case of fence producer, we don't retry and instead we have no handling 
for concurrent transactions and log a message about an unexpected error.
[https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
 

This is not unexpected though and the operation was successful. We should just 
swallow this error and treat this as a successful run of the command. 


> FenceProducers API returns "unexpected error" when successful
> -
>
> Key: KAFKA-16570
> URL: https://issues.apache.org/jira/browse/KAFKA-16570
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> When we want to fence a producer using the admin client, we send an 
> InitProducerId request.
> There is logic in that API to fence (and abort) any ongoing transactions and 
> that is what the API relies on to fence the producer. However, this handling 
> also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because 
> we want to actually get a new producer ID and want to retry until the the ID 
> is supplied or we time out.  
> [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
>  
> [https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322]
>  
> In the case of fence producer, we don't retry and instead we have no handling 
> for concurrent transactions and log a message about an unexpected error.
> [https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
>  
> This is not unexpected though and the operation was successful. We should 
> just swallow this error and treat this as a successful run of the command. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful

2024-04-16 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837902#comment-17837902
 ] 

Justine Olshan commented on KAFKA-16570:


cc: [~ChrisEgerton] [~showuon] [~tombentley] as author and reviewers of the 
original change.

> FenceProducers API returns "unexpected error" when successful
> -
>
> Key: KAFKA-16570
> URL: https://issues.apache.org/jira/browse/KAFKA-16570
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> When we want to fence a producer using the admin client, we send an 
> InitProducerId request.
> There is logic in that API to fence (and abort) any ongoing transactions and 
> that is what the API relies on to fence the producer. However, this handling 
> also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because 
> we want to actually get a new producer ID and want to retry until the the ID 
> is supplied or we time out.  
> [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
>  
> [https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322]
>  
> In the case of fence producer, we don't retry and instead we have no handling 
> for concurrent transactions and log a message about an unexpected error.
> [https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
>  
> This is not unexpected though and the operation was successful. We should 
> just swallow this error and treat this as a successful run of the command. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16571) reassign_partitions_test.bounce_brokers should wait for messages to be sent to every partition

2024-04-16 Thread David Mao (Jira)
David Mao created KAFKA-16571:
-

 Summary: reassign_partitions_test.bounce_brokers should wait for 
messages to be sent to every partition
 Key: KAFKA-16571
 URL: https://issues.apache.org/jira/browse/KAFKA-16571
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao


This particular system test tries to bounce brokers while produce is ongoing. 
The test also has rf=3 and min.isr=3 configured, so if any brokers are bounced 
before records are produced to every partition, it is possible to run into 
OutOfOrderSequence exceptions similar to what is described in 
https://issues.apache.org/jira/browse/KAFKA-14359

When running the produce_consume_validate for the reassign_partitions_test, 
instead of waiting for 5 acked messages, we should wait for messages to be 
acked on the full set of partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful

2024-04-16 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-16570:
--

 Summary: FenceProducers API returns "unexpected error" when 
successful
 Key: KAFKA-16570
 URL: https://issues.apache.org/jira/browse/KAFKA-16570
 Project: Kafka
  Issue Type: Bug
Reporter: Justine Olshan
Assignee: Justine Olshan


When we want to fence a producer using the admin client, we send an 
InitProducerId request.

There is logic in that API to fence (and abort) any ongoing transactions and 
that is what the API relies on to fence the producer. However, this handling 
also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because we 
want to actually get a new producer ID and want to retry until the the ID is 
supplied or we time out.  
[https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
 



In the case of fence producer, we don't retry and instead we have no handling 
for concurrent transactions and log a message about an unexpected error.
[https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
 

This is not unexpected though and the operation was successful. We should just 
swallow this error and treat this as a successful run of the command. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16569) Target Assignment Format Change

2024-04-16 Thread Ritika Reddy (Jira)
Ritika Reddy created KAFKA-16569:


 Summary: Target Assignment Format Change
 Key: KAFKA-16569
 URL: https://issues.apache.org/jira/browse/KAFKA-16569
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ritika Reddy
Assignee: Ritika Reddy


Currently the assignment is stored as Map>, we 
want to change it to a list

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16568) Add JMH Benchmarks for assignor performance testing

2024-04-16 Thread Ritika Reddy (Jira)
Ritika Reddy created KAFKA-16568:


 Summary: Add JMH Benchmarks for assignor performance testing 
 Key: KAFKA-16568
 URL: https://issues.apache.org/jira/browse/KAFKA-16568
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ritika Reddy
Assignee: Ritika Reddy


The 3 benchmarks that are being used to test the performance and efficiency of 
the consumer group rebalance process.
 * Client Assignors (assign method)
 * Server Assignors (assign method)
 * Target Assignment Builder (build method)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-04-16 Thread Francois Visconte (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837892#comment-17837892
 ] 

Francois Visconte commented on KAFKA-16511:
---

Here is for example one of the segments and partitions where I had the issue. 

Remaining segment leaked on tiered storage that was never deleted
{code}
partition: 12, offset: 429045, value: 
RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=9481FsNMTqiXLLzIBY03lA}, startOffset=2965469, endOffset=2968297, 
brokerId=10045, maxTimestampMs=1712010650848, eventTimestampMs=1712018260978, 
segmentLeaderEpochs={7=2965469}, segmentSizeInBytes=179013013, 
customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}
partition: 12, offset: 429049, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=9481FsNMTqiXLLzIBY03lA}, customMetadata=Optional.empty, 
state=COPY_SEGMENT_FINISHED, eventTimestampMs=1712018266033, brokerId=10045}
{code}

Last events in the remote log metadata for this partition:
{code}
partition: 12, offset: 427434, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=5_3z-dusQEeSDqNC_E5ifQ}, customMetadata=Optional.empty, 
state=COPY_SEGMENT_FINISHED, eventTimestampMs=1712010391903, brokerId=10041}
partition: 12, offset: 427680, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=58Ht5YJ8SaW3JaI_lDEpsA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712012524972, brokerId=10041}
partition: 12, offset: 427681, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=58Ht5YJ8SaW3JaI_lDEpsA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712012525109, brokerId=10041}
partition: 12, offset: 428017, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=s4RCtworR-2Na8PGY6h2nQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712014486037, brokerId=10045}
partition: 12, offset: 428018, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=s4RCtworR-2Na8PGY6h2nQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712014486168, brokerId=10045}
partition: 12, offset: 428399, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=UWhaLz-GTjqPcVKL8uZOQQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712016766308, brokerId=10045}
partition: 12, offset: 428400, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=UWhaLz-GTjqPcVKL8uZOQQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712016766397, brokerId=10045}
partition: 12, offset: 429045, value: 
RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=9481FsNMTqiXLLzIBY03lA}, startOffset=2965469, endOffset=2968297, 
brokerId=10045, maxTimestampMs=1712010650848, eventTimestampMs=1712018260978, 
segmentLeaderEpochs={7=2965469}, segmentSizeInBytes=179013013, 
customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}
partition: 12, offset: 429049, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=9481FsNMTqiXLLzIBY03lA}, customMetadata=Optional.empty, 
state=COPY_SEGMENT_FINISHED, eventTimestampMs=1712018266033, brokerId=10045}
partition: 12, offset: 429265, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=TM44MmIqSKSBPmXMzjAvxA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712018873277, brokerId=10045}
partition: 12, offset: 429266, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=TM44MmIqSKSBPmXMzjAvxA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712018873411, brokerId=10045}
partition: 12, offset: 429602, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=djqwO5JeS9y2vRua-FBO3A}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712020973535, brokerId=10045}
partition: 12, offset: 429603, value: 

[jira] (KAFKA-16511) Leaking tiered segments

2024-04-16 Thread Francois Visconte (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-16511 ]


Francois Visconte deleted comment on KAFKA-16511:
---

was (Author: JIRAUSER288982):
>The issue might be due to the overlapping remote log segments after a new 
>leader gets elected during rolling restart. Would you please upload the past 
>10 segments remote-log-segment metadata events for 
>5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765 partition? Thanks!

Here is 


{code:java}
partition: 27, offset: 400504, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=v8QykWvQQryMxNoS1aHMCQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712045518393, brokerId=10015}
partition: 27, offset: 400505, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=v8QykWvQQryMxNoS1aHMCQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712045518494, brokerId=10015}
partition: 27, offset: 400828, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=6K-bSNgxSnOypIrjl9OiGA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712047438507, brokerId=10015}
partition: 27, offset: 400829, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=6K-bSNgxSnOypIrjl9OiGA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712047438609, brokerId=10015}
partition: 27, offset: 401145, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=ErPfaCVhRiS6EKx7RnL1iQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712049418775, brokerId=10015}
partition: 27, offset: 401146, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=ErPfaCVhRiS6EKx7RnL1iQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712049418894, brokerId=10015}
partition: 27, offset: 401458, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=2yAT3R6tS3umrvU8iuMVfw}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712051340571, brokerId=10015}
partition: 27, offset: 401459, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=2yAT3R6tS3umrvU8iuMVfw}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712051340719, brokerId=10015}
partition: 27, offset: 401758, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=KLtWI-SWS7eBML87LX63SA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712053320735, brokerId=10015}
partition: 27, offset: 401759, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=KLtWI-SWS7eBML87LX63SA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712053320838, brokerId=10015}
partition: 27, offset: 539496, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=8dP13VDYSaiFlubl9SNBTQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712909568625, brokerId=10015}
partition: 27, offset: 539497, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=8dP13VDYSaiFlubl9SNBTQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712909568846, brokerId=10015}
{code}


> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> 

[jira] (KAFKA-16511) Leaking tiered segments

2024-04-16 Thread Francois Visconte (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-16511 ]


Francois Visconte deleted comment on KAFKA-16511:
---

was (Author: JIRAUSER288982):
Here is for example on one of the partition where I have the issue: 

{code}
_zIcSPWGzlqLjUTFfw}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712036488994, brokerId=10041}
partition: 12, offset: 432066, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=YTIk_zIcSPWGzlqLjUTFfw}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712036489094, brokerId=10041}
partition: 12, offset: 432319, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=8mQ_JJLfTQmkP80MYcN6Ig}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712038114012, brokerId=10041}
partition: 12, offset: 432320, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=8mQ_JJLfTQmkP80MYcN6Ig}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712038114110, brokerId=10041}
partition: 12, offset: 432593, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=oqr_-KDrSIGiGrcLQZhzvA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712039805803, brokerId=10041}
partition: 12, offset: 432594, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=oqr_-KDrSIGiGrcLQZhzvA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712039805901, brokerId=10041}
partition: 12, offset: 432914, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=OSou-FH9S4ioH5LHYUxPMg}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712041682856, brokerId=10041}
partition: 12, offset: 432915, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=OSou-FH9S4ioH5LHYUxPMg}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712041683023, brokerId=10041}
partition: 12, offset: 433251, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=uSPj4yS5RIO1LuBHecl7iQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712043678337, brokerId=10041}
partition: 12, offset: 433252, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=uSPj4yS5RIO1LuBHecl7iQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712043678474, brokerId=10041}
partition: 12, offset: 433577, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=pcCt8vc-TUyRhHOGUJBHXg}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712045569214, brokerId=10041}
partition: 12, offset: 433578, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=pcCt8vc-TUyRhHOGUJBHXg}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712045569333, brokerId=10041}
partition: 12, offset: 433904, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=LFqoPyg0SOiHbscnV3Ahtw}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712047491475, brokerId=10041}
partition: 12, offset: 433905, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=LFqoPyg0SOiHbscnV3Ahtw}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712047491576, brokerId=10041}
partition: 12, offset: 434229, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=SSyS_hbXSEKD5rgbu1S8ug}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712049443915, brokerId=10041}
partition: 12, offset: 434230, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=SSyS_hbXSEKD5rgbu1S8ug}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712049444048, brokerId=10041}
partition: 12, offset: 434584, value: 

[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-04-16 Thread Francois Visconte (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837891#comment-17837891
 ] 

Francois Visconte commented on KAFKA-16511:
---

>The issue might be due to the overlapping remote log segments after a new 
>leader gets elected during rolling restart. Would you please upload the past 
>10 segments remote-log-segment metadata events for 
>5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765 partition? Thanks!

Here is 


{code:java}
partition: 27, offset: 400504, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=v8QykWvQQryMxNoS1aHMCQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712045518393, brokerId=10015}
partition: 27, offset: 400505, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=v8QykWvQQryMxNoS1aHMCQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712045518494, brokerId=10015}
partition: 27, offset: 400828, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=6K-bSNgxSnOypIrjl9OiGA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712047438507, brokerId=10015}
partition: 27, offset: 400829, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=6K-bSNgxSnOypIrjl9OiGA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712047438609, brokerId=10015}
partition: 27, offset: 401145, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=ErPfaCVhRiS6EKx7RnL1iQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712049418775, brokerId=10015}
partition: 27, offset: 401146, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=ErPfaCVhRiS6EKx7RnL1iQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712049418894, brokerId=10015}
partition: 27, offset: 401458, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=2yAT3R6tS3umrvU8iuMVfw}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712051340571, brokerId=10015}
partition: 27, offset: 401459, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=2yAT3R6tS3umrvU8iuMVfw}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712051340719, brokerId=10015}
partition: 27, offset: 401758, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=KLtWI-SWS7eBML87LX63SA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712053320735, brokerId=10015}
partition: 27, offset: 401759, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=KLtWI-SWS7eBML87LX63SA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712053320838, brokerId=10015}
partition: 27, offset: 539496, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=8dP13VDYSaiFlubl9SNBTQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712909568625, brokerId=10015}
partition: 27, offset: 539497, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765,
 id=8dP13VDYSaiFlubl9SNBTQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712909568846, brokerId=10015}
{code}


> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> 

[jira] [Assigned] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-16 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez reassigned KAFKA-16567:


Assignee: Walter Hernandez

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>Reporter: Walter Hernandez
>Assignee: Walter Hernandez
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-16 Thread Walter Hernandez (Jira)
Walter Hernandez created KAFKA-16567:


 Summary: Add New Stream Metrics based on KIP-869
 Key: KAFKA-16567
 URL: https://issues.apache.org/jira/browse/KAFKA-16567
 Project: Kafka
  Issue Type: Task
Reporter: Walter Hernandez
 Fix For: 4.0.0


Add the following metrics to the state updater:
 * restoring-active-tasks: count
 * restoring-standby-tasks: count
 * paused-active-tasks: count
 * paused-standby-tasks: count
 * idle-ratio: percentage
 * restore-ratio: percentage
 * checkpoint-ratio: percentage
 * restore-records-total: count
 * restore-records-rate: rate
 * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]

2024-04-16 Thread via GitHub


lianetm commented on PR #15738:
URL: https://github.com/apache/kafka/pull/15738#issuecomment-2059888686

   Hey @lucasbru, could you take a look at this test fix? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16336) Remove Deprecated metric standby-process-ratio

2024-04-16 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez reassigned KAFKA-16336:


Assignee: Walter Hernandez

> Remove Deprecated metric standby-process-ratio
> --
>
> Key: KAFKA-16336
> URL: https://issues.apache.org/jira/browse/KAFKA-16336
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Walter Hernandez
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Metric "standby-process-ratio" was deprecated in 3.5 release via 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16336) Remove Deprecated metric standby-process-ratio

2024-04-16 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez reassigned KAFKA-16336:


Assignee: (was: Walter Hernandez)

> Remove Deprecated metric standby-process-ratio
> --
>
> Key: KAFKA-16336
> URL: https://issues.apache.org/jira/browse/KAFKA-16336
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Metric "standby-process-ratio" was deprecated in 3.5 release via 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16511) Leaking tiered segments

2024-04-16 Thread Francois Visconte (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837867#comment-17837867
 ] 

Francois Visconte commented on KAFKA-16511:
---

Here is for example on one of the partition where I have the issue: 

{code}
_zIcSPWGzlqLjUTFfw}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712036488994, brokerId=10041}
partition: 12, offset: 432066, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=YTIk_zIcSPWGzlqLjUTFfw}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712036489094, brokerId=10041}
partition: 12, offset: 432319, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=8mQ_JJLfTQmkP80MYcN6Ig}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712038114012, brokerId=10041}
partition: 12, offset: 432320, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=8mQ_JJLfTQmkP80MYcN6Ig}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712038114110, brokerId=10041}
partition: 12, offset: 432593, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=oqr_-KDrSIGiGrcLQZhzvA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712039805803, brokerId=10041}
partition: 12, offset: 432594, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=oqr_-KDrSIGiGrcLQZhzvA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712039805901, brokerId=10041}
partition: 12, offset: 432914, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=OSou-FH9S4ioH5LHYUxPMg}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712041682856, brokerId=10041}
partition: 12, offset: 432915, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=OSou-FH9S4ioH5LHYUxPMg}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712041683023, brokerId=10041}
partition: 12, offset: 433251, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=uSPj4yS5RIO1LuBHecl7iQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712043678337, brokerId=10041}
partition: 12, offset: 433252, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=uSPj4yS5RIO1LuBHecl7iQ}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712043678474, brokerId=10041}
partition: 12, offset: 433577, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=pcCt8vc-TUyRhHOGUJBHXg}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712045569214, brokerId=10041}
partition: 12, offset: 433578, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=pcCt8vc-TUyRhHOGUJBHXg}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712045569333, brokerId=10041}
partition: 12, offset: 433904, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=LFqoPyg0SOiHbscnV3Ahtw}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712047491475, brokerId=10041}
partition: 12, offset: 433905, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=LFqoPyg0SOiHbscnV3Ahtw}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712047491576, brokerId=10041}
partition: 12, offset: 434229, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=SSyS_hbXSEKD5rgbu1S8ug}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712049443915, brokerId=10041}
partition: 12, offset: 434230, value: 
RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774,
 id=SSyS_hbXSEKD5rgbu1S8ug}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712049444048, brokerId=10041}
partition: 12, offset: 434584, value: 

Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15719:
URL: https://github.com/apache/kafka/pull/15719#discussion_r1567891428


##
core/src/main/java/kafka/server/builders/LogManagerBuilder.java:
##
@@ -55,6 +55,7 @@ public class LogManagerBuilder {
 private Time time = Time.SYSTEM;
 private boolean keepPartitionMetadataFile = true;
 private boolean remoteStorageSystemEnable = false;
+private long initialTaskDelayMs = LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS;

Review Comment:
   please add setter for it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]

2024-04-16 Thread via GitHub


lianetm opened a new pull request, #15738:
URL: https://github.com/apache/kafka/pull/15738

   Updating consumer system test that was failing with the new protocol, 
related to static membership behaviour. The behaviour regarding static 
consumers that join with conflicting group instance id is slightly different 
between the classic and new consumer protocol, so the expectations in the tests 
needed to be updated.  
   
   If static members join with same instance id:
   
   - Classic protocol: all members join the group with the same group instance 
id, and then the first one will eventually fail (receives a HB error with 
FencedInstanceIdException)
   
   - Consumer protocol: new member with an instance Id already in use is not 
able to join, and first member remains active (new member with same instance Id 
receives an UnreleasedInstanceIdException in the response to the HB to join the 
group)
   
   This PR is keeping the single parametrized test that existed before, given 
that what's being tested and part of the test itself apply to all protocols. 
This is just updating the expectations that are different, based on the 
protocol parameter.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15714:
URL: https://github.com/apache/kafka/pull/15714#discussion_r1567861049


##
storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class PartitionMetadataFileTest  {
+private final File dir = TestUtils.tempDirectory();
+
+@Test
+public void testSetRecordWithDifferentTopicId() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+Uuid differentTopicId = Uuid.randomUuid();
+assertThrows(InconsistentTopicIdException.class, () -> 
partitionMetadataFile.record(differentTopicId));
+}
+
+@Test
+public void testSetRecordWithSameTopicId() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+}
+
+@Test
+public void testMaybeFlushWithTopicIdPresent() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+assertDoesNotThrow(partitionMetadataFile::maybeFlush);
+
+assertDoesNotThrow(() -> {
+List lines = Files.readAllLines(file.toPath());
+assertEquals(2, lines.size());
+assertEquals("version: 0", lines.get(0));
+assertEquals("topic_id: " + topicId, lines.get(1));
+});
+}
+
+@Test
+public void testMaybeFlushWithNoTopicIdPresent() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+
+assertDoesNotThrow(partitionMetadataFile::maybeFlush);
+assertEquals(0, file.length());
+}
+
+@Test
+public void testRead() {
+File file = PartitionMetadataFile.newFile(dir);
+LogDirFailureChannel channel = 
Mockito.mock(LogDirFailureChannel.class);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, channel);
+
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));

Review Comment:
   >I used assertDoesNotThrow is because we expect the first time to record the 
topicId shouldn't generate any exception, since there's no dirtyTopicIdOpt
   
   I agree to that. My point was `assertDoesNotThrow` converts the checked 
exception to unchecked exception, and it is useful when we run assert in the 
lambda. However, in those test case we can add the exception to method 
signature, and just let it fail. 
   
   In short, `assertDoesNotThrow` in those test cases is redundant wrapper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1567855363


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -391,13 +391,20 @@ void enqueueMetadataChangeEvent(
 
 // Events handled by Migration Driver.
 abstract class MigrationEvent implements EventQueue.Event {
+// Use no-op handler by default because the handleException will be 
overridden if needed
+private Consumer retryHandler = NO_OP_HANDLER;
+
+public void retryHandler(Consumer retryHandler) {
+this.retryHandler = retryHandler;
+}

Review Comment:
   Also, should we call `wakeup` (run next poll ASAP)  rather that 
`scheduleDeferred` if the exception is retryable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1567853719


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -391,13 +391,20 @@ void enqueueMetadataChangeEvent(
 
 // Events handled by Migration Driver.
 abstract class MigrationEvent implements EventQueue.Event {
+// Use no-op handler by default because the handleException will be 
overridden if needed
+private Consumer retryHandler = NO_OP_HANDLER;
+
+public void retryHandler(Consumer retryHandler) {
+this.retryHandler = retryHandler;
+}

Review Comment:
   +1 to this style!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16336) Remove Deprecated metric standby-process-ratio

2024-04-16 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez reassigned KAFKA-16336:


Assignee: Walter Hernandez

> Remove Deprecated metric standby-process-ratio
> --
>
> Key: KAFKA-16336
> URL: https://issues.apache.org/jira/browse/KAFKA-16336
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Walter Hernandez
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Metric "standby-process-ratio" was deprecated in 3.5 release via 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-16 Thread via GitHub


junrao commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1567796843


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   Thanks for the detailed explanation.
   
   For the `makeLeaders` path, it will call 
`UnifiedLog.convertToOffsetMetadataOrThrow`. Within it, 
`checkLogStartOffset(offset)` shouldn't throw OFFSET_OUT_OF_RANGE since we are 
comparing the offset with logStartOffset. Do you know which part throws 
OFFSET_OUT_OF_RANGE error?
   
   For the follower fetch path, it's bounded by `LogEndOffset`. So it shouldn't 
need to call `UnifiedLog.fetchHighWatermarkMetadata`, right? The regular 
consumer will call `UnifiedLog.fetchHighWatermarkMetadata`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]

2024-04-16 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1567823511


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"1000", "1"})
+private int memberCount;
+
+@Param({"10", "50"})
+private int partitionsPerTopicCount;
+
+@Param({"100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int numberOfRacks = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec(topicMetadata);
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new HashMap<>();
+Map> partitionRacks = 

[jira] [Comment Edited] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826
 ] 

Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:59 PM:
-

Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new protocol/consumer. You're on the right page regarding the rest: Streams 
tests haven't been migrated yet because it's not integrated with the new 
protocol. 

Classic protocol refers to the existing group protocol, and Consumer protocol 
refers to the new one introduced with KIP-848 (just using the names proposed to 
be used in the configs to switch between both)


was (Author: JIRAUSER300183):
Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

Classic protocol refers to the existing group protocol, and Consumer protocol 
refers to the new one introduced with KIP-848 (just using the names proposed to 
be used in the configs to switch between both)

> Update consumer static membership fencing system test to support new protocol
> -
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]

2024-04-16 Thread via GitHub


kirktrue opened a new pull request, #15737:
URL: https://github.com/apache/kafka/pull/15737

   Checking that the `TopicPartition` is in assignment before attempting to 
remove it.
   
   Also added some logging and refactoring.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]

2024-04-16 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1567815640


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/TargetAssignmentBuilderBenchmark.java:
##
@@ -0,0 +1,259 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.VersionedMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TargetAssignmentBuilderBenchmark {
+
+@Param({"1000", "1"})
+private int memberCount;
+
+@Param({"10", "50"})
+private int partitionsPerTopicCount;
+
+@Param({"1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isRackAware;

Review Comment:
   OKay let's use the uniform assignor with homogenous subscriptions and not 
rack aware
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]

2024-04-16 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1567812184


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;
+
+private PartitionAssignor partitionAssignor;
+
+private final int numberOfRacks = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = new HashMap<>();
+Map> partitionRacks = isRackAware ?
+mkMapOfPartitionRacks(partitionsPerTopicCount) :
+Collections.emptyMap();
+
+for (int i = 1; i <= topicCount; i++) {
+Uuid topicUuid = Uuid.randomUuid();
+String topicName = "topic" + i;
+topicMetadata.put(topicUuid, new TopicMetadata(
+topicUuid, topicName, partitionsPerTopicCount, 
partitionRacks));
+}
+
+addTopicSubscriptions(topicMetadata);
+this.subscribedTopicDescriber = new 
SubscribedTopicMetadata(topicMetadata);
+
+if (isRangeAssignor) {
+this.partitionAssignor = new RangeAssignor();
+} else {
+this.partitionAssignor = new UniformAssignor();
+}
+
+if (isReassignment) {
+GroupAssignment initialAssignment = 
partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+Map members;
+
+members = initialAssignment.members();
+
+// Update the AssignmentSpec with the results from the initial 
assignment.
+Map updatedMembers = new HashMap<>();
+
+members.forEach((memberId, memberAssignment) -> {
+AssignmentMemberSpec memberSpec = 
assignmentSpec.members().get(memberId);
+updatedMembers.put(memberId, new AssignmentMemberSpec(
+memberSpec.instanceId(),
+memberSpec.rackId(),
+memberSpec.subscribedTopicIds(),
+memberAssignment.targetPartitions()
+));
+});
+
+// Add new member to trigger a reassignment.
+Optional rackId = isRackAware ? Optional.of("rack" + 
(memberCount + 1) % numberOfRacks) : Optional.empty();
+
+updatedMembers.put("newMember", new AssignmentMemberSpec(
+Optional.empty(),
+rackId,
+topicMetadata.keySet(),
+Collections.emptyMap()
+ 

Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]

2024-04-16 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1567812184


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;
+
+private PartitionAssignor partitionAssignor;
+
+private final int numberOfRacks = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = new HashMap<>();
+Map> partitionRacks = isRackAware ?
+mkMapOfPartitionRacks(partitionsPerTopicCount) :
+Collections.emptyMap();
+
+for (int i = 1; i <= topicCount; i++) {
+Uuid topicUuid = Uuid.randomUuid();
+String topicName = "topic" + i;
+topicMetadata.put(topicUuid, new TopicMetadata(
+topicUuid, topicName, partitionsPerTopicCount, 
partitionRacks));
+}
+
+addTopicSubscriptions(topicMetadata);
+this.subscribedTopicDescriber = new 
SubscribedTopicMetadata(topicMetadata);
+
+if (isRangeAssignor) {
+this.partitionAssignor = new RangeAssignor();
+} else {
+this.partitionAssignor = new UniformAssignor();
+}
+
+if (isReassignment) {
+GroupAssignment initialAssignment = 
partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+Map members;
+
+members = initialAssignment.members();
+
+// Update the AssignmentSpec with the results from the initial 
assignment.
+Map updatedMembers = new HashMap<>();
+
+members.forEach((memberId, memberAssignment) -> {
+AssignmentMemberSpec memberSpec = 
assignmentSpec.members().get(memberId);
+updatedMembers.put(memberId, new AssignmentMemberSpec(
+memberSpec.instanceId(),
+memberSpec.rackId(),
+memberSpec.subscribedTopicIds(),
+memberAssignment.targetPartitions()
+));
+});
+
+// Add new member to trigger a reassignment.
+Optional rackId = isRackAware ? Optional.of("rack" + 
(memberCount + 1) % numberOfRacks) : Optional.empty();
+
+updatedMembers.put("newMember", new AssignmentMemberSpec(
+Optional.empty(),
+rackId,
+topicMetadata.keySet(),
+Collections.emptyMap()
+ 

Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]

2024-04-16 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1567812184


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;
+
+private PartitionAssignor partitionAssignor;
+
+private final int numberOfRacks = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = new HashMap<>();
+Map> partitionRacks = isRackAware ?
+mkMapOfPartitionRacks(partitionsPerTopicCount) :
+Collections.emptyMap();
+
+for (int i = 1; i <= topicCount; i++) {
+Uuid topicUuid = Uuid.randomUuid();
+String topicName = "topic" + i;
+topicMetadata.put(topicUuid, new TopicMetadata(
+topicUuid, topicName, partitionsPerTopicCount, 
partitionRacks));
+}
+
+addTopicSubscriptions(topicMetadata);
+this.subscribedTopicDescriber = new 
SubscribedTopicMetadata(topicMetadata);
+
+if (isRangeAssignor) {
+this.partitionAssignor = new RangeAssignor();
+} else {
+this.partitionAssignor = new UniformAssignor();
+}
+
+if (isReassignment) {
+GroupAssignment initialAssignment = 
partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+Map members;
+
+members = initialAssignment.members();
+
+// Update the AssignmentSpec with the results from the initial 
assignment.
+Map updatedMembers = new HashMap<>();
+
+members.forEach((memberId, memberAssignment) -> {
+AssignmentMemberSpec memberSpec = 
assignmentSpec.members().get(memberId);
+updatedMembers.put(memberId, new AssignmentMemberSpec(
+memberSpec.instanceId(),
+memberSpec.rackId(),
+memberSpec.subscribedTopicIds(),
+memberAssignment.targetPartitions()
+));
+});
+
+// Add new member to trigger a reassignment.
+Optional rackId = isRackAware ? Optional.of("rack" + 
(memberCount + 1) % numberOfRacks) : Optional.empty();
+
+updatedMembers.put("newMember", new AssignmentMemberSpec(
+Optional.empty(),
+rackId,
+topicMetadata.keySet(),
+Collections.emptyMap()
+ 

[jira] [Commented] (KAFKA-15250) ConsumerNetworkThread is running tight loop

2024-04-16 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837836#comment-17837836
 ] 

Philip Nee commented on KAFKA-15250:


Added a testing branch under my fork: network-thread-mbean-metrics

 

I ran the testAsyncCommit in the integration test (because it actually polls).  
These are the stats:

backoff time avg:0.29459381324082107
(backoff time max,92.0)
poll time avg:9.2161365261735107E18
(poll time max,9.223372036854776E18)

 

The loop doesn't really backoff.  

> ConsumerNetworkThread is running tight loop
> ---
>
> Key: KAFKA-15250
> URL: https://issues.apache.org/jira/browse/KAFKA-15250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, events
> Fix For: 3.8.0
>
>
> The DefaultBackgroundThread is running tight loops and wasting CPU cycles.  I 
> think we need to reexamine the timeout pass to networkclientDelegate.poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16566:
---
Summary: Update consumer static membership fencing system test to support 
new protocol  (was: Update static membership fencing system test to support new 
protocol)

> Update consumer static membership fencing system test to support new protocol
> -
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]

2024-04-16 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1567794140


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/AssignPartitionsMicroBenchmark.java:
##
@@ -0,0 +1,153 @@
+package org.apache.kafka.jmh.group_coordinator;

Review Comment:
   okay yeah I'll do that
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16566) Update static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826
 ] 

Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:34 PM:
-

Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

Classic protocol refers to the existing group protocol, and Consumer protocol 
refers to the new one introduced with KIP-848 (just using the names proposed to 
be used in the configs to switch between both)


was (Author: JIRAUSER300183):
Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

> Update static membership fencing system test to support new protocol
> 
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-16 Thread via GitHub


kirktrue commented on PR #15723:
URL: https://github.com/apache/kafka/pull/15723#issuecomment-2059706910

   @lianetm & @lucasbru—I've addressed your comments, and this is ready for 
another review. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-16 Thread via GitHub


kirktrue commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1567791850


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java:
##
@@ -66,6 +67,7 @@ public RequestState(final LogContext logContext,
  * and the backoff is restored to its minimal configuration.
  */
 public void reset() {
+this.requestInFlight = false;
 this.lastSentMs = -1;

Review Comment:
   I've removed `lastSentMs` as it is no longer needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16566) Update static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826
 ] 

Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:31 PM:
-

Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 


was (Author: JIRAUSER300183):
Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that was parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

> Update static membership fencing system test to support new protocol
> 
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16566) Update static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826
 ] 

Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:30 PM:
-

Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that was parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 


was (Author: JIRAUSER300183):
Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

> Update static membership fencing system test to support new protocol
> 
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16566) Update static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826
 ] 

Lianet Magrans commented on KAFKA-16566:


Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

> Update static membership fencing system test to support new protocol
> 
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16566) Update static membership fencing system test to support new protocol

2024-04-16 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837820#comment-17837820
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16566:


I'm a bit confused – I thought we haven't migrated Streams over to the new 
consumer rebalancing protocol. Or is this referring to something else? What is 
the "classic" vs "consumer" protocol? And when/why did we migrate our system 
tests to using it? Does it have to do with static membership specifically?

 

Sorry for being out of the loop here

> Update static membership fencing system test to support new protocol
> 
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16566) Update static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16566:
--

 Summary: Update static membership fencing system test to support 
new protocol
 Key: KAFKA-16566
 URL: https://issues.apache.org/jira/browse/KAFKA-16566
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Lianet Magrans
Assignee: Lianet Magrans
 Fix For: 3.8.0


consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
that verifies the sequence in which static members join a group when using 
conflicting instance id. This behaviour is different in the classic and 
consumer protocol, so the tests should be updated to set the right expectations 
when running with the new consumer protocol. Note that what the tests covers 
(params, setup), apply to both protocols. It is the expected results that are 
not the same. 

When conflicts between static members joining a group:

Classic protocol: all members join the group with the same group instance id, 
and then the first one will eventually receive a HB error with 
FencedInstanceIdException

Consumer protocol: new member with an instance Id already in use is not able to 
join, receiving an UnreleasedInstanceIdException in the response to the HB to 
join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Remove unneeded explicit type arguments [kafka]

2024-04-16 Thread via GitHub


mimaison opened a new pull request, #15736:
URL: https://github.com/apache/kafka/pull/15736

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-16 Thread via GitHub


OmniaGM commented on PR #15569:
URL: https://github.com/apache/kafka/pull/15569#issuecomment-2059643602

   > Let's just let Omnia decides which PR she prefers doing first. Then we can 
all review that PR, and once merged move onto the next one. This would avoid 
unnecessary churn as I expect she's getting tired of rebasing all of these PRs 
every day!
   
   Thanks @mimaison this is really thoughtful! breaking KafkaConfig out of core 
is one of the hardest cleanup specially if we want to move with smaller/medium 
PRs. So I don't see any other way than going through it. I think it might be 
better to take them in order so we can get them merged and the rebases can be 
easier. 
   Let's merge https://github.com/apache/kafka/pull/15684 as
   -  it is smaller 
   -  more contained than the log config 
   - and group configs are more likely to change as part of KIP-848 and any of 
its followups which make the conflicts much harder and risker to get messed up. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16475 add TopicImageNodeTest [kafka]

2024-04-16 Thread via GitHub


cmccabe commented on PR #15735:
URL: https://github.com/apache/kafka/pull/15735#issuecomment-2059637927

   Thanks, @mannoopj. Since this adds some new test cases beyond what was in 
#15720,  can you rebase your PR on trunk?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]

2024-04-16 Thread via GitHub


mwesterby commented on PR #15733:
URL: https://github.com/apache/kafka/pull/15733#issuecomment-2059603416

   @chia7712 Thanks again for the review. I've implemented all of your 
suggested. Please let me know if you spot anything else I've missed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]

2024-04-16 Thread via GitHub


mwesterby commented on code in PR #15733:
URL: https://github.com/apache/kafka/pull/15733#discussion_r1567721679


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -440,8 +440,12 @@ object StorageTool extends Logging {
 "Use --ignore-formatted to ignore this directory and format the 
others.")
 }
 if (!copier.errorLogDirs().isEmpty) {
-  val firstLogDir = copier.errorLogDirs().iterator().next()
-  throw new TerseFailure(s"I/O error trying to read log directory 
$firstLogDir.")
+  copier.errorLogDirs().forEach(errorLogDir => {
+stream.println(s"I/O error trying to read log directory $errorLogDir. 
Ignoring...")
+  })
+  if(metaPropertiesEnsemble.emptyLogDirs().isEmpty) {
+throw new TerseFailure("No available log directories to format.")

Review Comment:
   Discussed below. I've now added the additional check to see if `logDirProps` 
is empty, and only throw this failure when this is also true. This is to 
prevent the format storage script from failing when the only remaining 
directories to be formatted are unavailable, but there is at least one 
directory already formatted, and the script is ran with `--ignore-formatted`.
   
   In this scenario, it should succeed as there is at least one successfully 
formatted directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]

2024-04-16 Thread via GitHub


mwesterby commented on code in PR #15733:
URL: https://github.com/apache/kafka/pull/15733#discussion_r1567721679


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -440,8 +440,12 @@ object StorageTool extends Logging {
 "Use --ignore-formatted to ignore this directory and format the 
others.")
 }
 if (!copier.errorLogDirs().isEmpty) {
-  val firstLogDir = copier.errorLogDirs().iterator().next()
-  throw new TerseFailure(s"I/O error trying to read log directory 
$firstLogDir.")
+  copier.errorLogDirs().forEach(errorLogDir => {
+stream.println(s"I/O error trying to read log directory $errorLogDir. 
Ignoring...")
+  })
+  if(metaPropertiesEnsemble.emptyLogDirs().isEmpty) {
+throw new TerseFailure("No available log directories to format.")

Review Comment:
   Discussed below. I've now added the additional check to see if `logDirProps` 
is empty, and only throw this failure when this is also true. This is to 
prevent the format storage script from failing when then only remaining 
directories to be formatted are unavailable, but there is at least one 
directory already formatted, and the script is ran with `--ignore-formatted`.
   
   In this scenario, it should succeed as there is at least one successfully 
formatted directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16565) IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned

2024-04-16 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16565:
--
Labels: consumer-threading-refactor kip-848-client-support system-tests  
(was: )

> IncrementalAssignmentConsumerEventHandler throws error when attempting to 
> remove a partition that isn't assigned
> 
>
> Key: KAFKA-16565
> URL: https://issues.apache.org/jira/browse/KAFKA-16565
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.8.0
>
>
> In {{{}verifiable_consumer.py{}}}, the Incremental
>  
> {code:java}
> def handle_partitions_revoked(self, event):
> self.revoked_count += 1
> self.state = ConsumerState.Rebalancing
> self.position = {}
> for topic_partition in event["partitions"]:
> topic = topic_partition["topic"]
> partition = topic_partition["partition"]
> self.assignment.remove(TopicPartition(topic, partition))
>  {code}
> If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that 
> isn't in the list, an error is thrown. For now, we should first check that 
> the {{TopicPartition}} is in the list, and if not, log a warning or something.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16565) IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned

2024-04-16 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16565:
-

Assignee: Kirk True

> IncrementalAssignmentConsumerEventHandler throws error when attempting to 
> remove a partition that isn't assigned
> 
>
> Key: KAFKA-16565
> URL: https://issues.apache.org/jira/browse/KAFKA-16565
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.8.0
>
>
> In {{{}verifiable_consumer.py{}}}, the Incremental
>  
> {code:java}
> def handle_partitions_revoked(self, event):
> self.revoked_count += 1
> self.state = ConsumerState.Rebalancing
> self.position = {}
> for topic_partition in event["partitions"]:
> topic = topic_partition["topic"]
> partition = topic_partition["partition"]
> self.assignment.remove(TopicPartition(topic, partition))
>  {code}
> If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that 
> isn't in the list, an error is thrown. For now, we should first check that 
> the {{TopicPartition}} is in the list, and if not, log a warning or something.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16565) IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned

2024-04-16 Thread Kirk True (Jira)
Kirk True created KAFKA-16565:
-

 Summary: IncrementalAssignmentConsumerEventHandler throws error 
when attempting to remove a partition that isn't assigned
 Key: KAFKA-16565
 URL: https://issues.apache.org/jira/browse/KAFKA-16565
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.8.0
Reporter: Kirk True
 Fix For: 3.8.0


In {{{}verifiable_consumer.py{}}}, the Incremental

 
{code:java}
def handle_partitions_revoked(self, event):
self.revoked_count += 1
self.state = ConsumerState.Rebalancing
self.position = {}
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
self.assignment.remove(TopicPartition(topic, partition))
 {code}
If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that 
isn't in the list, an error is thrown. For now, we should first check that the 
{{TopicPartition}} is in the list, and if not, log a warning or something.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]

2024-04-16 Thread via GitHub


mwesterby commented on code in PR #15733:
URL: https://github.com/apache/kafka/pull/15733#discussion_r1567721679


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -440,8 +440,12 @@ object StorageTool extends Logging {
 "Use --ignore-formatted to ignore this directory and format the 
others.")
 }
 if (!copier.errorLogDirs().isEmpty) {
-  val firstLogDir = copier.errorLogDirs().iterator().next()
-  throw new TerseFailure(s"I/O error trying to read log directory 
$firstLogDir.")
+  copier.errorLogDirs().forEach(errorLogDir => {
+stream.println(s"I/O error trying to read log directory $errorLogDir. 
Ignoring...")
+  })
+  if(metaPropertiesEnsemble.emptyLogDirs().isEmpty) {
+throw new TerseFailure("No available log directories to format.")

Review Comment:
   Discussed below. I've now added the additional check to see if `logDirProps` 
is empty, and only throw this failure when this is also true. This is to 
prevent the format storage script from failing when there is at least one 
directory already formatted, and the only other non-formatted directories are 
unavailable, when it is ran with `--ignore-formatted`.
   
   In this scenario, it should succeed as there is at least one successfully 
formatted directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]

2024-04-16 Thread via GitHub


mwesterby commented on code in PR #15733:
URL: https://github.com/apache/kafka/pull/15733#discussion_r1567721679


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -440,8 +440,12 @@ object StorageTool extends Logging {
 "Use --ignore-formatted to ignore this directory and format the 
others.")
 }
 if (!copier.errorLogDirs().isEmpty) {
-  val firstLogDir = copier.errorLogDirs().iterator().next()
-  throw new TerseFailure(s"I/O error trying to read log directory 
$firstLogDir.")
+  copier.errorLogDirs().forEach(errorLogDir => {
+stream.println(s"I/O error trying to read log directory $errorLogDir. 
Ignoring...")
+  })
+  if(metaPropertiesEnsemble.emptyLogDirs().isEmpty) {
+throw new TerseFailure("No available log directories to format.")

Review Comment:
   Discussed below. I've now added the additional check to see if `logDirProps` 
is empty, and only throw this failure when this is also true. This is to 
prevent the format storage script from failing when there is at least one 
directory already formatted, and the only other non-formatted directories are 
unavailable, when it is ran with `--ignore-formatted`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-16 Thread via GitHub


kirktrue commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1567722373


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java:
##
@@ -48,4 +50,51 @@ public void testRequestStateSimple() {
 state.reset();
 assertTrue(state.canSendRequest(200));
 }
+
+@Test
+public void testTrackInflightOnSuccessfulAttempt() {
+testTrackInflight(RequestState::onSuccessfulAttempt);
+}
+
+@Test
+public void testTrackInflightOnFailedAttempt() {
+testTrackInflight(RequestState::onFailedAttempt);
+}
+
+/**
+ * In some cases, the network layer is very fast and can send out 
a second request within the same
+ * millisecond timestamp as receiving the first response.
+ *
+ * 
+ *
+ * The previous logic for tracking inflight status used timestamps: if the 
timestamp from the last received
+ * response was less than the timestamp from the last sent 
request, we'd interpret that as having an
+ * inflight request. However, this approach would incorrectly return 
false from
+ * {@link RequestState#requestInFlight()} if the two timestamps were 
equal.
+ */

Review Comment:
   I figure the existing PR description covers the basics. I'll just remove the 
test comment wholesale.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]

2024-04-16 Thread via GitHub


mwesterby commented on code in PR #15733:
URL: https://github.com/apache/kafka/pull/15733#discussion_r1567721679


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -440,8 +440,12 @@ object StorageTool extends Logging {
 "Use --ignore-formatted to ignore this directory and format the 
others.")
 }
 if (!copier.errorLogDirs().isEmpty) {
-  val firstLogDir = copier.errorLogDirs().iterator().next()
-  throw new TerseFailure(s"I/O error trying to read log directory 
$firstLogDir.")
+  copier.errorLogDirs().forEach(errorLogDir => {
+stream.println(s"I/O error trying to read log directory $errorLogDir. 
Ignoring...")
+  })
+  if(metaPropertiesEnsemble.emptyLogDirs().isEmpty) {
+throw new TerseFailure("No available log directories to format.")

Review Comment:
   Discussed below. I've now added the additional check to see if `logDirProps` 
is empty, and only throw this failure when this is also true. This is to 
prevent the format storage script from failing when there is at least one 
directory already formatted, and the only other directories are unavailable, 
when it is ran with `--ignore-formatted`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16559) allow defining number of disks per broker in TestKitNodes

2024-04-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16559:
--

Assignee: Gaurav Narula  (was: Chia-Ping Tsai)

> allow defining number of disks per broker in TestKitNodes
> -
>
> Key: KAFKA-16559
> URL: https://issues.apache.org/jira/browse/KAFKA-16559
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Gaurav Narula
>Priority: Major
>
> from: https://github.com/apache/kafka/pull/15136#discussion_r1565571409
> That allow us to run the reassignment tests.
> Also, we should enhance setNumBrokerNodes 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/TestKitNodes.java#L81)
>  to accept extra argument to define the number of folders (by 
> setLogDirectories)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16559) allow defining number of disks per broker in TestKitNodes

2024-04-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16559:
---
Summary: allow defining number of disks per broker in TestKitNodes  (was: 
Support to define the number of data folders in ClusterTest)

> allow defining number of disks per broker in TestKitNodes
> -
>
> Key: KAFKA-16559
> URL: https://issues.apache.org/jira/browse/KAFKA-16559
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> from: https://github.com/apache/kafka/pull/15136#discussion_r1565571409
> That allow us to run the reassignment tests.
> Also, we should enhance setNumBrokerNodes 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/TestKitNodes.java#L81)
>  to accept extra argument to define the number of folders (by 
> setLogDirectories)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16559: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-16 Thread via GitHub


gaurav-narula commented on code in PR #15730:
URL: https://github.com/apache/kafka/pull/15730#discussion_r1567713128


##
core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class KafkaClusterTestKitTest {
+@Test
+public void testCreateClusterWithNoDisksThrows() {
+try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(

Review Comment:
    Using `assertThrowsExactly` instead to avoid the equality check on 
`RuntimeException.class`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16475: add test for TopicImageNodeTest [kafka]

2024-04-16 Thread via GitHub


cmccabe merged PR #15720:
URL: https://github.com/apache/kafka/pull/15720


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16475: add test for TopicImageNodeTest [kafka]

2024-04-16 Thread via GitHub


cmccabe commented on PR #15720:
URL: https://github.com/apache/kafka/pull/15720#issuecomment-2059575247

   Thanks, @johnnychhsu 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16559: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15730:
URL: https://github.com/apache/kafka/pull/15730#discussion_r1567703623


##
core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class KafkaClusterTestKitTest {
+@Test
+public void testCreateClusterWithNoDisksThrows() {
+try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(

Review Comment:
   We can use `assertThrows` to rewrite it. For example:
   ```java
   Exception e = assertThrows(Exception.class, () -> new 
KafkaClusterTestKit.Builder(
   new TestKitNodes.Builder().
   setBrokerNodes(1, 0).
   setNumControllerNodes(1).build()));
   assertEquals(RuntimeException.class, e.getClass());
   assertEquals("Invalid value for disksPerBroker", e.getMessage());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16559: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15730:
URL: https://github.com/apache/kafka/pull/15730#discussion_r1567700594


##
core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import org.hamcrest.Matcher;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.nio.file.Paths;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class KafkaClusterTestKitTest {
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) 
{
+try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+new TestKitNodes.Builder().
+setBrokerNodes(5, 2).
+setCombined(combined).
+setNumControllerNodes(3).build()).build()) {
+
+assertEquals(5, cluster.nodes().brokerNodes().size());
+assertEquals(3, cluster.nodes().controllerNodes().size());
+
+cluster.nodes().brokerNodes().forEach((brokerId, node) -> {
+assertEquals(2, node.logDataDirectories().size());
+Matcher> matcher = 
containsInAnyOrder(String.format("broker_%d_data0", brokerId), 
String.format("broker_%d_data1", brokerId));

Review Comment:
   > is there any reason not to use hamcrest in core when we do use it in other 
modules?
   
   I'm curious. Maybe our modules have different tastes of code style :smile:



##
core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import org.hamcrest.Matcher;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.nio.file.Paths;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class KafkaClusterTestKitTest {
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) 
{
+try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+new TestKitNodes.Builder().
+setBrokerNodes(5, 2).
+setCombined(combined).
+setNumControllerNodes(3).build()).build()) {
+
+assertEquals(5, cluster.nodes().brokerNodes().size());
+assertEquals(3, cluster.nodes().controllerNodes().size());
+
+cluster.nodes().brokerNodes().forEach((brokerId, node) -> {
+assertEquals(2, node.logDataDirectories().size());
+Matcher> matcher = 
containsInAnyOrder(String.format("broker_%d_data0", brokerId), 
String.format("broker_%d_data1", brokerId));

Review Comment:
   > is there any reason not to use hamcrest in core when we do use it in other 
modules?
   

Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]

2024-04-16 Thread via GitHub


mwesterby commented on code in PR #15733:
URL: https://github.com/apache/kafka/pull/15733#discussion_r1567699963


##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -192,6 +192,66 @@ Found problem:
 } finally Utils.delete(tempDir)
   }
 
+  private def runFormatCommand(stream: ByteArrayOutputStream, directories: 
Seq[String]): Int = {
+val metaProperties = new MetaProperties.Builder().
+  setVersion(MetaPropertiesVersion.V1).
+  setClusterId("XcZZOzUqS4yHOjhMQB6JLQ").
+  setNodeId(2).
+  build()
+val bootstrapMetadata = 
StorageTool.buildBootstrapMetadata(MetadataVersion.latestTesting(), None, "test 
format command")
+StorageTool.formatCommand(new PrintStream(stream), directories, 
metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), 
ignoreFormatted = false)
+  }
+
+  @Test
+  def testFormatSucceedsIfAllDirectoriesAreAvailable(): Unit = {
+val availableDir1 = TestUtils.tempDir()
+val availableDir2 = TestUtils.tempDir()
+try {
+  val stream = new ByteArrayOutputStream()
+  assertEquals(0, runFormatCommand(stream, Seq(availableDir1.toString, 
availableDir2.toString)))
+  assertTrue(stream.toString().contains("Formatting 
%s".format(availableDir1)))
+  assertTrue(stream.toString().contains("Formatting 
%s".format(availableDir2)))
+} finally {
+  Utils.delete(availableDir1)
+  Utils.delete(availableDir2)
+}
+  }
+
+  @Test
+  def testFormatSucceedsIfAtLeastOneDirectoryIsAvailable(): Unit = {
+val availableDir1 = TestUtils.tempDir()
+val unavailableDir1 = TestUtils.tempFile()
+try {
+  val stream = new ByteArrayOutputStream()
+  assertEquals(0, runFormatCommand(stream, Seq(availableDir1.toString, 
unavailableDir1.toString)))
+  assertTrue(stream.toString().contains("I/O error trying to read log 
directory %s. Ignoring...".format(unavailableDir1)))
+  assertTrue(stream.toString().contains("Formatting 
%s".format(availableDir1)))
+  assertFalse(stream.toString().contains("Formatting 
%s".format(unavailableDir1)))
+} finally {
+  Utils.delete(availableDir1)
+  Utils.delete(unavailableDir1)
+}
+  }
+
+  @Test
+  def testFormatFailsIfAllDirectoriesAreUnavailable(): Unit = {
+val unavailableDir1 = TestUtils.tempFile()
+val unavailableDir2 = TestUtils.tempFile()
+try {
+  val stream = new ByteArrayOutputStream()
+  try {
+assertEquals(1, runFormatCommand(stream, Seq(unavailableDir1.toString, 
unavailableDir2.toString)))
+  } catch {
+case e: TerseFailure => assertEquals("No available log directories to 
format.", e.getMessage)

Review Comment:
   Much cleaner, I'll refactor this assertion too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16555) Consumer's RequestState has incorrect logic to determine if inflight

2024-04-16 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16555:
--
Issue Type: Bug  (was: Task)

> Consumer's RequestState has incorrect logic to determine if inflight
> 
>
> Key: KAFKA-16555
> URL: https://issues.apache.org/jira/browse/KAFKA-16555
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> When running system tests for the new consumer, I've hit an issue where the 
> {{HeartbeatRequestManager}} is sending out multiple concurrent 
> {{CONSUMER_GROUP_REQUEST}} RPCs. The effect is the coordinator creates 
> multiple members which causes downstream assignment problems.
> Here's the order of events:
> * Time 202: {{HearbeatRequestManager.poll()}} determines it's OK to send a 
> request. In so doing, it updates the {{RequestState}}'s {{lastSentMs}} to the 
> current timestamp, 202
> * Time 236: the response is received and response handler is invoked, setting 
> the {{RequestState}}'s {{lastReceivedMs}} to the current timestamp, 236
> * Time 236: {{HearbeatRequestManager.poll()}} is invoked again, and it sees 
> that it's OK to send a request. It creates another request, once again 
> updating the {{RequestState}}'s {{lastSentMs}} to the current timestamp, 236
> * Time 237:  {{HearbeatRequestManager.poll()}} is invoked again, and 
> ERRONEOUSLY decides it's OK to send another request, despite one already in 
> flight.
> Here's the problem with {{requestInFlight()}}:
> {code:java}
> public boolean requestInFlight() {
> return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs;
> }
> {code}
> On our case, {{lastReceivedMs}} is 236 and {{lastSentMs}} is _also_ 236. So 
> the received timestamp is _equal_ to the sent timestamp, not _less_.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >