[jira] [Comment Edited] (KAFKA-16510) java.lang.OutOfMemoryError in kafka-metadata-quorum.sh
[ https://issues.apache.org/jira/browse/KAFKA-16510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836026#comment-17836026 ] Zhiyuan Lei edited comment on KAFKA-16510 at 4/11/24 6:48 AM: -- run in local, it should be that the default heap size of your local JVM is too small. The default KAFKA_HEAP_OPTS is 256M. You can try exporting KAFKA_HEAP_OPTS=-Xmx2048M before starting it again. ``` # Memory options if [ -z "$KAFKA_HEAP_OPTS" ]; then KAFKA_HEAP_OPTS="-Xmx256M" fi ``` was (Author: leizhiyuan): run in local, it should be that the default heap size of your local JVM is too small. The default KAFKA_HEAP_OPTS is 256M. You can try exporting KAFKA_HEAP_OPTS=-Xmx2048M before starting it again. ``` # Memory options if [ -z "$KAFKA_HEAP_OPTS" ]; then KAFKA_HEAP_OPTS="-Xmx256M" fi ``` > java.lang.OutOfMemoryError in kafka-metadata-quorum.sh > -- > > Key: KAFKA-16510 > URL: https://issues.apache.org/jira/browse/KAFKA-16510 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.4.1 >Reporter: Hiro >Priority: Major > > kafka-metadata-quorum is not available in SASL_PLAIN. > I got this error, I only use SASL_PLAIN. not use SSL. > I found a person with a similar situation, but he is using mTLS. > https://issues.apache.org/jira/browse/KAFKA-16006 > {code:java} > sh-4.2$ /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server ip>:9093 --command-config controller-admin.properties describe --replication > [2024-04-11 04:12:54,128] ERROR Uncaught exception in thread > ‘kafka-admin-client-thread | adminclient-1': > (org.apache.kafka.common.utils.KafkaThread) > java.lang.OutOfMemoryError: Java heap space > at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64) > at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363) > at > org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:476) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:573) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:251) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) > at org.apache.kafka.common.network.Selector.poll(Selector.java:481) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435) > at java.base/java.lang.Thread.run(Thread.java:840) > org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has > exited. Call: describeMetadataQuorum > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has > exited. Call: describeMetadataQuorum > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.tools.MetadataQuorumCommand.handleDescribeReplication(MetadataQuorumCommand.java:158) > at > org.apache.kafka.tools.MetadataQuorumCommand.execute(MetadataQuorumCommand.java:106) > at > org.apache.kafka.tools.MetadataQuorumCommand.mainNoExit(MetadataQuorumCommand.java:62) > at > org.apache.kafka.tools.MetadataQuorumCommand.main(MetadataQuorumCommand.java:57) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: describeMetadataQuorum {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16510) java.lang.OutOfMemoryError in kafka-metadata-quorum.sh
[ https://issues.apache.org/jira/browse/KAFKA-16510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836026#comment-17836026 ] Zhiyuan Lei commented on KAFKA-16510: - run in local, it should be that the default heap size of your local JVM is too small. The default KAFKA_HEAP_OPTS is 256M. You can try exporting KAFKA_HEAP_OPTS=-Xmx2048M before starting it again. ``` # Memory options if [ -z "$KAFKA_HEAP_OPTS" ]; then KAFKA_HEAP_OPTS="-Xmx256M" fi ``` > java.lang.OutOfMemoryError in kafka-metadata-quorum.sh > -- > > Key: KAFKA-16510 > URL: https://issues.apache.org/jira/browse/KAFKA-16510 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.4.1 >Reporter: Hiro >Priority: Major > > kafka-metadata-quorum is not available in SASL_PLAIN. > I got this error, I only use SASL_PLAIN. not use SSL. > I found a person with a similar situation, but he is using mTLS. > https://issues.apache.org/jira/browse/KAFKA-16006 > {code:java} > sh-4.2$ /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server ip>:9093 --command-config controller-admin.properties describe --replication > [2024-04-11 04:12:54,128] ERROR Uncaught exception in thread > ‘kafka-admin-client-thread | adminclient-1': > (org.apache.kafka.common.utils.KafkaThread) > java.lang.OutOfMemoryError: Java heap space > at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64) > at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363) > at > org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:476) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:573) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:251) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) > at org.apache.kafka.common.network.Selector.poll(Selector.java:481) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435) > at java.base/java.lang.Thread.run(Thread.java:840) > org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has > exited. Call: describeMetadataQuorum > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has > exited. Call: describeMetadataQuorum > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.tools.MetadataQuorumCommand.handleDescribeReplication(MetadataQuorumCommand.java:158) > at > org.apache.kafka.tools.MetadataQuorumCommand.execute(MetadataQuorumCommand.java:106) > at > org.apache.kafka.tools.MetadataQuorumCommand.mainNoExit(MetadataQuorumCommand.java:62) > at > org.apache.kafka.tools.MetadataQuorumCommand.main(MetadataQuorumCommand.java:57) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: describeMetadataQuorum {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]
jolshan commented on code in PR #14607: URL: https://github.com/apache/kafka/pull/14607#discussion_r1560454922 ## storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFile.java: ## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + +public class PartitionMetadataFile { +private static final String PARTITION_METADATA_FILE_NAME = "partition.metadata"; +static final int CURRENT_VERSION = 0; + +public static File newFile(File dir) { +return new File(dir, PARTITION_METADATA_FILE_NAME); +} + +private final File file; +private final LogDirFailureChannel logDirFailureChannel; + +private final Object lock = new Object(); +private volatile Optional dirtyTopicIdOpt = Optional.empty(); + +public PartitionMetadataFile( +final File file, +final LogDirFailureChannel logDirFailureChannel +) { +this.file = file; +this.logDirFailureChannel = logDirFailureChannel; +} + +/** + * Records the topic ID that will be flushed to disk. + */ +public void record(Uuid topicId) { +// Topic IDs should not differ, but we defensively check here to fail earlier in the case that the IDs somehow differ. +dirtyTopicIdOpt.ifPresent(dirtyTopicId -> { +if (dirtyTopicId != topicId) { Review Comment: I don't think this should be a reference comparison. Perhaps a miss in the scala -> java conversion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10788) Streamlining Tests in CachingInMemoryKeyValueStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-10788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836006#comment-17836006 ] Shikhar commented on KAFKA-10788: - Hi [~sagarrao] Is this issue resolved? If not, can I work on this if no one else is working on it? > Streamlining Tests in CachingInMemoryKeyValueStoreTest > -- > > Key: KAFKA-10788 > URL: https://issues.apache.org/jira/browse/KAFKA-10788 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Sagar Rao >Assignee: Rohit Deshpande >Priority: Major > Labels: newbie > > While reviewing, kIP-614, it was decided that tests for > [CachingInMemoryKeyValueStoreTest.java|https://github.com/apache/kafka/pull/9508/files/899b79781d3412658293b918dce16709121accf1#diff-fdfe70d8fa0798642f0ed54785624aa9850d5d86afff2285acdf12f2775c3588] > need to be streamlined to use mocked underlyingStore. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16510) java.lang.OutOfMemoryError in kafka-metadata-quorum.sh
HiroArai created KAFKA-16510: Summary: java.lang.OutOfMemoryError in kafka-metadata-quorum.sh Key: KAFKA-16510 URL: https://issues.apache.org/jira/browse/KAFKA-16510 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 3.4.1 Reporter: HiroArai kafka-metadata-quorum is not available in SASL_PLAIN. I got this error, I only use SASL_PLAIN. not use SSL. I found a person with a similar situation, but he is using mTLS. https://issues.apache.org/jira/browse/KAFKA-16006 {code:java} sh-4.2$ /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server :9093 --command-config controller-admin.properties describe --replication [2024-04-11 04:12:54,128] ERROR Uncaught exception in thread ‘kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread) java.lang.OutOfMemoryError: Java heap space at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64) at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363) at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:476) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:573) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:251) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) at org.apache.kafka.common.network.Selector.poll(Selector.java:481) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435) at java.base/java.lang.Thread.run(Thread.java:840) org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: describeMetadataQuorum java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: describeMetadataQuorum at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at org.apache.kafka.tools.MetadataQuorumCommand.handleDescribeReplication(MetadataQuorumCommand.java:158) at org.apache.kafka.tools.MetadataQuorumCommand.execute(MetadataQuorumCommand.java:106) at org.apache.kafka.tools.MetadataQuorumCommand.mainNoExit(MetadataQuorumCommand.java:62) at org.apache.kafka.tools.MetadataQuorumCommand.main(MetadataQuorumCommand.java:57) Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: describeMetadataQuorum {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16006) mTLS authentication works for kafka-topic.sh but fails for kafka-metadata-quorum.sh
[ https://issues.apache.org/jira/browse/KAFKA-16006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835989#comment-17835989 ] HiroArai commented on KAFKA-16006: -- mee too I saw same error in the test. > mTLS authentication works for kafka-topic.sh but fails for > kafka-metadata-quorum.sh > --- > > Key: KAFKA-16006 > URL: https://issues.apache.org/jira/browse/KAFKA-16006 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.4.0 >Reporter: 10011 >Priority: Major > > The same client-ssl configuration works for kafka-topics.sh script but failed > for kafka-metadata-quorum.sh during authentication. See details below > {code:java} > bash-4.2$ ./kafka-topics.sh --bootstrap-server localhost:11005 > --command-config /config/client-ssl.properties --describe --topic > clientmTLSTest > Topic: clientmTLSTest TopicId: dg7q11k6R2m2dgDSDGEfXw PartitionCount: 3 > ReplicationFactor: 3 Configs: segment.bytes=1073741824 > Topic: clientmTLSTest Partition: 0 Leader: 5 Replicas: 5,6,4 > Isr: 6,5,4 > Topic: clientmTLSTest Partition: 1 Leader: 6 Replicas: 6,4,5 > Isr: 6,4,5 > Topic: clientmTLSTest Partition: 2 Leader: 4 Replicas: 4,5,6 > Isr: 6,4,5 > bash-4.2$ ./kafka-metadata-quorum.sh --command-config > /config/client-ssl.properties --bootstrap-server localhost:11005 describe > --status > [2023-12-13 21:19:55,500] ERROR Uncaught exception in thread > 'kafka-admin-client-thread | adminclient-1': > (org.apache.kafka.common.utils.KafkaThread) > java.lang.OutOfMemoryError: Java heap space > at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64) > at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363) > at > org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452) > at > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402) > at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) > at org.apache.kafka.common.network.Selector.poll(Selector.java:481) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344) > at java.base/java.lang.Thread.run(Thread.java:842) > org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has > exited. Call: listNodes > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has > exited. Call: listNodes > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.tools.MetadataQuorumCommand.handleDescribeStatus(MetadataQuorumCommand.java:167) > at > org.apache.kafka.tools.MetadataQuorumCommand.execute(MetadataQuorumCommand.java:106) > at > org.apache.kafka.tools.MetadataQuorumCommand.mainNoExit(MetadataQuorumCommand.java:55) > at > org.apache.kafka.tools.MetadataQuorumCommand.main(MetadataQuorumCommand.java:50) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: listNodes > bash-4.2$ tail /logs/kafka/server.log > [2023-12-13 21:18:17,356] INFO [SocketServer listenerType=BROKER, nodeId=4] > Failed authentication with /127.0.0.1 > (channelId=127.0.0.1:11005-127.0.0.1:42730-794) (SSL handshake failed) > (org.apache.kafka.common.network.Selector) > [2023-12-13 21:19:55,464] INFO [SocketServer listenerType=BROKER, nodeId=4] > Failed authentication with /127.0.0.1 > (channelId=127.0.0.1:11005-127.0.0.1:39594-809) (SSL handshake failed) > (org.apache.kafka.common.network.Selector) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Add reviewers GitHub action [kafka]
github-actions[bot] commented on PR #15115: URL: https://github.com/apache/kafka/pull/15115#issuecomment-2048870549 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16482: Eliminate the IDE warnings of accepting ClusterConfig in BeforeEach [kafka]
KevinZTW commented on code in PR #15676: URL: https://github.com/apache/kafka/pull/15676#discussion_r1560373161 ## core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala: ## @@ -19,32 +19,31 @@ package kafka.coordinator.transaction import kafka.network.SocketServer import kafka.server.{IntegrationTestUtils, KafkaConfig} -import kafka.test.annotation.{AutoStart, ClusterTest, ClusterTests, Type} +import kafka.test.ClusterInstance +import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions -import kafka.test.{ClusterConfig, ClusterInstance} import org.apache.kafka.common.message.InitProducerIdRequestData import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse} import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{BeforeEach, Disabled, Timeout} import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.{Disabled, Timeout} import java.util.stream.{Collectors, IntStream} import scala.concurrent.duration.DurationInt import scala.jdk.CollectionConverters._ + +@ClusterTestDefaults(serverProperties = Array( + new ClusterConfigProperty(key = "transaction.state.log.num.partitions", value = "1"), + new ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "3") Review Comment: Oh thanks for letting me know! I check the `KafkaConfig` and as you said, it would use the `TransactionLogConfig.DEFAULT_REPLICATION_FACTOR` when defining the config -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16507) Add raw record into RecordDeserialisationException
[ https://issues.apache.org/jira/browse/KAFKA-16507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16507: -- Component/s: clients > Add raw record into RecordDeserialisationException > -- > > Key: KAFKA-16507 > URL: https://issues.apache.org/jira/browse/KAFKA-16507 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Fred Rouleau >Assignee: Fred Rouleau >Priority: Minor > Labels: kip > > [KIP-334|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87297793] > introduced into the Consumer the RecordDeserializationException with offsets > information. That is useful to skip a poison pill but as you do not have > access to the Record, it still prevents easy implementation of dead letter > queue or simply logging the faulty data. > Changes are described in > [KIP-1036|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16507) Add raw record into RecordDeserialisationException
[ https://issues.apache.org/jira/browse/KAFKA-16507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16507: - Assignee: Fred Rouleau > Add raw record into RecordDeserialisationException > -- > > Key: KAFKA-16507 > URL: https://issues.apache.org/jira/browse/KAFKA-16507 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Fred Rouleau >Assignee: Fred Rouleau >Priority: Minor > Labels: kip > > [KIP-334|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87297793] > introduced into the Consumer the RecordDeserializationException with offsets > information. That is useful to skip a poison pill but as you do not have > access to the Record, it still prevents easy implementation of dead letter > queue or simply logging the faulty data. > Changes are described in > [KIP-1036|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16507 Add raw record into RecordDeserialisationException [kafka]
kirktrue commented on code in PR #15691: URL: https://github.com/apache/kafka/pull/15691#discussion_r1560342179 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java: ## @@ -326,7 +327,17 @@ ConsumerRecord parseRecord(Deserializers deserializers, key, value, headers, leaderEpoch); } catch (RuntimeException e) { log.error("Deserializers with error: {}", deserializers); -throw new RecordDeserializationException(partition, record.offset(), +ByteBuffer keyBytes = record.key(); +byte[] key = org.apache.kafka.common.utils.Utils.toNullableArray(keyBytes); +ByteBuffer valueBytes = record.value(); +byte[] value = Utils.toNullableArray(valueBytes); +Headers headers = new RecordHeaders(record.headers()); +ConsumerRecord consumerRecord = new ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(), +record.timestamp(), timestampType, +keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(), +valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(), +key, value, headers, Optional.empty()); Review Comment: Any reason to omit the epoch value? Can we do the same as the happy path? ```suggestion ConsumerRecord consumerRecord = new ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(), record.timestamp(), timestampType, keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(), valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(), key, value, headers, leaderEpoch); ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java: ## @@ -326,7 +327,17 @@ ConsumerRecord parseRecord(Deserializers deserializers, key, value, headers, leaderEpoch); } catch (RuntimeException e) { log.error("Deserializers with error: {}", deserializers); -throw new RecordDeserializationException(partition, record.offset(), +ByteBuffer keyBytes = record.key(); +byte[] key = org.apache.kafka.common.utils.Utils.toNullableArray(keyBytes); Review Comment: Minor nit—can we use the unqualified version of `Utils` here as you do a couple lines down? That is: ```suggestion byte[] key = Utils.toNullableArray(keyBytes); ``` ## clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java: ## @@ -16,29 +16,38 @@ */ package org.apache.kafka.common.errors; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; + /** * This exception is raised for any error that occurs while deserializing records received by the consumer using * the configured {@link org.apache.kafka.common.serialization.Deserializer}. */ public class RecordDeserializationException extends SerializationException { -private static final long serialVersionUID = 1L; +private static final long serialVersionUID = 2L; private final TopicPartition partition; -private final long offset; +private final ConsumerRecord consumerRecord; -public RecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause) { +public RecordDeserializationException(TopicPartition partition, + ConsumerRecord record, + String message, + Throwable cause) { Review Comment: IIUC, @mjsax mentioned on the mailing list that we need to keep the existing constructor signature as is and add an overloaded version that accepts the `ConsumerRecord`. Although I'm not sure why... 🤔 ## clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java: ## @@ -16,29 +16,38 @@ */ package org.apache.kafka.common.errors; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; + Review Comment: In general, we try to avoid unnecessary whitespace changes. ```suggestion ``` ## clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java: ## @@ -16,29 +16,38 @@ */ package org.apache.kafka.common.errors; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; + /** * This exception is raised for any error that occurs while deserializing records received by the consumer using * the configured {@link org.apache.kafka.common.serialization.Deserializer}. */ public class RecordDeserializationE
Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]
dongnuo123 commented on code in PR #15662: URL: https://github.com/apache/kafka/pull/15662#discussion_r1560320708 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -180,12 +192,19 @@ public static class DeadlineAndEpoch { */ private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY; +/** + * Map of protocol names to the number of members that use legacy protocol and support them. + */ +private final TimelineHashMap legacyProtocolMembersSupportedProtocols; Review Comment: In `supportsClassicProtocols(String memberProtocolType, Set memberProtocols)`, we need it to check if at least one of the given protocols in the `JoinGroupRequest` can be supported if a consumer using the classic protocol joins the group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]
dongnuo123 commented on code in PR #15662: URL: https://github.com/apache/kafka/pull/15662#discussion_r1560320708 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -180,12 +192,19 @@ public static class DeadlineAndEpoch { */ private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY; +/** + * Map of protocol names to the number of members that use legacy protocol and support them. + */ +private final TimelineHashMap legacyProtocolMembersSupportedProtocols; Review Comment: We will need it to check if at least one of the given protocols in the `JoinGroupRequest` can be supported if a consumer using the classic protocol joins the group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]
dongnuo123 commented on code in PR #15662: URL: https://github.com/apache/kafka/pull/15662#discussion_r1560315985 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -761,6 +776,58 @@ public ClassicGroup classicGroup( } } +/** + * Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request. + * + * @param classicGroup A ClassicGroup. + * @return the boolean indicating whether it's valid to online upgrade the classic group. + */ +private boolean validateOnlineUpgrade(ClassicGroup classicGroup) { +if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) { +log.debug("Online upgrade is invalid because the consumer group {} migration config is {} so online upgrade is not enabled.", +classicGroup.groupId(), consumerGroupMigrationPolicy); +return false; +} else if (classicGroup.isInState(DEAD)) { Review Comment: Yeah it makes sense. Actually I think we don't set classicGroup to dead at all. We can delete the else if -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16509: CurrentControllerId metric is unreliable in ZK mode [kafka]
mumrah commented on code in PR #15695: URL: https://github.com/apache/kafka/pull/15695#discussion_r1560175751 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -657,15 +657,19 @@ class KafkaServer( } private def createCurrentControllerIdMetric(): Unit = { - KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID, () => { - Option(metadataCache) match { -case None => -1 -case Some(cache) => cache.getControllerId match { Review Comment: Ah right, makes sense 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16509: CurrentControllerId metric is unreliable in ZK mode [kafka]
cmccabe commented on code in PR #15695: URL: https://github.com/apache/kafka/pull/15695#discussion_r1560167894 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -657,15 +657,19 @@ class KafkaServer( } private def createCurrentControllerIdMetric(): Unit = { - KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID, () => { - Option(metadataCache) match { -case None => -1 -case Some(cache) => cache.getControllerId match { - case None => -1 - case Some(id) => id.id -} - } -}) + KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID, + () => getCurrentControllerIdFromOldController()) + } + + /** + * Get the current controller ID from the old controller code. + * This is the most up-to-date controller ID we can get when in ZK mode. + */ + def getCurrentControllerIdFromOldController(): Int = { +Option(_kafkaController) match { + case None => -1 Review Comment: yes, that's right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16509: CurrentControllerId metric is unreliable in ZK mode [kafka]
cmccabe commented on code in PR #15695: URL: https://github.com/apache/kafka/pull/15695#discussion_r1560168052 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -657,15 +657,19 @@ class KafkaServer( } private def createCurrentControllerIdMetric(): Unit = { - KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID, () => { - Option(metadataCache) match { -case None => -1 -case Some(cache) => cache.getControllerId match { Review Comment: It should be OK in hybrid mode since we are still updating the znode in hybrid mode. And this comes from there directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16509: CurrentControllerId metric is unreliable in ZK mode [kafka]
mumrah commented on code in PR #15695: URL: https://github.com/apache/kafka/pull/15695#discussion_r1560161829 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -657,15 +657,19 @@ class KafkaServer( } private def createCurrentControllerIdMetric(): Unit = { - KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID, () => { - Option(metadataCache) match { -case None => -1 -case Some(cache) => cache.getControllerId match { - case None => -1 - case Some(id) => id.id -} - } -}) + KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID, + () => getCurrentControllerIdFromOldController()) + } + + /** + * Get the current controller ID from the old controller code. + * This is the most up-to-date controller ID we can get when in ZK mode. + */ + def getCurrentControllerIdFromOldController(): Int = { +Option(_kafkaController) match { + case None => -1 Review Comment: Is this just to cover the startup case? (When `_kafkaController` is None) ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -657,15 +657,19 @@ class KafkaServer( } private def createCurrentControllerIdMetric(): Unit = { - KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID, () => { - Option(metadataCache) match { -case None => -1 -case Some(cache) => cache.getControllerId match { Review Comment: Will this work in hybrid mode? Don't we still need to read from the metadata cache to get the KRaft controller ID once we've entered hybrid mode? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16211: Inconsistent config values in CreateTopicsResult and DescribeConfigsResult [kafka]
infantlikesprogramming opened a new pull request, #15696: URL: https://github.com/apache/kafka/pull/15696 *When creating a topic in KRaft cluster, a config value returned by createTopics() is different than what you get from describeConfigs().* *My guess from reading the code is that describeTopics() will send the request to a specified broker if the config resource is broker (ConfigResouce.Type.BROKER). In the case that the config resource is topic (ConfigResouce.Type.TOPIC), then a broker will be assigned using LeastLoadBrokerOrActiveKController() (in KafkaAdminClient), which in this situation will assign the "least loaded" broker. I have tested this and, indeed, each time I use describeConfigs() with the ConfigResource's type being "TOPIC", a different broker's static configuration may be returned. My question is: Is it supposed to be the way describeConfigs() should be used with the configResource's type being ConfigResouce.Type.TOPIC? Or even, are we supposed to use describeConfig() with configResource's type being ConfigResouce.Type.TOPIC instead of strictly with ConfigResouce.Type.BROKER? I have also added some comments in the code for my understanding of the logic of describeConfigs() * ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]
soarez commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1560139446 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -40,7 +92,72 @@ public interface TierStateMachine { */ PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception; + PartitionData fetchPartitionData) throws Exception { +OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); +long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); + +long offsetToFetch; + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); + +try { +offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset()); +} catch (RemoteStorageException e) { + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark(); +throw e; +} + +OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +long leaderEndOffset = fetchLatestOffsetResult.offset(); + +long initialLag = leaderEndOffset - offsetToFetch; + +return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(), +Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch()); Review Comment: Should `replicaMgr.futureLogOrException` be used instead, if `useFutureLog`? ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ## @@ -175,8 +186,38 @@ private OffsetHolder getEarliestLocalOffset(TopicPartition topicPartition) { return new OffsetHolder(LogFileUtils.offsetFromFileName(firstLogFile.get()), partitionFiles); } -private List getTopicPartitionFiles(TopicPartition topicPartition) { -File[] files = brokerStorageDirectory.listFiles((dir, name) -> name.equals(topicPartition.toString())); +public boolean isTopicPartitionFileExistInDir(TopicPartition topicPartition, File logDir) { Review Comment: Maybe `dirContainsTopicPartition` is a better name for this method? ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ## @@ -31,31 +31,36 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; public final class BrokerLocalStorage { private final Integer brokerId; -private final File brokerStorageDirectory; +private final Set brokerStorageDirectory; private final Integer storageWaitTimeoutSec; private final int storagePollPeriodSec = 1; private final Time time = Time.SYSTEM; public BrokerLocalStorage(Integer brokerId, - String storageDirname, + Set storageDirname, Integer storageWaitTimeoutSec) { this.brokerId = brokerId; -this.brokerStorageDirectory = new File(storageDirname); +this.brokerStorageDirectory = storageDirname.stream().map(File::new).collect(Collectors.toSet()); Review Comment: The names for parameter `storageDirname` and field `brokerStorageDirectory` should be pluralized. ## core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala: ## @@ -23,20 +23,23 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.record._ import org.apache.kafka.common.{TopicPartition, Uuid} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test import kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch} +import org.junit.jupiter.api.extension.ExtensionContext +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{Arguments, ArgumentsProvider, ArgumentsSource} import scala.collection.Map -class ReplicaFetcherTierStateMachineTest { +class TierStateMachineTest { - val truncateOnFetch = true + val truncateOnFetch = false Review Comment: This can be removed ## core/src/test/scala/unit/kafka/server/T
[jira] [Assigned] (KAFKA-16509) CurrentControllerId metric is unreliable in ZK mode
[ https://issues.apache.org/jira/browse/KAFKA-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-16509: Assignee: Colin McCabe > CurrentControllerId metric is unreliable in ZK mode > --- > > Key: KAFKA-16509 > URL: https://issues.apache.org/jira/browse/KAFKA-16509 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > > The CurrentControllerId metric added by KIP-1001 is unreliable in ZK mode. > Sometimes when there is no active ZK-based controller, it still shows the > previous controller ID. Instead, it should show -1 in that situation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16509: CurrentControllerId metric is unreliable in ZK mode [kafka]
cmccabe opened a new pull request, #15695: URL: https://github.com/apache/kafka/pull/15695 The CurrentControllerId metric added by KIP-1001 is unreliable in ZK mode. Sometimes when there is no active ZK-based controller, it still shows the previous controller ID. Instead, it should show -1 in that situation. This PR fixes that by using the controller ID from the KafkaController.scala, which is obtained directly from the controller znode. It also adds a new test, ControllerIdMetricTest.scala. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16509) CurrentControllerId metric is unreliable in ZK mode
Colin McCabe created KAFKA-16509: Summary: CurrentControllerId metric is unreliable in ZK mode Key: KAFKA-16509 URL: https://issues.apache.org/jira/browse/KAFKA-16509 Project: Kafka Issue Type: Bug Reporter: Colin McCabe The CurrentControllerId metric added by KIP-1001 is unreliable in ZK mode. Sometimes when there is no active ZK-based controller, it still shows the previous controller ID. Instead, it should show -1 in that situation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
kirktrue commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1559907295 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -157,8 +157,10 @@ public class AsyncKafkaConsumerTest { @AfterEach public void resetAll() { backgroundEventQueue.clear(); -if (consumer != null) { +try { consumer.close(Duration.ZERO); +} catch (Exception e) { +// ignore Review Comment: I'm a little leery about swallowing the exception here. Can we validate the exception type is something we expect? e.g.: ```suggestion } catch (Exception e) { assertInstanceOf(KafkaException.class, e); ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1388,6 +1393,37 @@ public void commitSync(Map offsets, Duration } } +private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean disableWakeup) { +if (lastPendingAsyncCommit == null) { +return; +} + +try { +CompletableFuture futureToAwait; +if (!disableWakeup) { +// We don't want the wake-up trigger to complete our pending async commit future, +// so create new future here. +futureToAwait = new CompletableFuture<>(); +lastPendingAsyncCommit.whenComplete((v, t) -> { +if (t != null) { +futureToAwait.completeExceptionally(t); +} else { +futureToAwait.complete(v); +} +}); +wakeupTrigger.setActiveTask(futureToAwait); +} else { +futureToAwait = lastPendingAsyncCommit; +} +ConsumerUtils.getResult(futureToAwait, timer); +lastPendingAsyncCommit = null; +} finally { +if (!disableWakeup) wakeupTrigger.clearTask(); +timer.update(); +} Review Comment: Do we want to clear out the `lastPendingAsyncCommit` in the `finally` block: ```suggestion ConsumerUtils.getResult(futureToAwait, timer); } finally { lastPendingAsyncCommit = null; if (!disableWakeup) wakeupTrigger.clearTask(); timer.update(); } ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1952,10 +1988,6 @@ private void maybeThrowFencedInstanceException() { } } -private void maybeInvokeCommitCallbacks() { -offsetCommitCallbackInvoker.executeCallbacks(); -} - Review Comment: Any reason we don't want to keep this method abstraction? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16466) QuorumController is swallowing some exception messages
[ https://issues.apache.org/jira/browse/KAFKA-16466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835846#comment-17835846 ] Ilya Zakharov commented on KAFKA-16466: --- [~chiacyu] Hello! Сan you tell me if you're working on this task right now? If not, I would like to pick up this task. > QuorumController is swallowing some exception messages > -- > > Key: KAFKA-16466 > URL: https://issues.apache.org/jira/browse/KAFKA-16466 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 3.7.0 >Reporter: David Arthur >Assignee: Chia Chuan Yu >Priority: Major > Labels: good-first-issue > Fix For: 3.8.0, 3.7.1 > > > In some cases in QuorumController, we throw exceptions from the control > manager methods. Unless these are explicitly caught and handled, they will > eventually bubble up to the ControllerReadEvent/ControllerWriteEvent an hit > the generic error handler. > In the generic error handler of QuorumController, we examine the exception to > determine if it is a fault or not. In the case where it is not a fault, we > log the error like: > {code:java} > log.info("{}: {}", name, failureMessage); > {code} > which results in messages like > {code:java} > [2024-04-02 16:08:38,078] INFO [QuorumController id=3000] registerBroker: > event failed with UnsupportedVersionException in 167 microseconds. > (org.apache.kafka.controller.QuorumController:544) > {code} > In this case, the exception actually has more details in its own message > {code:java} > Unable to register because the broker does not support version 8 of > metadata.version. It wants a version between 20 and 20, inclusive. > {code} > We should include the exception's message in the log output for non-fault > errors as it includes very useful debugging info. > This was found while writing an integration test for KRaft migration where > the brokers and controllers have a mismatched MetadataVersion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac merged PR #15411: URL: https://github.com/apache/kafka/pull/15411 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16294) Add group protocol migration enabling config
[ https://issues.apache.org/jira/browse/KAFKA-16294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-16294. - Fix Version/s: 3.8.0 Resolution: Fixed > Add group protocol migration enabling config > > > Key: KAFKA-16294 > URL: https://issues.apache.org/jira/browse/KAFKA-16294 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongnuo Lyu >Assignee: Dongnuo Lyu >Priority: Major > Fix For: 3.8.0 > > > The online upgrade is triggered when a consumer group heartbeat request is > received in a classic group. The downgrade is triggered when any old protocol > request is received in a consumer group. We only accept upgrade/downgrade if > the corresponding group migration config policy is enabled. > This is the first part of the implementation of online group protocol > migration, adding the kafka config group protocol migration. The config has > four valid values – both(both upgrade and downgrade are allowed), > upgrade(only upgrade is allowed), downgrade(only downgrade is allowed) and > none(neither is allowed.). > At present the default value is NONE. When we start enabling the migration, > we expect to set BOTH to default so that it's easier to roll back to the old > protocol as a quick fix for anything wrong in the new protocol; when using > consumer groups becomes default and the migration is near finished, we will > set the default policy to UPGRADE to prevent unwanted downgrade causing too > frequent migration. DOWNGRADE could be useful for revert or debug purposes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16287: Implement example tests for common rebalance callback [kafka]
kirktrue commented on PR #15408: URL: https://github.com/apache/kafka/pull/15408#issuecomment-2048153273 @lucasbru—sorry that I've forgotten, but why don't we want to merge this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16433: BeginningAndEndOffsets and OffsetsForTimes should send an event and return empty with zero timeout provided [kafka]
kirktrue commented on PR #15688: URL: https://github.com/apache/kafka/pull/15688#issuecomment-2048148062 @philipnee—sorry I'm late to the party, but why do we submit the event to the queue when the timeout is 0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison commented on PR #15516: URL: https://github.com/apache/kafka/pull/15516#issuecomment-2048128916 @divijvaidya It seems you've done a bit of work around compression in the past. Can you take a look? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7632: Support Compression Level [kafka]
mimaison commented on PR #10826: URL: https://github.com/apache/kafka/pull/10826#issuecomment-2048127235 I opened https://github.com/apache/kafka/pull/15516 to implement this KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
lucasbru commented on code in PR #15661: URL: https://github.com/apache/kafka/pull/15661#discussion_r1559790393 ## tests/kafkatest/services/verifiable_consumer.py: ## @@ -135,6 +135,28 @@ def last_commit(self, tp): else: return None +# This needs to be used for cooperative and consumer protocol +class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler): Review Comment: Seems to me that it would have been cleaner to have a single implementation that treats the value that is passed in `onAssigned` correctly (it is always incremental by contract, just happens to be non-incremental in the eager case), instead of having two implementations now. But I'll leave it to a follow-up PR to clean it up, if you agree, and merge this now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
lucasbru merged PR #15661: URL: https://github.com/apache/kafka/pull/15661 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
chia7712 commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1559774590 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ## @@ -16,14 +16,132 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import java.util.Arrays; +import java.util.Collections; import java.util.List; /** * The group coordinator configurations. */ public class GroupCoordinatorConfig { +/** * Group coordinator configuration ***/ +public final static String GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = "group.min.session.timeout.ms"; +public final static String GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum allowed session timeout for registered consumers. Shorter timeouts result in quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources."; +public static final int GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 6000; + +public final static String GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = "group.max.session.timeout.ms"; +public final static String GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures."; +public static final int GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 180; + +public final static String GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = "group.initial.rebalance.delay.ms"; +public final static String GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The amount of time the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins."; +public static final int GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000; + +public final static String GROUP_MAX_SIZE_CONFIG = "group.max.size"; +public final static String GROUP_MAX_SIZE_DOC = "The maximum number of consumers that a single consumer group can accommodate."; +public static final int GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE; + +/** New group coordinator configs */ +public final static String NEW_GROUP_COORDINATOR_ENABLE_CONFIG = "group.coordinator.new.enable"; +public final static String NEW_GROUP_COORDINATOR_ENABLE_DOC = "Enable the new group coordinator."; +public static final boolean NEW_GROUP_COORDINATOR_ENABLE_DEFAULT = false; + +public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols"; +public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " + Utils.join(Group.GroupType.values(), ",") + ". " + +"The " + Group.GroupType.CONSUMER + " rebalance protocol is in early access and therefore must not be used in production."; +public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString()); + +public final static String GROUP_COORDINATOR_NUM_THREADS_CONFIG = "group.coordinator.threads"; +public final static String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number of threads used by the group coordinator."; +public static final int GROUP_COORDINATOR_NUM_THREADS_DEFAULT = 1; + +/** Consumer group configs */ +public final static String CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG = "group.consumer.session.timeout.ms"; +public final static String CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC = "The timeout to detect client failures when using the consumer group protocol."; +public static final int CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000; + +public final static String CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = "group.consumer.min.session.timeout.ms"; +public final static String CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum allowed session timeout for registered consumers."; +public static final int CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 45000; + +public final static String CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = "group.consumer.max.session.timeout.ms"; +public final static String CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum allowed session timeout for registered consumers."; +public static final int CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 6; + +public final static String CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = "group.consumer.heartbeat.interval.ms"; +public final static
Re: [PR] KAFKA-9914: Fix replication cycle detection [kafka]
izmal commented on PR #10277: URL: https://github.com/apache/kafka/pull/10277#issuecomment-2047967071 I my case reason for replication cycle was: `replication.policy.separator = -` With another separator works without cycle (e.g '_'). :-) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: LegacyConsumer should always await pending async commits on commitSync and close [kafka]
lucasbru commented on PR #15693: URL: https://github.com/apache/kafka/pull/15693#issuecomment-2047947707 @lianetm You have reviewed those changes already as part of https://github.com/apache/kafka/pull/15613 . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
OmniaGM commented on PR #15684: URL: https://github.com/apache/kafka/pull/15684#issuecomment-2047947749 ugh my local environment keep the wrong caches because I keep jump between branches -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
lucasbru commented on PR #15613: URL: https://github.com/apache/kafka/pull/15613#issuecomment-2047945261 Hey @lianetm. I split the PR into two, the changes for the legacy consumer go into https://github.com/apache/kafka/pull/15693. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16103: LegacyConsumer should always await pending async commits on commitSync and close [kafka]
lucasbru opened a new pull request, #15693: URL: https://github.com/apache/kafka/pull/15693 The javadoc for `KafkaConsumer.commitSync` says: > Note that asynchronous offset commits sent previously with the {https://github.com/link #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. This is not always true in the legacy consumer, when the set of offsets is empty, the execution of the commit callback is not always awaited. There are also various races possible that can avoid callback handler execution. Similarly, there is code in the legacy consumer to await the completion of the commit callback before closing, however, the code doesn't cover all cases and the behavior is therefore inconsistent. While the javadoc doesn't explicitly promise callback execution, it promises "completing commits", which one would reasonably expect to include callback execution. Either way, the current behavior of the legacy consumer is inconsistent. This change proposed a number of fixes to clean up the callback execution guarantees: - We also need to await async commits that are "pending" instead of "in-flight", because we do not know the coordinator yet. - In close, we need do not only execute the commit listeners of "pending" commits, but also those of "in-flight" commits. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
chia7712 commented on PR #15684: URL: https://github.com/apache/kafka/pull/15684#issuecomment-2047933442 @OmniaGM it seems there are build error. could you fix them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
OmniaGM commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1559709137 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -20,16 +20,51 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { Review Comment: I updated the pr now. And rebased from trunk as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
OmniaGM commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1559706475 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -20,16 +20,51 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { Review Comment: sorry I meant `org.apache.kafka.coordinator.group` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
chia7712 commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1559703336 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -20,16 +20,51 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { Review Comment: > then we can change the constructor of GroupCoordinatorConfig to accept KafkaConfig that makes sense to me. A unified way to generate those config class can make consistent behavior. > I moved it to kafka.coordinator.group I assume the package you mentioned is in core module, but I'm ok with your approach since it can avoid rewriting java code back to scala code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16508) Infinte loop if output topic does not exisit
Matthias J. Sax created KAFKA-16508: --- Summary: Infinte loop if output topic does not exisit Key: KAFKA-16508 URL: https://issues.apache.org/jira/browse/KAFKA-16508 Project: Kafka Issue Type: Improvement Components: streams Reporter: Matthias J. Sax Kafka Streams supports `ProductionExceptionHandler` to drop records on error when writing into an output topic. However, if the output topic does not exist, the corresponding error cannot be skipped over because the handler is not called. The issue is, that the producer internally retires to fetch the output topic metadata until it times out, an a `TimeoutException` (which is a `RetriableException`) is returned via the registered `Callback`. However, for `RetriableException` there is different code path and the `ProductionExceptionHandler` is not called. In general, Kafka Streams correctly tries to handle as many errors a possible internally, and a `RetriableError` falls into this category (and thus there is no need to call the handler). However, for this particular case, just retrying does not solve the issue – it's unclear if throwing a retryable `TimeoutException` is actually the right thing to do for the Producer? Also not sure what the right way to address this ticket would be (currently, we cannot really detect this case, except if we would do some nasty error message String comparison what sounds hacky...) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
OmniaGM commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1559689777 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -20,16 +20,51 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { Review Comment: I would suggest that we wait until KafkaConfig is fully migrated out of core and then we can change the constructor of `GroupCoordinatorConfig` to accept [KafkaConfig](https://issues.apache.org/jira/browse/KAFKA-15853?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17789703#comment-17789703) and it extract any needed grouping out KafkaConfig definition. WDYT? This might be the easiest way -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
OmniaGM commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1559689777 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -20,16 +20,51 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { Review Comment: I would suggest that we wait until KafkaConfig is fully migrated out of core and then we can change the constructor of `GroupCoordinatorConfig` to accept KafkaConfig and it extract any needed grouping out KafkaConfig definition. WDYT? This might be the easiest way -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
OmniaGM commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1559689777 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -20,16 +20,51 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { Review Comment: I would suggest that we wait until KafkaConfig is fully migrated out and then we can change the constructor of `GroupCoordinatorConfig` to accept KafkaConfig and it extract any needed grouping out KafkaConfig definition. WDYT? This might be the easiest way -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16507 Add raw record into RecordDeserialisationException [kafka]
fred-ro opened a new pull request, #15691: URL: https://github.com/apache/kafka/pull/15691 Add raw ConsumerRecord data to RecordDeserialisationException to make DLQ implementation easier *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16507) Add raw record into RecordDeserialisationException
[ https://issues.apache.org/jira/browse/KAFKA-16507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fred Rouleau updated KAFKA-16507: - Labels: kip (was: ) > Add raw record into RecordDeserialisationException > -- > > Key: KAFKA-16507 > URL: https://issues.apache.org/jira/browse/KAFKA-16507 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Fred Rouleau >Priority: Minor > Labels: kip > > [KIP-334|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87297793] > introduced into the Consumer the RecordDeserializationException with offsets > information. That is useful to skip a poison pill but as you do not have > access to the Record, it still prevents easy implementation of dead letter > queue or simply logging the faulty data. > Changes are described in > [KIP-1036|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16507) Add raw record into RecordDeserialisationException
Fred Rouleau created KAFKA-16507: Summary: Add raw record into RecordDeserialisationException Key: KAFKA-16507 URL: https://issues.apache.org/jira/browse/KAFKA-16507 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Fred Rouleau [KIP-334|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87297793] introduced into the Consumer the RecordDeserializationException with offsets information. That is useful to skip a poison pill but as you do not have access to the Record, it still prevents easy implementation of dead letter queue or simply logging the faulty data. Changes are described in [KIP-1036|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
OmniaGM commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1559674776 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -20,16 +20,51 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { Review Comment: I moved it to `kafka.coordinator.group` for now we can have a followup for [KAFKA-15089](https://github.com/apache/kafka/pull/15684/commits/581e9a88089aa9881ec28de75877350e17711b3e) to look into removing this class later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16505) KIP-1034: Dead letter queue in Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-16505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16505: Component/s: streams > KIP-1034: Dead letter queue in Kafka Streams > > > Key: KAFKA-16505 > URL: https://issues.apache.org/jira/browse/KAFKA-16505 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Damien Gasparina >Priority: Major > > See KIP: KIP-1034: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16505) KIP-1034: Dead letter queue in Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-16505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16505: Labels: KIP (was: ) > KIP-1034: Dead letter queue in Kafka Streams > > > Key: KAFKA-16505 > URL: https://issues.apache.org/jira/browse/KAFKA-16505 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Damien Gasparina >Priority: Major > Labels: KIP > > See KIP: KIP-1034: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
philipnee commented on code in PR #15661: URL: https://github.com/apache/kafka/pull/15661#discussion_r1559662337 ## tests/kafkatest/services/verifiable_consumer.py: ## @@ -135,6 +135,28 @@ def last_commit(self, tp): else: return None +# This needs to be used for cooperative and consumer protocol +class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler): Review Comment: I believe the current listener assumes Eager protocol so it is not making the incorrect assumptions. This (incremental handler) would probably work for Eager as well but I thought it would be clearer to distinguish the two. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15309) Add custom error handler to Producer
[ https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alieh Saeedi reassigned KAFKA-15309: Assignee: Alieh Saeedi > Add custom error handler to Producer > > > Key: KAFKA-15309 > URL: https://issues.apache.org/jira/browse/KAFKA-15309 > Project: Kafka > Issue Type: New Feature > Components: producer >Reporter: Matthias J. Sax >Assignee: Alieh Saeedi >Priority: Major > Labels: needs-kip > Attachments: KafkaProducerReproducer.java, app.log > > > The producer collects multiple records into batches, and a single record > specific error might fail the whole batch (eg, `RecordTooLargeException`). > This ticket suggests to add a per-record error handler, that allows user to > opt into skipping bad records without failing the whole batch (similar to > Kafka Streams `ProductionExceptionHandler`). > The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused > https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. > Another example for which a production exception handler could be useful, if > a user tries to write into a non-existing topic, which returns a retryable > error code; with infinite retries the producer would hang retrying forever. A > handler could help to break the infinite retry loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
chia7712 commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1559616179 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -20,16 +20,51 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { Review Comment: > OffsetConfig is only used by Scala code so it will disappear when we remove it or we can move `OffsetConfig` to `kafka.coordinator.group` since it is used by `kafka.coordinator.group.GroupCoordinator` and `kafka.coordinator.group.GroupMetadataManager`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]
mimaison commented on code in PR #15558: URL: https://github.com/apache/kafka/pull/15558#discussion_r1559584216 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -320,4 +323,21 @@ static void createCompactedTopic(String topicName, short partitions, short repli static void createSinglePartitionCompactedTopic(String topicName, short replicationFactor, Admin admin) { createCompactedTopic(topicName, (short) 1, replicationFactor, admin); } + +static T adminCall(Callable callable, Supplier errMsg) +throws ExecutionException, InterruptedException { +try { +return callable.call(); +} catch (ExecutionException | InterruptedException e) { +Throwable cause = e.getCause(); +if (cause instanceof TopicAuthorizationException || +cause instanceof ClusterAuthorizationException || +cause instanceof GroupAuthorizationException) { +log.error("Authorization error occurred while trying to " + errMsg.get()); Review Comment: Could we do something like: ``` log.error(cause.getClass().getSimpleName() + " occurred while trying to " + errMsg.get()); ``` so the exact exception is printed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16502) Fix flaky EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore
[ https://issues.apache.org/jira/browse/KAFKA-16502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16502: Component/s: streams unit tests > Fix flaky > EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore > -- > > Key: KAFKA-16502 > URL: https://issues.apache.org/jira/browse/KAFKA-16502 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Priority: Major > > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Expected ERROR state but driver is on RUNNING ==> expected: but was: > > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:350) > at > app//org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore(EOSUncleanShutdownIntegrationTest.java:169) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) > at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at > java.base@11.0.16.1/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16502) Fix flaky EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore
[ https://issues.apache.org/jira/browse/KAFKA-16502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16502: Labels: flaky-test (was: ) > Fix flaky > EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore > -- > > Key: KAFKA-16502 > URL: https://issues.apache.org/jira/browse/KAFKA-16502 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Priority: Major > Labels: flaky-test > > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Expected ERROR state but driver is on RUNNING ==> expected: but was: > > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:350) > at > app//org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore(EOSUncleanShutdownIntegrationTest.java:169) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) > at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at > java.base@11.0.16.1/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
dajac commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1559600382 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -20,16 +20,51 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { Review Comment: `OffsetConfig` is only used by Scala code so it will disappear when we remove it. The `GroupConfig` in Scala may never be migrated to Java as `GroupCoordinatorConfig` already contains everything, I think. I wonder if we could actually replace `OffsetConfig` by an interface and make `GroupCoordinatorConfig` implements it. Then, we could pass `GroupCoordinatorConfig` to the old code too. I am not sure if this is feasible though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16336) Remove Deprecated metric standby-process-ratio
[ https://issues.apache.org/jira/browse/KAFKA-16336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835763#comment-17835763 ] Kiriakos Marantidis commented on KAFKA-16336: - Hi [~mjsax] , I would like to work on this issue. I see that no one else is working on this. Please, let me know it it's ok if I look at it. Kind regards, Kiriakos > Remove Deprecated metric standby-process-ratio > -- > > Key: KAFKA-16336 > URL: https://issues.apache.org/jira/browse/KAFKA-16336 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > Metric "standby-process-ratio" was deprecated in 3.5 release via > https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16506) add the scala version of tool-related classes back to core module to follow KIP-906
[ https://issues.apache.org/jira/browse/KAFKA-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16506: -- Assignee: PoAn Yang (was: Chia-Ping Tsai) > add the scala version of tool-related classes back to core module to follow > KIP-906 > --- > > Key: KAFKA-16506 > URL: https://issues.apache.org/jira/browse/KAFKA-16506 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Major > > According to > https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines > , we have to deprecate the scala version of tool-related classes instead of > deleting them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16506) add the scala version of tool-related classes back to core module to follow KIP-906
[ https://issues.apache.org/jira/browse/KAFKA-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835754#comment-17835754 ] Chia-Ping Tsai commented on KAFKA-16506: [~yangpoan]thanks for taking over this jira. please help us list all deleted tool-related classes. thanks! > add the scala version of tool-related classes back to core module to follow > KIP-906 > --- > > Key: KAFKA-16506 > URL: https://issues.apache.org/jira/browse/KAFKA-16506 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > According to > https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines > , we have to deprecate the scala version of tool-related classes instead of > deleting them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1559526460 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java: ## @@ -17,13 +17,12 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.transaction.TransactionLogConfigs; import java.util.Set; public class ProducerStateManagerConfig { -public static final String PRODUCER_ID_EXPIRATION_MS = "producer.id.expiration.ms"; -public static final String TRANSACTION_VERIFICATION_ENABLED = "transaction.partition.verification.enable"; -public static final Set RECONFIGURABLE_CONFIGS = Utils.mkSet(PRODUCER_ID_EXPIRATION_MS, TRANSACTION_VERIFICATION_ENABLED); +public static final Set RECONFIGURABLE_CONFIGS = Utils.mkSet(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG); Review Comment: Do we need to put those in `ProducerStateManagerConfig`? `DynamicBrokerConfig` is the class that has power to decide the reconfigurable configs (https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L1136). and so It seems to me those reconfigurable configs should be moved to `DynamicBrokerConfig`. ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1062,21 +1027,21 @@ object KafkaConfig { .define(CompressionTypeProp, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, CompressionTypeDoc) /** * Transaction management configuration ***/ - .define(TransactionalIdExpirationMsProp, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionalIdExpirationMsDoc) - .define(TransactionsMaxTimeoutMsProp, INT, Defaults.TRANSACTIONS_MAX_TIMEOUT_MS, atLeast(1), HIGH, TransactionsMaxTimeoutMsDoc) - .define(TransactionsTopicMinISRProp, INT, Defaults.TRANSACTIONS_TOPIC_MIN_ISR, atLeast(1), HIGH, TransactionsTopicMinISRDoc) - .define(TransactionsLoadBufferSizeProp, INT, Defaults.TRANSACTIONS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, TransactionsLoadBufferSizeDoc) - .define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TRANSACTIONS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc) - .define(TransactionsTopicPartitionsProp, INT, Defaults.TRANSACTIONS_TOPIC_PARTITIONS, atLeast(1), HIGH, TransactionsTopicPartitionsDoc) - .define(TransactionsTopicSegmentBytesProp, INT, Defaults.TRANSACTIONS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc) - .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc) - .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc) - - .define(TransactionPartitionVerificationEnableProp, BOOLEAN, Defaults.TRANSACTION_PARTITION_VERIFICATION_ENABLE, LOW, TransactionPartitionVerificationEnableDoc) - - .define(ProducerIdExpirationMsProp, INT, Defaults.PRODUCER_ID_EXPIRATION_MS, atLeast(1), LOW, ProducerIdExpirationMsDoc) + .define(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DOC) Review Comment: `TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT` should be replaced by `TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
OmniaGM commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1559529499 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -20,16 +20,51 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { Review Comment: It is also bit odd to have a class only for the constructor that do nothing but grouping. But I can see that split it out might be better as it seems like we have `GroupConfig` in scala that will get migrated to java at some point so wouldn't make sense to have the `GroupCoordinatorConfig` grow out of hand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16506) add the scala version of tool-related classes back to core module to follow KIP-906
[ https://issues.apache.org/jira/browse/KAFKA-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835749#comment-17835749 ] PoAn Yang commented on KAFKA-16506: --- Hi [~chia7712], I'm interested in this. May I assign to myself? Thank you. > add the scala version of tool-related classes back to core module to follow > KIP-906 > --- > > Key: KAFKA-16506 > URL: https://issues.apache.org/jira/browse/KAFKA-16506 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > According to > https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines > , we have to deprecate the scala version of tool-related classes instead of > deleting them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]
chia7712 merged PR #15656: URL: https://github.com/apache/kafka/pull/15656 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync and close should await pending async commits [kafka]
lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1559430603 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -229,7 +229,11 @@ private GroupRebalanceConfig buildRebalanceConfig(Optional groupInstance @AfterEach public void teardown() { this.metrics.close(); -this.coordinator.close(time.timer(0)); +try { +this.coordinator.close(time.timer(0)); Review Comment: correct. it was just less likely before. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1360,6 +1362,9 @@ public void commitSync(Map offsets, Duration Timer requestTimer = time.timer(timeout.toMillis()); SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, requestTimer); CompletableFuture commitFuture = commit(syncCommitEvent); + +awaitPendingAsyncCommits(requestTimer, false); Review Comment: Done ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { Review Comment: Done ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount); + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(1, cb.successCount); + +// Enforce looking up the coordinator +consumer.committed(Set(tp, tp2).asJava) + +// Try with coordinator known +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava, cb) +consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(2L))).asJava) +assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset) +assertEquals(2, cb.successCount); + +// Try with empty sync commit +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset) +assertEquals(3, cb.successCount); Review Comment: Done ## core/src/test/scala/integrati
Re: [PR] MINOR : Replaced the while loop with TestUtils.waitForCondition [kafka]
chia7712 commented on PR #15678: URL: https://github.com/apache/kafka/pull/15678#issuecomment-2047578617 > If the condition doesn't meet within the maxWaitMs, the waitForCondition would throw the assertion failure. Do we need another timeout to handle that? My origin thought was to use Junit 5 timeout, but we can't observe the method from the error stack if the error is produced by junit timeout. So +1 to current approach -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-2047548232 Chris, I started changing the tests in alignment with the comments (i.e using AtomicBoolean, AtomicReference and removing try-catch block). I noticed an interesting issue with `testFlushFailureWhenWritesToPrimaryStoreFailsAndSecondarySucceedsForTombstoneRecords` test. What's happening is that when we do a get on the future returned in this case, that doesn't throw an exception. I debugged it and I think the problem is because in this case, when the primary store fails, we set the callback to error correctly. However, because the secondary store write doesn't fail, when it's callback gets invoked from [here](https://github.com/apache/kafka/pull/13801/files#diff-0b612a24267f45b927d37b223af3034feebe4363b23b53f5751f1b29e54e2aa7R331), eventually the callback's onCompletion sets it to a non-error from [here](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L369-L372). The net effect is that the .get() call on the future doesn't return an error which isn't right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16506) add the scala version of tool-related class back to core module to follow KIP-906
Chia-Ping Tsai created KAFKA-16506: -- Summary: add the scala version of tool-related class back to core module to follow KIP-906 Key: KAFKA-16506 URL: https://issues.apache.org/jira/browse/KAFKA-16506 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai According to https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines , we have to deprecate the scala version of tool-related classes instead of deleting them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16506) add the scala version of tool-related class back to core module to follow KIP-906
[ https://issues.apache.org/jira/browse/KAFKA-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16506: -- Assignee: Chia-Ping Tsai > add the scala version of tool-related class back to core module to follow > KIP-906 > - > > Key: KAFKA-16506 > URL: https://issues.apache.org/jira/browse/KAFKA-16506 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > According to > https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines > , we have to deprecate the scala version of tool-related classes instead of > deleting them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16506) add the scala version of tool-related classes back to core module to follow KIP-906
[ https://issues.apache.org/jira/browse/KAFKA-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16506: --- Summary: add the scala version of tool-related classes back to core module to follow KIP-906 (was: add the scala version of tool-related class back to core module to follow KIP-906) > add the scala version of tool-related classes back to core module to follow > KIP-906 > --- > > Key: KAFKA-16506 > URL: https://issues.apache.org/jira/browse/KAFKA-16506 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > According to > https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines > , we have to deprecate the scala version of tool-related classes instead of > deleting them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14517) Implement regex subscriptions
[ https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phuc Hong Tran reassigned KAFKA-14517: -- Assignee: Phuc Hong Tran (was: Jimmy Wang) > Implement regex subscriptions > - > > Key: KAFKA-14517 > URL: https://issues.apache.org/jira/browse/KAFKA-14517 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848-preview > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
chia7712 commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1559398541 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -20,16 +20,51 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { Review Comment: it seems to me the package `GroupCoordinatorConfig.OffsetConfig` is a bit confused, and move `OffsetConfig` out of `GroupCoordinatorConfig` should be fine. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16482: Eliminate the IDE warnings of accepting ClusterConfig in BeforeEach [kafka]
chia7712 commented on code in PR #15676: URL: https://github.com/apache/kafka/pull/15676#discussion_r1559385018 ## core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala: ## @@ -19,32 +19,31 @@ package kafka.coordinator.transaction import kafka.network.SocketServer import kafka.server.{IntegrationTestUtils, KafkaConfig} -import kafka.test.annotation.{AutoStart, ClusterTest, ClusterTests, Type} +import kafka.test.ClusterInstance +import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions -import kafka.test.{ClusterConfig, ClusterInstance} import org.apache.kafka.common.message.InitProducerIdRequestData import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse} import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{BeforeEach, Disabled, Timeout} import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.{Disabled, Timeout} import java.util.stream.{Collectors, IntStream} import scala.concurrent.duration.DurationInt import scala.jdk.CollectionConverters._ + +@ClusterTestDefaults(serverProperties = Array( + new ClusterConfigProperty(key = "transaction.state.log.num.partitions", value = "1"), + new ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "3") Review Comment: this is equal to the default value (https://github.com/apache/kafka/blob/trunk/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java#L23) , so maybe we can remove it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
nizhikov commented on PR #15645: URL: https://github.com/apache/kafka/pull/15645#issuecomment-2047311878 > I mean we can add more tests for zk and kraft in this PR, and it would be nice to use new test infra (ClusterTestExtensions) to rewrite it. WDYT? Agree. I will try to extend coverage of the test shortly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15309) Add custom error handler to Producer
[ https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alieh Saeedi reassigned KAFKA-15309: Assignee: (was: Alieh Saeedi) > Add custom error handler to Producer > > > Key: KAFKA-15309 > URL: https://issues.apache.org/jira/browse/KAFKA-15309 > Project: Kafka > Issue Type: New Feature > Components: producer >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > Attachments: KafkaProducerReproducer.java, app.log > > > The producer collects multiple records into batches, and a single record > specific error might fail the whole batch (eg, `RecordTooLargeException`). > This ticket suggests to add a per-record error handler, that allows user to > opt into skipping bad records without failing the whole batch (similar to > Kafka Streams `ProductionExceptionHandler`). > The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused > https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. > Another example for which a production exception handler could be useful, if > a user tries to write into a non-existing topic, which returns a retryable > error code; with infinite retries the producer would hang retrying forever. A > handler could help to break the infinite retry loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
showuon commented on PR #15136: URL: https://github.com/apache/kafka/pull/15136#issuecomment-2047306952 Will check it again within this week. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
chia7712 commented on PR #15645: URL: https://github.com/apache/kafka/pull/15645#issuecomment-2047302613 > I thought we want to move all code related to the ConfigCommand to java and remove it when ZK support will be dropped. Personally, rewriting a class which will get removed totally is a bit weird. However, I understand your purpose and effort of #15417, and hence could we make this PR more valuable by increasing its coverage? I mean we can add more tests for zk and kraft in this PR, and it would be nice to use new test infra (`ClusterTestExtensions`) to rewrite it. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15309) Add custom error handler to Producer
[ https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alieh Saeedi reassigned KAFKA-15309: Assignee: Alieh Saeedi > Add custom error handler to Producer > > > Key: KAFKA-15309 > URL: https://issues.apache.org/jira/browse/KAFKA-15309 > Project: Kafka > Issue Type: New Feature > Components: producer >Reporter: Matthias J. Sax >Assignee: Alieh Saeedi >Priority: Major > Labels: needs-kip > Attachments: KafkaProducerReproducer.java, app.log > > > The producer collects multiple records into batches, and a single record > specific error might fail the whole batch (eg, `RecordTooLargeException`). > This ticket suggests to add a per-record error handler, that allows user to > opt into skipping bad records without failing the whole batch (similar to > Kafka Streams `ProductionExceptionHandler`). > The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused > https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. > Another example for which a production exception handler could be useful, if > a user tries to write into a non-existing topic, which returns a retryable > error code; with infinite retries the producer would hang retrying forever. A > handler could help to break the infinite retry loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15568) Use incrementalAlterConfigs to update the dynamic config of broker in ConfigCommand tool
[ https://issues.apache.org/jira/browse/KAFKA-15568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-15568. Resolution: Duplicate duplicate to KAFKA-16181 > Use incrementalAlterConfigs to update the dynamic config of broker in > ConfigCommand tool > > > Key: KAFKA-15568 > URL: https://issues.apache.org/jira/browse/KAFKA-15568 > Project: Kafka > Issue Type: Improvement >Reporter: Aman Singh >Assignee: Aman Singh >Priority: Major > > As part of [this > KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API] > incrementalAlterConfigs API was introduced to change any config dynamically. > - `kakfa-config.sh (CommandConfig)` still uses `alterConfig` to update the > config. > - The tool first describes the configs and then replaces all the configs. > - We need to remember all the sensitive configs since sensitive fields are > not returned by DescribeConfigs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1559256180 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + Review Comment: If I don't have these 2 lines, the tests become flaky. With the 2 lines added, I ran all the tests in WorkerCoordinatorTest 30 times and all the tests passes. This is needed because sometimes in the test the connectivity with coordinator goes away due to session timeout and a classcast exception gets thrown. Adding logs for referecene: ``` [2024-04-10 16:35:06,023] INFO Cluster ID: kafka-cluster (org.apache.kafka.clients.Metadata:349) [2024-04-10 16:35:06,028] DEBUG Sending FindCoordinator request to broker localhost:1969 (id: 0 rack: null) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:904) [2024-04-10 16:35:06,029] DEBUG Received FindCoordinator response ClientResponse(receivedTimeMs=1712747106023, latencyMs=0, disconnected=false, timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=mockClientId, correlationId=0, headerVersion=2), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='', nodeId=0, host='', port=0, coordinators=[Coordinator(key='test-group', nodeId=0, host='localhost', port=1969, errorCode=0, errorMessage='NONE')])) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:917) [2024-04-10 16:35:06,029] INFO Discovered group coordinator localhost:1969 (id: 2147483647 rack: null) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:936) [2024-04-10 16:35:06,030] INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) [2024-04-10 16:35:06,030] DEBUG Heartbeat thread started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1481) [2024-04-10 16:35:06,030] DEBUG Cooperative rebalance triggered. Keeping assignment null until it's explicitly revoked. (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:250) [2024-04-10 16:35:06,030] INFO (Re-)joining group (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) [2024-04-10 16:35:06,031] DEBUG Sending JoinGroup (JoinGroupRequestData(groupId='test-group', sessionTimeoutMs=30, rebalanceTimeoutMs=60, memberId='', groupInstanceId=null, protocolType='connect', protocols=[JoinGroupRequestProtocol(name='compatible', metadata=[0, 1, 0, 14, 108, 101, 97, 100, 101, 114, 85, 114, 108, 58, 56, 48, 56, 51, 0, 0, 0, 0, 0, 0, 0, 4, -1, -1, -1, -1]), JoinGroupRequestProtocol(name='default', metadata=[0, 0, 0, 14, 108, 101, 97, 100, 101, 114, 85, 114, 108, 58, 56, 48, 56, 51, 0, 0, 0, 0, 0, 0, 0, 4])], reason='')) to coordinator localhost:1969 (id: 2147483647 rack: null) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:617) [2024-04-10 16:35:06,031] DEBUG Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=1, protocolType=null, protocolName='default', leader='leader', skipAssignment=false, memberId='member', members=[]) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:645) [2024-04-10 16:35:06,031] DEBUG Enabling heartbeat thread (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1449) [2024-04-10 16:35:06,031] INFO Successfully joined group with generation Generation{generationId=1, memberId='member', protocol='default'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) [2024-04-10 16:35:06,031] DEBUG Sending follower SyncGroup to coordinator localhost:1969 (id: 2147483647 rack: null): SyncGroupRequestData(groupId='test-group', generationId=1, memberId='member', groupInstanceId=null, protocolType='connect', protocolName='default', assignments=[]) (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:759) [2024-04-10 16:35:06,032] DEBUG Received successful SyncGr
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1559253060 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { +coordinator.ensureActiveGroup(); +coordinator.poll(0, () -> { +return null; +}); + +long now = time.milliseconds(); +// We keep the heartbeat thread running behind the scenes and poll frequently so that eventually +// the time goes past now + rebalanceTimeoutMs which triggers poll timeout expiry. +TestUtils.waitForCondition(() -> { +time.sleep(heartbeatIntervalMs - 1); +return time.milliseconds() > now + rebalanceTimeoutMs; +}, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for rebalance.timeout.ms"); +coordinator.poll(0, () -> { Review Comment: Thanks @showuon for the suggestions. I updated the test now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR : Replaced the while loop with TestUtils.waitForCondition [kafka]
chiacyu commented on PR #15678: URL: https://github.com/apache/kafka/pull/15678#issuecomment-2047169130 If the condition doesn't meet within the maxWaitMs, the waitForCondition would throw the assertion failure. Do we need another timeout to handle that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve logging in AssignmentsManager [kafka]
showuon commented on PR #15522: URL: https://github.com/apache/kafka/pull/15522#issuecomment-2047096126 @soarez , oops, there's conflict due to I've just merged another PR. Please help resolve it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon merged PR #15557: URL: https://github.com/apache/kafka/pull/15557 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1559170927 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) +for (topicPartition <- topicPartitions) { + if (this.promotionStates.containsKey(topicPartition)) { +val PromotionState(reassignmentState, topicId, originalDir) = this.promotionStates.get(topicPartition) +// Revert any reassignments for partitions that did not complete the future replica promotion +if (originalDir.isDefined && topicId.isDefined && reassignmentState.maybeInconsistentMetadata) { + directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, () => ()) +} +this.promotionStates.remove(topicPartition) + } Review Comment: Thanks for the explanation! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon commented on PR #15557: URL: https://github.com/apache/kafka/pull/15557#issuecomment-2047083295 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve logging in AssignmentsManager [kafka]
showuon commented on PR #15522: URL: https://github.com/apache/kafka/pull/15522#issuecomment-2047072476 Sorry, forgot about this PR. The jdk8 job failed to complete due to infra's issue. Re-triggering now: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15522/6/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
vamossagar12 commented on PR #15594: URL: https://github.com/apache/kafka/pull/15594#issuecomment-2047030266 @lucasbru , that test did pass. However, let me try again with the snippet you shared above and see if it works. Let me get back to you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16433: BeginningAndEndOffsets and OffsetsForTimes should send an event and return empty with zero timeout provided [kafka]
lucasbru merged PR #15688: URL: https://github.com/apache/kafka/pull/15688 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
soarez commented on PR #15136: URL: https://github.com/apache/kafka/pull/15136#issuecomment-2046971782 @showuon PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16505) KIP-1034: Dead letter queue in Kafka Streams
Damien Gasparina created KAFKA-16505: Summary: KIP-1034: Dead letter queue in Kafka Streams Key: KAFKA-16505 URL: https://issues.apache.org/jira/browse/KAFKA-16505 Project: Kafka Issue Type: Improvement Reporter: Damien Gasparina See KIP: KIP-1034: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
lucasbru commented on PR #15594: URL: https://github.com/apache/kafka/pull/15594#issuecomment-2046963651 @vamossagar12 did the test you ran pass? Here is an example how I run parameterized tests using a test suite file: ``` consumer_test: - tests/kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"} ``` The change looks fine to me. If you want to be sure that the test set up works, you may want to run the parameter combinations and post the results here. However, if you have tested one parameter combination successfully, and you are confident that the general test setup is working, I am fine with merging it like this (please confirm). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve logging in AssignmentsManager [kafka]
soarez commented on PR #15522: URL: https://github.com/apache/kafka/pull/15522#issuecomment-2046961029 Failing tests are all unrelated and tracked: * kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest."testNoConsumeWithDescribeAclViaAssign(String).quorum=kraft" [KAFKA-8250](https://issues.apache.org/jira/browse/KAFKA-8250) * kafka.zk.ZkMigrationIntegrationTest.testMigrateTopicDeletions [7] Type=ZK, MetadataVersion=3.7-IV4, Security=PLAINTEXT [KAFKA-15793](https://issues.apache.org/jira/browse/KAFKA-15793) (reopened) * org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId [KAFKA-15914](https://issues.apache.org/jira/browse/KAFKA-15914) * org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks [KAFKA-15917](https://issues.apache.org/jira/browse/KAFKA-15917) * org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault() [KAFKA-15787](https://issues.apache.org/jira/browse/KAFKA-15787) * org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() [KAFKA-15197](https://issues.apache.org/jira/browse/KAFKA-15197) * org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicateSourceDefault() [KAFKA-15926](https://issues.apache.org/jira/browse/KAFKA-15926) * org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations() [KAFKA-16504](https://issues.apache.org/jira/browse/KAFKA-16504) (new) * org.apache.kafka.tools.MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful [6] Type=Raft-Isolated, MetadataVersion=3.8-IV0, Security=PLAINTEXT [KAFKA-16174](https://issues.apache.org/jira/browse/KAFKA-16174) PTAL @showuon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8250) Flaky Test DelegationTokenEndToEndAuthorizationTest#testProduceConsumeViaAssign
[ https://issues.apache.org/jira/browse/KAFKA-8250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835654#comment-17835654 ] Igor Soarez commented on KAFKA-8250: This failed again in [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15522/5/tests/] {code:java} [2024-04-09T20:36:54.043Z] kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithDescribeAclViaAssign(String)[1] failed, log available in /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15522/core/build/reports/testOutput/kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithDescribeAclViaAssign(String)[1].test.stdout [2024-04-09T20:36:54.043Z] [2024-04-09T20:36:54.043Z] Gradle Test Run :core:test > Gradle Test Executor 96 > DelegationTokenEndToEndAuthorizationWithOwnerTest > testNoConsumeWithDescribeAclViaAssign(String) > "testNoConsumeWithDescribeAclViaAssign(String).quorum=kraft" FAILED [2024-04-09T20:36:54.043Z] java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256 [2024-04-09T20:36:54.043Z] at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) [2024-04-09T20:36:54.043Z] at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) [2024-04-09T20:36:54.043Z] at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) [2024-04-09T20:36:54.043Z] at kafka.api.DelegationTokenEndToEndAuthorizationTest.createDelegationTokens(DelegationTokenEndToEndAuthorizationTest.scala:165) [2024-04-09T20:36:54.043Z] at kafka.api.DelegationTokenEndToEndAuthorizationTest.createDelegationTokens(DelegationTokenEndToEndAuthorizationTest.scala:157) [2024-04-09T20:36:54.043Z] at kafka.api.DelegationTokenEndToEndAuthorizationTest.configureSecurityAfterServersStart(DelegationTokenEndToEndAuthorizationTest.scala:100) [2024-04-09T20:36:54.043Z] at kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.configureSecurityAfterServersStart(DelegationTokenEndToEndAuthorizationWithOwnerTest.scala:77) [2024-04-09T20:36:54.043Z] [2024-04-09T20:36:54.043Z] Caused by: [2024-04-09T20:36:54.043Z] org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256 [2024-04-09T20:36:54.043Z] {code} > Flaky Test > DelegationTokenEndToEndAuthorizationTest#testProduceConsumeViaAssign > --- > > Key: KAFKA-8250 > URL: https://issues.apache.org/jira/browse/KAFKA-8250 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk11/detail/kafka-trunk-jdk11/442/tests] > {quote}java.lang.AssertionError: Consumed more records than expected > expected:<1> but was:<2> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:647) > at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1288) > at > kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:460) > at > kafka.api.EndToEndAuthorizationTest.testProduceConsumeViaAssign(EndToEndAuthorizationTest.scala:209){quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15793) Flaky test ZkMigrationIntegrationTest.testMigrateTopicDeletions
[ https://issues.apache.org/jira/browse/KAFKA-15793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835653#comment-17835653 ] Igor Soarez edited comment on KAFKA-15793 at 4/10/24 8:56 AM: -- This has come up again in [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15522/5/tests/] {code:java} [2024-04-09T21:06:17.307Z] Gradle Test Run :core:test > Gradle Test Executor 97 > DeleteTopicsRequestTest > testTopicDeletionClusterHasOfflinePartitions(String) > "testTopicDeletionClusterHasOfflinePartitions(String).quorum=zk" STARTED [2024-04-09T21:06:17.307Z] kafka.zk.ZkMigrationIntegrationTest.testMigrateTopicDeletions(ClusterInstance)[7] failed, log available in /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15522/core/build/reports/testOutput/kafka.zk.ZkMigrationIntegrationTest.testMigrateTopicDeletions(ClusterInstance)[7].test.stdout [2024-04-09T21:06:17.307Z] [2024-04-09T21:06:17.307Z] Gradle Test Run :core:test > Gradle Test Executor 96 > ZkMigrationIntegrationTest > testMigrateTopicDeletions(ClusterInstance) > testMigrateTopicDeletions [7] Type=ZK, MetadataVersion=3.7-IV4, Security=PLAINTEXT FAILED [2024-04-09T21:06:17.307Z] org.apache.kafka.server.fault.FaultHandlerException: nonFatalFaultHandler: Unhandled error in MetadataChangeEvent: Check op on KRaft Migration ZNode failed. Expected zkVersion = 5. This indicates that another KRaft controller is making writes to ZooKeeper. [2024-04-09T21:06:17.307Z] at app//kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2001) [2024-04-09T21:06:17.307Z] at app//kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2027) [2024-04-09T21:06:17.307Z] at app//kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2052) [2024-04-09T21:06:17.307Z] at app//scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100) [2024-04-09T21:06:17.307Z] at app//scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87) [2024-04-09T21:06:17.307Z] at app//scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43) [2024-04-09T21:06:17.307Z] at app//kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2052) [2024-04-09T21:06:17.307Z] at app//kafka.zk.migration.ZkTopicMigrationClient.$anonfun$updateTopicPartitions$1(ZkTopicMigrationClient.scala:265) [2024-04-09T21:06:17.307Z] at app//kafka.zk.migration.ZkTopicMigrationClient.updateTopicPartitions(ZkTopicMigrationClient.scala:255) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsDelta$20(KRaftMigrationZkWriter.java:334) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:248) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:62) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.lambda$run$1(KRaftMigrationDriver.java:532) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:845) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsDelta$21(KRaftMigrationZkWriter.java:331) [2024-04-09T21:06:17.307Z] at java.base@17.0.7/java.util.HashMap.forEach(HashMap.java:1421) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsDelta(KRaftMigrationZkWriter.java:297) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleDelta(KRaftMigrationZkWriter.java:112) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.run(KRaftMigrationDriver.java:531) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182) [2024-04-09T21:06:17.307Z] at java.base@17.0.7/java.lang.Thread.run(Thread.java:833) [2024-04-09T21:06:17.307Z] [2024-04-09T21:06:17.307Z] Caused by: [2024-04-09T21:06:17.307Z] java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. Expected zkVersion = 5. This indicates that another KRaft controll
Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]
dajac commented on code in PR #15662: URL: https://github.com/apache/kafka/pull/15662#discussion_r1559052702 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -761,6 +776,58 @@ public ClassicGroup classicGroup( } } +/** + * Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request. + * + * @param classicGroup A ClassicGroup. + * @return the boolean indicating whether it's valid to online upgrade the classic group. + */ +private boolean validateOnlineUpgrade(ClassicGroup classicGroup) { +if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) { +log.debug("Online upgrade is invalid because the consumer group {} migration config is {} so online upgrade is not enabled.", +classicGroup.groupId(), consumerGroupMigrationPolicy); +return false; +} else if (classicGroup.isInState(DEAD)) { Review Comment: Could this really happen? I would have thought that it would be automatically converted as Dead or equivalent to Empty. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -411,6 +432,20 @@ public int numMembers() { return members.size(); } +/** + * @return The number of members that use the legacy protocol. + */ +public int numLegacyProtocolMember() { +return (int) members.values().stream().filter(member -> member.useLegacyProtocol()).count(); Review Comment: It may be better to maintain this count in the group state instead of having to go through all members. Is it possible? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -761,6 +776,58 @@ public ClassicGroup classicGroup( } } +/** + * Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request. + * + * @param classicGroup A ClassicGroup. + * @return the boolean indicating whether it's valid to online upgrade the classic group. + */ +private boolean validateOnlineUpgrade(ClassicGroup classicGroup) { +if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) { +log.debug("Online upgrade is invalid because the consumer group {} migration config is {} so online upgrade is not enabled.", +classicGroup.groupId(), consumerGroupMigrationPolicy); +return false; +} else if (classicGroup.isInState(DEAD)) { +log.debug("Online upgrade is invalid because the classic group {} is in DEAD state.", classicGroup.groupId()); +return false; +} else if (!classicGroup.usesConsumerGroupProtocol()) { +log.debug("Online upgrade is invalid because the classic group {} has protocol type {} and doesn't use the consumer group protocol.", Review Comment: nit: `Cannot upgrade classic group {} to consumer group because the group does not use the consumer embedded protocol.` ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -761,6 +776,58 @@ public ClassicGroup classicGroup( } } +/** + * Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request. + * + * @param classicGroup A ClassicGroup. + * @return the boolean indicating whether it's valid to online upgrade the classic group. + */ +private boolean validateOnlineUpgrade(ClassicGroup classicGroup) { +if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) { +log.debug("Online upgrade is invalid because the consumer group {} migration config is {} so online upgrade is not enabled.", +classicGroup.groupId(), consumerGroupMigrationPolicy); +return false; +} else if (classicGroup.isInState(DEAD)) { +log.debug("Online upgrade is invalid because the classic group {} is in DEAD state.", classicGroup.groupId()); +return false; +} else if (!classicGroup.usesConsumerGroupProtocol()) { +log.debug("Online upgrade is invalid because the classic group {} has protocol type {} and doesn't use the consumer group protocol.", +classicGroup.groupId(), classicGroup.protocolType().orElse("")); +return false; +} else if (classicGroup.size() > consumerGroupMaxSize) { +log.debug("Online upgrade is invalid because the classic group {} size {} exceeds the consumer group maximum size {}.", Review Comment: nit: Same idea. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -761,6 +776,58 @@ public ClassicGroup classicGroup( } } +/** + * Validates the online upgrade if the Classic Group receives a Consu
Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
lucasbru commented on code in PR #15661: URL: https://github.com/apache/kafka/pull/15661#discussion_r1559094045 ## tests/kafkatest/services/verifiable_consumer.py: ## @@ -135,6 +135,28 @@ def last_commit(self, tp): else: return None +# This needs to be used for cooperative and consumer protocol +class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler): Review Comment: Why can we not implement this in the normal ConsumerEventHandler? It seems it's making incorrect assumptions about the consumer rebalance listener (since previously owned partitions are not guaranteed to be reported in onassinged) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org