Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2024-04-10 Thread via GitHub


jolshan commented on code in PR #14607:
URL: https://github.com/apache/kafka/pull/14607#discussion_r1560454922


##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFile.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+public class PartitionMetadataFile {
+private static final String PARTITION_METADATA_FILE_NAME = 
"partition.metadata";
+static final int CURRENT_VERSION = 0;
+
+public static File newFile(File dir) {
+return new File(dir, PARTITION_METADATA_FILE_NAME);
+}
+
+private final File file;
+private final LogDirFailureChannel logDirFailureChannel;
+
+private final Object lock = new Object();
+private volatile Optional dirtyTopicIdOpt = Optional.empty();
+
+public PartitionMetadataFile(
+final File file,
+final LogDirFailureChannel logDirFailureChannel
+) {
+this.file = file;
+this.logDirFailureChannel = logDirFailureChannel;
+}
+
+/**
+ * Records the topic ID that will be flushed to disk.
+ */
+public void record(Uuid topicId) {
+// Topic IDs should not differ, but we defensively check here to fail 
earlier in the case that the IDs somehow differ.
+dirtyTopicIdOpt.ifPresent(dirtyTopicId -> {
+if (dirtyTopicId != topicId) {

Review Comment:
   I don't think this should be a reference comparison. Perhaps a miss in the 
scala -> java conversion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10788) Streamlining Tests in CachingInMemoryKeyValueStoreTest

2024-04-10 Thread Shikhar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836006#comment-17836006
 ] 

Shikhar commented on KAFKA-10788:
-

Hi [~sagarrao] 

Is this issue resolved? If not, can I work on this if no one else is working on 
it?

> Streamlining Tests in CachingInMemoryKeyValueStoreTest
> --
>
> Key: KAFKA-10788
> URL: https://issues.apache.org/jira/browse/KAFKA-10788
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Sagar Rao
>Assignee: Rohit Deshpande
>Priority: Major
>  Labels: newbie
>
> While reviewing, kIP-614, it was decided that tests for 
> [CachingInMemoryKeyValueStoreTest.java|https://github.com/apache/kafka/pull/9508/files/899b79781d3412658293b918dce16709121accf1#diff-fdfe70d8fa0798642f0ed54785624aa9850d5d86afff2285acdf12f2775c3588]
>  need to be streamlined to use mocked underlyingStore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16510) java.lang.OutOfMemoryError in kafka-metadata-quorum.sh

2024-04-10 Thread HiroArai (Jira)
HiroArai created KAFKA-16510:


 Summary: java.lang.OutOfMemoryError in kafka-metadata-quorum.sh
 Key: KAFKA-16510
 URL: https://issues.apache.org/jira/browse/KAFKA-16510
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 3.4.1
Reporter: HiroArai


kafka-metadata-quorum is not available in SASL_PLAIN.
I got this error, I only use SASL_PLAIN. not use SSL.
I found a person with a similar situation, but he is using mTLS.
https://issues.apache.org/jira/browse/KAFKA-16006

{code:java}
sh-4.2$ /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server :9093 --command-config controller-admin.properties  describe --replication
[2024-04-11 04:12:54,128] ERROR Uncaught exception in thread 
‘kafka-admin-client-thread | adminclient-1': 
(org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64)
at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
at 
org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:476)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:573)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:251)
at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435)
at java.base/java.lang.Thread.run(Thread.java:840)
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
exited. Call: describeMetadataQuorum
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
exited. Call: describeMetadataQuorum
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
org.apache.kafka.tools.MetadataQuorumCommand.handleDescribeReplication(MetadataQuorumCommand.java:158)
at 
org.apache.kafka.tools.MetadataQuorumCommand.execute(MetadataQuorumCommand.java:106)
at 
org.apache.kafka.tools.MetadataQuorumCommand.mainNoExit(MetadataQuorumCommand.java:62)
at 
org.apache.kafka.tools.MetadataQuorumCommand.main(MetadataQuorumCommand.java:57)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
thread has exited. Call: describeMetadataQuorum {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16006) mTLS authentication works for kafka-topic.sh but fails for kafka-metadata-quorum.sh

2024-04-10 Thread HiroArai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835989#comment-17835989
 ] 

HiroArai commented on KAFKA-16006:
--

mee too I saw same error in the test.

> mTLS authentication works for kafka-topic.sh but fails for 
> kafka-metadata-quorum.sh
> ---
>
> Key: KAFKA-16006
> URL: https://issues.apache.org/jira/browse/KAFKA-16006
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.4.0
>Reporter: 10011
>Priority: Major
>
> The same client-ssl configuration works for kafka-topics.sh script but failed 
> for kafka-metadata-quorum.sh during authentication. See details below
> {code:java}
> bash-4.2$ ./kafka-topics.sh --bootstrap-server localhost:11005 
> --command-config /config/client-ssl.properties --describe --topic 
> clientmTLSTest
> Topic: clientmTLSTest    TopicId: dg7q11k6R2m2dgDSDGEfXw    PartitionCount: 3 
>    ReplicationFactor: 3    Configs: segment.bytes=1073741824
>     Topic: clientmTLSTest    Partition: 0    Leader: 5    Replicas: 5,6,4    
> Isr: 6,5,4
>     Topic: clientmTLSTest    Partition: 1    Leader: 6    Replicas: 6,4,5    
> Isr: 6,4,5
>     Topic: clientmTLSTest    Partition: 2    Leader: 4    Replicas: 4,5,6    
> Isr: 6,4,5
> bash-4.2$ ./kafka-metadata-quorum.sh --command-config 
> /config/client-ssl.properties --bootstrap-server localhost:11005  describe 
> --status
> [2023-12-13 21:19:55,500] ERROR Uncaught exception in thread 
> 'kafka-admin-client-thread | adminclient-1': 
> (org.apache.kafka.common.utils.KafkaThread)
> java.lang.OutOfMemoryError: Java heap space
>     at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64)
>     at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
>     at 
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
>     at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)
>     at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)
>     at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
>     at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
>     at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413)
>     at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344)
>     at java.base/java.lang.Thread.run(Thread.java:842)
> org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
> exited. Call: listNodes
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has 
> exited. Call: listNodes
>     at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
>     at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
>     at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>     at 
> org.apache.kafka.tools.MetadataQuorumCommand.handleDescribeStatus(MetadataQuorumCommand.java:167)
>     at 
> org.apache.kafka.tools.MetadataQuorumCommand.execute(MetadataQuorumCommand.java:106)
>     at 
> org.apache.kafka.tools.MetadataQuorumCommand.mainNoExit(MetadataQuorumCommand.java:55)
>     at 
> org.apache.kafka.tools.MetadataQuorumCommand.main(MetadataQuorumCommand.java:50)
> Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient 
> thread has exited. Call: listNodes
> bash-4.2$ tail /logs/kafka/server.log
> [2023-12-13 21:18:17,356] INFO [SocketServer listenerType=BROKER, nodeId=4] 
> Failed authentication with /127.0.0.1 
> (channelId=127.0.0.1:11005-127.0.0.1:42730-794) (SSL handshake failed) 
> (org.apache.kafka.common.network.Selector)
> [2023-12-13 21:19:55,464] INFO [SocketServer listenerType=BROKER, nodeId=4] 
> Failed authentication with /127.0.0.1 
> (channelId=127.0.0.1:11005-127.0.0.1:39594-809) (SSL handshake failed) 
> (org.apache.kafka.common.network.Selector)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add reviewers GitHub action [kafka]

2024-04-10 Thread via GitHub


github-actions[bot] commented on PR #15115:
URL: https://github.com/apache/kafka/pull/15115#issuecomment-2048870549

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16482: Eliminate the IDE warnings of accepting ClusterConfig in BeforeEach [kafka]

2024-04-10 Thread via GitHub


KevinZTW commented on code in PR #15676:
URL: https://github.com/apache/kafka/pull/15676#discussion_r1560373161


##
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala:
##
@@ -19,32 +19,31 @@ package kafka.coordinator.transaction
 
 import kafka.network.SocketServer
 import kafka.server.{IntegrationTestUtils, KafkaConfig}
-import kafka.test.annotation.{AutoStart, ClusterTest, ClusterTests, Type}
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
-import kafka.test.{ClusterConfig, ClusterInstance}
 import org.apache.kafka.common.message.InitProducerIdRequestData
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.{InitProducerIdRequest, 
InitProducerIdResponse}
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Disabled, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{Disabled, Timeout}
 
 import java.util.stream.{Collectors, IntStream}
 import scala.concurrent.duration.DurationInt
 import scala.jdk.CollectionConverters._
 
+
+@ClusterTestDefaults(serverProperties = Array(
+  new ClusterConfigProperty(key = "transaction.state.log.num.partitions", 
value = "1"),
+  new ClusterConfigProperty(key = "transaction.state.log.replication.factor", 
value = "3")

Review Comment:
   Oh thanks for letting me know! I check the `KafkaConfig` and as you said, it 
would use the `TransactionLogConfig.DEFAULT_REPLICATION_FACTOR` when defining 
the config
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16507) Add raw record into RecordDeserialisationException

2024-04-10 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16507:
--
Component/s: clients

> Add raw record into RecordDeserialisationException
> --
>
> Key: KAFKA-16507
> URL: https://issues.apache.org/jira/browse/KAFKA-16507
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Fred Rouleau
>Assignee: Fred Rouleau
>Priority: Minor
>  Labels: kip
>
> [KIP-334|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87297793]
>  introduced into the Consumer the RecordDeserializationException with offsets 
> information. That is useful to skip a poison pill but as you do not have 
> access to the Record, it still prevents easy implementation of dead letter 
> queue or simply logging the faulty data.
> Changes are described in 
> [KIP-1036|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16507) Add raw record into RecordDeserialisationException

2024-04-10 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16507:
-

Assignee: Fred Rouleau

> Add raw record into RecordDeserialisationException
> --
>
> Key: KAFKA-16507
> URL: https://issues.apache.org/jira/browse/KAFKA-16507
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Fred Rouleau
>Assignee: Fred Rouleau
>Priority: Minor
>  Labels: kip
>
> [KIP-334|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87297793]
>  introduced into the Consumer the RecordDeserializationException with offsets 
> information. That is useful to skip a poison pill but as you do not have 
> access to the Record, it still prevents easy implementation of dead letter 
> queue or simply logging the faulty data.
> Changes are described in 
> [KIP-1036|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16507 Add raw record into RecordDeserialisationException [kafka]

2024-04-10 Thread via GitHub


kirktrue commented on code in PR #15691:
URL: https://github.com/apache/kafka/pull/15691#discussion_r1560342179


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##
@@ -326,7 +327,17 @@  ConsumerRecord parseRecord(Deserializers deserializers,
 key, value, headers, leaderEpoch);
 } catch (RuntimeException e) {
 log.error("Deserializers with error: {}", deserializers);
-throw new RecordDeserializationException(partition, 
record.offset(),
+ByteBuffer keyBytes = record.key();
+byte[] key = 
org.apache.kafka.common.utils.Utils.toNullableArray(keyBytes);
+ByteBuffer valueBytes = record.value();
+byte[] value = Utils.toNullableArray(valueBytes);
+Headers headers = new RecordHeaders(record.headers());
+ConsumerRecord consumerRecord = new 
ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(),
+record.timestamp(), timestampType,
+keyBytes == null ? ConsumerRecord.NULL_SIZE : 
keyBytes.remaining(),
+valueBytes == null ? ConsumerRecord.NULL_SIZE : 
valueBytes.remaining(),
+key, value, headers, Optional.empty());

Review Comment:
   Any reason to omit the epoch value? Can we do the same as the happy path?
   
   ```suggestion
   ConsumerRecord consumerRecord = new 
ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(),
   record.timestamp(), timestampType,
   keyBytes == null ? ConsumerRecord.NULL_SIZE : 
keyBytes.remaining(),
   valueBytes == null ? ConsumerRecord.NULL_SIZE : 
valueBytes.remaining(),
   key, value, headers, leaderEpoch);
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##
@@ -326,7 +327,17 @@  ConsumerRecord parseRecord(Deserializers deserializers,
 key, value, headers, leaderEpoch);
 } catch (RuntimeException e) {
 log.error("Deserializers with error: {}", deserializers);
-throw new RecordDeserializationException(partition, 
record.offset(),
+ByteBuffer keyBytes = record.key();
+byte[] key = 
org.apache.kafka.common.utils.Utils.toNullableArray(keyBytes);

Review Comment:
   Minor nit—can we use the unqualified version of `Utils` here as you do a 
couple lines down? That is:
   
   ```suggestion
   byte[] key = Utils.toNullableArray(keyBytes);
   ```



##
clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java:
##
@@ -16,29 +16,38 @@
  */
 package org.apache.kafka.common.errors;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
+
 /**
  *  This exception is raised for any error that occurs while deserializing 
records received by the consumer using 
  *  the configured {@link org.apache.kafka.common.serialization.Deserializer}.
  */
 public class RecordDeserializationException extends SerializationException {
 
-private static final long serialVersionUID = 1L;
+private static final long serialVersionUID = 2L;
 private final TopicPartition partition;
-private final long offset;
+private final ConsumerRecord consumerRecord;
 
-public RecordDeserializationException(TopicPartition partition, long 
offset, String message, Throwable cause) {
+public RecordDeserializationException(TopicPartition partition,
+  ConsumerRecord 
record,
+  String message,
+  Throwable cause) {

Review Comment:
   IIUC, @mjsax mentioned on the mailing list that we need to keep the existing 
constructor signature as is and add an overloaded version that accepts the 
`ConsumerRecord`. Although I'm not sure why... 樂



##
clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java:
##
@@ -16,29 +16,38 @@
  */
 package org.apache.kafka.common.errors;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
+

Review Comment:
   In general, we try to avoid unnecessary whitespace changes.
   
   ```suggestion
   ```



##
clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java:
##
@@ -16,29 +16,38 @@
  */
 package org.apache.kafka.common.errors;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
+
 /**
  *  This exception is raised for any error that occurs while deserializing 
records received by the consumer using 
  *  the configured {@link org.apache.kafka.common.serialization.Deserializer}.
  */
 public class 

Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-10 Thread via GitHub


dongnuo123 commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1560320708


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -180,12 +192,19 @@ public static class DeadlineAndEpoch {
  */
 private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
 
+/**
+ * Map of protocol names to the number of members that use legacy protocol 
and support them.
+ */
+private final TimelineHashMap 
legacyProtocolMembersSupportedProtocols;

Review Comment:
   In `supportsClassicProtocols(String memberProtocolType, Set 
memberProtocols)`, we need it to check if at least one of the given protocols 
in the `JoinGroupRequest` can be supported if a consumer using the classic 
protocol joins the group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-10 Thread via GitHub


dongnuo123 commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1560320708


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -180,12 +192,19 @@ public static class DeadlineAndEpoch {
  */
 private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
 
+/**
+ * Map of protocol names to the number of members that use legacy protocol 
and support them.
+ */
+private final TimelineHashMap 
legacyProtocolMembersSupportedProtocols;

Review Comment:
   We will need it to check if at least one of the given protocols in the 
`JoinGroupRequest` can be supported if a consumer using the classic protocol 
joins the group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-10 Thread via GitHub


dongnuo123 commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1560315985


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+ */
+private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+log.debug("Online upgrade is invalid because the consumer group {} 
migration config is {} so online upgrade is not enabled.",
+classicGroup.groupId(), consumerGroupMigrationPolicy);
+return false;
+} else if (classicGroup.isInState(DEAD)) {

Review Comment:
   Yeah it makes sense. Actually I think we don't set classicGroup to dead at 
all. We can delete the else if



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16509: CurrentControllerId metric is unreliable in ZK mode [kafka]

2024-04-10 Thread via GitHub


mumrah commented on code in PR #15695:
URL: https://github.com/apache/kafka/pull/15695#discussion_r1560175751


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -657,15 +657,19 @@ class KafkaServer(
   }
 
   private def createCurrentControllerIdMetric(): Unit = {
-
KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID,
 () => {
-  Option(metadataCache) match {
-case None => -1
-case Some(cache) => cache.getControllerId match {

Review Comment:
   Ah right, makes sense  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16509: CurrentControllerId metric is unreliable in ZK mode [kafka]

2024-04-10 Thread via GitHub


cmccabe commented on code in PR #15695:
URL: https://github.com/apache/kafka/pull/15695#discussion_r1560167894


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -657,15 +657,19 @@ class KafkaServer(
   }
 
   private def createCurrentControllerIdMetric(): Unit = {
-
KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID,
 () => {
-  Option(metadataCache) match {
-case None => -1
-case Some(cache) => cache.getControllerId match {
-  case None => -1
-  case Some(id) => id.id
-}
-  }
-})
+
KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID,
+  () => getCurrentControllerIdFromOldController())
+  }
+
+  /**
+   * Get the current controller ID from the old controller code.
+   * This is the most up-to-date controller ID we can get when in ZK mode.
+   */
+  def getCurrentControllerIdFromOldController(): Int = {
+Option(_kafkaController) match {
+  case None => -1

Review Comment:
   yes, that's right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16509: CurrentControllerId metric is unreliable in ZK mode [kafka]

2024-04-10 Thread via GitHub


cmccabe commented on code in PR #15695:
URL: https://github.com/apache/kafka/pull/15695#discussion_r1560168052


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -657,15 +657,19 @@ class KafkaServer(
   }
 
   private def createCurrentControllerIdMetric(): Unit = {
-
KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID,
 () => {
-  Option(metadataCache) match {
-case None => -1
-case Some(cache) => cache.getControllerId match {

Review Comment:
   It should be OK in hybrid mode since we are still updating the znode in 
hybrid mode. And this comes from there directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16509: CurrentControllerId metric is unreliable in ZK mode [kafka]

2024-04-10 Thread via GitHub


mumrah commented on code in PR #15695:
URL: https://github.com/apache/kafka/pull/15695#discussion_r1560161829


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -657,15 +657,19 @@ class KafkaServer(
   }
 
   private def createCurrentControllerIdMetric(): Unit = {
-
KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID,
 () => {
-  Option(metadataCache) match {
-case None => -1
-case Some(cache) => cache.getControllerId match {
-  case None => -1
-  case Some(id) => id.id
-}
-  }
-})
+
KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID,
+  () => getCurrentControllerIdFromOldController())
+  }
+
+  /**
+   * Get the current controller ID from the old controller code.
+   * This is the most up-to-date controller ID we can get when in ZK mode.
+   */
+  def getCurrentControllerIdFromOldController(): Int = {
+Option(_kafkaController) match {
+  case None => -1

Review Comment:
   Is this just to cover the startup case? (When `_kafkaController` is None)



##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -657,15 +657,19 @@ class KafkaServer(
   }
 
   private def createCurrentControllerIdMetric(): Unit = {
-
KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID,
 () => {
-  Option(metadataCache) match {
-case None => -1
-case Some(cache) => cache.getControllerId match {

Review Comment:
   Will this work in hybrid mode? Don't we still need to read from the metadata 
cache to get the KRaft controller ID once we've entered hybrid mode?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16211: Inconsistent config values in CreateTopicsResult and DescribeConfigsResult [kafka]

2024-04-10 Thread via GitHub


infantlikesprogramming opened a new pull request, #15696:
URL: https://github.com/apache/kafka/pull/15696

   *When creating a topic in KRaft cluster, a config value returned by 
createTopics() is different than what you get from describeConfigs().​*
   
   *My guess from reading the code is that describeTopics() will send the 
request to a specified broker if the config resource is broker 
(ConfigResouce.Type.BROKER). In the case that the config resource is topic 
(ConfigResouce.Type.TOPIC), then a broker will be assigned using 
LeastLoadBrokerOrActiveKController() (in KafkaAdminClient), which in this 
situation will assign the "least loaded" broker. I have tested this and, 
indeed, each time I use describeConfigs() with the ConfigResource's type being 
"TOPIC", a different broker's static configuration may be returned. My question 
is: Is it supposed to be the way describeConfigs() should be used with the 
configResource's type being ConfigResouce.Type.TOPIC? Or even, are we supposed 
to use describeConfig() with configResource's type being 
ConfigResouce.Type.TOPIC instead of strictly with ConfigResouce.Type.BROKER?
   
   I have also added some comments in the code for my understanding of the 
logic of describeConfigs() *
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-10 Thread via GitHub


soarez commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1560139446


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -40,7 +92,72 @@ public interface TierStateMachine {
  */
 PartitionFetchState start(TopicPartition topicPartition,
   PartitionFetchState currentFetchState,
-  PartitionData fetchPartitionData) throws 
Exception;
+  PartitionData fetchPartitionData) throws 
Exception {
+OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+long offsetToFetch;
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+try {
+offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset());
+} catch (RemoteStorageException e) {
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+throw e;
+}
+
+OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+long initialLag = leaderEndOffset - offsetToFetch;
+
+return PartitionFetchState.apply(currentFetchState.topicId(), 
offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+Fetching$.MODULE$, 
replicaMgr.localLogOrException(topicPartition).latestEpoch());

Review Comment:
   Should `replicaMgr.futureLogOrException` be used instead, if `useFutureLog`?



##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##
@@ -175,8 +186,38 @@ private OffsetHolder getEarliestLocalOffset(TopicPartition 
topicPartition) {
 return new 
OffsetHolder(LogFileUtils.offsetFromFileName(firstLogFile.get()), 
partitionFiles);
 }
 
-private List getTopicPartitionFiles(TopicPartition topicPartition) 
{
-File[] files = brokerStorageDirectory.listFiles((dir, name) -> 
name.equals(topicPartition.toString()));
+public boolean isTopicPartitionFileExistInDir(TopicPartition 
topicPartition, File logDir) {

Review Comment:
   Maybe `dirContainsTopicPartition` is a better name for this method?



##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##
@@ -31,31 +31,36 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public final class BrokerLocalStorage {
 
 private final Integer brokerId;
-private final File brokerStorageDirectory;
+private final Set brokerStorageDirectory;
 private final Integer storageWaitTimeoutSec;
 
 private final int storagePollPeriodSec = 1;
 private final Time time = Time.SYSTEM;
 
 public BrokerLocalStorage(Integer brokerId,
-  String storageDirname,
+  Set storageDirname,
   Integer storageWaitTimeoutSec) {
 this.brokerId = brokerId;
-this.brokerStorageDirectory = new File(storageDirname);
+this.brokerStorageDirectory = 
storageDirname.stream().map(File::new).collect(Collectors.toSet());

Review Comment:
   The names for parameter `storageDirname` and field `brokerStorageDirectory` 
should be pluralized.



##
core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala:
##
@@ -23,20 +23,23 @@ import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
 import kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch}
+import org.junit.jupiter.api.extension.ExtensionContext
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, ArgumentsProvider, 
ArgumentsSource}
 
 import scala.collection.Map
 
-class ReplicaFetcherTierStateMachineTest {
+class TierStateMachineTest {
 
-  val truncateOnFetch = true
+  val truncateOnFetch = false

Review Comment:
   This can be removed



##

[jira] [Assigned] (KAFKA-16509) CurrentControllerId metric is unreliable in ZK mode

2024-04-10 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-16509:


Assignee: Colin McCabe

> CurrentControllerId metric is unreliable in ZK mode
> ---
>
> Key: KAFKA-16509
> URL: https://issues.apache.org/jira/browse/KAFKA-16509
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>
> The CurrentControllerId metric added by KIP-1001 is unreliable in ZK mode. 
> Sometimes when there is no active ZK-based controller, it still shows the 
> previous controller ID. Instead, it should show -1 in that situation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16509: CurrentControllerId metric is unreliable in ZK mode [kafka]

2024-04-10 Thread via GitHub


cmccabe opened a new pull request, #15695:
URL: https://github.com/apache/kafka/pull/15695

   The CurrentControllerId metric added by KIP-1001 is unreliable in ZK mode. 
Sometimes when there is no active ZK-based controller, it still shows the 
previous controller ID. Instead, it should show -1 in that situation.
   
   This PR fixes that by using the controller ID from the 
KafkaController.scala, which is obtained directly from the controller znode. It 
also adds a new test, ControllerIdMetricTest.scala.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16509) CurrentControllerId metric is unreliable in ZK mode

2024-04-10 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-16509:


 Summary: CurrentControllerId metric is unreliable in ZK mode
 Key: KAFKA-16509
 URL: https://issues.apache.org/jira/browse/KAFKA-16509
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe


The CurrentControllerId metric added by KIP-1001 is unreliable in ZK mode. 
Sometimes when there is no active ZK-based controller, it still shows the 
previous controller ID. Instead, it should show -1 in that situation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-10 Thread via GitHub


kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1559907295


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -157,8 +157,10 @@ public class AsyncKafkaConsumerTest {
 @AfterEach
 public void resetAll() {
 backgroundEventQueue.clear();
-if (consumer != null) {
+try {
 consumer.close(Duration.ZERO);
+} catch (Exception e) {
+// ignore

Review Comment:
   I'm a little leery about swallowing the exception here. Can we validate the 
exception type is something we expect? e.g.:
   
   ```suggestion
   } catch (Exception e) {
   assertInstanceOf(KafkaException.class, e);
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1388,6 +1393,37 @@ public void commitSync(Map offsets, Duration
 }
 }
 
+private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {
+if (lastPendingAsyncCommit == null) {
+return;
+}
+
+try {
+CompletableFuture futureToAwait;
+if (!disableWakeup) {
+// We don't want the wake-up trigger to complete our pending 
async commit future,
+// so create new future here.
+futureToAwait = new CompletableFuture<>();
+lastPendingAsyncCommit.whenComplete((v, t) -> {
+if (t != null) {
+futureToAwait.completeExceptionally(t);
+} else {
+futureToAwait.complete(v);
+}
+});
+wakeupTrigger.setActiveTask(futureToAwait);
+} else {
+futureToAwait = lastPendingAsyncCommit;
+}
+ConsumerUtils.getResult(futureToAwait, timer);
+lastPendingAsyncCommit = null;
+} finally {
+if (!disableWakeup) wakeupTrigger.clearTask();
+timer.update();
+}

Review Comment:
   Do we want to clear out the `lastPendingAsyncCommit` in the `finally` block:
   
   ```suggestion
   ConsumerUtils.getResult(futureToAwait, timer);
   } finally {
   lastPendingAsyncCommit = null;
   if (!disableWakeup) wakeupTrigger.clearTask();
   timer.update();
   }
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1952,10 +1988,6 @@ private void maybeThrowFencedInstanceException() {
 }
 }
 
-private void maybeInvokeCommitCallbacks() {
-offsetCommitCallbackInvoker.executeCallbacks();
-}
-

Review Comment:
   Any reason we don't want to keep this method abstraction?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16466) QuorumController is swallowing some exception messages

2024-04-10 Thread Ilya Zakharov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835846#comment-17835846
 ] 

Ilya Zakharov commented on KAFKA-16466:
---

[~chiacyu] Hello! Сan you tell me if you're working on this task right now? If 
not, I would like to pick up this task.

> QuorumController is swallowing some exception messages
> --
>
> Key: KAFKA-16466
> URL: https://issues.apache.org/jira/browse/KAFKA-16466
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.7.0
>Reporter: David Arthur
>Assignee: Chia Chuan Yu
>Priority: Major
>  Labels: good-first-issue
> Fix For: 3.8.0, 3.7.1
>
>
> In some cases in QuorumController, we throw exceptions from the control 
> manager methods. Unless these are explicitly caught and handled, they will 
> eventually bubble up to the ControllerReadEvent/ControllerWriteEvent an hit 
> the generic error handler.
> In the generic error handler of QuorumController, we examine the exception to 
> determine if it is a fault or not. In the case where it is not a fault, we 
> log the error like:
> {code:java}
>  log.info("{}: {}", name, failureMessage);
> {code}
> which results in messages like
> {code:java}
> [2024-04-02 16:08:38,078] INFO [QuorumController id=3000] registerBroker: 
> event failed with UnsupportedVersionException in 167 microseconds. 
> (org.apache.kafka.controller.QuorumController:544)
> {code}
> In this case, the exception actually has more details in its own message
> {code:java}
> Unable to register because the broker does not support version 8 of 
> metadata.version. It wants a version between 20 and 20, inclusive.
> {code}
> We should include the exception's message in the log output for non-fault 
> errors as it includes very useful debugging info.
> This was found while writing an integration test for KRaft migration where 
> the brokers and controllers have a mismatched MetadataVersion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-10 Thread via GitHub


dajac merged PR #15411:
URL: https://github.com/apache/kafka/pull/15411


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16294) Add group protocol migration enabling config

2024-04-10 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16294.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Add group protocol migration enabling config
> 
>
> Key: KAFKA-16294
> URL: https://issues.apache.org/jira/browse/KAFKA-16294
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
> Fix For: 3.8.0
>
>
> The online upgrade is triggered when a consumer group heartbeat request is 
> received in a classic group. The downgrade is triggered when any old protocol 
> request is received in a consumer group. We only accept upgrade/downgrade if 
> the corresponding group migration config policy is enabled.
> This is the first part of the implementation of online group protocol 
> migration, adding the kafka config group protocol migration. The config has 
> four valid values – both(both upgrade and downgrade are allowed), 
> upgrade(only upgrade is allowed), downgrade(only downgrade is allowed) and 
> none(neither is allowed.).
> At present the default value is NONE. When we start enabling the migration, 
> we expect to set BOTH to default so that it's easier to roll back to the old 
> protocol as a quick fix for anything wrong in the new protocol; when using 
> consumer groups becomes default and the migration is near finished, we will 
> set the default policy to UPGRADE to prevent unwanted downgrade causing too 
> frequent migration. DOWNGRADE could be useful for revert or debug purposes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16287: Implement example tests for common rebalance callback [kafka]

2024-04-10 Thread via GitHub


kirktrue commented on PR #15408:
URL: https://github.com/apache/kafka/pull/15408#issuecomment-2048153273

   @lucasbru—sorry that I've forgotten, but why don't we want to merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16433: BeginningAndEndOffsets and OffsetsForTimes should send an event and return empty with zero timeout provided [kafka]

2024-04-10 Thread via GitHub


kirktrue commented on PR #15688:
URL: https://github.com/apache/kafka/pull/15688#issuecomment-2048148062

   @philipnee—sorry I'm late to the party, but why do we submit the event to 
the queue when the timeout is 0?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-04-10 Thread via GitHub


mimaison commented on PR #15516:
URL: https://github.com/apache/kafka/pull/15516#issuecomment-2048128916

   @divijvaidya It seems you've done a bit of work around compression in the 
past. Can you take a look? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7632: Support Compression Level [kafka]

2024-04-10 Thread via GitHub


mimaison commented on PR #10826:
URL: https://github.com/apache/kafka/pull/10826#issuecomment-2048127235

   I opened https://github.com/apache/kafka/pull/15516 to implement this KIP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]

2024-04-10 Thread via GitHub


lucasbru commented on code in PR #15661:
URL: https://github.com/apache/kafka/pull/15661#discussion_r1559790393


##
tests/kafkatest/services/verifiable_consumer.py:
##
@@ -135,6 +135,28 @@ def last_commit(self, tp):
 else:
 return None
 
+# This needs to be used for cooperative and consumer protocol
+class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):

Review Comment:
   Seems to me that it would have been cleaner to have a single implementation 
that treats the value that is passed in `onAssigned` correctly (it is always 
incremental by contract, just happens to be non-incremental in the eager case), 
instead of having two implementations now. But I'll leave it to a follow-up PR 
to clean it up, if you agree, and merge this now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]

2024-04-10 Thread via GitHub


lucasbru merged PR #15661:
URL: https://github.com/apache/kafka/pull/15661


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


chia7712 commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559774590


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -16,14 +16,132 @@
  */
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /**
  * The group coordinator configurations.
  */
 public class GroupCoordinatorConfig {
+/** * Group coordinator configuration ***/
+public final static String GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = 
"group.min.session.timeout.ms";
+public final static String GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum 
allowed session timeout for registered consumers. Shorter timeouts result in 
quicker failure detection at the cost of more frequent consumer heartbeating, 
which can overwhelm broker resources.";
+public static final int GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 6000;
+
+public final static String GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = 
"group.max.session.timeout.ms";
+public final static String GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum 
allowed session timeout for registered consumers. Longer timeouts give 
consumers more time to process messages in between heartbeats at the cost of a 
longer time to detect failures.";
+public static final int GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 180;
+
+public final static String GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = 
"group.initial.rebalance.delay.ms";
+public final static String GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The 
amount of time the group coordinator will wait for more consumers to join a new 
group before performing the first rebalance. A longer delay means potentially 
fewer rebalances, but increases the time until processing begins.";
+public static final int GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000;
+
+public final static String GROUP_MAX_SIZE_CONFIG = "group.max.size";
+public final static String GROUP_MAX_SIZE_DOC = "The maximum number of 
consumers that a single consumer group can accommodate.";
+public static final int GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
+
+/** New group coordinator configs */
+public final static String NEW_GROUP_COORDINATOR_ENABLE_CONFIG = 
"group.coordinator.new.enable";
+public final static String NEW_GROUP_COORDINATOR_ENABLE_DOC = "Enable the 
new group coordinator.";
+public static final boolean NEW_GROUP_COORDINATOR_ENABLE_DEFAULT = false;
+
+public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = 
"group.coordinator.rebalance.protocols";
+public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = 
"The list of enabled rebalance protocols. Supported protocols: " + 
Utils.join(Group.GroupType.values(), ",") + ". " +
+"The " + Group.GroupType.CONSUMER + " rebalance protocol is in 
early access and therefore must not be used in production.";
+public static final List 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = 
Collections.singletonList(Group.GroupType.CLASSIC.toString());
+
+public final static String GROUP_COORDINATOR_NUM_THREADS_CONFIG = 
"group.coordinator.threads";
+public final static String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number 
of threads used by the group coordinator.";
+public static final int GROUP_COORDINATOR_NUM_THREADS_DEFAULT = 1;
+
+/** Consumer group configs */
+public final static String CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG = 
"group.consumer.session.timeout.ms";
+public final static String CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC = "The 
timeout to detect client failures when using the consumer group protocol.";
+public static final int CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000;
+
+public final static String CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = 
"group.consumer.min.session.timeout.ms";
+public final static String CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC = 
"The minimum allowed session timeout for registered consumers.";
+public static final int CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 
45000;
+
+public final static String CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = 
"group.consumer.max.session.timeout.ms";
+public final static String CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC = 
"The maximum allowed session timeout for registered consumers.";
+public static final int CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 
6;
+
+public final static String CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = 
"group.consumer.heartbeat.interval.ms";
+public final 

Re: [PR] KAFKA-9914: Fix replication cycle detection [kafka]

2024-04-10 Thread via GitHub


izmal commented on PR #10277:
URL: https://github.com/apache/kafka/pull/10277#issuecomment-2047967071

   I my case reason for replication cycle was:
   `replication.policy.separator = -`
   With another separator works without cycle (e.g '_'). :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: LegacyConsumer should always await pending async commits on commitSync and close [kafka]

2024-04-10 Thread via GitHub


lucasbru commented on PR #15693:
URL: https://github.com/apache/kafka/pull/15693#issuecomment-2047947707

   @lianetm You have reviewed those changes already as part of 
https://github.com/apache/kafka/pull/15613 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on PR #15684:
URL: https://github.com/apache/kafka/pull/15684#issuecomment-2047947749

   ugh my local environment keep the wrong caches because I keep jump between 
branches 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-10 Thread via GitHub


lucasbru commented on PR #15613:
URL: https://github.com/apache/kafka/pull/15613#issuecomment-2047945261

   Hey @lianetm. I split the PR into two, the changes for the legacy consumer 
go into https://github.com/apache/kafka/pull/15693. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16103: LegacyConsumer should always await pending async commits on commitSync and close [kafka]

2024-04-10 Thread via GitHub


lucasbru opened a new pull request, #15693:
URL: https://github.com/apache/kafka/pull/15693

   The javadoc for `KafkaConsumer.commitSync` says:
   
   > Note that asynchronous offset commits sent previously with the 
{https://github.com/link #commitAsync(OffsetCommitCallback)}
   > (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
   
   This is not always true in the legacy consumer, when the set of offsets is 
empty, the execution of the commit callback is not always awaited. There are 
also various races possible that can avoid callback handler execution. 
   
   Similarly, there is code in the legacy consumer to await the completion of 
the commit callback before closing, however, the code doesn't cover all cases 
and the behavior is therefore inconsistent. While the javadoc doesn't 
explicitly promise callback execution, it promises "completing commits", which 
one would reasonably expect to include callback execution. Either way, the 
current behavior of the legacy consumer is inconsistent. 
   
   This change proposed a number of fixes to clean up the callback execution 
guarantees:
   
- We also need to await async commits that are "pending" instead of 
"in-flight", because we do not know the coordinator yet.
- In close, we need do not only execute the commit listeners of "pending" 
commits, but also those of "in-flight" commits.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


chia7712 commented on PR #15684:
URL: https://github.com/apache/kafka/pull/15684#issuecomment-2047933442

   @OmniaGM it seems there are build error. could you fix them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559709137


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   I updated the pr now. And rebased from trunk as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559706475


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   sorry I meant `org.apache.kafka.coordinator.group`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


chia7712 commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559703336


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   > then we can change the constructor of GroupCoordinatorConfig to accept 
KafkaConfig 
   
   that makes sense to me. A unified way to generate those config class can 
make consistent behavior.
   
   > I moved it to kafka.coordinator.group 
   
   I assume the package you mentioned is in core module, but I'm ok with your 
approach since it can avoid rewriting java code back to scala code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16508) Infinte loop if output topic does not exisit

2024-04-10 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16508:
---

 Summary: Infinte loop if output topic does not exisit
 Key: KAFKA-16508
 URL: https://issues.apache.org/jira/browse/KAFKA-16508
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams supports `ProductionExceptionHandler` to drop records on error 
when writing into an output topic.

However, if the output topic does not exist, the corresponding error cannot be 
skipped over because the handler is not called.

The issue is, that the producer internally retires to fetch the output topic 
metadata until it times out, an a `TimeoutException` (which is a 
`RetriableException`) is returned via the registered `Callback`. However, for 
`RetriableException` there is different code path and the 
`ProductionExceptionHandler` is not called.

In general, Kafka Streams correctly tries to handle as many errors a possible 
internally, and a `RetriableError` falls into this category (and thus there is 
no need to call the handler). However, for this particular case, just retrying 
does not solve the issue – it's unclear if throwing a retryable 
`TimeoutException` is actually the right thing to do for the Producer? Also not 
sure what the right way to address this ticket would be (currently, we cannot 
really detect this case, except if we would do some nasty error message String 
comparison what sounds hacky...)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559689777


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   I would suggest that we wait until KafkaConfig is fully migrated out of core 
and then we can change the constructor of `GroupCoordinatorConfig` to accept 
[KafkaConfig](https://issues.apache.org/jira/browse/KAFKA-15853?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=17789703#comment-17789703)
 and it extract any needed grouping out KafkaConfig definition. WDYT? This 
might be the easiest way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559689777


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   I would suggest that we wait until KafkaConfig is fully migrated out of core 
and then we can change the constructor of `GroupCoordinatorConfig` to accept 
KafkaConfig and it extract any needed grouping out KafkaConfig definition. 
WDYT? This might be the easiest way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559689777


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   I would suggest that we wait until KafkaConfig is fully migrated out and 
then we can change the constructor of `GroupCoordinatorConfig` to accept 
KafkaConfig and it extract any needed grouping out KafkaConfig definition. 
WDYT? This might be the easiest way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16507 Add raw record into RecordDeserialisationException [kafka]

2024-04-10 Thread via GitHub


fred-ro opened a new pull request, #15691:
URL: https://github.com/apache/kafka/pull/15691

   Add raw ConsumerRecord data to RecordDeserialisationException to make DLQ  
implementation easier
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16507) Add raw record into RecordDeserialisationException

2024-04-10 Thread Fred Rouleau (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fred Rouleau updated KAFKA-16507:
-
Labels: kip  (was: )

> Add raw record into RecordDeserialisationException
> --
>
> Key: KAFKA-16507
> URL: https://issues.apache.org/jira/browse/KAFKA-16507
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Fred Rouleau
>Priority: Minor
>  Labels: kip
>
> [KIP-334|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87297793]
>  introduced into the Consumer the RecordDeserializationException with offsets 
> information. That is useful to skip a poison pill but as you do not have 
> access to the Record, it still prevents easy implementation of dead letter 
> queue or simply logging the faulty data.
> Changes are described in 
> [KIP-1036|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16507) Add raw record into RecordDeserialisationException

2024-04-10 Thread Fred Rouleau (Jira)
Fred Rouleau created KAFKA-16507:


 Summary: Add raw record into RecordDeserialisationException
 Key: KAFKA-16507
 URL: https://issues.apache.org/jira/browse/KAFKA-16507
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Fred Rouleau


[KIP-334|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87297793]
 introduced into the Consumer the RecordDeserializationException with offsets 
information. That is useful to skip a poison pill but as you do not have access 
to the Record, it still prevents easy implementation of dead letter queue or 
simply logging the faulty data.

Changes are described in 
[KIP-1036|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559674776


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   I moved it to `kafka.coordinator.group` for now we can have a followup for
   
[KAFKA-15089](https://github.com/apache/kafka/pull/15684/commits/581e9a88089aa9881ec28de75877350e17711b3e)
 to look into removing this class later. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16505) KIP-1034: Dead letter queue in Kafka Streams

2024-04-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16505:

Component/s: streams

> KIP-1034: Dead letter queue in Kafka Streams
> 
>
> Key: KAFKA-16505
> URL: https://issues.apache.org/jira/browse/KAFKA-16505
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Damien Gasparina
>Priority: Major
>
> See KIP: KIP-1034: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16505) KIP-1034: Dead letter queue in Kafka Streams

2024-04-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16505:

Labels: KIP  (was: )

> KIP-1034: Dead letter queue in Kafka Streams
> 
>
> Key: KAFKA-16505
> URL: https://issues.apache.org/jira/browse/KAFKA-16505
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Damien Gasparina
>Priority: Major
>  Labels: KIP
>
> See KIP: KIP-1034: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]

2024-04-10 Thread via GitHub


philipnee commented on code in PR #15661:
URL: https://github.com/apache/kafka/pull/15661#discussion_r1559662337


##
tests/kafkatest/services/verifiable_consumer.py:
##
@@ -135,6 +135,28 @@ def last_commit(self, tp):
 else:
 return None
 
+# This needs to be used for cooperative and consumer protocol
+class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):

Review Comment:
   I believe the current listener assumes Eager protocol so it is not making 
the incorrect assumptions.  This (incremental handler) would probably work for 
Eager as well but I thought it would be clearer to distinguish the two.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15309) Add custom error handler to Producer

2024-04-10 Thread Alieh Saeedi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alieh Saeedi reassigned KAFKA-15309:


Assignee: Alieh Saeedi

> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>Reporter: Matthias J. Sax
>Assignee: Alieh Saeedi
>Priority: Major
>  Labels: needs-kip
> Attachments: KafkaProducerReproducer.java, app.log
>
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggests to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.
> Another example for which a production exception handler could be useful, if 
> a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries the producer would hang retrying forever. A 
> handler could help to break the infinite retry loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


chia7712 commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559616179


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   > OffsetConfig is only used by Scala code so it will disappear when we 
remove it
   
   or we can move `OffsetConfig` to `kafka.coordinator.group` since it is used 
by `kafka.coordinator.group.GroupCoordinator` and 
`kafka.coordinator.group.GroupMetadataManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-04-10 Thread via GitHub


mimaison commented on code in PR #15558:
URL: https://github.com/apache/kafka/pull/15558#discussion_r1559584216


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -320,4 +323,21 @@ static void createCompactedTopic(String topicName, short 
partitions, short repli
 static void createSinglePartitionCompactedTopic(String topicName, short 
replicationFactor, Admin admin) {
 createCompactedTopic(topicName, (short) 1, replicationFactor, admin);
 }
+
+static  T adminCall(Callable callable, Supplier errMsg)
+throws ExecutionException, InterruptedException {
+try {
+return callable.call();
+} catch (ExecutionException | InterruptedException e) {
+Throwable cause = e.getCause();
+if (cause instanceof TopicAuthorizationException ||
+cause instanceof ClusterAuthorizationException ||
+cause instanceof GroupAuthorizationException) {
+log.error("Authorization error occurred while trying to " + 
errMsg.get());

Review Comment:
   Could we do something like:
   ```
   log.error(cause.getClass().getSimpleName() + " occurred while trying to " + 
errMsg.get());
   ```
   so the exact exception is printed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16502) Fix flaky EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore

2024-04-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16502:

Component/s: streams
 unit tests

> Fix flaky 
> EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore
> --
>
> Key: KAFKA-16502
> URL: https://issues.apache.org/jira/browse/KAFKA-16502
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Major
>
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Expected ERROR state but driver is on RUNNING ==> expected:  but was: 
> 
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>   at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
>   at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)
>   at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:350)
>   at 
> app//org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore(EOSUncleanShutdownIntegrationTest.java:169)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at 
> java.base@11.0.16.1/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16502) Fix flaky EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore

2024-04-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16502:

Labels: flaky-test  (was: )

> Fix flaky 
> EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore
> --
>
> Key: KAFKA-16502
> URL: https://issues.apache.org/jira/browse/KAFKA-16502
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Major
>  Labels: flaky-test
>
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Expected ERROR state but driver is on RUNNING ==> expected:  but was: 
> 
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>   at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
>   at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)
>   at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:350)
>   at 
> app//org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore(EOSUncleanShutdownIntegrationTest.java:169)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at 
> java.base@11.0.16.1/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


dajac commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559600382


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   `OffsetConfig` is only used by Scala code so it will disappear when we 
remove it. The `GroupConfig` in Scala may never be migrated to Java as 
`GroupCoordinatorConfig` already contains everything, I think. I wonder if we 
could actually replace `OffsetConfig` by an interface and make 
`GroupCoordinatorConfig` implements it. Then, we could pass 
`GroupCoordinatorConfig` to the old code too. I am not sure if this is feasible 
though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16336) Remove Deprecated metric standby-process-ratio

2024-04-10 Thread Kiriakos Marantidis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835763#comment-17835763
 ] 

Kiriakos Marantidis commented on KAFKA-16336:
-

Hi [~mjsax] , I would like to work on this issue. I see that no one else is 
working on this. Please, let me know it it's ok if I look at it.

Kind regards, 

Kiriakos 

> Remove Deprecated metric standby-process-ratio
> --
>
> Key: KAFKA-16336
> URL: https://issues.apache.org/jira/browse/KAFKA-16336
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Metric "standby-process-ratio" was deprecated in 3.5 release via 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16506) add the scala version of tool-related classes back to core module to follow KIP-906

2024-04-10 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16506:
--

Assignee: PoAn Yang  (was: Chia-Ping Tsai)

> add the scala version of tool-related classes back to core module to follow 
> KIP-906
> ---
>
> Key: KAFKA-16506
> URL: https://issues.apache.org/jira/browse/KAFKA-16506
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Major
>
> According to 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines
>  , we have to deprecate the scala version of tool-related classes instead of 
> deleting them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16506) add the scala version of tool-related classes back to core module to follow KIP-906

2024-04-10 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835754#comment-17835754
 ] 

Chia-Ping Tsai commented on KAFKA-16506:


[~yangpoan]thanks for taking over this jira. please help us list all deleted 
tool-related classes. thanks!

> add the scala version of tool-related classes back to core module to follow 
> KIP-906
> ---
>
> Key: KAFKA-16506
> URL: https://issues.apache.org/jira/browse/KAFKA-16506
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> According to 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines
>  , we have to deprecate the scala version of tool-related classes instead of 
> deleting them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]

2024-04-10 Thread via GitHub


chia7712 commented on code in PR #15670:
URL: https://github.com/apache/kafka/pull/15670#discussion_r1559526460


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java:
##
@@ -17,13 +17,12 @@
 package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
 
 import java.util.Set;
 
 public class ProducerStateManagerConfig {
-public static final String PRODUCER_ID_EXPIRATION_MS = 
"producer.id.expiration.ms";
-public static final String TRANSACTION_VERIFICATION_ENABLED = 
"transaction.partition.verification.enable";
-public static final Set RECONFIGURABLE_CONFIGS = 
Utils.mkSet(PRODUCER_ID_EXPIRATION_MS, TRANSACTION_VERIFICATION_ENABLED);
+public static final Set RECONFIGURABLE_CONFIGS = 
Utils.mkSet(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, 
TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);

Review Comment:
   Do we need to put those in `ProducerStateManagerConfig`? 
`DynamicBrokerConfig` is the class that has power to decide the reconfigurable 
configs 
(https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L1136).
  and so It seems to me those reconfigurable configs should be moved to 
`DynamicBrokerConfig`.



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1062,21 +1027,21 @@ object KafkaConfig {
   .define(CompressionTypeProp, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, 
in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, CompressionTypeDoc)
 
   /** * Transaction management configuration ***/
-  .define(TransactionalIdExpirationMsProp, INT, 
Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, 
TransactionalIdExpirationMsDoc)
-  .define(TransactionsMaxTimeoutMsProp, INT, 
Defaults.TRANSACTIONS_MAX_TIMEOUT_MS, atLeast(1), HIGH, 
TransactionsMaxTimeoutMsDoc)
-  .define(TransactionsTopicMinISRProp, INT, 
Defaults.TRANSACTIONS_TOPIC_MIN_ISR, atLeast(1), HIGH, 
TransactionsTopicMinISRDoc)
-  .define(TransactionsLoadBufferSizeProp, INT, 
Defaults.TRANSACTIONS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, 
TransactionsLoadBufferSizeDoc)
-  .define(TransactionsTopicReplicationFactorProp, SHORT, 
Defaults.TRANSACTIONS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, 
TransactionsTopicReplicationFactorDoc)
-  .define(TransactionsTopicPartitionsProp, INT, 
Defaults.TRANSACTIONS_TOPIC_PARTITIONS, atLeast(1), HIGH, 
TransactionsTopicPartitionsDoc)
-  .define(TransactionsTopicSegmentBytesProp, INT, 
Defaults.TRANSACTIONS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, 
TransactionsTopicSegmentBytesDoc)
-  .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, 
Defaults.TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS, atLeast(1), LOW, 
TransactionsAbortTimedOutTransactionsIntervalMsDoc)
-  .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, 
INT, Defaults.TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS, atLeast(1), LOW, 
TransactionsRemoveExpiredTransactionsIntervalMsDoc)
-
-  .define(TransactionPartitionVerificationEnableProp, BOOLEAN, 
Defaults.TRANSACTION_PARTITION_VERIFICATION_ENABLE, LOW, 
TransactionPartitionVerificationEnableDoc)
-
-  .define(ProducerIdExpirationMsProp, INT, 
Defaults.PRODUCER_ID_EXPIRATION_MS, atLeast(1), LOW, ProducerIdExpirationMsDoc)
+  
.define(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, 
INT, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, 
atLeast(1), HIGH, 
TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DOC)

Review Comment:
   `TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT` should be replaced by 
`TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559529499


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   It is also bit odd to have a class  only for the constructor that do nothing 
but grouping. But I can see that split it out might be better as it seems like 
we have `GroupConfig` in scala that will get migrated to java at some point so 
wouldn't make sense to have the `GroupCoordinatorConfig` grow out of hand. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16506) add the scala version of tool-related classes back to core module to follow KIP-906

2024-04-10 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835749#comment-17835749
 ] 

PoAn Yang commented on KAFKA-16506:
---

Hi [~chia7712], I'm interested in this. May I assign to myself? Thank you.

> add the scala version of tool-related classes back to core module to follow 
> KIP-906
> ---
>
> Key: KAFKA-16506
> URL: https://issues.apache.org/jira/browse/KAFKA-16506
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> According to 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines
>  , we have to deprecate the scala version of tool-related classes instead of 
> deleting them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]

2024-04-10 Thread via GitHub


chia7712 merged PR #15656:
URL: https://github.com/apache/kafka/pull/15656


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync and close should await pending async commits [kafka]

2024-04-10 Thread via GitHub


lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1559430603


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -229,7 +229,11 @@ private GroupRebalanceConfig 
buildRebalanceConfig(Optional groupInstance
 @AfterEach
 public void teardown() {
 this.metrics.close();
-this.coordinator.close(time.timer(0));
+try {
+this.coordinator.close(time.timer(0));

Review Comment:
   correct. it was just less likely before.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1360,6 +1362,9 @@ public void commitSync(Map offsets, Duration
 Timer requestTimer = time.timer(timeout.toMillis());
 SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, 
requestTimer);
 CompletableFuture commitFuture = commit(syncCommitEvent);
+
+awaitPendingAsyncCommits(requestTimer, false);

Review Comment:
   Done



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   Done



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: 
String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed.
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount);
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(1, cb.successCount);
+
+// Enforce looking up the coordinator
+consumer.committed(Set(tp, tp2).asJava)
+
+// Try with coordinator known
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(2L))).asJava, cb)
+consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(2L))).asJava)
+assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+assertEquals(2, cb.successCount);
+
+// Try with empty sync commit
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(3L))).asJava, cb)
+consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+assertEquals(3, cb.successCount);

Review Comment:
   Done



##

Re: [PR] MINOR : Replaced the while loop with TestUtils.waitForCondition [kafka]

2024-04-10 Thread via GitHub


chia7712 commented on PR #15678:
URL: https://github.com/apache/kafka/pull/15678#issuecomment-2047578617

   > If the condition doesn't meet within the maxWaitMs, the waitForCondition 
would throw the assertion failure. Do we need another timeout to handle that?
   
   My origin thought was to use Junit 5 timeout, but we can't observe the 
method from the error stack if the error is produced by junit timeout. So +1 to 
current approach


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-10 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-2047548232

   Chris, I started changing the tests in alignment with the comments (i.e 
using AtomicBoolean, AtomicReference and removing try-catch block). I noticed 
an interesting issue with 
`testFlushFailureWhenWritesToPrimaryStoreFailsAndSecondarySucceedsForTombstoneRecords`
 test. What's happening is that when we do a get on the future returned in this 
case, that doesn't throw an exception. I debugged it and I think the problem is 
because in this case, when the primary store fails, we set the callback to 
error correctly. However, because the secondary store write doesn't fail, when 
it's callback gets invoked from 
[here](https://github.com/apache/kafka/pull/13801/files#diff-0b612a24267f45b927d37b223af3034feebe4363b23b53f5751f1b29e54e2aa7R331),
 eventually the callback's onCompletion sets it to a non-error from 
[here](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L369-L372).
  The net effect is that the .get() call on the future doesn't return an error 
which isn't right. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16506) add the scala version of tool-related class back to core module to follow KIP-906

2024-04-10 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16506:
--

 Summary: add the scala version of tool-related class back to core 
module to follow KIP-906
 Key: KAFKA-16506
 URL: https://issues.apache.org/jira/browse/KAFKA-16506
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai


According to 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines
 , we have to deprecate the scala version of tool-related classes instead of 
deleting them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16506) add the scala version of tool-related class back to core module to follow KIP-906

2024-04-10 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16506:
--

Assignee: Chia-Ping Tsai

> add the scala version of tool-related class back to core module to follow 
> KIP-906
> -
>
> Key: KAFKA-16506
> URL: https://issues.apache.org/jira/browse/KAFKA-16506
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> According to 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines
>  , we have to deprecate the scala version of tool-related classes instead of 
> deleting them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16506) add the scala version of tool-related classes back to core module to follow KIP-906

2024-04-10 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16506:
---
Summary: add the scala version of tool-related classes back to core module 
to follow KIP-906  (was: add the scala version of tool-related class back to 
core module to follow KIP-906)

> add the scala version of tool-related classes back to core module to follow 
> KIP-906
> ---
>
> Key: KAFKA-16506
> URL: https://issues.apache.org/jira/browse/KAFKA-16506
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> According to 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines
>  , we have to deprecate the scala version of tool-related classes instead of 
> deleting them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14517) Implement regex subscriptions

2024-04-10 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran reassigned KAFKA-14517:
--

Assignee: Phuc Hong Tran  (was: Jimmy Wang)

> Implement regex subscriptions
> -
>
> Key: KAFKA-14517
> URL: https://issues.apache.org/jira/browse/KAFKA-14517
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-preview
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


chia7712 commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559398541


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   it seems to me the package `GroupCoordinatorConfig.OffsetConfig` is a bit 
confused, and move `OffsetConfig` out of `GroupCoordinatorConfig` should be 
fine. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16482: Eliminate the IDE warnings of accepting ClusterConfig in BeforeEach [kafka]

2024-04-10 Thread via GitHub


chia7712 commented on code in PR #15676:
URL: https://github.com/apache/kafka/pull/15676#discussion_r1559385018


##
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala:
##
@@ -19,32 +19,31 @@ package kafka.coordinator.transaction
 
 import kafka.network.SocketServer
 import kafka.server.{IntegrationTestUtils, KafkaConfig}
-import kafka.test.annotation.{AutoStart, ClusterTest, ClusterTests, Type}
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
-import kafka.test.{ClusterConfig, ClusterInstance}
 import org.apache.kafka.common.message.InitProducerIdRequestData
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.{InitProducerIdRequest, 
InitProducerIdResponse}
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Disabled, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{Disabled, Timeout}
 
 import java.util.stream.{Collectors, IntStream}
 import scala.concurrent.duration.DurationInt
 import scala.jdk.CollectionConverters._
 
+
+@ClusterTestDefaults(serverProperties = Array(
+  new ClusterConfigProperty(key = "transaction.state.log.num.partitions", 
value = "1"),
+  new ClusterConfigProperty(key = "transaction.state.log.replication.factor", 
value = "3")

Review Comment:
   this is equal to the default value 
(https://github.com/apache/kafka/blob/trunk/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java#L23)
 , so maybe we can remove it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-10 Thread via GitHub


nizhikov commented on PR #15645:
URL: https://github.com/apache/kafka/pull/15645#issuecomment-2047311878

   > I mean we can add more tests for zk and kraft in this PR, and it would be 
nice to use new test infra (ClusterTestExtensions) to rewrite it. WDYT?
   
   Agree. I will try to extend coverage of the test shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15309) Add custom error handler to Producer

2024-04-10 Thread Alieh Saeedi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alieh Saeedi reassigned KAFKA-15309:


Assignee: (was: Alieh Saeedi)

> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
> Attachments: KafkaProducerReproducer.java, app.log
>
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggests to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.
> Another example for which a production exception handler could be useful, if 
> a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries the producer would hang retrying forever. A 
> handler could help to break the infinite retry loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-10 Thread via GitHub


showuon commented on PR #15136:
URL: https://github.com/apache/kafka/pull/15136#issuecomment-2047306952

   Will check it again within this week. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-10 Thread via GitHub


chia7712 commented on PR #15645:
URL: https://github.com/apache/kafka/pull/15645#issuecomment-2047302613

   > I thought we want to move all code related to the ConfigCommand to java 
and remove it when ZK support will be dropped.
   
   Personally, rewriting a class which will get removed totally is a bit weird. 
However, I understand your purpose and effort of #15417, and hence could we 
make this PR more valuable by increasing its coverage? I mean we can add more 
tests for zk and kraft in this PR, and it would be nice to use new test infra 
(`ClusterTestExtensions`) to rewrite it. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15309) Add custom error handler to Producer

2024-04-10 Thread Alieh Saeedi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alieh Saeedi reassigned KAFKA-15309:


Assignee: Alieh Saeedi

> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>Reporter: Matthias J. Sax
>Assignee: Alieh Saeedi
>Priority: Major
>  Labels: needs-kip
> Attachments: KafkaProducerReproducer.java, app.log
>
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggests to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.
> Another example for which a production exception handler could be useful, if 
> a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries the producer would hang retrying forever. A 
> handler could help to break the infinite retry loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15568) Use incrementalAlterConfigs to update the dynamic config of broker in ConfigCommand tool

2024-04-10 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-15568.

Resolution: Duplicate

duplicate to KAFKA-16181

> Use incrementalAlterConfigs to update the dynamic config of broker in 
> ConfigCommand tool
> 
>
> Key: KAFKA-15568
> URL: https://issues.apache.org/jira/browse/KAFKA-15568
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Aman Singh
>Assignee: Aman Singh
>Priority: Major
>
> As part of [this 
> KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API]
> incrementalAlterConfigs  API was introduced to change any config dynamically.
> - `kakfa-config.sh (CommandConfig)` still uses `alterConfig`  to update the 
> config.
> - The tool first describes the configs and then replaces all the configs.
> -  We need to remember all the sensitive configs since sensitive fields are 
> not returned by DescribeConfigs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-10 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1559256180


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+

Review Comment:
   If I don't have these 2 lines, the tests become flaky. With the 2 lines 
added, I ran all the tests in WorkerCoordinatorTest 30 times and all the tests 
passes. This is needed because sometimes in the test the connectivity with 
coordinator goes away due to session timeout and a classcast exception gets 
thrown. Adding logs for referecene:
   
   ```
   [2024-04-10 16:35:06,023] INFO Cluster ID: kafka-cluster 
(org.apache.kafka.clients.Metadata:349)
   [2024-04-10 16:35:06,028] DEBUG Sending FindCoordinator request to broker 
localhost:1969 (id: 0 rack: null) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:904)
   [2024-04-10 16:35:06,029] DEBUG Received FindCoordinator response 
ClientResponse(receivedTimeMs=1712747106023, latencyMs=0, disconnected=false, 
timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, 
apiVersion=4, clientId=mockClientId, correlationId=0, headerVersion=2), 
responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, 
errorMessage='', nodeId=0, host='', port=0, 
coordinators=[Coordinator(key='test-group', nodeId=0, host='localhost', 
port=1969, errorCode=0, errorMessage='NONE')])) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:917)
   [2024-04-10 16:35:06,029] INFO Discovered group coordinator localhost:1969 
(id: 2147483647 rack: null) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:936)
   [2024-04-10 16:35:06,030] INFO Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
   [2024-04-10 16:35:06,030] DEBUG Heartbeat thread started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1481)
   [2024-04-10 16:35:06,030] DEBUG Cooperative rebalance triggered. Keeping 
assignment null until it's explicitly revoked. 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:250)
   [2024-04-10 16:35:06,030] INFO (Re-)joining group 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
   [2024-04-10 16:35:06,031] DEBUG Sending JoinGroup 
(JoinGroupRequestData(groupId='test-group', sessionTimeoutMs=30, 
rebalanceTimeoutMs=60, memberId='', groupInstanceId=null, 
protocolType='connect', protocols=[JoinGroupRequestProtocol(name='compatible', 
metadata=[0, 1, 0, 14, 108, 101, 97, 100, 101, 114, 85, 114, 108, 58, 56, 48, 
56, 51, 0, 0, 0, 0, 0, 0, 0, 4, -1, -1, -1, -1]), 
JoinGroupRequestProtocol(name='default', metadata=[0, 0, 0, 14, 108, 101, 97, 
100, 101, 114, 85, 114, 108, 58, 56, 48, 56, 51, 0, 0, 0, 0, 0, 0, 0, 4])], 
reason='')) to coordinator localhost:1969 (id: 2147483647 rack: null) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:617)
   [2024-04-10 16:35:06,031] DEBUG Received successful JoinGroup response: 
JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=1, 
protocolType=null, protocolName='default', leader='leader', 
skipAssignment=false, memberId='member', members=[]) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:645)
   [2024-04-10 16:35:06,031] DEBUG Enabling heartbeat thread 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1449)
   [2024-04-10 16:35:06,031] INFO Successfully joined group with generation 
Generation{generationId=1, memberId='member', protocol='default'} 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
   [2024-04-10 16:35:06,031] DEBUG Sending follower SyncGroup to coordinator 
localhost:1969 (id: 2147483647 rack: null): 
SyncGroupRequestData(groupId='test-group', generationId=1, memberId='member', 
groupInstanceId=null, protocolType='connect', protocolName='default', 
assignments=[]) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:759)
   [2024-04-10 16:35:06,032] DEBUG Received successful 

Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-10 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1559253060


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> {
+return null;
+});
+
+long now = time.milliseconds();
+// We keep the heartbeat thread running behind the scenes and poll 
frequently so that eventually
+// the time goes past now + rebalanceTimeoutMs which triggers poll 
timeout expiry.
+TestUtils.waitForCondition(() -> {
+time.sleep(heartbeatIntervalMs - 1);
+return time.milliseconds() > now + rebalanceTimeoutMs;
+}, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for 
rebalance.timeout.ms");
+coordinator.poll(0, () -> {

Review Comment:
   Thanks @showuon for the suggestions. I updated the test now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR : Replaced the while loop with TestUtils.waitForCondition [kafka]

2024-04-10 Thread via GitHub


chiacyu commented on PR #15678:
URL: https://github.com/apache/kafka/pull/15678#issuecomment-2047169130

   If the condition doesn't meet within the maxWaitMs, the waitForCondition 
would throw the assertion failure. Do we need another timeout to handle that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve logging in AssignmentsManager [kafka]

2024-04-10 Thread via GitHub


showuon commented on PR #15522:
URL: https://github.com/apache/kafka/pull/15522#issuecomment-2047096126

   @soarez , oops, there's conflict due to I've just merged another PR. Please 
help resolve it. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-10 Thread via GitHub


showuon merged PR #15557:
URL: https://github.com/apache/kafka/pull/15557


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-10 Thread via GitHub


showuon commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1559170927


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+for (topicPartition <- topicPartitions) {
+  if (this.promotionStates.containsKey(topicPartition)) {
+val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionStates.get(topicPartition)
+// Revert any reassignments for partitions that did not complete the 
future replica promotion
+if (originalDir.isDefined && topicId.isDefined && 
reassignmentState.maybeInconsistentMetadata) {
+  directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, () 
=> ())
+}
+this.promotionStates.remove(topicPartition)
+  }

Review Comment:
   Thanks for the explanation!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-10 Thread via GitHub


showuon commented on PR #15557:
URL: https://github.com/apache/kafka/pull/15557#issuecomment-2047083295

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve logging in AssignmentsManager [kafka]

2024-04-10 Thread via GitHub


showuon commented on PR #15522:
URL: https://github.com/apache/kafka/pull/15522#issuecomment-2047072476

   Sorry, forgot about this PR. The jdk8 job failed to complete due to infra's 
issue. Re-triggering now: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15522/6/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-04-10 Thread via GitHub


vamossagar12 commented on PR #15594:
URL: https://github.com/apache/kafka/pull/15594#issuecomment-2047030266

   @lucasbru , that test did pass. However, let me try again with the snippet 
you shared above and see if it works. Let me get back to you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16433: BeginningAndEndOffsets and OffsetsForTimes should send an event and return empty with zero timeout provided [kafka]

2024-04-10 Thread via GitHub


lucasbru merged PR #15688:
URL: https://github.com/apache/kafka/pull/15688


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-10 Thread via GitHub


soarez commented on PR #15136:
URL: https://github.com/apache/kafka/pull/15136#issuecomment-2046971782

   @showuon PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16505) KIP-1034: Dead letter queue in Kafka Streams

2024-04-10 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-16505:


 Summary: KIP-1034: Dead letter queue in Kafka Streams
 Key: KAFKA-16505
 URL: https://issues.apache.org/jira/browse/KAFKA-16505
 Project: Kafka
  Issue Type: Improvement
Reporter: Damien Gasparina


See KIP: KIP-1034: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-04-10 Thread via GitHub


lucasbru commented on PR #15594:
URL: https://github.com/apache/kafka/pull/15594#issuecomment-2046963651

   @vamossagar12 did the test you ran pass?
   
   Here is an example how I run parameterized tests using a test suite file:
   ```
   consumer_test:
   - 
tests/kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}
   ```
   
   The change looks fine to me. If you want to be sure that the test set up 
works, you may want to run the parameter combinations and post the results 
here. However, if you have tested one parameter combination successfully, and 
you are confident that the general test setup is working, I am fine with 
merging it like this (please confirm).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve logging in AssignmentsManager [kafka]

2024-04-10 Thread via GitHub


soarez commented on PR #15522:
URL: https://github.com/apache/kafka/pull/15522#issuecomment-2046961029

   Failing tests are all unrelated and tracked:
   * 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest."testNoConsumeWithDescribeAclViaAssign(String).quorum=kraft"
 [KAFKA-8250](https://issues.apache.org/jira/browse/KAFKA-8250)
   * kafka.zk.ZkMigrationIntegrationTest.testMigrateTopicDeletions [7] Type=ZK, 
MetadataVersion=3.7-IV4, Security=PLAINTEXT 
[KAFKA-15793](https://issues.apache.org/jira/browse/KAFKA-15793) (reopened)
   * 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId
 [KAFKA-15914](https://issues.apache.org/jira/browse/KAFKA-15914)
   * 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks
 [KAFKA-15917](https://issues.apache.org/jira/browse/KAFKA-15917)
   * 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault()
 [KAFKA-15787](https://issues.apache.org/jira/browse/KAFKA-15787)
   * 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
 [KAFKA-15197](https://issues.apache.org/jira/browse/KAFKA-15197)
   * 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicateSourceDefault()
 [KAFKA-15926](https://issues.apache.org/jira/browse/KAFKA-15926)
   * 
org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations() 
[KAFKA-16504](https://issues.apache.org/jira/browse/KAFKA-16504) (new)
   * 
org.apache.kafka.tools.MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful
 [6] Type=Raft-Isolated, MetadataVersion=3.8-IV0, Security=PLAINTEXT 
[KAFKA-16174](https://issues.apache.org/jira/browse/KAFKA-16174)
   
   PTAL @showuon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-8250) Flaky Test DelegationTokenEndToEndAuthorizationTest#testProduceConsumeViaAssign

2024-04-10 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835654#comment-17835654
 ] 

Igor Soarez commented on KAFKA-8250:


This failed again in 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15522/5/tests/]

 
{code:java}
[2024-04-09T20:36:54.043Z] 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithDescribeAclViaAssign(String)[1]
 failed, log available in 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15522/core/build/reports/testOutput/kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithDescribeAclViaAssign(String)[1].test.stdout
[2024-04-09T20:36:54.043Z] 
[2024-04-09T20:36:54.043Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> DelegationTokenEndToEndAuthorizationWithOwnerTest > 
testNoConsumeWithDescribeAclViaAssign(String) > 
"testNoConsumeWithDescribeAclViaAssign(String).quorum=kraft" FAILED
[2024-04-09T20:36:54.043Z]     java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication 
failed during authentication due to invalid credentials with SASL mechanism 
SCRAM-SHA-256
[2024-04-09T20:36:54.043Z]         at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
[2024-04-09T20:36:54.043Z]         at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
[2024-04-09T20:36:54.043Z]         at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
[2024-04-09T20:36:54.043Z]         at 
kafka.api.DelegationTokenEndToEndAuthorizationTest.createDelegationTokens(DelegationTokenEndToEndAuthorizationTest.scala:165)
[2024-04-09T20:36:54.043Z]         at 
kafka.api.DelegationTokenEndToEndAuthorizationTest.createDelegationTokens(DelegationTokenEndToEndAuthorizationTest.scala:157)
[2024-04-09T20:36:54.043Z]         at 
kafka.api.DelegationTokenEndToEndAuthorizationTest.configureSecurityAfterServersStart(DelegationTokenEndToEndAuthorizationTest.scala:100)
[2024-04-09T20:36:54.043Z]         at 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.configureSecurityAfterServersStart(DelegationTokenEndToEndAuthorizationWithOwnerTest.scala:77)
[2024-04-09T20:36:54.043Z] 
[2024-04-09T20:36:54.043Z]         Caused by:
[2024-04-09T20:36:54.043Z]         
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication 
failed during authentication due to invalid credentials with SASL mechanism 
SCRAM-SHA-256
[2024-04-09T20:36:54.043Z] 
 {code}

> Flaky Test 
> DelegationTokenEndToEndAuthorizationTest#testProduceConsumeViaAssign
> ---
>
> Key: KAFKA-8250
> URL: https://issues.apache.org/jira/browse/KAFKA-8250
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk11/detail/kafka-trunk-jdk11/442/tests]
> {quote}java.lang.AssertionError: Consumed more records than expected 
> expected:<1> but was:<2>
> at org.junit.Assert.fail(Assert.java:89)
> at org.junit.Assert.failNotEquals(Assert.java:835)
> at org.junit.Assert.assertEquals(Assert.java:647)
> at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1288)
> at 
> kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:460)
> at 
> kafka.api.EndToEndAuthorizationTest.testProduceConsumeViaAssign(EndToEndAuthorizationTest.scala:209){quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15793) Flaky test ZkMigrationIntegrationTest.testMigrateTopicDeletions

2024-04-10 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835653#comment-17835653
 ] 

Igor Soarez edited comment on KAFKA-15793 at 4/10/24 8:56 AM:
--

This has come up again in 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15522/5/tests/]

 
{code:java}
[2024-04-09T21:06:17.307Z] Gradle Test Run :core:test > Gradle Test Executor 97 
> DeleteTopicsRequestTest > 
testTopicDeletionClusterHasOfflinePartitions(String) > 
"testTopicDeletionClusterHasOfflinePartitions(String).quorum=zk" STARTED
[2024-04-09T21:06:17.307Z] 
kafka.zk.ZkMigrationIntegrationTest.testMigrateTopicDeletions(ClusterInstance)[7]
 failed, log available in 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15522/core/build/reports/testOutput/kafka.zk.ZkMigrationIntegrationTest.testMigrateTopicDeletions(ClusterInstance)[7].test.stdout
[2024-04-09T21:06:17.307Z] 
[2024-04-09T21:06:17.307Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZkMigrationIntegrationTest > testMigrateTopicDeletions(ClusterInstance) > 
testMigrateTopicDeletions [7] Type=ZK, MetadataVersion=3.7-IV4, 
Security=PLAINTEXT FAILED
[2024-04-09T21:06:17.307Z]     
org.apache.kafka.server.fault.FaultHandlerException: nonFatalFaultHandler: 
Unhandled error in MetadataChangeEvent: Check op on KRaft Migration ZNode 
failed. Expected zkVersion = 5. This indicates that another KRaft controller is 
making writes to ZooKeeper.
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2001)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2027)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2052)
[2024-04-09T21:06:17.307Z]         at 
app//scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
[2024-04-09T21:06:17.307Z]         at 
app//scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
[2024-04-09T21:06:17.307Z]         at 
app//scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2052)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.migration.ZkTopicMigrationClient.$anonfun$updateTopicPartitions$1(ZkTopicMigrationClient.scala:265)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.migration.ZkTopicMigrationClient.updateTopicPartitions(ZkTopicMigrationClient.scala:255)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsDelta$20(KRaftMigrationZkWriter.java:334)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:248)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:62)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.lambda$run$1(KRaftMigrationDriver.java:532)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:845)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsDelta$21(KRaftMigrationZkWriter.java:331)
[2024-04-09T21:06:17.307Z]         at 
java.base@17.0.7/java.util.HashMap.forEach(HashMap.java:1421)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsDelta(KRaftMigrationZkWriter.java:297)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleDelta(KRaftMigrationZkWriter.java:112)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.run(KRaftMigrationDriver.java:531)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
[2024-04-09T21:06:17.307Z]         at 
java.base@17.0.7/java.lang.Thread.run(Thread.java:833)
[2024-04-09T21:06:17.307Z] 
[2024-04-09T21:06:17.307Z]         Caused by:
[2024-04-09T21:06:17.307Z]         java.lang.RuntimeException: Check op on 
KRaft Migration ZNode failed. Expected zkVersion = 5. This indicates that 
another KRaft controller is making 

Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-10 Thread via GitHub


dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1559052702


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+ */
+private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+log.debug("Online upgrade is invalid because the consumer group {} 
migration config is {} so online upgrade is not enabled.",
+classicGroup.groupId(), consumerGroupMigrationPolicy);
+return false;
+} else if (classicGroup.isInState(DEAD)) {

Review Comment:
   Could this really happen? I would have thought that it would be 
automatically converted as Dead or equivalent to Empty.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -411,6 +432,20 @@ public int numMembers() {
 return members.size();
 }
 
+/**
+ * @return The number of members that use the legacy protocol.
+ */
+public int numLegacyProtocolMember() {
+return (int) members.values().stream().filter(member -> 
member.useLegacyProtocol()).count();

Review Comment:
   It may be better to maintain this count in the group state instead of having 
to go through all members. Is it possible?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+ */
+private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+log.debug("Online upgrade is invalid because the consumer group {} 
migration config is {} so online upgrade is not enabled.",
+classicGroup.groupId(), consumerGroupMigrationPolicy);
+return false;
+} else if (classicGroup.isInState(DEAD)) {
+log.debug("Online upgrade is invalid because the classic group {} 
is in DEAD state.", classicGroup.groupId());
+return false;
+} else if (!classicGroup.usesConsumerGroupProtocol()) {
+log.debug("Online upgrade is invalid because the classic group {} 
has protocol type {} and doesn't use the consumer group protocol.",

Review Comment:
   nit: `Cannot upgrade classic group {} to consumer group because the group 
does not use the consumer embedded protocol.`



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+ */
+private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+log.debug("Online upgrade is invalid because the consumer group {} 
migration config is {} so online upgrade is not enabled.",
+classicGroup.groupId(), consumerGroupMigrationPolicy);
+return false;
+} else if (classicGroup.isInState(DEAD)) {
+log.debug("Online upgrade is invalid because the classic group {} 
is in DEAD state.", classicGroup.groupId());
+return false;
+} else if (!classicGroup.usesConsumerGroupProtocol()) {
+log.debug("Online upgrade is invalid because the classic group {} 
has protocol type {} and doesn't use the consumer group protocol.",
+classicGroup.groupId(), 
classicGroup.protocolType().orElse(""));
+return false;
+} else if (classicGroup.size() > consumerGroupMaxSize) {
+log.debug("Online upgrade is invalid because the classic group {} 
size {} exceeds the consumer group maximum size {}.",

Review Comment:
   nit: Same idea.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the Classic Group receives a 

Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]

2024-04-10 Thread via GitHub


lucasbru commented on code in PR #15661:
URL: https://github.com/apache/kafka/pull/15661#discussion_r1559094045


##
tests/kafkatest/services/verifiable_consumer.py:
##
@@ -135,6 +135,28 @@ def last_commit(self, tp):
 else:
 return None
 
+# This needs to be used for cooperative and consumer protocol
+class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):

Review Comment:
   Why can we not implement this in the normal ConsumerEventHandler? It seems 
it's making incorrect assumptions about the consumer rebalance listener (since 
previously owned partitions are not guaranteed to be reported in onassinged)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (KAFKA-15793) Flaky test ZkMigrationIntegrationTest.testMigrateTopicDeletions

2024-04-10 Thread Igor Soarez (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-15793 ]


Igor Soarez deleted comment on KAFKA-15793:
-

was (Author: soarez):
This has come up again:

 
{code:java}
[2024-04-09T21:06:17.307Z] Gradle Test Run :core:test > Gradle Test Executor 97 
> DeleteTopicsRequestTest > 
testTopicDeletionClusterHasOfflinePartitions(String) > 
"testTopicDeletionClusterHasOfflinePartitions(String).quorum=zk" STARTED
[2024-04-09T21:06:17.307Z] 
kafka.zk.ZkMigrationIntegrationTest.testMigrateTopicDeletions(ClusterInstance)[7]
 failed, log available in 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15522/core/build/reports/testOutput/kafka.zk.ZkMigrationIntegrationTest.testMigrateTopicDeletions(ClusterInstance)[7].test.stdout
[2024-04-09T21:06:17.307Z] 
[2024-04-09T21:06:17.307Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZkMigrationIntegrationTest > testMigrateTopicDeletions(ClusterInstance) > 
testMigrateTopicDeletions [7] Type=ZK, MetadataVersion=3.7-IV4, 
Security=PLAINTEXT FAILED
[2024-04-09T21:06:17.307Z]     
org.apache.kafka.server.fault.FaultHandlerException: nonFatalFaultHandler: 
Unhandled error in MetadataChangeEvent: Check op on KRaft Migration ZNode 
failed. Expected zkVersion = 5. This indicates that another KRaft controller is 
making writes to ZooKeeper.
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2001)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2027)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2052)
[2024-04-09T21:06:17.307Z]         at 
app//scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
[2024-04-09T21:06:17.307Z]         at 
app//scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
[2024-04-09T21:06:17.307Z]         at 
app//scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2052)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.migration.ZkTopicMigrationClient.$anonfun$updateTopicPartitions$1(ZkTopicMigrationClient.scala:265)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.migration.ZkTopicMigrationClient.updateTopicPartitions(ZkTopicMigrationClient.scala:255)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsDelta$20(KRaftMigrationZkWriter.java:334)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:248)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:62)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.lambda$run$1(KRaftMigrationDriver.java:532)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:845)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsDelta$21(KRaftMigrationZkWriter.java:331)
[2024-04-09T21:06:17.307Z]         at 
java.base@17.0.7/java.util.HashMap.forEach(HashMap.java:1421)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsDelta(KRaftMigrationZkWriter.java:297)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleDelta(KRaftMigrationZkWriter.java:112)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.run(KRaftMigrationDriver.java:531)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
[2024-04-09T21:06:17.307Z]         at 
java.base@17.0.7/java.lang.Thread.run(Thread.java:833)
[2024-04-09T21:06:17.307Z] 
[2024-04-09T21:06:17.307Z]         Caused by:
[2024-04-09T21:06:17.307Z]         java.lang.RuntimeException: Check op on 
KRaft Migration ZNode failed. Expected zkVersion = 5. This indicates that 
another KRaft controller is making writes to ZooKeeper.
[2024-04-09T21:06:17.307Z]             at 
kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2001)
[2024-04-09T21:06:17.307Z]             ... 22 more

[jira] [Reopened] (KAFKA-15793) Flaky test ZkMigrationIntegrationTest.testMigrateTopicDeletions

2024-04-10 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reopened KAFKA-15793:
-

This has come up again:

 
{code:java}
[2024-04-09T21:06:17.307Z] Gradle Test Run :core:test > Gradle Test Executor 97 
> DeleteTopicsRequestTest > 
testTopicDeletionClusterHasOfflinePartitions(String) > 
"testTopicDeletionClusterHasOfflinePartitions(String).quorum=zk" STARTED
[2024-04-09T21:06:17.307Z] 
kafka.zk.ZkMigrationIntegrationTest.testMigrateTopicDeletions(ClusterInstance)[7]
 failed, log available in 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15522/core/build/reports/testOutput/kafka.zk.ZkMigrationIntegrationTest.testMigrateTopicDeletions(ClusterInstance)[7].test.stdout
[2024-04-09T21:06:17.307Z] 
[2024-04-09T21:06:17.307Z] Gradle Test Run :core:test > Gradle Test Executor 96 
> ZkMigrationIntegrationTest > testMigrateTopicDeletions(ClusterInstance) > 
testMigrateTopicDeletions [7] Type=ZK, MetadataVersion=3.7-IV4, 
Security=PLAINTEXT FAILED
[2024-04-09T21:06:17.307Z]     
org.apache.kafka.server.fault.FaultHandlerException: nonFatalFaultHandler: 
Unhandled error in MetadataChangeEvent: Check op on KRaft Migration ZNode 
failed. Expected zkVersion = 5. This indicates that another KRaft controller is 
making writes to ZooKeeper.
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2001)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2027)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2052)
[2024-04-09T21:06:17.307Z]         at 
app//scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
[2024-04-09T21:06:17.307Z]         at 
app//scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
[2024-04-09T21:06:17.307Z]         at 
app//scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2052)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.migration.ZkTopicMigrationClient.$anonfun$updateTopicPartitions$1(ZkTopicMigrationClient.scala:265)
[2024-04-09T21:06:17.307Z]         at 
app//kafka.zk.migration.ZkTopicMigrationClient.updateTopicPartitions(ZkTopicMigrationClient.scala:255)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsDelta$20(KRaftMigrationZkWriter.java:334)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:248)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:62)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.lambda$run$1(KRaftMigrationDriver.java:532)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:845)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsDelta$21(KRaftMigrationZkWriter.java:331)
[2024-04-09T21:06:17.307Z]         at 
java.base@17.0.7/java.util.HashMap.forEach(HashMap.java:1421)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsDelta(KRaftMigrationZkWriter.java:297)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleDelta(KRaftMigrationZkWriter.java:112)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.run(KRaftMigrationDriver.java:531)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
[2024-04-09T21:06:17.307Z]         at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
[2024-04-09T21:06:17.307Z]         at 
java.base@17.0.7/java.lang.Thread.run(Thread.java:833)
[2024-04-09T21:06:17.307Z] 
[2024-04-09T21:06:17.307Z]         Caused by:
[2024-04-09T21:06:17.307Z]         java.lang.RuntimeException: Check op on 
KRaft Migration ZNode failed. Expected zkVersion = 5. This indicates that 
another KRaft controller is making writes to ZooKeeper.
[2024-04-09T21:06:17.307Z]             at 
kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2001)
[2024-04-09T21:06:17.307Z]             ... 22 

  1   2   >