[GitHub] [kafka] showuon commented on pull request #14287: [Minor] Check the existence of AppInfo for the given ID before creating a new mbean of the same name
showuon commented on PR #14287: URL: https://github.com/apache/kafka/pull/14287#issuecomment-1718864019 Failed tests are unrelated: ``` Build / JDK 20 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector Build / JDK 20 and Scala 2.13 / kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestExcludingClusterAuthorizedOperations(String).quorum=kraft Build / JDK 20 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault() Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector Build / JDK 17 and Scala 2.13 / kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() Build / JDK 11 and Scala 2.13 / kafka.api.SaslScramSslEndToEndAuthorizationTest.testAuthentications(String).quorum=zk ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee closed pull request #14207: KIP-848 Heartbeat request manager implementation
philipnee closed pull request #14207: KIP-848 Heartbeat request manager implementation URL: https://github.com/apache/kafka/pull/14207 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14384: KAFKA-15415 On producer-batch retry, skip-backoff on a new leader
AndrewJSchofield commented on code in PR #14384: URL: https://github.com/apache/kafka/pull/14384#discussion_r1324599448 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ## @@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); +this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH; +this.leaderChangedAttempts = -1; recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation); } +/* + * Returns whether the leader epoch has changed since the last attempt. + * @param latestLeaderEpoch The latest leader epoch. + * @return true if the leader has changed, otherwise false. + */ +boolean hasLeaderChanged(int latestLeaderEpoch) { +boolean leaderChanged = false; +// Checking for leader change makes sense only from 1st retry onwards(attempt >=1). +log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}", Review Comment: There are 4 separate calls to `attempts()` (or the equivalent of `attempts.get()`) in this method. Capture the value with a single call and use instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14384: KAFKA-15415 On producer-batch retry, skip-backoff on a new leader
AndrewJSchofield commented on code in PR #14384: URL: https://github.com/apache/kafka/pull/14384#discussion_r1324641156 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ## @@ -3146,6 +3149,97 @@ public void testInvalidTxnStateIsAnAbortableError() throws Exception { txnManager.beginTransaction(); } +@Test +public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws Exception { +Metrics m = new Metrics(); +SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); +try { +// SETUP +String metricGrpName = "producer-metrics-test-stats-1"; +long totalSize = 1024 * 1024; +BufferPool pool = new BufferPool(totalSize, batchSize, metrics, time, +metricGrpName); +long retryBackoffMaxMs = 100L; +// lingerMs is 0 to send batch as soon as any records are available on it. +this.accumulator = new RecordAccumulator(logContext, batchSize, +CompressionType.NONE, 0, 10L, retryBackoffMaxMs, +DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, apiVersions, null, pool); +Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, +MAX_REQUEST_SIZE, ACKS_ALL, +10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, +apiVersions); +// Update metadata with leader-epochs. +int tp0LeaderEpoch = 100; +int epoch = tp0LeaderEpoch; +this.client.updateMetadata( +RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2), +tp -> { +if (tp0.equals(tp)) { +return epoch; +} else if (tp1.equals(tp)) { +return 0; +} else { +throw new RuntimeException("unexpected tp " + tp); +} +})); + +// Produce batch, it returns with a retry-able error like NOT_LEADER_OR_FOLLOWER, scheduled for retry. +// This triggers a metadata-request, that discovers a new-leader for tp0. +Future futureIsProduced = appendToAccumulator(tp0, 0L, "key", "value"); +sender.runOnce(); // connect +sender.runOnce(); // send produce request +assertEquals(1, client.inFlightRequestCount(), +"We should have a single produce request in flight."); +assertEquals(1, sender.inFlightBatches(tp0).size()); +assertTrue(client.hasInFlightRequests()); +client.respond(produceResponse(tp0, -1, Errors.NOT_LEADER_OR_FOLLOWER, 0)); +sender.runOnce(); // receive produce response, batch scheduled for retry +assertTrue(!futureIsProduced.isDone(), "Produce request is yet not done."); + +// TEST that as new-leader(with epochA) is discovered, the batch is retried immediately i.e. skips any backoff period. +// Update leader epoch for tp0 +log.info("Test that to a new-leader, batch is retried immediately."); +int newEpoch = ++tp0LeaderEpoch; +this.client.updateMetadata( +RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2), +tp -> { +if (tp0.equals(tp)) { +return newEpoch; +} else if (tp1.equals(tp)) { +return 0; +} else { +throw new RuntimeException("unexpected tp " + tp); +} +})); +sender.runOnce(); // send produce request, immediately. +assertEquals(1, sender.inFlightBatches(tp0).size()); +assertTrue(client.hasInFlightRequests()); +client.respond(produceResponse(tp0, -1, Errors.NOT_LEADER_OR_FOLLOWER, 0)); Review Comment: Should there not be testing that uses the new `CurrentLeader` field in v9 of the ProduceResponse given that this is part of KIP-951 which this PR is implementing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #14382: KAFKA-15442: add a section in doc for tiered storage
showuon commented on code in PR #14382: URL: https://github.com/apache/kafka/pull/14382#discussion_r1325394656 ## docs/ops.html: ## @@ -3859,6 +3859,98 @@ Finalizing the migration # Other configs ... + +6.11 Tiered Storage + +Tiered Storage Overview + +Kafka data is mostly consumed in a streaming fashion using tail reads. Tail reads leverage OS's page cache to serve the data instead of disk reads. + Older data is typically read from the disk for backfill or failure recovery purposes and is infrequent. + +In the tiered storage approach, Kafka cluster is configured with two tiers of storage - local and remote. + The local tier is the same as the current Kafka that uses the local disks on the Kafka brokers to store the log segments. + The new remote tier uses external storage systems, such as HDFS or S3, to store the completed log segments. + Please check https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage";>KIP-405 for more information. + + +Note: Tiered storage is considered as an early access feature, and is not recommended for use in production environments + +Configuration + +Broker Configurations + +By default, Kafka server will not enable tiered storage feature. remote.log.storage.system.enable + is the property to control whether to enable tiered storage functionality in a broker or not. Setting it to "true" to enable this feature. + + +RemoteStorageManager is an interface to provide the lifecycle of remote log segments and indexes. Kafka server + doesn't provide out-of-the-box implementation of RemoteStorageManager. Configuring remote.log.storage.manager.class.name + and remote.log.storage.manager.class.path to specify the implementation of RemoteStorageManager. + + +RemoteLogMetadataManager is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. + By default, Kafka provides an implementation with storage as an internal topic. This implementation can be changed by configuring + remote.log.metadata.manager.class.name and remote.log.metadata.manager.class.path. + When adopting the default kafka internal topic based implementation, remote.log.metadata.manager.listener.name + is a mandatory property to specify which listener the clients created by the default RemoteLogMetadataManager implementation. + + + +Topic Configurations + +After correctly configuring broker side configurations for tiered storage feature, there are still configurations in topic level needed to be set. + remote.storage.enable is the switch to determine if this topic want to use tiered storage or not. By default it is set as false. + After enabling remote.storage.enable property, the next thing to consider is the log retention. + When tiered storage is enabled in the topic, there will be 2 additional log retention configuration to set: + + + local.retention.ms + retention.ms + local.retention.bytes + retention.bytes + + + The configuration prefixed with local are to specify the time/size the "local" log file can accept before moving to remote storage, and then get deleted. + If unset, The value in retention.ms and retention.bytes will be used. + + +Configurations Example + +Here is a sample configuration to enable tiered storage feature in broker side: + +# Sample Zookeeper/Kraft broker server.properties listening on PLAINTEXT://:9092 +remote.log.storage.system.enable=true +# Please provide the implementation for remoteStorageManager. This is the mandatory configuration for tiered storage. +# remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager +# Using the "PLAINTEXT" listener for the clients in RemoteLogMetadataManager to talk to the brokers. +remote.log.metadata.manager.listener.name=PLAINTEXT + + + +After broker is started, creating a topic with tiered storage enabled, and a small log time retention value to try this feature: +bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server localhost:9092 --config remote.storage.enable=true --config local.retention.ms=1000 + + + +Then, after the active segment is rolled, the old segment should be moved to the remote storage and get deleted. + + +Limitation Review Comment: Good suggestion. Updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #14382: KAFKA-15442: add a section in doc for tiered storage
satishd commented on code in PR #14382: URL: https://github.com/apache/kafka/pull/14382#discussion_r1325382836 ## docs/ops.html: ## @@ -3859,6 +3859,98 @@ Finalizing the migration # Other configs ... + +6.11 Tiered Storage + +Tiered Storage Overview + +Kafka data is mostly consumed in a streaming fashion using tail reads. Tail reads leverage OS's page cache to serve the data instead of disk reads. + Older data is typically read from the disk for backfill or failure recovery purposes and is infrequent. + +In the tiered storage approach, Kafka cluster is configured with two tiers of storage - local and remote. + The local tier is the same as the current Kafka that uses the local disks on the Kafka brokers to store the log segments. + The new remote tier uses external storage systems, such as HDFS or S3, to store the completed log segments. + Please check https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage";>KIP-405 for more information. + + +Note: Tiered storage is considered as an early access feature, and is not recommended for use in production environments + +Configuration + +Broker Configurations + +By default, Kafka server will not enable tiered storage feature. remote.log.storage.system.enable + is the property to control whether to enable tiered storage functionality in a broker or not. Setting it to "true" to enable this feature. + + +RemoteStorageManager is an interface to provide the lifecycle of remote log segments and indexes. Kafka server + doesn't provide out-of-the-box implementation of RemoteStorageManager. Configuring remote.log.storage.manager.class.name + and remote.log.storage.manager.class.path to specify the implementation of RemoteStorageManager. + + +RemoteLogMetadataManager is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. + By default, Kafka provides an implementation with storage as an internal topic. This implementation can be changed by configuring + remote.log.metadata.manager.class.name and remote.log.metadata.manager.class.path. + When adopting the default kafka internal topic based implementation, remote.log.metadata.manager.listener.name + is a mandatory property to specify which listener the clients created by the default RemoteLogMetadataManager implementation. + + + +Topic Configurations + +After correctly configuring broker side configurations for tiered storage feature, there are still configurations in topic level needed to be set. + remote.storage.enable is the switch to determine if this topic want to use tiered storage or not. By default it is set as false. + After enabling remote.storage.enable property, the next thing to consider is the log retention. + When tiered storage is enabled in the topic, there will be 2 additional log retention configuration to set: + + + local.retention.ms + retention.ms + local.retention.bytes + retention.bytes + + + The configuration prefixed with local are to specify the time/size the "local" log file can accept before moving to remote storage, and then get deleted. + If unset, The value in retention.ms and retention.bytes will be used. + + +Configurations Example + +Here is a sample configuration to enable tiered storage feature in broker side: + +# Sample Zookeeper/Kraft broker server.properties listening on PLAINTEXT://:9092 +remote.log.storage.system.enable=true +# Please provide the implementation for remoteStorageManager. This is the mandatory configuration for tiered storage. +# remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager +# Using the "PLAINTEXT" listener for the clients in RemoteLogMetadataManager to talk to the brokers. +remote.log.metadata.manager.listener.name=PLAINTEXT + + + +After broker is started, creating a topic with tiered storage enabled, and a small log time retention value to try this feature: +bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server localhost:9092 --config remote.storage.enable=true --config local.retention.ms=1000 + + + +Then, after the active segment is rolled, the old segment should be moved to the remote storage and get deleted. + + +Limitation Review Comment: Please change `Limitation` -> `Limitations` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #14382: KAFKA-15442: add a section in doc for tiered storage
satishd commented on code in PR #14382: URL: https://github.com/apache/kafka/pull/14382#discussion_r1325382836 ## docs/ops.html: ## @@ -3859,6 +3859,98 @@ Finalizing the migration # Other configs ... + +6.11 Tiered Storage + +Tiered Storage Overview + +Kafka data is mostly consumed in a streaming fashion using tail reads. Tail reads leverage OS's page cache to serve the data instead of disk reads. + Older data is typically read from the disk for backfill or failure recovery purposes and is infrequent. + +In the tiered storage approach, Kafka cluster is configured with two tiers of storage - local and remote. + The local tier is the same as the current Kafka that uses the local disks on the Kafka brokers to store the log segments. + The new remote tier uses external storage systems, such as HDFS or S3, to store the completed log segments. + Please check https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage";>KIP-405 for more information. + + +Note: Tiered storage is considered as an early access feature, and is not recommended for use in production environments + +Configuration + +Broker Configurations + +By default, Kafka server will not enable tiered storage feature. remote.log.storage.system.enable + is the property to control whether to enable tiered storage functionality in a broker or not. Setting it to "true" to enable this feature. + + +RemoteStorageManager is an interface to provide the lifecycle of remote log segments and indexes. Kafka server + doesn't provide out-of-the-box implementation of RemoteStorageManager. Configuring remote.log.storage.manager.class.name + and remote.log.storage.manager.class.path to specify the implementation of RemoteStorageManager. + + +RemoteLogMetadataManager is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. + By default, Kafka provides an implementation with storage as an internal topic. This implementation can be changed by configuring + remote.log.metadata.manager.class.name and remote.log.metadata.manager.class.path. + When adopting the default kafka internal topic based implementation, remote.log.metadata.manager.listener.name + is a mandatory property to specify which listener the clients created by the default RemoteLogMetadataManager implementation. + + + +Topic Configurations + +After correctly configuring broker side configurations for tiered storage feature, there are still configurations in topic level needed to be set. + remote.storage.enable is the switch to determine if this topic want to use tiered storage or not. By default it is set as false. + After enabling remote.storage.enable property, the next thing to consider is the log retention. + When tiered storage is enabled in the topic, there will be 2 additional log retention configuration to set: + + + local.retention.ms + retention.ms + local.retention.bytes + retention.bytes + + + The configuration prefixed with local are to specify the time/size the "local" log file can accept before moving to remote storage, and then get deleted. + If unset, The value in retention.ms and retention.bytes will be used. + + +Configurations Example + +Here is a sample configuration to enable tiered storage feature in broker side: + +# Sample Zookeeper/Kraft broker server.properties listening on PLAINTEXT://:9092 +remote.log.storage.system.enable=true +# Please provide the implementation for remoteStorageManager. This is the mandatory configuration for tiered storage. +# remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager +# Using the "PLAINTEXT" listener for the clients in RemoteLogMetadataManager to talk to the brokers. +remote.log.metadata.manager.listener.name=PLAINTEXT + + + +After broker is started, creating a topic with tiered storage enabled, and a small log time retention value to try this feature: +bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server localhost:9092 --config remote.storage.enable=true --config local.retention.ms=1000 + + + +Then, after the active segment is rolled, the old segment should be moved to the remote storage and get deleted. + + +Limitation Review Comment: Please change `Limitation` -> `Limitations` Both in the text and the links. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1325383326 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); + +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +heartbeat
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1325382420 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); + +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +heartbeat
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1325381139 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); + +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +heartbeat
[GitHub] [kafka] showuon commented on pull request #14382: KAFKA-15442: add a section in doc for tiered storage
showuon commented on PR #14382: URL: https://github.com/apache/kafka/pull/14382#issuecomment-1718792709 @satishd , in the latest commit, I added: 1. a note to say this is an early access feature and should not use in production env. 2. a limitation section to describe the limitation in v3.6.0 3. a notable change entry in v3.6.0 The screenshot is updated in the PR description. Please take a look again. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1325368170 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time, logContext); CoordinatorRequestManager coordinatorRequestManager = null; CommitRequestManager commitRequestManager = null; +HeartbeatRequestManager heartbeatRequestManager = null; +MembershipManager membershipManaber = null; +// TODO: consolidate groupState and memberState if (groupState.groupId != null) { coordinatorRequestManager = new CoordinatorRequestManager( -this.time, -logContext, -retryBackoffMs, -retryBackoffMaxMs, -this.errorEventHandler, -groupState.groupId); +this.time, +logContext, +retryBackoffMs, +retryBackoffMaxMs, +this.errorEventHandler, +groupState.groupId); Review Comment: We should submit a patch to combine groupState into memberState. @lianetm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on PR #14386: URL: https://github.com/apache/kafka/pull/14386#issuecomment-1718779561 Hey @junrao - Addressed your comments. I'm holding off on your request for `Could we change toString to include the fields in the parent class?` as I think it would be easier if I do that consistently for all other CompletableApplicationEvents. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325364414 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listOffsets + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memoized by topic name. If all topics are requested, then {@code null} is used as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * + */ + +public class TopicMetadataRequestManager implements RequestManager { +private final boolean allowAutoTopicCreation; +private final Map, TopicMetadataRequestState> inflightRequests; +private final long retryBackoffMs; +private final long retryBackoffMaxMs; +private final Logger log; +private final LogContext logContext; + +public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { +this.logContext = logContext; +this.log = logContext.logger(this.getClass()); +this.inflightRequests = new HashMap<>(); +this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +List requests = inflightRequests.values().stream() +.map(req -> req.send(currentTimeMs)) +.filter(Optional::isPresent) +.map(Optional::get) +.collect(Collectors.toList()); +return requests.isEmpty() ? +new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : +new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); +} + +/** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ +public CompletableFuture>> requestTopicMetadata(final Optional topic) { +if (inflightRequests.containsKey(topic)) { +return inflightRequests.get(topic).future; +} + +TopicMetadataRequestState newRequest = new TopicMetadataRequestState( +logContext, +topic, +retryBackoffMs, +retryB
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325355774 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listOffsets + * partitionsFor Review Comment: As well as listTopics() - this is addressed in a separate PR: Would you be ok if we split this into multiple PRs? If you think we should remove the documentation due to inaccurate information, I'm willing to do so. The PR's commit is c71a18c95937dd18171f60afb4fd263ea39e9a1b - I anticipate @kirktrue will pick this into trunk soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325353270 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -203,16 +204,17 @@ public static class UnsentRequest { private Timer timer; public UnsentRequest(final AbstractRequest.Builder requestBuilder, final Optional node) { -this(requestBuilder, node, new FutureCompletionHandler()); +Objects.requireNonNull(requestBuilder); +this.requestBuilder = requestBuilder; +this.node = node; +this.handler = new FutureCompletionHandler(); } public UnsentRequest(final AbstractRequest.Builder requestBuilder, final Optional node, - final FutureCompletionHandler handler) { -Objects.requireNonNull(requestBuilder); -this.requestBuilder = requestBuilder; -this.node = node; -this.handler = handler; + final BiConsumer callback) { Review Comment: I think it is according to my IDE. I think it is used in CommitRequestManager (line 219 and 256) and TopicMetadataRequestManger (line 145) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325352296 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -381,41 +377,41 @@ public String toString() { } /** - * This is used to stage the unsent {@link OffsetCommitRequestState} and {@link OffsetFetchRequestState}. + * This is used to stage the unsent {@link OffsetCommitRequest} and {@link OffsetFetchRequestState}. * unsentOffsetCommits holds the offset commit requests that have not been sent out * unsentOffsetFetches holds the offset fetch requests that have not been sent out - * inflightOffsetFetches holds the offset fetch requests that have been sent out but incompleted. - * + * inflightOffsetFetches holds the offset fetch requests that have been sent out but not completed. + * * {@code addOffsetFetchRequest} dedupes the requests to avoid sending the same requests. */ class PendingRequests { // Queue is used to ensure the sequence of commit -Queue unsentOffsetCommits = new LinkedList<>(); +Queue unsentOffsetCommits = new LinkedList<>(); List unsentOffsetFetches = new ArrayList<>(); List inflightOffsetFetches = new ArrayList<>(); -public boolean hasUnsentRequests() { +boolean hasUnsentRequests() { Review Comment: Sorry - leaving this protected as it is used in one of the test to verify there's no more unsent request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325349813 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -106,6 +104,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); } +pendingRequests.inflightOffsetFetches.forEach(System.out::println); Review Comment: 🤦 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -106,6 +104,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); } +pendingRequests.inflightOffsetFetches.forEach(System.out::println); Review Comment: 🤦 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325349677 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java: ## @@ -19,22 +19,23 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -public class CommitApplicationEvent extends ApplicationEvent { -final private CompletableFuture future; +public class CommitApplicationEvent extends CompletableApplicationEvent { final private Map offsets; public CommitApplicationEvent(final Map offsets) { super(Type.COMMIT); -this.offsets = offsets; -Optional exception = isValid(offsets); -if (exception.isPresent()) { -throw new RuntimeException(exception.get()); +this.offsets = Collections.unmodifiableMap(offsets); + +for (OffsetAndMetadata offsetAndMetadata : offsets.values()) { Review Comment: There are quite a few events out there that need to be refactored if we do it this way. Would it be ok for you to skip this for now and I will post a MINOR patch to address after this is closed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325345190 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listOffsets + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memoized by topic name. If all topics are requested, then {@code null} is used as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * + */ + +public class TopicMetadataRequestManager implements RequestManager { +private final boolean allowAutoTopicCreation; +private final Map, TopicMetadataRequestState> inflightRequests; +private final long retryBackoffMs; +private final long retryBackoffMaxMs; +private final Logger log; +private final LogContext logContext; + +public TopicMetadataRequestManager(final LogContext logContext, final ConsumerConfig config) { +this.logContext = logContext; +this.log = logContext.logger(this.getClass()); +this.inflightRequests = new HashMap<>(); +this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); +} + +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +List requests = inflightRequests.values().stream() +.map(req -> req.send(currentTimeMs)) +.filter(Optional::isPresent) +.map(Optional::get) +.collect(Collectors.toList()); +return requests.isEmpty() ? +new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : +new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); +} + +/** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ +public CompletableFuture>> requestTopicMetadata(final Optional topic) { +if (inflightRequests.containsKey(topic)) { +return inflightRequests.get(topic).future; +} + +TopicMetadataRequestState newRequest = new TopicMetadataRequestState( +logContext, +topic, +retryBackoffMs, +retryB
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325342579 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listOffsets + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memoized by topic name. If all topics are requested, then {@code null} is used as the key. Review Comment: thanks - apologize for neglecting to update the doc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325341358 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listOffsets Review Comment: 🤦 - listTopics() instead. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listOffsets Review Comment: 🤦 - listTopics() instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15245) Improve Tiered Storage Metrics
[ https://issues.apache.org/jira/browse/KAFKA-15245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15245: - Fix Version/s: 3.6.0 (was: 3.7.0) > Improve Tiered Storage Metrics > -- > > Key: KAFKA-15245 > URL: https://issues.apache.org/jira/browse/KAFKA-15245 > Project: Kafka > Issue Type: Improvement >Reporter: Abhijeet Kumar >Assignee: Abhijeet Kumar >Priority: Major > Fix For: 3.6.0 > > > Rename existing tiered storage metrics to remove ambiguity -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15245) Improve Tiered Storage Metrics
[ https://issues.apache.org/jira/browse/KAFKA-15245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15245: - Description: Rename existing tiered storage metrics to remove ambiguity (was: Rename existing tiered storage metrics to remove ambiguity and add metrics for the RemoteIndexCache.) > Improve Tiered Storage Metrics > -- > > Key: KAFKA-15245 > URL: https://issues.apache.org/jira/browse/KAFKA-15245 > Project: Kafka > Issue Type: Improvement >Reporter: Abhijeet Kumar >Assignee: Abhijeet Kumar >Priority: Major > Fix For: 3.7.0 > > > Rename existing tiered storage metrics to remove ambiguity -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15245) Improve Tiered Storage Metrics
[ https://issues.apache.org/jira/browse/KAFKA-15245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764959#comment-17764959 ] Kamal Chandraprakash commented on KAFKA-15245: -- KAFKA-15094 ticket is already filed to track the RemoteIndexCache stats metrics. > Improve Tiered Storage Metrics > -- > > Key: KAFKA-15245 > URL: https://issues.apache.org/jira/browse/KAFKA-15245 > Project: Kafka > Issue Type: Improvement >Reporter: Abhijeet Kumar >Assignee: Abhijeet Kumar >Priority: Major > Fix For: 3.7.0 > > > Rename existing tiered storage metrics to remove ambiguity -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15453) Enable `testFencingOnTransactionExpiration` in TransactionsWithTieredStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-15453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15453: - Fix Version/s: 3.6.0 (was: 3.7.0) > Enable `testFencingOnTransactionExpiration` in TransactionsWithTieredStoreTest > -- > > Key: KAFKA-15453 > URL: https://issues.apache.org/jira/browse/KAFKA-15453 > Project: Kafka > Issue Type: Test > Components: core >Reporter: Satish Duggana >Assignee: Luke Chen >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15160) Message bytes duplication in Kafka headers when compression is enabled
[ https://issues.apache.org/jira/browse/KAFKA-15160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764956#comment-17764956 ] Phuc Hong Tran edited comment on KAFKA-15160 at 9/14/23 3:32 AM: - [~vikashmishra0808] How did you setup ConsumerRecord and @KafkaListener in your codebase, was the duplication happened for every ConsumerRecord, also can you attach the heap dump? Thanks was (Author: JIRAUSER301295): [~vikashmishra0808] How did you setup ConsumerRecord and @KafkaListener in your codebase, was the duplication happened for every ConsumerRecord? > Message bytes duplication in Kafka headers when compression is enabled > -- > > Key: KAFKA-15160 > URL: https://issues.apache.org/jira/browse/KAFKA-15160 > Project: Kafka > Issue Type: Bug > Components: clients, compression, consumer >Affects Versions: 3.2.3, 3.3.2 >Reporter: Vikash Mishra >Assignee: Phuc Hong Tran >Priority: Critical > Attachments: java heap dump.png, wireshark-min.png > > > I created a spring Kafka consumer using @KafkaListener. > During this, I encounter a scenario where when data is compressed ( any > compression snappy/gzip) and consumed by the consumer then I see that in a > heap dump, there is a " byte" occupying the same amount of memory as in > Message value. > This behavior is seen only in cases when compressed data is consumed by > consumers not in the case of uncompressed data. > Tried to capture Kafka's message through Wireshark, there it shows the proper > size of data incoming from Kafka server & no extra bytes in headers. So, this > is definitely something in Kafka client. Spring doesn't do any actions about > compression; the whole functionality is done internally in the Kafka client > library. > Attached is the screenshot of the heap dump and Wireshark. > This seems like a critical issue as message size in memory almost gets > doubles impacting consumer memory and performance. Somewhere it feels like > the actual message value is copied to headers? > *To Reproduce* > # Produce compressed data on any topic. > # Create a simple consumer consuming from the above-created topic. > # Capture heap dump. > *Expected behavior* > Headers should not show bytes consuming memory equivalent to value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15160) Message bytes duplication in Kafka headers when compression is enabled
[ https://issues.apache.org/jira/browse/KAFKA-15160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764956#comment-17764956 ] Phuc Hong Tran commented on KAFKA-15160: [~vikashmishra0808] How did you setup ConsumerRecord and @KafkaListener in your codebase, was the duplication happened for every ConsumerRecord? > Message bytes duplication in Kafka headers when compression is enabled > -- > > Key: KAFKA-15160 > URL: https://issues.apache.org/jira/browse/KAFKA-15160 > Project: Kafka > Issue Type: Bug > Components: clients, compression, consumer >Affects Versions: 3.2.3, 3.3.2 >Reporter: Vikash Mishra >Assignee: Phuc Hong Tran >Priority: Critical > Attachments: java heap dump.png, wireshark-min.png > > > I created a spring Kafka consumer using @KafkaListener. > During this, I encounter a scenario where when data is compressed ( any > compression snappy/gzip) and consumed by the consumer then I see that in a > heap dump, there is a " byte" occupying the same amount of memory as in > Message value. > This behavior is seen only in cases when compressed data is consumed by > consumers not in the case of uncompressed data. > Tried to capture Kafka's message through Wireshark, there it shows the proper > size of data incoming from Kafka server & no extra bytes in headers. So, this > is definitely something in Kafka client. Spring doesn't do any actions about > compression; the whole functionality is done internally in the Kafka client > library. > Attached is the screenshot of the heap dump and Wireshark. > This seems like a critical issue as message size in memory almost gets > doubles impacting consumer memory and performance. Somewhere it feels like > the actual message value is copied to headers? > *To Reproduce* > # Produce compressed data on any topic. > # Create a simple consumer consuming from the above-created topic. > # Capture heap dump. > *Expected behavior* > Headers should not show bytes consuming memory equivalent to value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #14381: KAFKA-14912:Add a dynamic config for remote index cache size
showuon commented on PR #14381: URL: https://github.com/apache/kafka/pull/14381#issuecomment-1718627959 @satishd , not sure if you want to have another look. If no, I'll merge it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #14381: KAFKA-14912:Add a dynamic config for remote index cache size
showuon commented on PR #14381: URL: https://github.com/apache/kafka/pull/14381#issuecomment-1718627359 Failed tests are unrelated: ``` Build / JDK 8 and Scala 2.12 / kafka.network.SocketServerTest.testSaslReauthenticationFailureNoKip152SaslAuthenticate() Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord() Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord() Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() Build / JDK 20 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 20 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector Build / JDK 20 and Scala 2.13 / kafka.api.SslAdminIntegrationTest.testSynchronousAuthorizerAclUpdatesBlockRequestThreads() Build / JDK 20 and Scala 2.13 / kafka.api.UserQuotaTest.testThrottledProducerConsumer(String).quorum=kraft Build / JDK 20 and Scala 2.13 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota() Build / JDK 20 and Scala 2.13 / kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector Build / JDK 11 and Scala 2.13 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testCreateTokenForOtherUserFails(String).quorum=kraft Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #14347: KAFKA-15439: Transactions test with tiered storage
showuon commented on PR #14347: URL: https://github.com/apache/kafka/pull/14347#issuecomment-1718626560 Backported to 3.6 branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #14347: KAFKA-15439: Transactions test with tiered storage
showuon merged PR #14347: URL: https://github.com/apache/kafka/pull/14347 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #14347: KAFKA-15439: Transactions test with tiered storage
showuon commented on PR #14347: URL: https://github.com/apache/kafka/pull/14347#issuecomment-1718624599 Failed tests are unrelated: ``` Build / JDK 20 and Scala 2.13 / org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailurePositiveDelayTest.testInvalidPasswordSaslPlain() Build / JDK 20 and Scala 2.13 / org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest.shouldAssignRandomInput[enableRackAwareTaskAssignor=true] Build / JDK 11 and Scala 2.13 / org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest.testFencingOnSend(String).quorum=zk Build / JDK 11 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() Build / JDK 8 and Scala 2.12 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testDescribeTokenForOtherUserPasses(String).quorum=kraft Build / JDK 17 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor() Build / JDK 17 and Scala 2.13 / kafka.admin.DeleteConsumerGroupsTest.testDeleteCmdNonEmptyGroup(String).quorum=kraft ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15466) Add KIP-919 support for some admin APIs
[ https://issues.apache.org/jira/browse/KAFKA-15466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-15466: - Description: Add KIP-919 support for kafka-features.sh, metadata-quorum.sh, and kafka-cluster.sh. > Add KIP-919 support for some admin APIs > --- > > Key: KAFKA-15466 > URL: https://issues.apache.org/jira/browse/KAFKA-15466 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > > Add KIP-919 support for kafka-features.sh, metadata-quorum.sh, and > kafka-cluster.sh. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15466) Add KIP-919 support for some admin APIs
[ https://issues.apache.org/jira/browse/KAFKA-15466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-15466: - Summary: Add KIP-919 support for some admin APIs (was: Add KIP-919 support to kafka-features.sh, kafka-metadata-quorum.sh, kafka-cluster.sh) > Add KIP-919 support for some admin APIs > --- > > Key: KAFKA-15466 > URL: https://issues.apache.org/jira/browse/KAFKA-15466 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15466) Add KIP-919 support to kafka-features.sh, kafka-metadata-quorum.sh, kafka-cluster.sh
Colin McCabe created KAFKA-15466: Summary: Add KIP-919 support to kafka-features.sh, kafka-metadata-quorum.sh, kafka-cluster.sh Key: KAFKA-15466 URL: https://issues.apache.org/jira/browse/KAFKA-15466 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
philipnee commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325175686 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -381,41 +377,41 @@ public String toString() { } /** - * This is used to stage the unsent {@link OffsetCommitRequestState} and {@link OffsetFetchRequestState}. + * This is used to stage the unsent {@link OffsetCommitRequest} and {@link OffsetFetchRequestState}. * unsentOffsetCommits holds the offset commit requests that have not been sent out * unsentOffsetFetches holds the offset fetch requests that have not been sent out - * inflightOffsetFetches holds the offset fetch requests that have been sent out but incompleted. - * + * inflightOffsetFetches holds the offset fetch requests that have been sent out but not completed. + * * {@code addOffsetFetchRequest} dedupes the requests to avoid sending the same requests. */ class PendingRequests { // Queue is used to ensure the sequence of commit -Queue unsentOffsetCommits = new LinkedList<>(); +Queue unsentOffsetCommits = new LinkedList<>(); List unsentOffsetFetches = new ArrayList<>(); List inflightOffsetFetches = new ArrayList<>(); -public boolean hasUnsentRequests() { +boolean hasUnsentRequests() { Review Comment: Sorry - I left out commit. Fixing those obvious mistakes right away. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager
kirktrue commented on code in PR #14359: URL: https://github.com/apache/kafka/pull/14359#discussion_r1325174944 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeStatusDetector.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.NetworkClientUtils; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.utils.Time; + +/** + * Use {@code NodeStatusDetector} to determine the status of a given broker {@link Node}. It's also + * possible to check for previous authentication errors if the node isn't available. + * + * @see ConsumerNetworkClient + * @see NetworkClientDelegate + */ +public interface NodeStatusDetector { + +/** + * Check if the node is disconnected and unavailable for immediate reconnection (i.e. if it is in + * reconnect backoff window following the disconnect). + * + * @param node {@link Node} to check for availability + * @see NetworkClientUtils#isUnavailable(KafkaClient, Node, Time) + */ +boolean isUnavailable(Node node); + +/** + * Checks for an authentication error on a given node and throws the exception if it exists. + * + * @param node {@link Node} to check for a previous {@link AuthenticationException}; if found it is thrown + * @see NetworkClientUtils#maybeThrowAuthFailure(KafkaClient, Node) + */ +void maybeThrowAuthFailure(Node node); + +/** + * Initiate a connection if currently possible. This is only really useful for resetting Review Comment: Per the above comments, I've removed `NodeStatusDetector` for now, so this should be moot for the time being. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager
kirktrue commented on code in PR #14359: URL: https://github.com/apache/kafka/pull/14359#discussion_r1325174579 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ## @@ -91,23 +105,70 @@ public void onFailure(RuntimeException e) { } }; -final RequestFuture future = client.send(fetchTarget, request); +final RequestFuture future = nodeStatusDetector.send(fetchTarget, request); future.addListener(listener); } return fetchRequestMap.size(); } -public void close(final Timer timer) { -if (!isClosed.compareAndSet(false, true)) { -log.info("Fetcher {} is already closed.", this); -return; +public Fetch collectFetch() { +return fetchCollector.collectFetch(fetchBuffer); +} + +protected void maybeCloseFetchSessions(final Timer timer) { Review Comment: Yes, it is a bit obtuse. My apologies. There are two reasons that `maybeCloseFetchSessions` exists in `Fetcher`: 1. The forthcoming `FetchRequestManager` doesn't close sessions in the same way, so it doesn't make sense to have the method in the shared `AbstractFetch` class. 2. `FetchRequestManager` (as with the other `RequestManager`s) doesn't use the `ConsumerNetworkClient` but uses the `NetworkClientDelegate`. The code in `AbstractFetch` that is shared by both `Fetcher` and—soon—`FetchRequestManager` needed a way to call some of the methods that `ConsumerNetworkClient` and `NetworkClientDelegate` have in common. Thus the `NodeStatusDetector` interface was created. Because the `FetchRequestManager` isn't in this PR, it's understandably unclear the reason for the current structure. For now I've made the following changes: 1. I moved `maybeCloseFetchSessions` to `AbstractFetch` 2. I left `AbstractFetch` with a reference to a `ConsumerNetworkClient` 3. I've removed the `NodeStatusDetector` As needed, the above may be changed in forthcoming PRs. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ## @@ -91,23 +105,70 @@ public void onFailure(RuntimeException e) { } }; -final RequestFuture future = client.send(fetchTarget, request); +final RequestFuture future = nodeStatusDetector.send(fetchTarget, request); future.addListener(listener); } return fetchRequestMap.size(); } -public void close(final Timer timer) { -if (!isClosed.compareAndSet(false, true)) { -log.info("Fetcher {} is already closed.", this); -return; +public Fetch collectFetch() { +return fetchCollector.collectFetch(fetchBuffer); +} + +protected void maybeCloseFetchSessions(final Timer timer) { Review Comment: Yes, it is a bit obtuse. My apologies. There are two reasons that `maybeCloseFetchSessions` exists in `Fetcher`: 1. The forthcoming `FetchRequestManager` doesn't close sessions in the same way, so it doesn't make sense to have the method in the shared `AbstractFetch` class. 2. `FetchRequestManager` (as with the other `RequestManager`s) doesn't use the `ConsumerNetworkClient` but uses the `NetworkClientDelegate`. The code in `AbstractFetch` that is shared by both `Fetcher` and—soon—`FetchRequestManager` needed a way to call some of the methods that `ConsumerNetworkClient` and `NetworkClientDelegate` have in common. Thus the `NodeStatusDetector` interface was created. Because the `FetchRequestManager` isn't in this PR, it's understandably unclear the reason for the current structure. For now I've made the following changes: 1. I moved `maybeCloseFetchSessions` to `AbstractFetch` 2. I left `AbstractFetch` with a reference to a `ConsumerNetworkClient` 3. I've removed the `NodeStatusDetector` As needed, the above may be changed in forthcoming PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager
kirktrue commented on code in PR #14359: URL: https://github.com/apache/kafka/pull/14359#discussion_r1325171010 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -63,6 +68,31 @@ public NetworkClientDelegate( this.unsentRequests = new ArrayDeque<>(); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.tryConnectNodes = new HashSet<>(); +} + +@Override +public boolean isUnavailable(Node node) { +return NetworkClientUtils.isUnavailable(client, node, time); +} + +@Override +public void maybeThrowAuthFailure(Node node) { +NetworkClientUtils.maybeThrowAuthFailure(client, node); +} + +@Override +public void tryConnect(Node node) { Review Comment: Yes—`tryConnect` and `maybeTryConnect` were intended to be used together. The reason the latter wasn't in the interface was because of the difference in _when_ we perform network I/O. In the existing `Consumer`, we perform it immediately, whereas in the refactored `Consumer`, we delay that I/O until the last step. So the intent was to for the new `Consumer` to queue up the connection attempts and perform them in one step in `maybeTryConnect` vs. immediately when `tryConnect` was called. That said, a) it's confusing, and b) it's not needed in this particular PR, so I've removed the whole `NodeStatusDetector` for now. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -63,6 +68,31 @@ public NetworkClientDelegate( this.unsentRequests = new ArrayDeque<>(); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.tryConnectNodes = new HashSet<>(); +} + +@Override +public boolean isUnavailable(Node node) { +return NetworkClientUtils.isUnavailable(client, node, time); +} + +@Override +public void maybeThrowAuthFailure(Node node) { +NetworkClientUtils.maybeThrowAuthFailure(client, node); +} + +@Override +public void tryConnect(Node node) { Review Comment: Yes—`tryConnect` and `maybeTryConnect` were intended to be used together. The reason the latter wasn't in the interface was because of the difference in _when_ we perform network I/O. In the existing `Consumer`, we perform it immediately, whereas in the refactored `Consumer`, we delay that I/O until the last step. So the intent was to for the new `Consumer` to queue up the connection attempts and perform them in one step in `maybeTryConnect` vs. immediately when `tryConnect` was called. That said, a) it's confusing, and b) it's not needed in this particular PR, so I've removed the whole `NodeStatusDetector` for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
junrao commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1325040397 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -91,7 +89,7 @@ public CommitRequestManager( } /** - * Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequest} request if there's any. The function will + * Poll for the {@link OffsetFetchRequest} and {@link org.apache.kafka.common.requests.OffsetCommitRequest} request if there's any. The function will Review Comment: Do we need to spell out the package name for OffsetCommitRequest? It doesn't seem this is done consistently. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listOffsets + * partitionsFor Review Comment: It doesn't seem that we have wired `TopicMetadataRequestManager` to the `partitionsFor` call? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -381,41 +377,41 @@ public String toString() { } /** - * This is used to stage the unsent {@link OffsetCommitRequestState} and {@link OffsetFetchRequestState}. + * This is used to stage the unsent {@link OffsetCommitRequest} and {@link OffsetFetchRequestState}. * unsentOffsetCommits holds the offset commit requests that have not been sent out * unsentOffsetFetches holds the offset fetch requests that have not been sent out - * inflightOffsetFetches holds the offset fetch requests that have been sent out but incompleted. - * + * inflightOffsetFetches holds the offset fetch requests that have been sent out but not completed. + * * {@code addOffsetFetchRequest} dedupes the requests to avoid sending the same requests. */ class PendingRequests { // Queue is used to ensure the sequence of commit -Queue unsentOffsetCommits = new LinkedList<>(); +Queue unsentOffsetCommits = new LinkedList<>(); List unsentOffsetFetches = new ArrayList<>(); List inflightOffsetFetches = new ArrayList<>(); -public boolean hasUnsentRequests() { +boolean hasUnsentRequests() { Review Comment: Could this be private? Ditto below. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java: ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the Li
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1325138384 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java: ## @@ -79,6 +79,9 @@ public int hashCode() { @Override public String toString() { -return String.format("Assignor selection {type:%s, name:%s}", type, serverAssignor); +return "AssignorSelection{" + Review Comment: just to create a coherent format -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #14382: KAFKA-15442: add a section in doc for tiered storage
satishd commented on code in PR #14382: URL: https://github.com/apache/kafka/pull/14382#discussion_r1324263153 ## docs/ops.html: ## @@ -3859,6 +3859,80 @@ Finalizing the migration # Other configs ... + +6.11 Tiered Storage + +Tiered Storage overview + +Kafka data is mostly consumed in a streaming fashion using tail reads. Tail reads leverage OS's page cache to serve the data instead of disk reads. + Older data is typically read from the disk for backfill or failure recovery purposes and is infrequent. + +In the tiered storage approach, Kafka cluster is configured with two tiers of storage - local and remote. + The local tier is the same as the current Kafka that uses the local disks on the Kafka brokers to store the log segments. + The new remote tier uses external storage systems, such as HDFS or S3, to store the completed log segments. + +Configuration + +broker configurations + +By default, Kafka server will not enable tiered storage feature. remote.log.storage.system.enable + is the property to control whether to enable tiered storage functionality in a broker or not. Setting it to "true" to enable this feature. + + +RemoteStorageManager is an interface to provide the lifecycle of remote log segments and indexes. Kafka server + doesn't provide out-of-the-box implementation of RemoteStorageManager. Configuring remote.log.storage.manager.class.name Review Comment: 1. `Configuring` -> `Configure` 2. remote.log.storage.manager.class.name and remote.log.storage.manager.class.path instead of remote.log.storage.manager.class.name or remote.log.storage.manager.class.path -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #14382: KAFKA-15442: add a section in doc for tiered storage
satishd commented on code in PR #14382: URL: https://github.com/apache/kafka/pull/14382#discussion_r1324263153 ## docs/ops.html: ## @@ -3859,6 +3859,80 @@ Finalizing the migration # Other configs ... + +6.11 Tiered Storage + +Tiered Storage overview + +Kafka data is mostly consumed in a streaming fashion using tail reads. Tail reads leverage OS's page cache to serve the data instead of disk reads. + Older data is typically read from the disk for backfill or failure recovery purposes and is infrequent. + +In the tiered storage approach, Kafka cluster is configured with two tiers of storage - local and remote. + The local tier is the same as the current Kafka that uses the local disks on the Kafka brokers to store the log segments. + The new remote tier uses external storage systems, such as HDFS or S3, to store the completed log segments. + +Configuration + +broker configurations + +By default, Kafka server will not enable tiered storage feature. remote.log.storage.system.enable + is the property to control whether to enable tiered storage functionality in a broker or not. Setting it to "true" to enable this feature. + + +RemoteStorageManager is an interface to provide the lifecycle of remote log segments and indexes. Kafka server + doesn't provide out-of-the-box implementation of RemoteStorageManager. Configuring remote.log.storage.manager.class.name Review Comment: `Configuring` -> `Configure` remote.log.storage.manager.class.name and remote.log.storage.manager.class.path instead of remote.log.storage.manager.class.name or remote.log.storage.manager.class.path -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15459) Convert coordinator retriable errors to a known producer response error.
[ https://issues.apache.org/jira/browse/KAFKA-15459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-15459. Resolution: Fixed > Convert coordinator retriable errors to a known producer response error. > > > Key: KAFKA-15459 > URL: https://issues.apache.org/jira/browse/KAFKA-15459 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.6.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.6.0 > > > KIP-890 Part 1 tries to address hanging transactions on old clients. Thus, > the produce version can not be bumped and no new errors can be added. > Currently we use the java client's notion of retriable and abortable errors > -- retriable errors are defined as such by extending the retriable error > class, fatal errors are defined explicitly, and abortable errors are the > remaining. However, many other clients treat non specified errors as fatal > and that means many retriable errors kill the application. This is not ideal. > While reviewing [https://github.com/apache/kafka/pull/14370] I added some of > the documentation for the returned errors in the produce response as well. > There were concerns about the new errors: > * {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} > * {@link Errors#COORDINATOR_NOT_AVAILABLE} > * {@link Errors#INVALID_TXN_STATE} > * {@link Errors#INVALID_PRODUCER_ID_MAPPING} > * {@link Errors#CONCURRENT_TRANSACTIONS} > The coordinator load, not available, and concurrent transactions errors > should be retriable. > The invalid txn state and pid mapping errors should be abortable. > This is how older java clients handle the errors, but it is unclear how other > clients handle them. It seems that rdkafka (for example) treats the abortable > errors as fatal instead. The coordinator errors are retriable but not the > concurrent transactions error. Generally anything not specified otherwise is > fatal. > It seems acceptable for the abortable errors to be fatal on some clients > since the error is likely on a zombie producer or in a state that may be > harder to recover from. However, for the retriable errors, we can return > NOT_ENOUGH_REPLICAS which is a known retriable response. We can use the > produce api's response string to specify the real cause of the error for > debugging. > There were trade-offs between making the older clients work and for clarity > in errors. This seems to be the best compromise. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan merged pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
jolshan merged PR #14378: URL: https://github.com/apache/kafka/pull/14378 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
jolshan commented on PR #14378: URL: https://github.com/apache/kafka/pull/14378#issuecomment-1718329367 Test failures look unrelated. I synced with Jason offline and he said we can proceed with the merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim opened a new pull request, #14387: [DRAFT] KAFKA-14519: New coordinator metrics
jeffkbkim opened a new pull request, #14387: URL: https://github.com/apache/kafka/pull/14387 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager
kirktrue commented on code in PR #14359: URL: https://github.com/apache/kafka/pull/14359#discussion_r1325061757 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -256,138 +258,23 @@ protected FetchRequest.Builder createFetchRequest(final Node fetchTarget, } /** - * Return the fetched records, empty the record buffer and update the consumed position. - * - * + * Return the list of fetchable partitions, which are the set of partitions to which we are subscribed, + * but excluding any partitions for which we still have buffered data. The idea is that since the user + * has yet to process the data for the partition that has already been fetch, we should not go send for more data Review Comment: Fixed. Also made this method return a `Set` which is more appropriate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager
kirktrue commented on code in PR #14359: URL: https://github.com/apache/kafka/pull/14359#discussion_r1325058583 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.TopicPartition; + +/** + * {@code FetchUtils} provides a place for disparate parts of the fetch logic to live. + */ +public class FetchUtils { + +/** + * Performs two combined actions based on the state related to the {@link TopicPartition}: + * + * + * + * Invokes {@link ConsumerMetadata#requestUpdate(boolean)} to signal that the metadata is incorrect and + * needs to be updated + * + * + * Invokes {@link SubscriptionState#clearPreferredReadReplica(TopicPartition)} to clear out any read replica + * information that may be present. + * + * + * + * This utility method should be invoked if the client detects (or is told by a node in the broker) that an + * attempt was made to fetch from a node that isn't the leader or preferred replica. + * + * @param metadata {@link ConsumerMetadata} for which to request an update + * @param subscriptions {@link SubscriptionState} to clear any internal read replica node + * @param topicPartition {@link TopicPartition} for which this state change is related + */ +static void requestMetadataUpdate(final ConsumerMetadata metadata, + final SubscriptionState subscriptions, + final TopicPartition topicPartition) { +metadata.requestUpdate(true); Review Comment: You're right. I double-checked the (very helpful) comments in `requestUpdate()` and I agree, so I set it to `false`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager
kirktrue commented on code in PR #14359: URL: https://github.com/apache/kafka/pull/14359#discussion_r1325056350 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.IdempotentCloser; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.io.Closeable; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Predicate; + +/** + * {@code FetchBuffer} buffers up the results from the broker responses as they are received. It is essentially a + * wrapper around a {@link java.util.Queue} of {@link CompletedFetch}. + * + * + * + * Note: this class is not thread-safe and is intended to only be used from a single thread. + */ +public class FetchBuffer implements Closeable { + +private final Logger log; +private final ConcurrentLinkedQueue completedFetches; Review Comment: I hadn't addressed it because I wasn't 100% sure if that was accurate. I temporarily added some assertion checking to verify that and re-run the unit tests and they passed, so I'll take that as sufficient confirmation. I've updated the class-level documentation to include that fact. LMK if you're OK with the comments. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15465) MM2 not working when its internal topics are pre-created on a cluster that disallows topic creation
[ https://issues.apache.org/jira/browse/KAFKA-15465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764867#comment-17764867 ] Ahmed HIBOT commented on KAFKA-15465: - [~omnia_h_ibrahim], Can you please take a look and confirm my findings. Also, I will be happy to provide the fix but I can't assign myself to the bug. > MM2 not working when its internal topics are pre-created on a cluster that > disallows topic creation > --- > > Key: KAFKA-15465 > URL: https://issues.apache.org/jira/browse/KAFKA-15465 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.4.1 >Reporter: Ahmed HIBOT >Priority: Major > > h1. Replication steps > * Setup a source kafka cluster (alias SOURCE) which doesn't allow topic > creation to MM2 (therefore it doesn't allow the creation of internal topics) > * Create MM2 internal topics in the source kafka cluster > * Setup a target kafka cluster (alias TARGET) > * Enable one way replication SOURCE->TARGET > MM2 will attempt to create or find its internal topics on the source cluster > but it will fail with the following stack trace > {code:java} > {"log_timestamp": "2023-09-13T09:39:25.612+", "log_level": "ERROR", > "process_id": 1, "process_name": "mirror-maker", "thread_id": 1, > "thread_name": "Scheduler for MirrorSourceConnector-creating upstream > offset-syncs topic", "action_name": > "org.apache.kafka.connect.mirror.Scheduler", "log_message": "Scheduler for > MirrorSourceConnector caught exception in scheduled task: creating upstream > offset-syncs topic"} > org.apache.kafka.connect.errors.ConnectException: Error while attempting to > create/find topic 'mm2-offset-syncs.TARGET.internal' > at > org.apache.kafka.connect.mirror.MirrorUtils.createCompactedTopic(MirrorUtils.java:155) > at > org.apache.kafka.connect.mirror.MirrorUtils.createSinglePartitionCompactedTopic(MirrorUtils.java:161) > at > org.apache.kafka.connect.mirror.MirrorSourceConnector.createOffsetSyncsTopic(MirrorSourceConnector.java:328) > at org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93) > at > org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112) > at > org.apache.kafka.connect.mirror.Scheduler.lambda$execute$2(Scheduler.java:63) > [...] > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TopicAuthorizationException: Authorization > failed. > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.connect.mirror.MirrorUtils.createCompactedTopic(MirrorUtils.java:124) > ... 11 more > Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: > Authorization failed. {code} > > h1. Root cause analysis > The changes introduced by KAFKA-13401 in > [{{{}MirrorUtils{}}}|https://github.com/apache/kafka/pull/12577/files#diff-fa8f595307a4ade20cc22253a7721828e3b55c96f778e9c4842c978801e0a1a4] > are supposed to follow the same logic as > [{{{}TopicAdmin{}}}|https://github.com/apache/kafka/blob/a7e865c0a756504cc7ae6f4eb0772caddc53/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L423] > according to the contributor's > [comment|https://github.com/apache/kafka/pull/12577#discussion_r991566108] > {{TopicAdmin.createOrFindTopics(...)}} and > {{MirrorUtils.createCompactedTopic(...)}} aren't aligned in terms of allowed > exceptions > ||Exception||TopicAdmin||MirrorUtils|| > |TopicExistsException|OK|OK| > |UnsupportedVersionException|OK|_KO_| > |ClusterAuthorizationException|OK|_KO_| > |TopicAuthorizationException|OK|_KO_| > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15465) MM2 not working when its internal topics are pre-created on a cluster that disallows topic creation
[ https://issues.apache.org/jira/browse/KAFKA-15465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed HIBOT updated KAFKA-15465: Description: h1. Replication steps * Setup a source kafka cluster (alias SOURCE) which doesn't allow topic creation to MM2 (therefore it doesn't allow the creation of internal topics) * Create MM2 internal topics in the source kafka cluster * Setup a target kafka cluster (alias TARGET) * Enable one way replication SOURCE->TARGET MM2 will attempt to create or find its internal topics on the source cluster but it will fail with the following stack trace {code:java} {"log_timestamp": "2023-09-13T09:39:25.612+", "log_level": "ERROR", "process_id": 1, "process_name": "mirror-maker", "thread_id": 1, "thread_name": "Scheduler for MirrorSourceConnector-creating upstream offset-syncs topic", "action_name": "org.apache.kafka.connect.mirror.Scheduler", "log_message": "Scheduler for MirrorSourceConnector caught exception in scheduled task: creating upstream offset-syncs topic"} org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic 'mm2-offset-syncs.TARGET.internal' at org.apache.kafka.connect.mirror.MirrorUtils.createCompactedTopic(MirrorUtils.java:155) at org.apache.kafka.connect.mirror.MirrorUtils.createSinglePartitionCompactedTopic(MirrorUtils.java:161) at org.apache.kafka.connect.mirror.MirrorSourceConnector.createOffsetSyncsTopic(MirrorSourceConnector.java:328) at org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93) at org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112) at org.apache.kafka.connect.mirror.Scheduler.lambda$execute$2(Scheduler.java:63) [...] Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at org.apache.kafka.connect.mirror.MirrorUtils.createCompactedTopic(MirrorUtils.java:124) ... 11 more Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed. {code} h1. Root cause analysis The changes introduced by KAFKA-13401 in [{{{}MirrorUtils{}}}|https://github.com/apache/kafka/pull/12577/files#diff-fa8f595307a4ade20cc22253a7721828e3b55c96f778e9c4842c978801e0a1a4] are supposed to follow the same logic as [{{{}TopicAdmin{}}}|https://github.com/apache/kafka/blob/a7e865c0a756504cc7ae6f4eb0772caddc53/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L423] according to the contributor's [comment|https://github.com/apache/kafka/pull/12577#discussion_r991566108] {{TopicAdmin.createOrFindTopics(...)}} and {{MirrorUtils.createCompactedTopic(...)}} aren't aligned in terms of allowed exceptions ||Exception||TopicAdmin||MirrorUtils|| |TopicExistsException|OK|OK| |UnsupportedVersionException|OK|_KO_| |ClusterAuthorizationException|OK|_KO_| |TopicAuthorizationException|OK|_KO_| > MM2 not working when its internal topics are pre-created on a cluster that > disallows topic creation > --- > > Key: KAFKA-15465 > URL: https://issues.apache.org/jira/browse/KAFKA-15465 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.4.1 >Reporter: Ahmed HIBOT >Priority: Major > > h1. Replication steps > * Setup a source kafka cluster (alias SOURCE) which doesn't allow topic > creation to MM2 (therefore it doesn't allow the creation of internal topics) > * Create MM2 internal topics in the source kafka cluster > * Setup a target kafka cluster (alias TARGET) > * Enable one way replication SOURCE->TARGET > MM2 will attempt to create or find its internal topics on the source cluster > but it will fail with the following stack trace > {code:java} > {"log_timestamp": "2023-09-13T09:39:25.612+", "log_level": "ERROR", > "process_id": 1, "process_name": "mirror-maker", "thread_id": 1, > "thread_name": "Scheduler for MirrorSourceConnector-creating upstream > offset-syncs topic", "action_name": > "org.apache.kafka.connect.mirror.Scheduler", "log_message": "Scheduler for > MirrorSourceConnector caught exception in scheduled task: creating upstream > offset-syncs topic"} > org.apache.kafka.connect.errors.ConnectException: Error while attempting to > create/find topic 'mm2-offset-syncs.TARGET.internal' > at > org.apache.kafka.connect.mirror.MirrorUtils.createCompactedTopic(MirrorUtils.java:155) > at > org.apach
[GitHub] [kafka] kirktrue commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager
kirktrue commented on code in PR #14359: URL: https://github.com/apache/kafka/pull/14359#discussion_r1325038436 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.IdempotentCloser; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.io.Closeable; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Predicate; + +/** + * {@code FetchBuffer} buffers up the results from the broker responses as they are received. It is essentially a + * wrapper around a {@link java.util.Queue} of {@link CompletedFetch}. + * + * + * + * Note: this class is not thread-safe and is intended to only be used from a single thread. + */ +public class FetchBuffer implements Closeable { + +private final Logger log; +private final ConcurrentLinkedQueue completedFetches; +private final IdempotentCloser idempotentCloser = new IdempotentCloser(); + +private CompletedFetch nextInLineFetch; + +public FetchBuffer(final LogContext logContext) { +this.log = logContext.logger(FetchBuffer.class); +this.completedFetches = new ConcurrentLinkedQueue<>(); +} + +/** + * Returns {@code true} if there are no completed fetches pending to return to the user. + * + * @return {@code true} if the buffer is empty, {@code false} otherwise + */ +boolean isEmpty() { +return completedFetches.isEmpty(); +} + +/** + * Return whether we have any completed fetches pending return to the user. This method is thread-safe. Has + * visibility for testing. + * + * @return {@code true} if there are completed fetches that match the {@link Predicate}, {@code false} otherwise + */ +boolean hasCompletedFetches(Predicate predicate) { +return completedFetches.stream().anyMatch(predicate); +} + +void add(CompletedFetch completedFetch) { +completedFetches.add(completedFetch); +} + +void addAll(Collection completedFetches) { +this.completedFetches.addAll(completedFetches); +} + +CompletedFetch nextInLineFetch() { +return nextInLineFetch; +} + +void setNextInLineFetch(CompletedFetch completedFetch) { +this.nextInLineFetch = completedFetch; +} + +CompletedFetch peek() { +return completedFetches.peek(); +} + +CompletedFetch poll() { +return completedFetches.poll(); +} + +/** + * Updates the buffer to retain only the fetch data that corresponds to the given partitions. Any previously + * {@link CompletedFetch fetched data} is removed if its partition is not in the given set of partitions. + * + * @param partitions {@link Set} of {@link TopicPartition}s for which any buffered data should be kept + */ +void retainAll(final Set partitions) { +completedFetches.removeIf(cf -> maybeDrain(partitions, cf)); + +if (maybeDrain(partitions, nextInLineFetch)) +nextInLineFetch = null; +} + +boolean maybeDrain(final Set partitions, final CompletedFetch completedFetch) { +if (completedFetch != null && !partitions.contains(completedFetch.partition)) { +log.debug("Removing {} from buffered fetch data as it is not in the set of partitions to retain ({})", completedFetch.partition, partitions); +completedFetch.drain(); +return true; +} else { +return false; +} +} + +Set partitions() { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15465) MM2 not working when its internal topics are pre-created on a cluster that disallows topic creation
Ahmed HIBOT created KAFKA-15465: --- Summary: MM2 not working when its internal topics are pre-created on a cluster that disallows topic creation Key: KAFKA-15465 URL: https://issues.apache.org/jira/browse/KAFKA-15465 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 3.4.1 Reporter: Ahmed HIBOT -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah commented on a diff in pull request #14376: KAFKA-15458: Fully resolve endpoint information before registering controllers
mumrah commented on code in PR #14376: URL: https://github.com/apache/kafka/pull/14376#discussion_r1324939918 ## metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java: ## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.ControllerRegistrationRequestData; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.RegisterControllerRecord; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + + +/** + * ListenerInfo contains information about the listeners of either a controller or a broker. + * ListenerInfo objects are immutable; they cannot be modified once created. The intention is + * that you store either controller listeners or broker listeners here, but not both. On a + * combined KRaft node, which has both broker and controller roles, you would have two + * separate ListenerInfo objects to represent the listeners of each role. + * + * Listener information is stored in a linked hash map. This maintains ordering while still + * allowing the traditional O(1) hash map access. By convention, the first listener is special, + * corresponding to either the inter-controller listener or the inter-broker listener. + * This is the only listener that other nodes will attempt to use to communicate with this node. + * + * You may wonder why nodes support multiple listeners, given that inter-cluster communication only + * ever uses the first one. Well, one reason is that external clients may wish to use the additional + * listeners. It is a good practice to separate external and internal traffic. In some cases, + * external traffic may be encrypted while internal traffic is not. (Although other admins may wish + * to encrypt everything.) Another reason is that supporting multiple listeners allows us to change + * the effective inter-cluster listener via a roll. During such a roll, half of the brokers + * (or controllers) might be using one listener, while the other half use another. This lets us, + * for example, transition from using a PLAINTEXT inter broker listener to using an SSL one without + * taking any downtime. + * + * The ListenerInfo class is intended to handle translating endpoint information between various + * different data structures, and also to handle the two big gotchas of Kafka endpoints. + * + * The first gotcha is that the hostname will be null or blank if we are listening on 0.0.0.0. + * The withWildcardHostnamesResolved function creates a ListenerInfo object where all such hostnames + * are replaced by specific hostnames. (It's not perfect because we have to choose a single hostname + * out of multiple possibilities. In production scenarios it would be better to set the desired + * hostname explicitly in the configuration rather than binding to 0.0.0.0.) + * + * The second gotcha is that if someone configures an ephemeral port (aka port 0), we need to fill + * The withEphemeralPortsCorrected resolves this by filling in the missing information for ephemeral Review Comment: seems some words are missing here? ## metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java: ## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writin
[GitHub] [kafka] philipnee opened a new pull request, #14386: KAFKA-14960: TopicMetadata request manager
philipnee opened a new pull request, #14386: URL: https://github.com/apache/kafka/pull/14386 TopicMetadataRequestManager is responsible for sending topic metadata requests. The manager manages API requests and build the request accordingly. All topic metadata requests are chained, if requesting the same topic, to avoid sending requests repeatedly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee closed pull request #14362: KAFKA-14960: TopicMetadata request manager
philipnee closed pull request #14362: KAFKA-14960: TopicMetadata request manager URL: https://github.com/apache/kafka/pull/14362 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm opened a new pull request, #14385: KAFKA-15306 - Integrating committed offsets for updating fetch positions
lianetm opened a new pull request, #14385: URL: https://github.com/apache/kafka/pull/14385 Support for using committed offsets to update fetch positions. This PR includes: - moving`refreshCommittedOffsets` function out of the existing `ConsumerCoordinator` so it can be reused (no code changes) - using the above `refreshCommittedOffsets` for updating fetch positions if the consumer has enabled the Kafka-based offsets management by defining a groupId -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15464) Allow dynamic reloading of certificates with different DN / SANs
[ https://issues.apache.org/jira/browse/KAFKA-15464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jakub Scholz reassigned KAFKA-15464: Assignee: Jakub Scholz > Allow dynamic reloading of certificates with different DN / SANs > > > Key: KAFKA-15464 > URL: https://issues.apache.org/jira/browse/KAFKA-15464 > Project: Kafka > Issue Type: Improvement >Reporter: Jakub Scholz >Assignee: Jakub Scholz >Priority: Major > > Kafka currently doesn't allow dynamic reloading of keystores when the new key > has a different DN or removes some of the SANs. While it might help to > prevent users from breaking their cluster, in some cases it would be great to > be able to bypass this validation when desired. > More details are in the [KIP-978: Allow dynamic reloading of certificates > with different DN / > SANs|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429128] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15464) Allow dynamic reloading of certificates with different DN / SANs
Jakub Scholz created KAFKA-15464: Summary: Allow dynamic reloading of certificates with different DN / SANs Key: KAFKA-15464 URL: https://issues.apache.org/jira/browse/KAFKA-15464 Project: Kafka Issue Type: Improvement Reporter: Jakub Scholz Kafka currently doesn't allow dynamic reloading of keystores when the new key has a different DN or removes some of the SANs. While it might help to prevent users from breaking their cluster, in some cases it would be great to be able to bypass this validation when desired. More details are in the [KIP-978: Allow dynamic reloading of certificates with different DN / SANs|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429128] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15452) Custom KafkaPrincipalBuilder Cannot Access SslPrincipalMapper
[ https://issues.apache.org/jira/browse/KAFKA-15452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764839#comment-17764839 ] Raghu Baddam commented on KAFKA-15452: -- [~hachikuji] /[~mimaison] /[~mjsax]: Do you mind validating this bug and assign it to me ? Tagging Active Contributors for visibility: [~divijvaidya] , [~satish.duggana], [~calvinliu] Thanks in advance. > Custom KafkaPrincipalBuilder Cannot Access SslPrincipalMapper > - > > Key: KAFKA-15452 > URL: https://issues.apache.org/jira/browse/KAFKA-15452 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.1 >Reporter: Raghu Baddam >Priority: Minor > Labels: kafka, walmart > > When implementing a custom KafkaPrincipalBuilder, it is not possible to > access *SslPrincipalMapper* and {*}kerberosShortNamer{*}. This limits the > ability to parse Regex Rules from > BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG, resulting in lack > of support for Mapping Rules as SslPrincipalMapper is null. > A possible workaround is to read the configuration and build another > SslPrincipalMapper. However, it would be beneficial if this issue could be > addressed at the ChannelBuilders or SslChannelBuilder level. > Git Reference: > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java#L223-L242] > If the community deems this to be a bug, I am willing to provide a fix. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] zhaohaidao commented on pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on PR #14271: URL: https://github.com/apache/kafka/pull/14271#issuecomment-1718110509 > @zhaohaidao Thanks for all the work you are doing on this. I really appreciate it. I have created a few jiras for the next steps: > > * https://issues.apache.org/jira/browse/KAFKA-15460 > * https://issues.apache.org/jira/browse/KAFKA-15461 > * https://issues.apache.org/jira/browse/KAFKA-15462 > > I have assigned all of them to you for now as you were interested in pursuing this work. Feel free to unassign them if you don't want or plan to tackle them. > > From a priority perspective, tacking https://issues.apache.org/jira/browse/KAFKA-14504 before the above mentioned jiras would be really helpful as it will allow us to start end-to-end testing when we have all the existing APIs in place. Thank you for your suggestion. I will give priority to 14504 and deal with other tasks one after another. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a diff in pull request #14053: KAFKA-15221; Fix the race between fetch requests from a rebooted follower.
splett2 commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1324902142 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -864,13 +866,19 @@ class Partition(val topicPartition: TopicPartition, // No need to calculate low watermark if there is no delayed DeleteRecordsRequest val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset -replica.updateFetchState( - followerFetchOffsetMetadata, - followerStartOffset, - followerFetchTimeMs, - leaderEndOffset, - brokerEpoch -) + +// Apply read lock here to avoid the race between ISR updates and the fetch requests from rebooted follower. It +// could break the broker epoch checks in the ISR expansion. +inReadLock(leaderIsrUpdateLock) { Review Comment: I think we should pass through the fetch request leader epoch and validate that it is still valid as well. Otherwise we can end up with a stale fetch updating the replica state for a new epoch. It may not be necessary to do it in this PR though. @dajac wdyt -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on PR #14271: URL: https://github.com/apache/kafka/pull/14271#issuecomment-1718105046 > @zhaohaidao Thanks for the update. I made another pass on it and I left a bunch of small comments. We should be able to merge it when they are addressed. Could you also rebase the PR to fix the conflicts? Hey @dajac Except for a few comments that may still need to be discussed and are still in unresolved status, the other comments have been addressed. Please take a look when you have time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1324884926 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -431,14 +434,17 @@ public void rollback() { snapshotRegistry.revertToSnapshot(lastCommittedOffset); } +public void getOrCreateSnapshot() { +snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); +} Review Comment: This will cause testListGroups to fail because groupMetadataManager.listGroups always uses lastCommittedOffset to query. Putting getOrCreateSnapshot(lastWrittenOffset) at the beginning of the replay function will lead to the following result: the largest epoch in the snapshot is always 1 less than lastCommittedOffset. In this way, listGroups cannot find the expected results. Putting getOrCreateSnapshot(lastWrittenOffset) at the end of the function, especially after lastWrittenOffset+1, all cases can pass, which is consistent with the semantics of CoordinatorRuntime, that is, always update the memory first, then update lastWrittenOffset, and use the latest lastWrittenOffset to update snapshot. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1324884926 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -431,14 +434,17 @@ public void rollback() { snapshotRegistry.revertToSnapshot(lastCommittedOffset); } +public void getOrCreateSnapshot() { +snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); +} Review Comment: This will cause testListGroups to fail because groupMetadataManager.listGroups always uses lastCommittedOffset to query. Putting getOrCreateSnapshot(lastWrittenOffset) at the beginning of the replay function will lead to the following result: the largest epoch in the snapshot is always 1 less than lastCommittedOffset. In this way, listGroups cannot find the expected results. Put getOrCreateSnapshot(lastWrittenOffset) at the end of the function, especially after lastWrittenOffset+1, all cases can pass, which is consistent with the semantics of CoordinatorRuntime, that is, always update the memory first, then update lastWrittenOffset, and use the latest lastWrittenOffset Update snapshot. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
jolshan commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324872270 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig, val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, origin, verifiedEntries, requiredAcks, requestLocal, verificationGuards.toMap) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) - -val unverifiedResults = unverifiedEntries.map { case (topicPartition, error) => - val message = if (error == Errors.INVALID_TXN_STATE) "Partition was not added to the transaction" else error.message() - topicPartition -> LogAppendResult( -LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, -Some(error.exception(message)) - ) -} -val errorResults = errorsPerPartition.map { case (topicPartition, error) => - topicPartition -> LogAppendResult( -LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, -Some(error.exception()) - ) +def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult], +useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = { + appendResult.map { case (topicPartition, result) => +topicPartition -> ProducePartitionStatus( + result.info.lastOffset + 1, // required offset + new PartitionResponse( +result.error, +result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L), +result.info.lastOffset, +result.info.logAppendTime, +result.info.logStartOffset, +result.info.recordErrors, +if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage Review Comment: Correct -- when we use custom message, we have always used LogAppendInfo.UnknownLogAppendInfo. I'm not sure why it is this way. Perhaps if it is the default error message (which was the case previously) we didn't want to bloat the response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
dajac commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324871934 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2569,6 +2571,70 @@ class ReplicaManagerTest { assertEquals((Errors.NONE, node0), replicaManager.getTransactionCoordinator(txnCoordinatorPartition0)) assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), replicaManager.getTransactionCoordinator(txnCoordinatorPartition1)) + + // Test we convert the error correctly when trying to append and coordinator is not available + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 0 + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(txnCoordinatorPartition1)) + val expectedError = s"Unable to verify the partition has been added to the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}" + assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(expectedError, result.assertFired.errorMessage) +} finally { + replicaManager.shutdown(checkpointHW = false) +} + } + + @Test + def testVerificationErrorConversions(): Unit = { +val tp0 = new TopicPartition(topic, 0) +val producerId = 24L +val producerEpoch = 0.toShort +val sequence = 0 +val node = new Node(0, "host1", 0) +val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + +val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node) +try { + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) + + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + + val transactionToAdd = new AddPartitionsToTxnTransaction() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(producerEpoch) +.setVerifyOnly(true) +.setTopics(new AddPartitionsToTxnTopicCollection( + Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava +)) + + // Start verification and return the coordinator related errors. + var invocations = 1 + def verifyError(error: Errors): Unit = { +val expectedMessage = s"Unable to verify the partition has been added to the transaction. Underlying error:${error.toString}" +val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0)) +val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback]) +verify(addPartitionsToTxnManager, times(invocations)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture()) + +// Confirm we did not write to the log and instead returned the converted error with the correct error message. +val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue() +callback(Map(tp0 -> error).toMap) +assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) +assertEquals(expectedMessage, result.assertFired.errorMessage) +invocations = invocations + 1 + } + + Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_)) Review Comment: That's fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1324870944 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -599,6 +608,164 @@ public void testHeartbeatCoordinatorException() throws Exception { ); } +@Test +public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +int partitionCount = 3; +service.startup(() -> partitionCount); + +ListGroupsRequestData request = new ListGroupsRequestData(); + +List expectedResults = Arrays.asList( +new ListGroupsResponseData.ListedGroup() +.setGroupId("group0") +.setGroupState("Stable") +.setProtocolType("protocol1"), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group1") +.setGroupState("Empty") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group2") +.setGroupState("Dead") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +); +Map> expectResultMap = new HashMap<>(); +for (ListGroupsResponseData.ListedGroup result : expectedResults) { +expectResultMap.put(result.groupId(), Collections.singletonList(result)); +} +when(runtime.partitions()).thenReturn(Sets.newSet( +new TopicPartition("__consumer_offsets", 0), +new TopicPartition("__consumer_offsets", 1), +new TopicPartition("__consumer_offsets", 2))); +for (int i = 0; i < partitionCount; i++) { +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("list-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), +ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i; +} + +CompletableFuture responseFuture = service.listGroups( +requestContext(ApiKeys.LIST_GROUPS), +request +); + +List actualResults = responseFuture.get(5, TimeUnit.SECONDS).groups(); +assertEquals(expectedResults, actualResults); +assertEquals(expectResultMap.size(), actualResults.size()); +for (ListGroupsResponseData.ListedGroup result : actualResults) { +assertEquals(expectResultMap.get(result.groupId()), Collections.singletonList(result)); +} +} + +@Test +public void testListGroupsFailedWithNotCoordinatorException() +throws InterruptedException, ExecutionException, TimeoutException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +int partitionCount = 3; +service.startup(() -> partitionCount); + +List expectedResults = Arrays.asList( +new ListGroupsResponseData.ListedGroup() +.setGroupId("group0") +.setGroupState("Stable") +.setProtocolType("protocol1"), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group1") +.setGroupState("Empty") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +); +Map> expectResultMap = new HashMap<>(); +for (ListGroupsResponseData.ListedGroup result : expectedResults) { +expectResultMap.put(result.groupId(), Collections.singletonList(result)); +} + +ListGroupsRequestData request = new ListGroupsRequestData(); +when(runtime.partitions()).thenReturn(Sets.newSet( +new TopicPartition("__consumer_offsets", 0), +new TopicPartition("__consumer_offsets", 1), +new TopicPartition("__consumer_offsets", 2))); +for (int i = 0; i < 2; i++) { +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("list-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), +ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i; +} + +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("list-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)), +ArgumentMatchers.any() +)).thenReturn(FutureUtils.failedFuture(new NotCoor
[GitHub] [kafka] jolshan commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
jolshan commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324870442 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2569,6 +2571,70 @@ class ReplicaManagerTest { assertEquals((Errors.NONE, node0), replicaManager.getTransactionCoordinator(txnCoordinatorPartition0)) assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), replicaManager.getTransactionCoordinator(txnCoordinatorPartition1)) + + // Test we convert the error correctly when trying to append and coordinator is not available + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 0 + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(txnCoordinatorPartition1)) + val expectedError = s"Unable to verify the partition has been added to the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}" + assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(expectedError, result.assertFired.errorMessage) +} finally { + replicaManager.shutdown(checkpointHW = false) +} + } + + @Test + def testVerificationErrorConversions(): Unit = { +val tp0 = new TopicPartition(topic, 0) +val producerId = 24L +val producerEpoch = 0.toShort +val sequence = 0 +val node = new Node(0, "host1", 0) +val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + +val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node) +try { + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) + + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + + val transactionToAdd = new AddPartitionsToTxnTransaction() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(producerEpoch) +.setVerifyOnly(true) +.setTopics(new AddPartitionsToTxnTopicCollection( + Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava +)) + + // Start verification and return the coordinator related errors. + var invocations = 1 + def verifyError(error: Errors): Unit = { +val expectedMessage = s"Unable to verify the partition has been added to the transaction. Underlying error:${error.toString}" +val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0)) +val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback]) +verify(addPartitionsToTxnManager, times(invocations)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture()) + +// Confirm we did not write to the log and instead returned the converted error with the correct error message. +val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue() +callback(Map(tp0 -> error).toMap) +assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) +assertEquals(expectedMessage, result.assertFired.errorMessage) +invocations = invocations + 1 + } + + Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_)) Review Comment: Oops. I already added it. I can remove it. Either way is fine with me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
dajac commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324868208 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig, val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, origin, verifiedEntries, requiredAcks, requestLocal, verificationGuards.toMap) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) - -val unverifiedResults = unverifiedEntries.map { case (topicPartition, error) => - val message = if (error == Errors.INVALID_TXN_STATE) "Partition was not added to the transaction" else error.message() - topicPartition -> LogAppendResult( -LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, -Some(error.exception(message)) - ) -} -val errorResults = errorsPerPartition.map { case (topicPartition, error) => - topicPartition -> LogAppendResult( -LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, -Some(error.exception()) - ) +def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult], +useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = { + appendResult.map { case (topicPartition, result) => +topicPartition -> ProducePartitionStatus( + result.info.lastOffset + 1, // required offset + new PartitionResponse( +result.error, +result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L), +result.info.lastOffset, +result.info.logAppendTime, +result.info.logStartOffset, +result.info.recordErrors, +if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage Review Comment: I suppose that you are saying that `result.info.errorMessage` is always null when `useCustomMessage` is true because we know that `LogAppendInfo.UnknownLogAppendInfo` does not set it. Am I right? What I was wondering is why don't we set it all the time... I suppose that what you did here is fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
dajac commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324869043 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2569,6 +2571,70 @@ class ReplicaManagerTest { assertEquals((Errors.NONE, node0), replicaManager.getTransactionCoordinator(txnCoordinatorPartition0)) assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), replicaManager.getTransactionCoordinator(txnCoordinatorPartition1)) + + // Test we convert the error correctly when trying to append and coordinator is not available + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 0 + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(txnCoordinatorPartition1)) + val expectedError = s"Unable to verify the partition has been added to the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}" + assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(expectedError, result.assertFired.errorMessage) +} finally { + replicaManager.shutdown(checkpointHW = false) +} + } + + @Test + def testVerificationErrorConversions(): Unit = { +val tp0 = new TopicPartition(topic, 0) +val producerId = 24L +val producerEpoch = 0.toShort +val sequence = 0 +val node = new Node(0, "host1", 0) +val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + +val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node) +try { + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) + + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + + val transactionToAdd = new AddPartitionsToTxnTransaction() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(producerEpoch) +.setVerifyOnly(true) +.setTopics(new AddPartitionsToTxnTopicCollection( + Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava +)) + + // Start verification and return the coordinator related errors. + var invocations = 1 + def verifyError(error: Errors): Unit = { +val expectedMessage = s"Unable to verify the partition has been added to the transaction. Underlying error:${error.toString}" +val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0)) +val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback]) +verify(addPartitionsToTxnManager, times(invocations)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture()) + +// Confirm we did not write to the log and instead returned the converted error with the correct error message. +val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue() +callback(Map(tp0 -> error).toMap) +assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) +assertEquals(expectedMessage, result.assertFired.errorMessage) +invocations = invocations + 1 Review Comment: Personally, I would use the ParameterizedTest but I leave it up to you. The `invocations` feels a bit hacky. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
dajac commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324868507 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2569,6 +2571,70 @@ class ReplicaManagerTest { assertEquals((Errors.NONE, node0), replicaManager.getTransactionCoordinator(txnCoordinatorPartition0)) assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), replicaManager.getTransactionCoordinator(txnCoordinatorPartition1)) + + // Test we convert the error correctly when trying to append and coordinator is not available + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 0 + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(txnCoordinatorPartition1)) + val expectedError = s"Unable to verify the partition has been added to the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}" + assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(expectedError, result.assertFired.errorMessage) +} finally { + replicaManager.shutdown(checkpointHW = false) +} + } + + @Test + def testVerificationErrorConversions(): Unit = { +val tp0 = new TopicPartition(topic, 0) +val producerId = 24L +val producerEpoch = 0.toShort +val sequence = 0 +val node = new Node(0, "host1", 0) +val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + +val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node) +try { + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) + + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + + val transactionToAdd = new AddPartitionsToTxnTransaction() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(producerEpoch) +.setVerifyOnly(true) +.setTopics(new AddPartitionsToTxnTopicCollection( + Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava +)) + + // Start verification and return the coordinator related errors. + var invocations = 1 + def verifyError(error: Errors): Unit = { +val expectedMessage = s"Unable to verify the partition has been added to the transaction. Underlying error:${error.toString}" +val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0)) +val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback]) +verify(addPartitionsToTxnManager, times(invocations)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture()) + +// Confirm we did not write to the log and instead returned the converted error with the correct error message. +val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue() +callback(Map(tp0 -> error).toMap) +assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) +assertEquals(expectedMessage, result.assertFired.errorMessage) +invocations = invocations + 1 + } + + Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_)) Review Comment: Understood. I think that it is fine as it is then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
dajac commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324868208 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig, val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, origin, verifiedEntries, requiredAcks, requestLocal, verificationGuards.toMap) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) - -val unverifiedResults = unverifiedEntries.map { case (topicPartition, error) => - val message = if (error == Errors.INVALID_TXN_STATE) "Partition was not added to the transaction" else error.message() - topicPartition -> LogAppendResult( -LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, -Some(error.exception(message)) - ) -} -val errorResults = errorsPerPartition.map { case (topicPartition, error) => - topicPartition -> LogAppendResult( -LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, -Some(error.exception()) - ) +def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult], +useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = { + appendResult.map { case (topicPartition, result) => +topicPartition -> ProducePartitionStatus( + result.info.lastOffset + 1, // required offset + new PartitionResponse( +result.error, +result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L), +result.info.lastOffset, +result.info.logAppendTime, +result.info.logStartOffset, +result.info.recordErrors, +if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage Review Comment: I suppose that you are saying that `result.info.errorMessage` is always null when `useCustomMessage` is true because we know that `LogAppendInfo.UnknownLogAppendInfo` does not set it. Am I right? What I was wondering is why don't we set it all the time. I suppose that what you did here is fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1324869392 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8574,6 +8584,93 @@ public void testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } +@Test +public void testListGroups() { +String consumerGroupId = "consumer-group-id"; +String genericGroupId = "generic-group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String genericGroupType = "generic"; +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)) +.build(); +context.replay(newGroupMetadataRecord( +genericGroupId, +new GroupMetadataValue() +.setMembers(Collections.emptyList()) +.setGeneration(2) +.setLeader(null) +.setProtocolType(genericGroupType) +.setProtocol("range") +.setCurrentStateTimestamp(context.time.milliseconds()), +MetadataVersion.latest())); +context.commit(); +context.getOrCreateSnapshot(); +GenericGroup genericGroup = context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, false); + +Map actualAllGroupMap = +context.sendListGroups(Collections.emptyList()) + .stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); +Map expectAllGroupMap = +Stream.of(new ListGroupsResponseData.ListedGroup() +.setGroupId(genericGroup.groupId()) +.setProtocolType(genericGroupType) +.setGroupState(EMPTY.toString()), +new ListGroupsResponseData.ListedGroup() +.setGroupId(consumerGroupId) +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())) + .collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
dajac commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324869297 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -753,39 +753,56 @@ class ReplicaManager(val config: KafkaConfig, val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, origin, verifiedEntries, requiredAcks, requestLocal, verificationGuards.toMap) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) - -val unverifiedResults = unverifiedEntries.map { case (topicPartition, error) => - val message = if (error == Errors.INVALID_TXN_STATE) "Partition was not added to the transaction" else error.message() - topicPartition -> LogAppendResult( -LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, -Some(error.exception(message)) - ) -} -val errorResults = errorsPerPartition.map { case (topicPartition, error) => - topicPartition -> LogAppendResult( -LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, -Some(error.exception()) - ) +def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult], +useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = { + appendResult.map { case (topicPartition, result) => +topicPartition -> ProducePartitionStatus( + result.info.lastOffset + 1, // required offset + new PartitionResponse( +result.error, +result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L), +result.info.lastOffset, +result.info.logAppendTime, +result.info.logStartOffset, +result.info.recordErrors, +if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage + ) +) // response status + } } -val allResults = localProduceResults ++ unverifiedResults ++ errorResults +val unverifiedResults = unverifiedEntries.map { + case (topicPartition, error) => +val finalException = + error match { +case Errors.INVALID_TXN_STATE => error.exception("Partition was not added to the transaction") +case Errors.CONCURRENT_TRANSACTIONS | + Errors.COORDINATOR_LOAD_IN_PROGRESS | + Errors.COORDINATOR_NOT_AVAILABLE | + Errors.NOT_COORDINATOR => new NotEnoughReplicasException( + s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}") Review Comment: nit: Indentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1324869129 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8574,6 +8584,93 @@ public void testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } +@Test +public void testListGroups() { +String consumerGroupId = "consumer-group-id"; +String genericGroupId = "generic-group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String genericGroupType = "generic"; +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)) +.build(); +context.replay(newGroupMetadataRecord( +genericGroupId, +new GroupMetadataValue() +.setMembers(Collections.emptyList()) +.setGeneration(2) +.setLeader(null) +.setProtocolType(genericGroupType) +.setProtocol("range") +.setCurrentStateTimestamp(context.time.milliseconds()), +MetadataVersion.latest())); +context.commit(); +context.getOrCreateSnapshot(); +GenericGroup genericGroup = context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, false); + +Map actualAllGroupMap = +context.sendListGroups(Collections.emptyList()) + .stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); +Map expectAllGroupMap = +Stream.of(new ListGroupsResponseData.ListedGroup() +.setGroupId(genericGroup.groupId()) +.setProtocolType(genericGroupType) +.setGroupState(EMPTY.toString()), +new ListGroupsResponseData.ListedGroup() +.setGroupId(consumerGroupId) +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())) + .collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); + +assertEquals(expectAllGroupMap.size(), actualAllGroupMap.size()); +for (Map.Entry entry : expectAllGroupMap.entrySet()) { +assertEquals(entry.getValue(), actualAllGroupMap.get(entry.getKey())); +} + + context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new ConsumerGroupMember.Builder(memberId1) +.setSubscribedTopicNames(Collections.singletonList(fooTopicName)) +.build())); +context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11)); +context.commit(); Review Comment: Can you be more detailed? I don’t quite understand this comment. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8574,6 +8584,93 @@ public void testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } +@Test +public void testListGroups() { +String consumerGroupId = "consumer-group-id"; +String genericGroupId = "generic-group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String genericGroupType = "generic"; +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)) +.build(); +context.replay(newGroupMetadataRecord( +genericGroupId, +new GroupMetadataValue() +.setMembers(Collections.emptyList()) +.setGeneration(2) +.setLeader(null) +.setProtocolType(genericGroupType) +.setProtocol("range") +.setCurrentStateTimestamp(context.time.milliseconds()), +MetadataVersion.latest())); +context.commit(); +context.getOrCreateSnapshot(); +GenericGroup genericGroup = context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, false); + +Map actualAllGroupMap
[GitHub] [kafka] splett2 commented on a diff in pull request #14053: KAFKA-15221; Fix the race between fetch requests from a rebooted follower.
splett2 commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1324845475 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -137,7 +137,8 @@ object Partition { delayedOperations = delayedOperations, metadataCache = replicaManager.metadataCache, logManager = replicaManager.logManager, - alterIsrManager = replicaManager.alterPartitionManager) + alterIsrManager = replicaManager.alterPartitionManager, + zkMigrationEnabled = () => replicaManager.config.migrationEnabled) Review Comment: we can just pass this in as a simple boolean. My understanding is that zk migration is not a dynamic config. ## core/src/main/scala/kafka/cluster/Replica.scala: ## @@ -98,14 +103,22 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log * fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at * high frequency. */ - def updateFetchState( + def updateFetchStateOrThrow( followerFetchOffsetMetadata: LogOffsetMetadata, followerStartOffset: Long, followerFetchTimeMs: Long, leaderEndOffset: Long, -brokerEpoch: Long +brokerEpoch: Long, +verifyBrokerEpoch: Boolean = false Review Comment: I would prefer not to use a default parameter. I guess we did it because `updateFetchStateOrThrow` has quite a few callers. Maybe we can pass the value for `verifyBrokerEpoch` into the `Replica` constructor to minimize some of the changes? It also helps us keep the `updateFetchState` a bit simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15463) StreamsException: Accessing from an unknown node
[ https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15463: Priority: Major (was: Blocker) > StreamsException: Accessing from an unknown node > - > > Key: KAFKA-15463 > URL: https://issues.apache.org/jira/browse/KAFKA-15463 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.1 >Reporter: Yevgeny >Priority: Major > > After some time application was working fine, starting to get: > > This is springboot application runs in kubernetes as stateful pod. > > > > {code:java} > Exception in thread > "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown > node at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162) > at myclass1.java:28) at myclass2.java:48) at > java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at > java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602) > at > java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129) > at > java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230) > at > java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637) > at myclass3.java:48) at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) > at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) > at > org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551) > {code} > > stream-thread > [-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State > transition from PENDING_SHUTDOWN to DEAD > > > Transformer is Prototype bean, the supplier supplys new instance of the > Transformer: > > > {code:java} > @Override public Transformer> get() > { return ctx.getBean(MyTransformer.class); }{code} > > > The only way to recover is to delete all topics used by kafkastreams, even if > application restarted same exception is thrown. > *If messages in internal topics of 'store-changelog' are deleted/offset > manipulated, can it cause the issue? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax commented on a diff in pull request #14322: KAFKA-15418: update statement on decompression
mjsax commented on code in PR #14322: URL: https://github.com/apache/kafka/pull/14322#discussion_r1324811185 ## docs/design.html: ## @@ -136,8 +136,10 @@ -Kafka supports this with an efficient batching format. A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will -remain compressed in the log and will only be decompressed by the consumer. +Kafka supports this with an efficient batching format. A batch of messages can be grouped together, compressed, and sent to the server in this form. The broker decompresses the batch in order to validate it. For +example, it validates that the number of records in the batch is same as what batch header states. The broker may also potentially modify the batch (e.g., if the topic is compacted, the broker will filter out Review Comment: Well, that's on the log-cleaner thread, so when actual compaction happens. This part of the docs is focusing on the write path though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] davetroiano commented on a diff in pull request #14322: KAFKA-15418: update statement on decompression
davetroiano commented on code in PR #14322: URL: https://github.com/apache/kafka/pull/14322#discussion_r1324799715 ## docs/design.html: ## @@ -136,8 +136,10 @@ -Kafka supports this with an efficient batching format. A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will -remain compressed in the log and will only be decompressed by the consumer. +Kafka supports this with an efficient batching format. A batch of messages can be grouped together, compressed, and sent to the server in this form. The broker decompresses the batch in order to validate it. For +example, it validates that the number of records in the batch is same as what batch header states. The broker may also potentially modify the batch (e.g., if the topic is compacted, the broker will filter out Review Comment: @divijvaidya @mjsax won't the broker decompress async to perform compaction filtering? ([here](https://github.com/apache/kafka/blob/a7e865c0a756504cc7ae6f4eb0772caddc53/core/src/main/scala/kafka/log/LogCleaner.scala#L801)) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
jolshan commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324792282 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2569,6 +2571,70 @@ class ReplicaManagerTest { assertEquals((Errors.NONE, node0), replicaManager.getTransactionCoordinator(txnCoordinatorPartition0)) assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), replicaManager.getTransactionCoordinator(txnCoordinatorPartition1)) + + // Test we convert the error correctly when trying to append and coordinator is not available + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 0 + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(txnCoordinatorPartition1)) + val expectedError = s"Unable to verify the partition has been added to the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}" + assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(expectedError, result.assertFired.errorMessage) +} finally { + replicaManager.shutdown(checkpointHW = false) +} + } + + @Test + def testVerificationErrorConversions(): Unit = { +val tp0 = new TopicPartition(topic, 0) +val producerId = 24L +val producerEpoch = 0.toShort +val sequence = 0 +val node = new Node(0, "host1", 0) +val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + +val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node) +try { + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) + + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + + val transactionToAdd = new AddPartitionsToTxnTransaction() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(producerEpoch) +.setVerifyOnly(true) +.setTopics(new AddPartitionsToTxnTopicCollection( + Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava +)) + + // Start verification and return the coordinator related errors. + var invocations = 1 + def verifyError(error: Errors): Unit = { +val expectedMessage = s"Unable to verify the partition has been added to the transaction. Underlying error:${error.toString}" +val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0)) +val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback]) +verify(addPartitionsToTxnManager, times(invocations)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture()) + +// Confirm we did not write to the log and instead returned the converted error with the correct error message. +val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue() +callback(Map(tp0 -> error).toMap) +assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) +assertEquals(expectedMessage, result.assertFired.errorMessage) +invocations = invocations + 1 + } + + Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_)) Review Comment: Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
zhaohaidao commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1324791489 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -599,6 +608,164 @@ public void testHeartbeatCoordinatorException() throws Exception { ); } +@Test +public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +int partitionCount = 3; +service.startup(() -> partitionCount); + +ListGroupsRequestData request = new ListGroupsRequestData(); + +List expectedResults = Arrays.asList( +new ListGroupsResponseData.ListedGroup() +.setGroupId("group0") +.setGroupState("Stable") +.setProtocolType("protocol1"), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group1") +.setGroupState("Empty") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE), +new ListGroupsResponseData.ListedGroup() +.setGroupId("group2") +.setGroupState("Dead") +.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +); +Map> expectResultMap = new HashMap<>(); +for (ListGroupsResponseData.ListedGroup result : expectedResults) { +expectResultMap.put(result.groupId(), Collections.singletonList(result)); +} +when(runtime.partitions()).thenReturn(Sets.newSet( +new TopicPartition("__consumer_offsets", 0), +new TopicPartition("__consumer_offsets", 1), +new TopicPartition("__consumer_offsets", 2))); +for (int i = 0; i < partitionCount; i++) { +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("list-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), +ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i; +} + +CompletableFuture responseFuture = service.listGroups( +requestContext(ApiKeys.LIST_GROUPS), +request +); + +List actualResults = responseFuture.get(5, TimeUnit.SECONDS).groups(); +assertEquals(expectedResults, actualResults); +assertEquals(expectResultMap.size(), actualResults.size()); +for (ListGroupsResponseData.ListedGroup result : actualResults) { +assertEquals(expectResultMap.get(result.groupId()), Collections.singletonList(result)); +} Review Comment: Map is used here because the order of the list returned by the function may be different from the expected one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager
junrao commented on code in PR #14359: URL: https://github.com/apache/kafka/pull/14359#discussion_r1323771865 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -63,6 +68,31 @@ public NetworkClientDelegate( this.unsentRequests = new ArrayDeque<>(); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +this.tryConnectNodes = new HashSet<>(); +} + +@Override +public boolean isUnavailable(Node node) { +return NetworkClientUtils.isUnavailable(client, node, time); +} + +@Override +public void maybeThrowAuthFailure(Node node) { +NetworkClientUtils.maybeThrowAuthFailure(client, node); +} + +@Override +public void tryConnect(Node node) { Review Comment: `tryConnect` and `maybeTryConnect` seem highly coupled. Why is only the former part of the `NodeStatusDetector` interface? Also, it seems that we have used `maybeTryConnect`, but not `tryConnect` in this class. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.TopicPartition; + +/** + * {@code FetchUtils} provides a place for disparate parts of the fetch logic to live. + */ +public class FetchUtils { + +/** + * Performs two combined actions based on the state related to the {@link TopicPartition}: + * + * + * + * Invokes {@link ConsumerMetadata#requestUpdate(boolean)} to signal that the metadata is incorrect and + * needs to be updated + * + * + * Invokes {@link SubscriptionState#clearPreferredReadReplica(TopicPartition)} to clear out any read replica + * information that may be present. + * + * + * + * This utility method should be invoked if the client detects (or is told by a node in the broker) that an + * attempt was made to fetch from a node that isn't the leader or preferred replica. + * + * @param metadata {@link ConsumerMetadata} for which to request an update + * @param subscriptions {@link SubscriptionState} to clear any internal read replica node + * @param topicPartition {@link TopicPartition} for which this state change is related + */ +static void requestMetadataUpdate(final ConsumerMetadata metadata, + final SubscriptionState subscriptions, + final TopicPartition topicPartition) { +metadata.requestUpdate(true); Review Comment: This was set to `false`. Why is it changed to `true`? I think `false` is correct since we don't want to reset the metadata retry count when receiving retriable error due to stale metadata. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeStatusDetector.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.NetworkClientUtils; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthenticationExcep
[GitHub] [kafka] OmniaGM commented on pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools
OmniaGM commented on PR #13204: URL: https://github.com/apache/kafka/pull/13204#issuecomment-1717905593 I found out why the tests were failing. It turns out that we are hitting a similar problem to this gradle [issue#847](https://github.com/gradle/gradle/issues/847) that causing transitive dependency problems where `storage:api` and `connect:api` were causing unintended conflicts. The gradle issue seems to not be 100% solved so I pushed a fix with a workaround that renames the `storage:api` project to `storage:storage-api`. This shouldn't impact the name of the final jar. I pushed the changes and am waiting for the pipelines to pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM commented on pull request #13201: KAFKA-14596: Move TopicCommand to tools
OmniaGM commented on PR #13201: URL: https://github.com/apache/kafka/pull/13201#issuecomment-1717877198 > Hi @OmniaGM, thanks. I left some comments. > > Additionally, this error message is different from previous implementation: > > ```shell > ### OLD > $ bin/kafka-topics.sh --bootstrap-server :9092 --describe --topic my-topic-foo --delete-config sdsad > Option "[delete-config]" can't be used with option "[describe]" > > ### NEW > $ bin/kafka-topics.sh --bootstrap-server :9092 --describe --topic my-topic-foo --delete-config sdsad > Option "[delete-config]" can't be used with option "[bootstrap-server]" > ``` Good catch @fvaleri , I missed that `immutable.Set` which is used to define invalid options in old scala code does preserve the order of insertion so when we check `bootstrap` option isn't the first on the list to compare but instead it's `describe`. I updated the code to use `LinkedHashSet` instead of `Set` in Java. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
jolshan commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324699341 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig, val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, origin, verifiedEntries, requiredAcks, requestLocal, verificationGuards.toMap) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) - -val unverifiedResults = unverifiedEntries.map { case (topicPartition, error) => - val message = if (error == Errors.INVALID_TXN_STATE) "Partition was not added to the transaction" else error.message() - topicPartition -> LogAppendResult( -LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, -Some(error.exception(message)) - ) -} -val errorResults = errorsPerPartition.map { case (topicPartition, error) => - topicPartition -> LogAppendResult( -LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, -Some(error.exception()) - ) +def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult], +useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = { + appendResult.map { case (topicPartition, result) => +topicPartition -> ProducePartitionStatus( + result.info.lastOffset + 1, // required offset + new PartitionResponse( +result.error, +result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L), +result.info.lastOffset, +result.info.logAppendTime, +result.info.logStartOffset, +result.info.recordErrors, +if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage + ) +) // response status + } } -val allResults = localProduceResults ++ unverifiedResults ++ errorResults +val unverifiedResults = unverifiedEntries.map { + case (topicPartition, error) => +val finalError = + error match { +case Errors.CONCURRENT_TRANSACTIONS | + Errors.COORDINATOR_LOAD_IN_PROGRESS | + Errors.COORDINATOR_NOT_AVAILABLE | + Errors.NOT_COORDINATOR => Errors.NOT_ENOUGH_REPLICAS +case _ => error +} +val message = + error match { +case Errors.INVALID_TXN_STATE => "Partition was not added to the transaction" +case Errors.CONCURRENT_TRANSACTIONS | + Errors.COORDINATOR_LOAD_IN_PROGRESS | + Errors.COORDINATOR_NOT_AVAILABLE | + Errors.NOT_COORDINATOR => s"Unable to verify the partition has been added to the transaction. Underlying error:${error.toString}" +case _ => error.message() + } Review Comment: We can. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
jolshan commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324697600 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2569,6 +2571,70 @@ class ReplicaManagerTest { assertEquals((Errors.NONE, node0), replicaManager.getTransactionCoordinator(txnCoordinatorPartition0)) assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), replicaManager.getTransactionCoordinator(txnCoordinatorPartition1)) + + // Test we convert the error correctly when trying to append and coordinator is not available + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 0 + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(txnCoordinatorPartition1)) + val expectedError = s"Unable to verify the partition has been added to the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}" + assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(expectedError, result.assertFired.errorMessage) +} finally { + replicaManager.shutdown(checkpointHW = false) +} + } + + @Test + def testVerificationErrorConversions(): Unit = { +val tp0 = new TopicPartition(topic, 0) +val producerId = 24L +val producerEpoch = 0.toShort +val sequence = 0 +val node = new Node(0, "host1", 0) +val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + +val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node) +try { + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) + + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + + val transactionToAdd = new AddPartitionsToTxnTransaction() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(producerEpoch) +.setVerifyOnly(true) +.setTopics(new AddPartitionsToTxnTopicCollection( + Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava +)) + + // Start verification and return the coordinator related errors. + var invocations = 1 + def verifyError(error: Errors): Unit = { +val expectedMessage = s"Unable to verify the partition has been added to the transaction. Underlying error:${error.toString}" +val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0)) +val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback]) +verify(addPartitionsToTxnManager, times(invocations)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture()) + +// Confirm we did not write to the log and instead returned the converted error with the correct error message. +val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue() +callback(Map(tp0 -> error).toMap) +assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) +assertEquals(expectedMessage, result.assertFired.errorMessage) +invocations = invocations + 1 + } + + Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_)) Review Comment: It can not be returned in this callback and is why I have a separate test for it. But I can add it even if it won't be returned this way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
jolshan commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324696082 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig, val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, origin, verifiedEntries, requiredAcks, requestLocal, verificationGuards.toMap) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) - -val unverifiedResults = unverifiedEntries.map { case (topicPartition, error) => - val message = if (error == Errors.INVALID_TXN_STATE) "Partition was not added to the transaction" else error.message() - topicPartition -> LogAppendResult( -LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, -Some(error.exception(message)) - ) -} -val errorResults = errorsPerPartition.map { case (topicPartition, error) => - topicPartition -> LogAppendResult( -LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, -Some(error.exception()) - ) +def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult], +useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = { + appendResult.map { case (topicPartition, result) => +topicPartition -> ProducePartitionStatus( + result.info.lastOffset + 1, // required offset + new PartitionResponse( +result.error, +result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L), +result.info.lastOffset, +result.info.logAppendTime, +result.info.logStartOffset, +result.info.recordErrors, +if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage Review Comment: I'm not sure I understand. Previously the result.info.errorMessage was null and it will continue to be so for all error partitions. LogAppendInfo.UnknownLogAppendInfo will always have a null error. I thought for consistency I should not change this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error
jolshan commented on code in PR #14378: URL: https://github.com/apache/kafka/pull/14378#discussion_r1324692053 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2569,6 +2571,70 @@ class ReplicaManagerTest { assertEquals((Errors.NONE, node0), replicaManager.getTransactionCoordinator(txnCoordinatorPartition0)) assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), replicaManager.getTransactionCoordinator(txnCoordinatorPartition1)) + + // Test we convert the error correctly when trying to append and coordinator is not available + val tp0 = new TopicPartition(topic, 0) + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 0 + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(txnCoordinatorPartition1)) + val expectedError = s"Unable to verify the partition has been added to the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}" + assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) + assertEquals(expectedError, result.assertFired.errorMessage) +} finally { + replicaManager.shutdown(checkpointHW = false) +} + } + + @Test + def testVerificationErrorConversions(): Unit = { +val tp0 = new TopicPartition(topic, 0) +val producerId = 24L +val producerEpoch = 0.toShort +val sequence = 0 +val node = new Node(0, "host1", 0) +val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + +val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node) +try { + replicaManager.becomeLeaderOrFollower(1, +makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), +(_, _) => ()) + + val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, +new SimpleRecord("message".getBytes)) + + val transactionToAdd = new AddPartitionsToTxnTransaction() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(producerEpoch) +.setVerifyOnly(true) +.setTopics(new AddPartitionsToTxnTopicCollection( + Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava +)) + + // Start verification and return the coordinator related errors. + var invocations = 1 + def verifyError(error: Errors): Unit = { +val expectedMessage = s"Unable to verify the partition has been added to the transaction. Underlying error:${error.toString}" +val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0)) +val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback]) +verify(addPartitionsToTxnManager, times(invocations)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture()) + +// Confirm we did not write to the log and instead returned the converted error with the correct error message. +val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue() +callback(Map(tp0 -> error).toMap) +assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) +assertEquals(expectedMessage, result.assertFired.errorMessage) +invocations = invocations + 1 Review Comment: I thought it was a lot of extra overhead to create a new replica manager every time -- especially since we can just make another append call. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15163) Implement validatePositions functionality for new KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-15163. - Fix Version/s: 3.7.0 Resolution: Fixed This is covered in https://github.com/apache/kafka/pull/14346. > Implement validatePositions functionality for new KafkaConsumer > --- > > Key: KAFKA-15163 > URL: https://issues.apache.org/jira/browse/KAFKA-15163 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.7.0 > > > Introduce support for validating positions in the new OffsetsRequestManager. > This task will include a new event for the validatePositions calls performed > from the new consumer, and the logic for handling such events in the > OffsetRequestManager. > The validate positions implementation will keep the same behaviour as the one > in the old consumer, but adapted to the new threading model. So it is based > in a VALIDATE_POSITIONS events that is submitted to the background thread, > and the processed by the ApplicationEventProcessor. The processing itself is > done by the OffsetRequestManager given that this will require an > OFFSET_FOR_LEADER_EPOCH request. This task will introduce support for such > requests in the OffsetRequestManager, responsible for offset-related requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14384: KAFKA-15415 On producer-batch retry, skip-backoff on a new leader
AndrewJSchofield commented on code in PR #14384: URL: https://github.com/apache/kafka/pull/14384#discussion_r1324590647 ## clients/src/main/java/org/apache/kafka/common/PartitionInfo.java: ## @@ -60,12 +69,19 @@ public int partition() { } /** - * The node id of the node currently acting as a leader for this partition or null if there is no leader + * The node currently acting as a leader for this partition or null if there is no leader */ public Node leader() { return leader; } +/** + * The epoch of the partition's leader. Review Comment: I think it's the "The leader epoch of this partition if known". ## clients/src/main/java/org/apache/kafka/common/PartitionInfo.java: ## @@ -20,26 +20,35 @@ * This is used to describe per-partition state in the MetadataResponse. */ public class PartitionInfo { +public static final int UNKNOWN_LEADER_EPOCH = -1; private final String topic; private final int partition; private final Node leader; +private final int leaderEpoch; private final Node[] replicas; private final Node[] inSyncReplicas; private final Node[] offlineReplicas; public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) { -this(topic, partition, leader, replicas, inSyncReplicas, new Node[0]); +this(topic, partition, leader, UNKNOWN_LEADER_EPOCH, replicas, inSyncReplicas, new Node[0]); Review Comment: And then `Optional.empty()` of course. ## clients/src/main/java/org/apache/kafka/common/Cluster.java: ## @@ -277,6 +277,19 @@ public Node leaderFor(TopicPartition topicPartition) { return info.leader(); } +/** + * Get the current leader's epoch for the given topic-partition. + * @param topicPartition + * @return The epoch for partition's leader, or UNKNOWN_LEADER_EPOCH if epoch unkown. Review Comment: Typo. "unkown" -> "unknown". ## clients/src/main/java/org/apache/kafka/common/PartitionInfo.java: ## @@ -20,26 +20,35 @@ * This is used to describe per-partition state in the MetadataResponse. */ public class PartitionInfo { +public static final int UNKNOWN_LEADER_EPOCH = -1; private final String topic; private final int partition; private final Node leader; +private final int leaderEpoch; Review Comment: I think it's more aligned with the existing Kafka interface to use `Optional leaderEpoch`. For example, `org.apache.kafka.clients.consumer.OffsetAndMetadata`. ## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ## @@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); +this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH; +this.leaderChangedAttempts = -1; recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation); } +/* + * Returns whether the leader epoch has changed since the last attempt. + * @param latestLeaderEpoch The latest leader epoch. + * @return true if the leader has changed, otherwise false. + */ +boolean hasLeaderChanged(int latestLeaderEpoch) { +boolean leaderChanged = false; +// Checking for leader change makes sense only from 1st retry onwards(attempt >=1). +log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}", Review Comment: There are 4 separate calls to `attempts()` (or the equivalent of `attempts.get()` in this method. Capture the value with a single call and use instead. ## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ## @@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); +this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH; +this.leaderChangedAttempts = -1; Review Comment: I don't think there's any need for a special value of -1. Initializing to 0 would work fine. ## clients/src/main/java/org/apache/kafka/common/Cluster.java: ## @@ -277,6 +277,19 @@ public Node leaderFor(TopicPartition topicPartition) { return info.leader(); } +/** + * Get the current leader's epoch
[jira] [Resolved] (KAFKA-15115) Implement resetPositions functionality in OffsetsRequestManager
[ https://issues.apache.org/jira/browse/KAFKA-15115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-15115. - Fix Version/s: 3.7.0 Resolution: Fixed merged the PR to trunk > Implement resetPositions functionality in OffsetsRequestManager > --- > > Key: KAFKA-15115 > URL: https://issues.apache.org/jira/browse/KAFKA-15115 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.7.0 > > > Introduce support for resetting positions in the new OffsetsRequestManager. > This task will include a new event for the resetPositions calls performed > from the new consumer, and the logic for handling such events in the > OffsetRequestManager. > The reset positions implementation will keep the same behaviour as the one in > the old consumer, but adapted to the new threading model. So it is based in a > RESET_POSITIONS events that is submitted to the background thread, and then > processed by the ApplicationEventProcessor. The processing itself is done by > the OffsetRequestManager given that this will require a LIST_OFFSETS request > for the partitions awaiting reset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] junrao merged pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration
junrao merged PR #14346: URL: https://github.com/apache/kafka/pull/14346 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tylerbertrand commented on a diff in pull request #14344: Resolve checkstyle cache miss
tylerbertrand commented on code in PR #14344: URL: https://github.com/apache/kafka/pull/14344#discussion_r1324625070 ## build.gradle: ## @@ -685,6 +685,7 @@ subprojects { } checkstyle { +configDirectory = rootProject.layout.projectDirectory.dir("checkstyle") Review Comment: These all point to the same directory, though `rootProject.layout.projectDirectory.dir("checkstyle")` uses lazy properties, which are generally preferred. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tylerbertrand commented on a diff in pull request #14344: Resolve checkstyle cache miss
tylerbertrand commented on code in PR #14344: URL: https://github.com/apache/kafka/pull/14344#discussion_r1324618554 ## build.gradle: ## @@ -685,6 +685,7 @@ subprojects { } checkstyle { +configDirectory = rootProject.layout.projectDirectory.dir("checkstyle") configFile = new File(rootDir, "checkstyle/checkstyle.xml") Review Comment: Good point. It turns out `${configDir}/checkstyle.xml` is [the default location](https://docs.gradle.org/current/dsl/org.gradle.api.plugins.quality.CheckstyleExtension.html#org.gradle.api.plugins.quality.CheckstyleExtension:configFile), so there is actually no need to specify `configFile` in this case. I pushed a commit to remove that line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tylerbertrand commented on a diff in pull request #14344: Resolve checkstyle cache miss
tylerbertrand commented on code in PR #14344: URL: https://github.com/apache/kafka/pull/14344#discussion_r1324618554 ## build.gradle: ## @@ -685,6 +685,7 @@ subprojects { } checkstyle { +configDirectory = rootProject.layout.projectDirectory.dir("checkstyle") configFile = new File(rootDir, "checkstyle/checkstyle.xml") Review Comment: Good point. It turns out `${configDirectory}/checkstyle.xml` is [the default location](https://docs.gradle.org/current/dsl/org.gradle.api.plugins.quality.CheckstyleExtension.html#org.gradle.api.plugins.quality.CheckstyleExtension:configFile), so there is actually no need to specify `configFile` in this case. I pushed a commit to remove that line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #14382: KAFKA-15442: add a section in doc for tiered storage
satishd commented on PR #14382: URL: https://github.com/apache/kafka/pull/14382#issuecomment-1717677707 >That sounds good. But since we don't have much time left for v3.6.0 RC build, we can add them after release. Doc updates can directly update in kafka-site repo. I'll add them in a separate PR. +1 on that. We can improve the doc in a followup. @kamalcph @showuon We can add quick start steps in the docs later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15160) Message bytes duplication in Kafka headers when compression is enabled
[ https://issues.apache.org/jira/browse/KAFKA-15160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764677#comment-17764677 ] Phuc Hong Tran commented on KAFKA-15160: [~vikashmishra0808] Sorry for the delay, I'm investigating it > Message bytes duplication in Kafka headers when compression is enabled > -- > > Key: KAFKA-15160 > URL: https://issues.apache.org/jira/browse/KAFKA-15160 > Project: Kafka > Issue Type: Bug > Components: clients, compression, consumer >Affects Versions: 3.2.3, 3.3.2 >Reporter: Vikash Mishra >Assignee: Phuc Hong Tran >Priority: Critical > Attachments: java heap dump.png, wireshark-min.png > > > I created a spring Kafka consumer using @KafkaListener. > During this, I encounter a scenario where when data is compressed ( any > compression snappy/gzip) and consumed by the consumer then I see that in a > heap dump, there is a " byte" occupying the same amount of memory as in > Message value. > This behavior is seen only in cases when compressed data is consumed by > consumers not in the case of uncompressed data. > Tried to capture Kafka's message through Wireshark, there it shows the proper > size of data incoming from Kafka server & no extra bytes in headers. So, this > is definitely something in Kafka client. Spring doesn't do any actions about > compression; the whole functionality is done internally in the Kafka client > library. > Attached is the screenshot of the heap dump and Wireshark. > This seems like a critical issue as message size in memory almost gets > doubles impacting consumer memory and performance. Somewhere it feels like > the actual message value is copied to headers? > *To Reproduce* > # Produce compressed data on any topic. > # Create a simple consumer consuming from the above-created topic. > # Capture heap dump. > *Expected behavior* > Headers should not show bytes consuming memory equivalent to value. -- This message was sent by Atlassian Jira (v8.20.10#820010)