Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-04 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1590215434


##
core/src/test/java/kafka/test/ClusterInstance.java:
##
@@ -145,4 +152,21 @@ default Admin createAdminClient() {
 void startBroker(int brokerId);
 
 void waitForReadyBrokers() throws InterruptedException;
+
+default Set supportedGroupProtocols() {

Review Comment:
   It would be great to remove unmodified collection. Also, we need to add 
comments to explain the reason of checking two configs.
   ```java
   default Set supportedGroupProtocols() {
   Set supportedGroupProtocols = new HashSet<>();
   supportedGroupProtocols.add(CLASSIC);
   // KafkaConfig#isNewGroupCoordinatorEnabled check both 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG and 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
   if 
(config().serverProperties().getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
"").equals("true") ||
   
config().serverProperties().getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "").contains("consumer")) {
   supportedGroupProtocols.add(CONSUMER);
   }
   return Collections.unmodifiableSet(supportedGroupProtocols);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-04 Thread via GitHub


chia7712 commented on PR #15766:
URL: https://github.com/apache/kafka/pull/15766#issuecomment-2094642579

   
https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java#L433
   
   @frankvicky  Could you change `state.state` to `state.state.name()` for same 
output.  Also, please fix the build error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14785: Connect offset read REST API [kafka]

2024-05-04 Thread via GitHub


chia7712 commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1590212654


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##
@@ -141,24 +145,13 @@ private static String noClientId() {
 protected KafkaBasedLog offsetLog;
 // Visible for testing
 final HashMap data = new HashMap<>();
+private final Map>> connectorPartitions = 
new HashMap<>();
+private Converter keyConverter;
 private final Supplier topicAdminSupplier;
 private final Supplier clientIdBase;
 private SharedTopicAdmin ownTopicAdmin;
 protected boolean exactlyOnce;
 
-/**
- * Create an {@link OffsetBackingStore} backed by a Kafka topic. This 
constructor will cause the
- * store to instantiate and close its own {@link TopicAdmin} during {@link 
#configure(WorkerConfig)}
- * and {@link #stop()}, respectively.
- *
- * @deprecated use {@link #KafkaOffsetBackingStore(Supplier, Supplier)} 
instead
- */
-@Deprecated
-public KafkaOffsetBackingStore() {

Review Comment:
   Sorry for making noise on this PR. out of curiosity, should we remove 
deprecated constructors from `KafkaStatusBackingStore` and 
`KafkaConfigBackingStore` too? not sure whether those internal classes need the 
deprecation cycle?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove redundant check in KafkaClusterTestKit [kafka]

2024-05-04 Thread via GitHub


chia7712 merged PR #15858:
URL: https://github.com/apache/kafka/pull/15858


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15739: KRaft support in ResetConsumerGroupOffsetTest [kafka]

2024-05-04 Thread via GitHub


github-actions[bot] commented on PR #14686:
URL: https://github.com/apache/kafka/pull/14686#issuecomment-2094567758

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-04 Thread via GitHub


frankvicky commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1590198922


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,374 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static kafka.test.annotation.Type.CO_KRAFT;
+import static kafka.test.annotation.Type.KRAFT;
+import static kafka.test.annotation.Type.ZK;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+@ExtendWith(value = ClusterTestExtensions.class)
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
 
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
-
-// note the group to be deleted is a 

[jira] [Assigned] (KAFKA-16617) Add KRaft info for the `advertised.listeners` doc description

2024-05-04 Thread Owen C.H. Leung (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Owen C.H. Leung reassigned KAFKA-16617:
---

Assignee: Owen C.H. Leung

> Add KRaft info for the `advertised.listeners` doc description
> -
>
> Key: KAFKA-16617
> URL: https://issues.apache.org/jira/browse/KAFKA-16617
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Owen C.H. Leung
>Priority: Major
>  Labels: newbie, newbie++
>
> Currently, we only write ZK handler in the `advertised.listeners` doc 
> description:
> > Listeners to publish to ZooKeeper for clients to use, if different than the 
> > listeners config property.
> We should also add KRaft handler info in the doc
> ref: 
> https://kafka.apache.org/documentation/#brokerconfigs_advertised.listeners



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16617 [kafka]

2024-05-04 Thread via GitHub


Owen-CH-Leung opened a new pull request, #15860:
URL: https://github.com/apache/kafka/pull/15860

   As per [KAFKA-16617](https://issues.apache.org/jira/browse/KAFKA-16617), 
this PR revised the documentation for `advertised.listeners` to include kraft 
handler. 
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16526; Add quorum state v1 [kafka]

2024-05-04 Thread via GitHub


jsancio closed pull request #15781: KAFKA-16526; Add quorum state v1
URL: https://github.com/apache/kafka/pull/15781


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-04 Thread via GitHub


jsancio opened a new pull request, #15859:
URL: https://github.com/apache/kafka/pull/15859

   DRAFT
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16624) Don't generate useless PartitionChangeRecord on older MV

2024-05-04 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez resolved KAFKA-16624.
-
Resolution: Fixed

> Don't generate useless PartitionChangeRecord on older MV
> 
>
> Key: KAFKA-16624
> URL: https://issues.apache.org/jira/browse/KAFKA-16624
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Minor
>
> Fix a case where we could generate useless PartitionChangeRecords on metadata 
> versions older than 3.6-IV0. This could happen in the case where we had an 
> ISR with only one broker in it, and we were trying to go down to a fully 
> empty ISR. In this case, PartitionChangeBuilder would block the record to 
> going down to a fully empty ISR (since that is not valid in these pre-KIP-966 
> metadata versions), but it would still emit the record, even though it had no 
> effect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-05-04 Thread via GitHub


jsancio merged PR #15671:
URL: https://github.com/apache/kafka/pull/15671


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-04 Thread via GitHub


chia7712 commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1590078150


##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -690,6 +702,10 @@ class FetchSessionCache(private val maxEntries: Int,
 * 2. B is considered "stale" because it has been inactive for a long time, 
or
 * 3. A contains more partitions than B, and B is not recently created.
 *
+* Prior to KAFKA-9401, the session cache was not sharded and we looked at 
all

Review Comment:
   This docs is great. Could you please update this also?
   
   
https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/server/KafkaConfig.scala#L182



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -787,9 +803,37 @@ class FetchSessionCache(private val maxEntries: Int,
 }
   }
 }
+object FetchSessionCache {
+  private[server] val metricsGroup = new 
KafkaMetricsGroup(classOf[FetchSessionCache])
+  private val counter = new AtomicLong(0)
+}
+
+class FetchSessionCache(private val cacheShards: Seq[FetchSessionCacheShard]) {
+  // Set up metrics.
+  
FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS,
 () => cacheShards.map(_.size).sum)
+  
FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED,
 () => cacheShards.map(_.totalPartitions).sum)
+
+  def getCacheShard(sessionId: Int): FetchSessionCacheShard = {
+val shard = sessionId / cacheShards.head.sessionIdRange

Review Comment:
   It assumes the `cacheShards` is sorted by the `shardNum`, right? If so, 
could you please add comments for it?



##
core/src/main/scala/kafka/server/FetchSession.scala:
##
@@ -787,9 +803,37 @@ class FetchSessionCache(private val maxEntries: Int,
 }
   }
 }
+object FetchSessionCache {
+  private[server] val metricsGroup = new 
KafkaMetricsGroup(classOf[FetchSessionCache])
+  private val counter = new AtomicLong(0)
+}
+
+class FetchSessionCache(private val cacheShards: Seq[FetchSessionCacheShard]) {
+  // Set up metrics.
+  
FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS,
 () => cacheShards.map(_.size).sum)
+  
FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED,
 () => cacheShards.map(_.totalPartitions).sum)
+
+  def getCacheShard(sessionId: Int): FetchSessionCacheShard = {
+val shard = sessionId / cacheShards.head.sessionIdRange
+cacheShards(shard)
+  }
+
+  // Returns the shard in round-robin
+  def getNextCacheShard: FetchSessionCacheShard = {
+val shardNum = (FetchSessionCache.counter.getAndIncrement() % size).toInt

Review Comment:
   As `int` is enough to this case, maybe we can use `AtomicInteger`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16668) Enable to set tags by `ClusterTest`

2024-05-04 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16668:
--

Assignee: Johnny Hsu  (was: Chia-Ping Tsai)

> Enable to set tags by `ClusterTest` 
> 
>
> Key: KAFKA-16668
> URL: https://issues.apache.org/jira/browse/KAFKA-16668
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Johnny Hsu
>Priority: Minor
>
> Currently, the display name can be customized by only `name` 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/annotation/ClusterTest.java#L42).
>  However, the "key" is hard-code to "name=xxx". Also, it is impossible to set 
> more "tags" for display name. 
> https://github.com/apache/kafka/pull/15766 is a example that we want to add 
> "xxx=bbb" to display name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16668) Enable to set tags by `ClusterTest`

2024-05-04 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843482#comment-17843482
 ] 

Johnny Hsu commented on KAFKA-16668:


hi [~chia7712] 
may I know if you are working on this? if not I am willing to help, thanks!

> Enable to set tags by `ClusterTest` 
> 
>
> Key: KAFKA-16668
> URL: https://issues.apache.org/jira/browse/KAFKA-16668
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> Currently, the display name can be customized by only `name` 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/annotation/ClusterTest.java#L42).
>  However, the "key" is hard-code to "name=xxx". Also, it is impossible to set 
> more "tags" for display name. 
> https://github.com/apache/kafka/pull/15766 is a example that we want to add 
> "xxx=bbb" to display name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16668) Enable to set tags by `ClusterTest`

2024-05-04 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843481#comment-17843481
 ] 

Chia-Ping Tsai commented on KAFKA-16668:


Personally, introducing a new annotation "Tag" is a solution. "Tag[] tags()" 

> Enable to set tags by `ClusterTest` 
> 
>
> Key: KAFKA-16668
> URL: https://issues.apache.org/jira/browse/KAFKA-16668
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> Currently, the display name can be customized by only `name` 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/annotation/ClusterTest.java#L42).
>  However, the "key" is hard-code to "name=xxx". Also, it is impossible to set 
> more "tags" for display name. 
> https://github.com/apache/kafka/pull/15766 is a example that we want to add 
> "xxx=bbb" to display name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-04 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1590061307


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,381 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static kafka.test.annotation.Type.CO_KRAFT;
+import static kafka.test.annotation.Type.KRAFT;
+import static kafka.test.annotation.Type.ZK;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+@ExtendWith(value = ClusterTestExtensions.class)
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private final Iterable groupProtocols;
 
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+this.groupProtocols = 

[jira] [Created] (KAFKA-16668) Enable to set tags by `ClusterTest`

2024-05-04 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16668:
--

 Summary: Enable to set tags by `ClusterTest` 
 Key: KAFKA-16668
 URL: https://issues.apache.org/jira/browse/KAFKA-16668
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


Currently, the display name can be customized by only `name` 
(https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/annotation/ClusterTest.java#L42).
 However, the "key" is hard-code to "name=xxx". Also, it is impossible to set 
more "tags" for display name. 

https://github.com/apache/kafka/pull/15766 is a example that we want to add 
"xxx=bbb" to display name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16667) KRaftMigrationDriver gets stuck after successive failovers

2024-05-04 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16667:
-
Description: 
This is a continuation of KAFKA-16171.

It turns out that the active KRaftMigrationDriver can get a stale read from ZK 
after becoming the active controller in ZK (i.e., writing to "/controller").

Because ZooKeeper only offers linearizability on writes to a given ZNode, it is 
possible that we get a stale read on the "/migration" ZNode after writing to 
"/controller" (and "/controller_epoch") when becoming active. 

 

The history looks like this:
 # Node B becomes leader in the Raft layer. KRaftLeaderEvents are enqueued on 
all KRaftMigrationDriver
 # Node A writes some state to ZK, updates "/migration", and checks 
"/controller_epoch" in one transaction. This happens before B claims controller 
leadership in ZK. The "/migration" state is updated from X to Y
 # Node B claims leadership by updating "/controller" and "/controller_epoch". 
Leader B reads "/migration" state X
 # Node A tries to write some state, fails on "/controller_epoch" check op.
 # Node A processes new leader and becomes inactive

 

This does not violate consistency guarantees made by ZooKeeper.

 

> Write operations in ZooKeeper are {_}linearizable{_}. In other words, each 
> {{write}} will appear to take effect atomically at some point between when 
> the client issues the request and receives the corresponding response.

and 

> Read operations in ZooKeeper are _not linearizable_ since they can return 
> potentially stale data. This is because a {{read}} in ZooKeeper is not a 
> quorum operation and a server will respond immediately to a client that is 
> performing a {{{}read{}}}.

 

--- 

 

The impact of this stale read is the same as KAFKA-16171. The 
KRaftMigrationDriver never gets past SYNC_KRAFT_TO_ZK because it has a stale 
zkVersion for the "/migration" ZNode. The result is brokers never learn about 
the new controller and cannot update any partition state.

The workaround for this bug is to re-elect the controller by shutting down the 
active KRaft controller. 

This bug was found during a migration where the KRaft controller was rapidly 
failing over due to an excess of metadata. 

  was:
This is a continuation of KAFKA-16171.

It turns out that the active KRaftMigrationDriver can get a stale read from ZK 
after becoming the active controller in ZK (i.e., writing to "/controller").

Because ZooKeeper only offers linearizability on writes to a given ZNode, it is 
possible that we get a stale read on the "/migration" ZNode after writing to 
"/controller" (and "/controller_epoch") when becoming active. 

 

The history looks like this:
 # Node B becomes leader in the Raft layer. KRaftLeaderEvents are enqueued on 
all KRaftMigrationDriver-s
 # Node A writes some state to ZK, updates "/migration", and checks 
"/controller_epoch" in one transaction. This happens before B claims controller 
leadership in ZK. The "/migration" state is updated from X to Y
 # Node B claims leadership by updating "/controller" and "/controller_epoch". 
Leader B reads "/migration" state X
 # Node A tries to write some state, fails on "/controller_epoch" check op.
 # Node A processes new leader and becomes inactive

 

This does not violate consistency guarantees made by ZooKeeper.

 

> Write operations in ZooKeeper are {_}linearizable{_}. In other words, each 
> {{write}} will appear to take effect atomically at some point between when 
> the client issues the request and receives the corresponding response.

and 

> Read operations in ZooKeeper are _not linearizable_ since they can return 
> potentially stale data. This is because a {{read}} in ZooKeeper is not a 
> quorum operation and a server will respond immediately to a client that is 
> performing a {{{}read{}}}.

 

--- 

 

The impact of this stale read is the same as KAFKA-16171. The 
KRaftMigrationDriver never gets past SYNC_KRAFT_TO_ZK because it has a stale 
zkVersion for the "/migration" ZNode. The result is brokers never learn about 
the new controller and cannot update any partition state.

The workaround for this bug is to re-elect the controller by shutting down the 
active KRaft controller. 

This bug was found during a migration where the KRaft controller was rapidly 
failing over due to an excess of metadata. 


> KRaftMigrationDriver gets stuck after successive failovers
> --
>
> Key: KAFKA-16667
> URL: https://issues.apache.org/jira/browse/KAFKA-16667
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, migration
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
>
> This is a continuation of KAFKA-16171.
> It turns out that the active KRaftMigrationDriver can get a stale read from 
> ZK 

[jira] [Assigned] (KAFKA-16667) KRaftMigrationDriver gets stuck after successive failovers

2024-05-04 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur reassigned KAFKA-16667:


Assignee: David Arthur

> KRaftMigrationDriver gets stuck after successive failovers
> --
>
> Key: KAFKA-16667
> URL: https://issues.apache.org/jira/browse/KAFKA-16667
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, migration
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
>
> This is a continuation of KAFKA-16171.
> It turns out that the active KRaftMigrationDriver can get a stale read from 
> ZK after becoming the active controller in ZK (i.e., writing to 
> "/controller").
> Because ZooKeeper only offers linearizability on writes to a given ZNode, it 
> is possible that we get a stale read on the "/migration" ZNode after writing 
> to "/controller" (and "/controller_epoch") when becoming active. 
>  
> The history looks like this:
>  # Node B becomes leader in the Raft layer. KRaftLeaderEvents are enqueued on 
> all KRaftMigrationDriver-s
>  # Node A writes some state to ZK, updates "/migration", and checks 
> "/controller_epoch" in one transaction. This happens before B claims 
> controller leadership in ZK. The "/migration" state is updated from X to Y
>  # Node B claims leadership by updating "/controller" and 
> "/controller_epoch". Leader B reads "/migration" state X
>  # Node A tries to write some state, fails on "/controller_epoch" check op.
>  # Node A processes new leader and becomes inactive
>  
> This does not violate consistency guarantees made by ZooKeeper.
>  
> > Write operations in ZooKeeper are {_}linearizable{_}. In other words, each 
> > {{write}} will appear to take effect atomically at some point between when 
> > the client issues the request and receives the corresponding response.
> and 
> > Read operations in ZooKeeper are _not linearizable_ since they can return 
> > potentially stale data. This is because a {{read}} in ZooKeeper is not a 
> > quorum operation and a server will respond immediately to a client that is 
> > performing a {{{}read{}}}.
>  
> --- 
>  
> The impact of this stale read is the same as KAFKA-16171. The 
> KRaftMigrationDriver never gets past SYNC_KRAFT_TO_ZK because it has a stale 
> zkVersion for the "/migration" ZNode. The result is brokers never learn about 
> the new controller and cannot update any partition state.
> The workaround for this bug is to re-elect the controller by shutting down 
> the active KRaft controller. 
> This bug was found during a migration where the KRaft controller was rapidly 
> failing over due to an excess of metadata. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16667) KRaftMigrationDriver gets stuck after successive failovers

2024-05-04 Thread David Arthur (Jira)
David Arthur created KAFKA-16667:


 Summary: KRaftMigrationDriver gets stuck after successive failovers
 Key: KAFKA-16667
 URL: https://issues.apache.org/jira/browse/KAFKA-16667
 Project: Kafka
  Issue Type: Bug
  Components: controller, migration
Reporter: David Arthur


This is a continuation of KAFKA-16171.

It turns out that the active KRaftMigrationDriver can get a stale read from ZK 
after becoming the active controller in ZK (i.e., writing to "/controller").

Because ZooKeeper only offers linearizability on writes to a given ZNode, it is 
possible that we get a stale read on the "/migration" ZNode after writing to 
"/controller" (and "/controller_epoch") when becoming active. 

 

The history looks like this:
 # Node B becomes leader in the Raft layer. KRaftLeaderEvents are enqueued on 
all KRaftMigrationDriver-s
 # Node A writes some state to ZK, updates "/migration", and checks 
"/controller_epoch" in one transaction. This happens before B claims controller 
leadership in ZK. The "/migration" state is updated from X to Y
 # Node B claims leadership by updating "/controller" and "/controller_epoch". 
Leader B reads "/migration" state X
 # Node A tries to write some state, fails on "/controller_epoch" check op.
 # Node A processes new leader and becomes inactive

 

This does not violate consistency guarantees made by ZooKeeper.

 

> Write operations in ZooKeeper are {_}linearizable{_}. In other words, each 
> {{write}} will appear to take effect atomically at some point between when 
> the client issues the request and receives the corresponding response.

and 

> Read operations in ZooKeeper are _not linearizable_ since they can return 
> potentially stale data. This is because a {{read}} in ZooKeeper is not a 
> quorum operation and a server will respond immediately to a client that is 
> performing a {{{}read{}}}.

 

--- 

 

The impact of this stale read is the same as KAFKA-16171. The 
KRaftMigrationDriver never gets past SYNC_KRAFT_TO_ZK because it has a stale 
zkVersion for the "/migration" ZNode. The result is brokers never learn about 
the new controller and cannot update any partition state.

The workaround for this bug is to re-elect the controller by shutting down the 
active KRaft controller. 

This bug was found during a migration where the KRaft controller was rapidly 
failing over due to an excess of metadata. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-04 Thread via GitHub


chia7712 commented on PR #15766:
URL: https://github.com/apache/kafka/pull/15766#issuecomment-2094325675

   @frankvicky Please avoid "force push". It can eliminate the conversions ... 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-04 Thread via GitHub


frankvicky commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1589996273


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,381 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static kafka.test.annotation.Type.CO_KRAFT;
+import static kafka.test.annotation.Type.KRAFT;
+import static kafka.test.annotation.Type.ZK;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+@ExtendWith(value = ClusterTestExtensions.class)
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private final Iterable groupProtocols;
 
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+this.groupProtocols = 

Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-04 Thread via GitHub


frankvicky commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1589995300


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,381 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static kafka.test.annotation.Type.CO_KRAFT;
+import static kafka.test.annotation.Type.KRAFT;
+import static kafka.test.annotation.Type.ZK;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+@ExtendWith(value = ClusterTestExtensions.class)
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private final Iterable groupProtocols;
 
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+this.groupProtocols = 

Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-05-04 Thread via GitHub


AndrewJSchofield commented on code in PR #15844:
URL: https://github.com/apache/kafka/pull/15844#discussion_r1589983081


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -1145,14 +1141,42 @@ private CompletableFuture> addOffsetFetch
 inflightOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
 
 if (dupe.isPresent() || inflight.isPresent()) {
-log.info("Duplicated OffsetFetchRequest: " + 
request.requestedPartitions);
-dupe.orElseGet(inflight::get).chainFuture(request.future);
+log.info("Duplicate OffsetFetchRequest found for partitions: 
{}", request.requestedPartitions);
+OffsetFetchRequestState originalRequest = 
dupe.orElseGet(inflight::get);
+originalRequest.chainFuture(request.future);
 } else {
 this.unsentOffsetFetches.add(request);
 }
 return request.future;
 }
 
+/**
+ * Remove the {@link OffsetFetchRequestState request} from the 
inflight requests queue iff
+ * both of the following are true:
+ *
+ * 
+ * The request completed with a null {@link 
Throwable error}
+ * The request is not {@link OffsetFetchRequestState#isExpired 
expired}
+ * 
+ *
+ * 
+ *
+ * In some cases, even though an offset fetch request may complete 
without an error, technically
+ * the request took longer than the user's provided timeout. In that 
case, the application thread will
+ * still receive a timeout error, and will shortly try to fetch these 
offsets again. Keeping the result
+ * of the current attempt will enable the next 
attempt to use that result and return

Review Comment:
   I think the emphasis might have got a bit mismatched here.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -1145,14 +1141,42 @@ private CompletableFuture> addOffsetFetch
 inflightOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
 
 if (dupe.isPresent() || inflight.isPresent()) {
-log.info("Duplicated OffsetFetchRequest: " + 
request.requestedPartitions);
-dupe.orElseGet(inflight::get).chainFuture(request.future);
+log.info("Duplicate OffsetFetchRequest found for partitions: 
{}", request.requestedPartitions);
+OffsetFetchRequestState originalRequest = 
dupe.orElseGet(inflight::get);
+originalRequest.chainFuture(request.future);
 } else {
 this.unsentOffsetFetches.add(request);
 }
 return request.future;
 }
 
+/**
+ * Remove the {@link OffsetFetchRequestState request} from the 
inflight requests queue iff
+ * both of the following are true:
+ *
+ * 
+ * The request completed with a null {@link 
Throwable error}

Review Comment:
   Isn't this more succinctly described as "The request completed without an 
error"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14579) Move DumpLogSegments to tools

2024-05-04 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843422#comment-17843422
 ] 

Chia-Ping Tsai commented on KAFKA-14579:


{quote}
currently only DumpLogSegments is using Decoder, if it's removed then Decoder 
should be safe to be deprecated since no one will be using that anymore
{quote}

According to Kafka compatibility rule, command line tools belong to public 
interfaces. Hence, we need a replacement for `kafka.serializer.Decoder` and 
then deprecate `kafka.serializer.Decoder`. Also, the replacement should be 
written by Java and put into tools-api module.

> Move DumpLogSegments to tools
> -
>
> Key: KAFKA-14579
> URL: https://issues.apache.org/jira/browse/KAFKA-14579
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Alexandre Dupriez
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14579) Move DumpLogSegments to tools

2024-05-04 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843421#comment-17843421
 ] 

Johnny Hsu edited comment on KAFKA-14579 at 5/4/24 10:36 AM:
-

currently only DumpLogSegments is using Decoder, if it's removed then Decoder 
should be safe to be deprecated since no one will be using that anymore.
I am willing to work on the KIP for this


was (Author: JIRAUSER304478):
currently only DumpLogSegments is using Decoder, if it's removed then Decoder 
should be safe to be deprecated since no one will be using that anymore.
I am willing to work on the KIP for this :) 

> Move DumpLogSegments to tools
> -
>
> Key: KAFKA-14579
> URL: https://issues.apache.org/jira/browse/KAFKA-14579
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Alexandre Dupriez
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14579) Move DumpLogSegments to tools

2024-05-04 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843421#comment-17843421
 ] 

Johnny Hsu edited comment on KAFKA-14579 at 5/4/24 10:36 AM:
-

currently only DumpLogSegments is using Decoder, if it's removed then Decoder 
should be safe to be deprecated since no one will be using that anymore.
I am willing to work on the KIP for this :) 


was (Author: JIRAUSER304478):
currently only DumpLogSegments is using Decoder, if it's removed then Decoder 
should be safe to be deprecated since no one will be using that anymore 

> Move DumpLogSegments to tools
> -
>
> Key: KAFKA-14579
> URL: https://issues.apache.org/jira/browse/KAFKA-14579
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Alexandre Dupriez
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-04 Thread via GitHub


gaurav-narula commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1589956639


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -389,9 +389,17 @@ class BrokerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  val fetchManager = new FetchManager(Time.SYSTEM,
-new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-  KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+  // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+  // for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 
1) * sessionIdRange
+  val sessionIdRange = Int.MaxValue / NumFetchSessionCacheShards
+  val fetchSessionCacheShards = (0 until NumFetchSessionCacheShards)
+.map(shardNum => new FetchSessionCacheShard(
+  config.maxIncrementalFetchSessionCacheSlots / 
NumFetchSessionCacheShards,

Review Comment:
   That's a great point and it's quite subtle. I reckon this may happen because 
the cacheShards are picked randomly and it can be avoided by picking shards in 
round-robin. I'll make this change along with addressing the other comments  
   
   Some subtle differences cannot be avoided, particularly around eviction. The 
[KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability#KIP227:IntroduceIncrementalFetchRequeststoIncreasePartitionScalability-FetchSessionCaching)
 considers all existing sessions when considering a session for eviction while 
this change would consider only existing sessions **within** a shard for 
eviction. I'll update the documentation to call out the difference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14579) Move DumpLogSegments to tools

2024-05-04 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843421#comment-17843421
 ] 

Johnny Hsu commented on KAFKA-14579:


currently only DumpLogSegments is using Decoder, if it's removed then Decoder 
should be safe to be deprecated since no one will be using that anymore 

> Move DumpLogSegments to tools
> -
>
> Key: KAFKA-14579
> URL: https://issues.apache.org/jira/browse/KAFKA-14579
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Alexandre Dupriez
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-04 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1589953917


##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##
@@ -173,21 +172,15 @@ protected Serde prepareValueSerdeForStore(final 
Serde valueSerde, final Se
 protected void initStoreSerde(final ProcessorContext context) {
 final String storeName = name();
 final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
-serdes = new StateSerdes<>(
-changelogTopic,
-prepareKeySerde(keySerde, new SerdeGetter(context)),
-prepareValueSerdeForStore(valueSerde, new SerdeGetter(context))
-);
+serdes = StoreSerdeInitializer.prepareStoreSerde(
+context, storeName, changelogTopic, keySerde, valueSerde, 
this::prepareValueSerdeForStore);
 }
 
 protected void initStoreSerde(final StateStoreContext context) {
 final String storeName = name();
 final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
-serdes = new StateSerdes<>(
-changelogTopic,
-prepareKeySerde(keySerde, new SerdeGetter(context)),
-prepareValueSerdeForStore(valueSerde, new SerdeGetter(context))
-);
+serdes = StoreSerdeInitializer.prepareStoreSerde(
+context, storeName, changelogTopic, keySerde, valueSerde, 
this::prepareValueSerdeForStore);

Review Comment:
   I added a parameter function prepareValueSerde to be able to use the correct 
function in children TimestampedStore classes. These don't directly use 
`WrappingNullableUtils.prepareValueSerde` by overriding the behavior of 
prepareValueSerdeForStore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: remove redundant check in KafkaClusterTestKit [kafka]

2024-05-04 Thread via GitHub


johnnychhsu opened a new pull request, #15858:
URL: https://github.com/apache/kafka/pull/15858

   ## Context
   This check is always false, and in `ControllerServer` it cleanup itself if 
exception happens. Thus, we don't need this check and cleanup here.
   
   ## Solution
   Remove this check
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-04 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1589952122


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,14 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize 
serdes for sink node %s", name()), e);

Review Comment:
   Good idea, I moved the exception handling in a separate class to share the 
code between the store classes, with corresponding unit tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12317: fix documentation [kafka]

2024-05-04 Thread via GitHub


florin-akermann commented on PR #15689:
URL: https://github.com/apache/kafka/pull/15689#issuecomment-2094077287

   Thanks @AyoubOm & @mjsax 
   
   > I agree with @AyoubOm that both statements you intend to remove are still 
correct? 
   Indeed, yes.
   
   > It seems https://github.com/apache/kafka/pull/14107 remove it incorrectly 
for inner join instead of left join?
   Removed from left join and added old statement again to inner join.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16174) Flaky test: testDescribeQuorumStatusSuccessful – org.apache.kafka.tools.MetadataQuorumCommandTest

2024-05-04 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843403#comment-17843403
 ] 

Johnny Hsu edited comment on KAFKA-16174 at 5/4/24 8:21 AM:


the exception is from 
[https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/server/BrokerServer.scala#L474]

when the cluster starts, 
[https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L426]
 tries to init broker, but it failed to get the response from controller 


was (Author: JIRAUSER304478):
the exception is from 
[https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/server/BrokerServer.scala#L474]

when the cluster starts, 
[https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L426]
 tries to init broker, but it failed. 

> Flaky test: testDescribeQuorumStatusSuccessful – 
> org.apache.kafka.tools.MetadataQuorumCommandTest
> -
>
> Key: KAFKA-16174
> URL: https://issues.apache.org/jira/browse/KAFKA-16174
> Project: Kafka
>  Issue Type: Test
>Reporter: Apoorv Mittal
>Assignee: Johnny Hsu
>Priority: Major
>  Labels: flaky-test
>
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15190/3/tests/]
>  
> {code:java}
> Errorjava.util.concurrent.ExecutionException: java.lang.RuntimeException: 
> Received a fatal error while waiting for the controller to acknowledge that 
> we are caught upStacktracejava.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Received a fatal error while waiting for the 
> controller to acknowledge that we are caught up at 
> java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)   at 
> kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:421)  
> at 
> kafka.test.junit.RaftClusterInvocationContext.lambda$getAdditionalExtensions$5(RaftClusterInvocationContext.java:116)
> at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeTestExecutionCallbacks$5(TestMethodTestDescriptor.java:192)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$6(TestMethodTestDescriptor.java:203)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:203)
>at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeTestExecutionCallbacks(TestMethodTestDescriptor.java:191)
>at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
>at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
> at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
>at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:226)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:204)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16174) Flaky test: testDescribeQuorumStatusSuccessful – org.apache.kafka.tools.MetadataQuorumCommandTest

2024-05-04 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843403#comment-17843403
 ] 

Johnny Hsu commented on KAFKA-16174:


the exception is from 
[https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/server/BrokerServer.scala#L474]

when the cluster starts, 
[https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L426]
 tries to init broker, but it failed. 

> Flaky test: testDescribeQuorumStatusSuccessful – 
> org.apache.kafka.tools.MetadataQuorumCommandTest
> -
>
> Key: KAFKA-16174
> URL: https://issues.apache.org/jira/browse/KAFKA-16174
> Project: Kafka
>  Issue Type: Test
>Reporter: Apoorv Mittal
>Assignee: Johnny Hsu
>Priority: Major
>  Labels: flaky-test
>
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15190/3/tests/]
>  
> {code:java}
> Errorjava.util.concurrent.ExecutionException: java.lang.RuntimeException: 
> Received a fatal error while waiting for the controller to acknowledge that 
> we are caught upStacktracejava.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Received a fatal error while waiting for the 
> controller to acknowledge that we are caught up at 
> java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)   at 
> kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:421)  
> at 
> kafka.test.junit.RaftClusterInvocationContext.lambda$getAdditionalExtensions$5(RaftClusterInvocationContext.java:116)
> at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeTestExecutionCallbacks$5(TestMethodTestDescriptor.java:192)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$6(TestMethodTestDescriptor.java:203)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:203)
>at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeTestExecutionCallbacks(TestMethodTestDescriptor.java:191)
>at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
>at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
> at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
>at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:226)
> at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:204)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-04 Thread via GitHub


showuon commented on code in PR #15817:
URL: https://github.com/apache/kafka/pull/15817#discussion_r1589925627


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1245,19 +1260,27 @@ public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata
 return false;
 }
 
-// Segment's first epoch's offset should be more than or equal to 
the respective leader epoch's offset.
-if (epoch == segmentFirstEpoch && offset < 
leaderEpochs.get(epoch)) {
-LOGGER.debug("Segment {} first epoch {} offset is less than 
leader epoch offset {}.",
-segmentMetadata.remoteLogSegmentId(), epoch, 
leaderEpochs.get(epoch));
+// Two cases:
+// case-1: When the segment-first-epoch equals to the first-epoch 
in the leader-epoch-lineage, then the
+// offset value can lie anywhere between 0 to 
(next-epoch-start-offset - 1) is valid.
+// case-2: When the segment-first-epoch is not equal to the 
first-epoch in the leader-epoch-lineage, then
+// the offset value should be between (current-epoch-start-offset) 
to (next-epoch-start-offset - 1).
+if (epoch == segmentFirstEpoch && leaderEpochs.lowerKey(epoch) != 
null && offset < leaderEpochs.get(epoch)) {
+LOGGER.debug("Segment {} first-valid epoch {} offset is less 
than leader epoch offset {}." +

Review Comment:
   nit: is less than "first" leader epoch offset...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2024-05-04 Thread via GitHub


florin-akermann commented on code in PR #14107:
URL: https://github.com/apache/kafka/pull/14107#discussion_r1589921037


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##
@@ -109,108 +111,102 @@ public void init(final ProcessorContext> context) {
 
 @Override
 public void process(final Record> record) {
+// clear cashed hash from previous record
+recordHash = null;
 // drop out-of-order records from versioned tables (cf. KIP-914)
 if (useVersionedSemantics && !record.value().isLatest) {
 LOG.info("Skipping out-of-order record from versioned table 
while performing table-table join.");
 droppedRecordsSensor.record();
 return;
 }
+if (leftJoin) {
+leftJoinInstructions(record);
+} else {
+defaultJoinInstructions(record);
+}
+}
 
-final long[] currentHash = record.value().newValue == null ?
-null :
-Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
-
-final int partition = context().recordMetadata().get().partition();
+private void leftJoinInstructions(final Record> record) {
 if (record.value().oldValue != null) {
 final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
+final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+}
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);

Review Comment:
   @mjsax omg, thanks for the flag! Looks like @AyoubOm is addressing it in 
https://issues.apache.org/jira/browse/KAFKA-16394 already? Else i'll adress it.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Window not honored? Why is X record not dropped? [kafka]

2024-05-04 Thread via GitHub


florin-akermann closed pull request #15314: Window not honored? Why is X record 
not dropped?
URL: https://github.com/apache/kafka/pull/15314


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2024-05-04 Thread via GitHub


florin-akermann commented on code in PR #14107:
URL: https://github.com/apache/kafka/pull/14107#discussion_r1589921037


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##
@@ -109,108 +111,102 @@ public void init(final ProcessorContext> context) {
 
 @Override
 public void process(final Record> record) {
+// clear cashed hash from previous record
+recordHash = null;
 // drop out-of-order records from versioned tables (cf. KIP-914)
 if (useVersionedSemantics && !record.value().isLatest) {
 LOG.info("Skipping out-of-order record from versioned table 
while performing table-table join.");
 droppedRecordsSensor.record();
 return;
 }
+if (leftJoin) {
+leftJoinInstructions(record);
+} else {
+defaultJoinInstructions(record);
+}
+}
 
-final long[] currentHash = record.value().newValue == null ?
-null :
-Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
-
-final int partition = context().recordMetadata().get().partition();
+private void leftJoinInstructions(final Record> record) {
 if (record.value().oldValue != null) {
 final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
+final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+}
+forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);

Review Comment:
   @mjsax omg, thanks for the flag! Looks like @AyoubOm is addressing it in 
https://issues.apache.org/jira/browse/KAFKA-16394 already? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16659: KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER [kafka]

2024-05-04 Thread via GitHub


chia7712 commented on code in PR #15853:
URL: https://github.com/apache/kafka/pull/15853#discussion_r1589913151


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1703,7 +1704,9 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer 
timer) {
 new FetchCommittedOffsetsEvent(
 initializingPartitions,
 timer);
+wakeupTrigger.setActiveTask(event.future());
 final Map offsets = 
applicationEventHandler.addAndGet(event, timer);
+wakeupTrigger.clearTask();

Review Comment:
   Please use try-finally



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -872,4 +873,38 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 // local metadata. However, it should give up after the user-supplied 
timeout has past.
 assertThrows(classOf[TimeoutException], () => 
consumer.position(topicPartition, Duration.ofSeconds(3)))
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  @Timeout(15)
+  def testPositionRespectsWakeup(quorum: String, groupProtocol: String): Unit 
= {
+val topicPartition = new TopicPartition(topic, 15)
+val consumer = createConsumer()
+consumer.assign(List(topicPartition).asJava)
+
+CompletableFuture.runAsync { () =>
+  TimeUnit.SECONDS.sleep(1)
+  consumer.wakeup()
+}
+
+assertThrows(classOf[WakeupException], () => 
consumer.position(topicPartition, Duration.ofSeconds(3)))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  @Timeout(15)
+  def testPositionWithErrorConnectionRespectsWakeup(quorum: String, 
groupProtocol: String): Unit = {
+val topicPartition = new TopicPartition(topic, 15)
+val properties = new Properties()
+properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")

Review Comment:
   Please add comments to say this connection in un-connectable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16659: KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER [kafka]

2024-05-04 Thread via GitHub


FrankYang0529 commented on code in PR #15853:
URL: https://github.com/apache/kafka/pull/15853#discussion_r1589912438


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -906,6 +906,7 @@ public long position(TopicPartition partition, Duration 
timeout) {
 return position.offset;
 
 updateFetchPositions(timer);

Review Comment:
   I add `FetchCommittedOffsetsEvent` to `wakeupTrigger.setActiveTask`. Thanks 
for the review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org