[GitHub] [kafka] showuon commented on pull request #14287: [Minor] Check the existence of AppInfo for the given ID before creating a new mbean of the same name

2023-09-13 Thread via GitHub


showuon commented on PR #14287:
URL: https://github.com/apache/kafka/pull/14287#issuecomment-1718864019

   Failed tests are unrelated:
   ```
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector
   Build / JDK 20 and Scala 2.13 / 
kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestExcludingClusterAuthorizedOperations(String).quorum=kraft
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector
   Build / JDK 17 and Scala 2.13 / 
kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 11 and Scala 2.13 / 
kafka.api.SaslScramSslEndToEndAuthorizationTest.testAuthentications(String).quorum=zk
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee closed pull request #14207: KIP-848 Heartbeat request manager implementation

2023-09-13 Thread via GitHub


philipnee closed pull request #14207: KIP-848 Heartbeat request manager 
implementation
URL: https://github.com/apache/kafka/pull/14207


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14384: KAFKA-15415 On producer-batch retry, skip-backoff on a new leader

2023-09-13 Thread via GitHub


AndrewJSchofield commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1324599448


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
 this.isSplitBatch = isSplitBatch;
 float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),

 recordsBuilder.compressionType());
+this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+this.leaderChangedAttempts = -1;
 
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
 }
 
+/*
+ * Returns whether the leader epoch has changed since the last attempt.
+ * @param latestLeaderEpoch The latest leader epoch.
+ * @return true if the leader has changed, otherwise false.
+ */
+boolean hasLeaderChanged(int latestLeaderEpoch) {
+boolean leaderChanged = false;
+// Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",

Review Comment:
   There are 4 separate calls to `attempts()` (or the equivalent of 
`attempts.get()`) in this method. Capture the value with a single call and use 
instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14384: KAFKA-15415 On producer-batch retry, skip-backoff on a new leader

2023-09-13 Thread via GitHub


AndrewJSchofield commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1324641156


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -3146,6 +3149,97 @@ public void testInvalidTxnStateIsAnAbortableError() 
throws Exception {
 
 txnManager.beginTransaction();
 }
+@Test
+public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws 
Exception {
+Metrics m = new Metrics();
+SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+try {
+// SETUP
+String metricGrpName = "producer-metrics-test-stats-1";
+long totalSize = 1024 * 1024;
+BufferPool pool = new BufferPool(totalSize, batchSize, metrics, 
time,
+metricGrpName);
+long retryBackoffMaxMs = 100L;
+// lingerMs is 0 to send batch as soon as any records are 
available on it.
+this.accumulator = new RecordAccumulator(logContext, batchSize,
+CompressionType.NONE, 0, 10L, retryBackoffMaxMs,
+DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, 
apiVersions, null, pool);
+Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false,
+MAX_REQUEST_SIZE, ACKS_ALL,
+10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, 
null,
+apiVersions);
+// Update metadata with leader-epochs.
+int tp0LeaderEpoch = 100;
+int epoch = tp0LeaderEpoch;
+this.client.updateMetadata(
+RequestTestUtils.metadataUpdateWith(1, 
Collections.singletonMap("test", 2),
+tp -> {
+if (tp0.equals(tp)) {
+return epoch;
+}  else if (tp1.equals(tp)) {
+return 0;
+} else {
+throw new RuntimeException("unexpected tp " + tp);
+}
+}));
+
+// Produce batch, it returns with a retry-able error like 
NOT_LEADER_OR_FOLLOWER, scheduled for retry.
+// This triggers a metadata-request, that discovers a new-leader 
for tp0.
+Future futureIsProduced = appendToAccumulator(tp0, 
0L, "key", "value");
+sender.runOnce(); // connect
+sender.runOnce(); // send produce request
+assertEquals(1, client.inFlightRequestCount(),
+"We should have a single produce request in flight.");
+assertEquals(1, sender.inFlightBatches(tp0).size());
+assertTrue(client.hasInFlightRequests());
+client.respond(produceResponse(tp0, -1, 
Errors.NOT_LEADER_OR_FOLLOWER, 0));
+sender.runOnce(); // receive produce response, batch scheduled for 
retry
+assertTrue(!futureIsProduced.isDone(), "Produce request is yet not 
done.");
+
+// TEST that as new-leader(with epochA) is discovered, the batch 
is retried immediately i.e. skips any backoff period.
+// Update leader epoch for tp0
+log.info("Test that to a new-leader, batch is retried 
immediately.");
+int newEpoch = ++tp0LeaderEpoch;
+this.client.updateMetadata(
+RequestTestUtils.metadataUpdateWith(1, 
Collections.singletonMap("test", 2),
+tp -> {
+if (tp0.equals(tp)) {
+return newEpoch;
+} else if (tp1.equals(tp)) {
+return 0;
+} else {
+throw new RuntimeException("unexpected tp " + tp);
+}
+}));
+sender.runOnce(); // send produce request, immediately.
+assertEquals(1, sender.inFlightBatches(tp0).size());
+assertTrue(client.hasInFlightRequests());
+client.respond(produceResponse(tp0, -1, 
Errors.NOT_LEADER_OR_FOLLOWER, 0));

Review Comment:
   Should there not be testing that uses the new `CurrentLeader` field in v9 of 
the ProduceResponse given that this is part of KIP-951 which this PR is 
implementing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #14382: KAFKA-15442: add a section in doc for tiered storage

2023-09-13 Thread via GitHub


showuon commented on code in PR #14382:
URL: https://github.com/apache/kafka/pull/14382#discussion_r1325394656


##
docs/ops.html:
##
@@ -3859,6 +3859,98 @@ Finalizing the migration
 
 # Other configs ...
 
+
+6.11 Tiered Storage
+
+Tiered Storage 
Overview
+
+Kafka data is mostly consumed in a streaming fashion using tail reads. Tail 
reads leverage OS's page cache to serve the data instead of disk reads.
+  Older data is typically read from the disk for backfill or failure recovery 
purposes and is infrequent.
+
+In the tiered storage approach, Kafka cluster is configured with two tiers 
of storage - local and remote.
+  The local tier is the same as the current Kafka that uses the local disks on 
the Kafka brokers to store the log segments.
+  The new remote tier uses external storage systems, such as HDFS or S3, to 
store the completed log segments.
+  Please check https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage";>KIP-405
 for more information.
+
+
+Note: Tiered storage is considered as an early access feature, and is 
not recommended for use in production environments
+
+Configuration
+
+Broker 
Configurations
+
+By default, Kafka server will not enable tiered storage feature. 
remote.log.storage.system.enable
+  is the property to control whether to enable tiered storage functionality in 
a broker or not. Setting it to "true" to enable this feature.
+
+
+RemoteStorageManager is an interface to provide the lifecycle 
of remote log segments and indexes. Kafka server
+  doesn't provide out-of-the-box implementation of RemoteStorageManager. 
Configuring remote.log.storage.manager.class.name
+  and remote.log.storage.manager.class.path to specify the 
implementation of RemoteStorageManager.
+
+
+RemoteLogMetadataManager is an interface to provide the 
lifecycle of metadata about remote log segments with strongly consistent 
semantics.
+  By default, Kafka provides an implementation with storage as an internal 
topic. This implementation can be changed by configuring
+  remote.log.metadata.manager.class.name and 
remote.log.metadata.manager.class.path.
+  When adopting the default kafka internal topic based implementation, 
remote.log.metadata.manager.listener.name
+  is a mandatory property to specify which listener the clients created by the 
default RemoteLogMetadataManager implementation.
+
+
+
+Topic 
Configurations
+
+After correctly configuring broker side configurations for tiered storage 
feature, there are still configurations in topic level needed to be set.
+  remote.storage.enable is the switch to determine if this topic 
want to use tiered storage or not. By default it is set as false.
+  After enabling remote.storage.enable property, the next thing 
to consider is the log retention.
+  When tiered storage is enabled in the topic, there will be 2 additional log 
retention configuration to set:
+
+
+  local.retention.ms
+  retention.ms
+  local.retention.bytes
+  retention.bytes
+
+
+  The configuration prefixed with local are to specify the 
time/size the "local" log file can accept before moving to remote storage, and 
then get deleted.
+  If unset, The value in retention.ms and 
retention.bytes will be used.
+
+
+Configurations 
Example
+
+Here is a sample configuration to enable tiered storage feature in broker 
side:
+
+# Sample Zookeeper/Kraft broker server.properties listening on 
PLAINTEXT://:9092
+remote.log.storage.system.enable=true
+# Please provide the implementation for remoteStorageManager. This is the 
mandatory configuration for tiered storage.
+# 
remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager
+# Using the "PLAINTEXT" listener for the clients in RemoteLogMetadataManager 
to talk to the brokers.
+remote.log.metadata.manager.listener.name=PLAINTEXT
+
+
+
+After broker is started, creating a topic with tiered storage enabled, and 
a small log time retention value to try this feature:
+bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server 
localhost:9092 --config remote.storage.enable=true --config 
local.retention.ms=1000
+
+
+
+Then, after the active segment is rolled, the old segment should be moved 
to the remote storage and get deleted.
+
+
+Limitation

Review Comment:
   Good suggestion. Updated. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #14382: KAFKA-15442: add a section in doc for tiered storage

2023-09-13 Thread via GitHub


satishd commented on code in PR #14382:
URL: https://github.com/apache/kafka/pull/14382#discussion_r1325382836


##
docs/ops.html:
##
@@ -3859,6 +3859,98 @@ Finalizing the migration
 
 # Other configs ...
 
+
+6.11 Tiered Storage
+
+Tiered Storage 
Overview
+
+Kafka data is mostly consumed in a streaming fashion using tail reads. Tail 
reads leverage OS's page cache to serve the data instead of disk reads.
+  Older data is typically read from the disk for backfill or failure recovery 
purposes and is infrequent.
+
+In the tiered storage approach, Kafka cluster is configured with two tiers 
of storage - local and remote.
+  The local tier is the same as the current Kafka that uses the local disks on 
the Kafka brokers to store the log segments.
+  The new remote tier uses external storage systems, such as HDFS or S3, to 
store the completed log segments.
+  Please check https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage";>KIP-405
 for more information.
+
+
+Note: Tiered storage is considered as an early access feature, and is 
not recommended for use in production environments
+
+Configuration
+
+Broker 
Configurations
+
+By default, Kafka server will not enable tiered storage feature. 
remote.log.storage.system.enable
+  is the property to control whether to enable tiered storage functionality in 
a broker or not. Setting it to "true" to enable this feature.
+
+
+RemoteStorageManager is an interface to provide the lifecycle 
of remote log segments and indexes. Kafka server
+  doesn't provide out-of-the-box implementation of RemoteStorageManager. 
Configuring remote.log.storage.manager.class.name
+  and remote.log.storage.manager.class.path to specify the 
implementation of RemoteStorageManager.
+
+
+RemoteLogMetadataManager is an interface to provide the 
lifecycle of metadata about remote log segments with strongly consistent 
semantics.
+  By default, Kafka provides an implementation with storage as an internal 
topic. This implementation can be changed by configuring
+  remote.log.metadata.manager.class.name and 
remote.log.metadata.manager.class.path.
+  When adopting the default kafka internal topic based implementation, 
remote.log.metadata.manager.listener.name
+  is a mandatory property to specify which listener the clients created by the 
default RemoteLogMetadataManager implementation.
+
+
+
+Topic 
Configurations
+
+After correctly configuring broker side configurations for tiered storage 
feature, there are still configurations in topic level needed to be set.
+  remote.storage.enable is the switch to determine if this topic 
want to use tiered storage or not. By default it is set as false.
+  After enabling remote.storage.enable property, the next thing 
to consider is the log retention.
+  When tiered storage is enabled in the topic, there will be 2 additional log 
retention configuration to set:
+
+
+  local.retention.ms
+  retention.ms
+  local.retention.bytes
+  retention.bytes
+
+
+  The configuration prefixed with local are to specify the 
time/size the "local" log file can accept before moving to remote storage, and 
then get deleted.
+  If unset, The value in retention.ms and 
retention.bytes will be used.
+
+
+Configurations 
Example
+
+Here is a sample configuration to enable tiered storage feature in broker 
side:
+
+# Sample Zookeeper/Kraft broker server.properties listening on 
PLAINTEXT://:9092
+remote.log.storage.system.enable=true
+# Please provide the implementation for remoteStorageManager. This is the 
mandatory configuration for tiered storage.
+# 
remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager
+# Using the "PLAINTEXT" listener for the clients in RemoteLogMetadataManager 
to talk to the brokers.
+remote.log.metadata.manager.listener.name=PLAINTEXT
+
+
+
+After broker is started, creating a topic with tiered storage enabled, and 
a small log time retention value to try this feature:
+bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server 
localhost:9092 --config remote.storage.enable=true --config 
local.retention.ms=1000
+
+
+
+Then, after the active segment is rolled, the old segment should be moved 
to the remote storage and get deleted.
+
+
+Limitation

Review Comment:
   Please change `Limitation` -> `Limitations`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #14382: KAFKA-15442: add a section in doc for tiered storage

2023-09-13 Thread via GitHub


satishd commented on code in PR #14382:
URL: https://github.com/apache/kafka/pull/14382#discussion_r1325382836


##
docs/ops.html:
##
@@ -3859,6 +3859,98 @@ Finalizing the migration
 
 # Other configs ...
 
+
+6.11 Tiered Storage
+
+Tiered Storage 
Overview
+
+Kafka data is mostly consumed in a streaming fashion using tail reads. Tail 
reads leverage OS's page cache to serve the data instead of disk reads.
+  Older data is typically read from the disk for backfill or failure recovery 
purposes and is infrequent.
+
+In the tiered storage approach, Kafka cluster is configured with two tiers 
of storage - local and remote.
+  The local tier is the same as the current Kafka that uses the local disks on 
the Kafka brokers to store the log segments.
+  The new remote tier uses external storage systems, such as HDFS or S3, to 
store the completed log segments.
+  Please check https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage";>KIP-405
 for more information.
+
+
+Note: Tiered storage is considered as an early access feature, and is 
not recommended for use in production environments
+
+Configuration
+
+Broker 
Configurations
+
+By default, Kafka server will not enable tiered storage feature. 
remote.log.storage.system.enable
+  is the property to control whether to enable tiered storage functionality in 
a broker or not. Setting it to "true" to enable this feature.
+
+
+RemoteStorageManager is an interface to provide the lifecycle 
of remote log segments and indexes. Kafka server
+  doesn't provide out-of-the-box implementation of RemoteStorageManager. 
Configuring remote.log.storage.manager.class.name
+  and remote.log.storage.manager.class.path to specify the 
implementation of RemoteStorageManager.
+
+
+RemoteLogMetadataManager is an interface to provide the 
lifecycle of metadata about remote log segments with strongly consistent 
semantics.
+  By default, Kafka provides an implementation with storage as an internal 
topic. This implementation can be changed by configuring
+  remote.log.metadata.manager.class.name and 
remote.log.metadata.manager.class.path.
+  When adopting the default kafka internal topic based implementation, 
remote.log.metadata.manager.listener.name
+  is a mandatory property to specify which listener the clients created by the 
default RemoteLogMetadataManager implementation.
+
+
+
+Topic 
Configurations
+
+After correctly configuring broker side configurations for tiered storage 
feature, there are still configurations in topic level needed to be set.
+  remote.storage.enable is the switch to determine if this topic 
want to use tiered storage or not. By default it is set as false.
+  After enabling remote.storage.enable property, the next thing 
to consider is the log retention.
+  When tiered storage is enabled in the topic, there will be 2 additional log 
retention configuration to set:
+
+
+  local.retention.ms
+  retention.ms
+  local.retention.bytes
+  retention.bytes
+
+
+  The configuration prefixed with local are to specify the 
time/size the "local" log file can accept before moving to remote storage, and 
then get deleted.
+  If unset, The value in retention.ms and 
retention.bytes will be used.
+
+
+Configurations 
Example
+
+Here is a sample configuration to enable tiered storage feature in broker 
side:
+
+# Sample Zookeeper/Kraft broker server.properties listening on 
PLAINTEXT://:9092
+remote.log.storage.system.enable=true
+# Please provide the implementation for remoteStorageManager. This is the 
mandatory configuration for tiered storage.
+# 
remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager
+# Using the "PLAINTEXT" listener for the clients in RemoteLogMetadataManager 
to talk to the brokers.
+remote.log.metadata.manager.listener.name=PLAINTEXT
+
+
+
+After broker is started, creating a topic with tiered storage enabled, and 
a small log time retention value to try this feature:
+bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server 
localhost:9092 --config remote.storage.enable=true --config 
local.retention.ms=1000
+
+
+
+Then, after the active segment is rolled, the old segment should be moved 
to the remote storage and get deleted.
+
+
+Limitation

Review Comment:
   Please change `Limitation` -> `Limitations`
   
   Both in the text and the links.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1325383326


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+heartbeat

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1325382420


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+heartbeat

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1325381139


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+heartbeat

[GitHub] [kafka] showuon commented on pull request #14382: KAFKA-15442: add a section in doc for tiered storage

2023-09-13 Thread via GitHub


showuon commented on PR #14382:
URL: https://github.com/apache/kafka/pull/14382#issuecomment-1718792709

   @satishd , in the latest commit, I added:
   1. a note to say this is an early access feature and should not use in 
production env.
   2. a limitation section to describe the limitation in v3.6.0
   3. a notable change entry in v3.6.0
   
   The screenshot is updated in the PR description. Please take a look again. 
Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1325368170


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -168,28 +171,41 @@ public DefaultBackgroundThread(final Time time,
 logContext);
 CoordinatorRequestManager coordinatorRequestManager = null;
 CommitRequestManager commitRequestManager = null;
+HeartbeatRequestManager heartbeatRequestManager = null;
+MembershipManager membershipManaber = null;
 
+// TODO: consolidate groupState and memberState
 if (groupState.groupId != null) {
 coordinatorRequestManager = new CoordinatorRequestManager(
-this.time,
-logContext,
-retryBackoffMs,
-retryBackoffMaxMs,
-this.errorEventHandler,
-groupState.groupId);
+this.time,
+logContext,
+retryBackoffMs,
+retryBackoffMaxMs,
+this.errorEventHandler,
+groupState.groupId);

Review Comment:
   We should submit a patch to combine groupState into memberState. @lianetm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on PR #14386:
URL: https://github.com/apache/kafka/pull/14386#issuecomment-1718779561

   Hey @junrao - Addressed your comments. I'm holding off on your request for 
`Could we change toString to include the fields in the parent class?` as I 
think it would be easier if I do that consistently for all other 
CompletableApplicationEvents. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325364414


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listOffsets
+ * partitionsFor
+ * 
+ * 
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memoized by topic name. If all topics are 
requested, then {@code null} is used as the key.
+ * Once a request is completed successfully, its corresponding entry is 
removed.
+ * 
+ */
+
+public class TopicMetadataRequestManager implements RequestManager {
+private final boolean allowAutoTopicCreation;
+private final Map, TopicMetadataRequestState> 
inflightRequests;
+private final long retryBackoffMs;
+private final long retryBackoffMaxMs;
+private final Logger log;
+private final LogContext logContext;
+
+public TopicMetadataRequestManager(final LogContext logContext, final 
ConsumerConfig config) {
+this.logContext = logContext;
+this.log = logContext.logger(this.getClass());
+this.inflightRequests = new HashMap<>();
+this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+List requests = 
inflightRequests.values().stream()
+.map(req -> req.send(currentTimeMs))
+.filter(Optional::isPresent)
+.map(Optional::get)
+.collect(Collectors.toList());
+return requests.isEmpty() ?
+new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>()) :
+new NetworkClientDelegate.PollResult(0, 
Collections.unmodifiableList(requests));
+}
+
+/**
+ * return the future of the metadata request. Return the existing future 
if a request for the same topic is already
+ * inflight.
+ *
+ * @param topic to be requested. If empty, return the metadata for all 
topics.
+ * @return the future of the metadata request.
+ */
+public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {
+if (inflightRequests.containsKey(topic)) {
+return inflightRequests.get(topic).future;
+}
+
+TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
+logContext,
+topic,
+retryBackoffMs,
+retryB

[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325355774


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listOffsets
+ * partitionsFor

Review Comment:
   As well as listTopics() - this is addressed in a separate PR: Would you be 
ok if we split this into multiple PRs? If you think we should remove the 
documentation due to inaccurate information, I'm willing to do so.
   
   The PR's commit is c71a18c95937dd18171f60afb4fd263ea39e9a1b - I anticipate 
@kirktrue will pick this into trunk soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325353270


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -203,16 +204,17 @@ public static class UnsentRequest {
 private Timer timer;
 
 public UnsentRequest(final AbstractRequest.Builder requestBuilder, 
final Optional node) {
-this(requestBuilder, node, new FutureCompletionHandler());
+Objects.requireNonNull(requestBuilder);
+this.requestBuilder = requestBuilder;
+this.node = node;
+this.handler = new FutureCompletionHandler();
 }
 
 public UnsentRequest(final AbstractRequest.Builder requestBuilder,
  final Optional node,
- final FutureCompletionHandler handler) {
-Objects.requireNonNull(requestBuilder);
-this.requestBuilder = requestBuilder;
-this.node = node;
-this.handler = handler;
+ final BiConsumer 
callback) {

Review Comment:
   I think it is according to my IDE. I think it is used in 
CommitRequestManager (line 219 and 256) and TopicMetadataRequestManger (line 
145)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325352296


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -381,41 +377,41 @@ public String toString() {
 }
 
 /**
- * This is used to stage the unsent {@link OffsetCommitRequestState} 
and {@link OffsetFetchRequestState}.
+ * This is used to stage the unsent {@link OffsetCommitRequest} and 
{@link OffsetFetchRequestState}.
  * unsentOffsetCommits holds the offset commit requests that have not 
been sent out
  * unsentOffsetFetches holds the offset fetch requests that have not 
been sent out
- * inflightOffsetFetches holds the offset fetch requests that have 
been sent out but incompleted.
- *
+ * inflightOffsetFetches holds the offset fetch requests that have 
been sent out but not completed.
+ * 
  * {@code addOffsetFetchRequest} dedupes the requests to avoid sending the 
same requests.
  */
 
 class PendingRequests {
 // Queue is used to ensure the sequence of commit
-Queue unsentOffsetCommits = new 
LinkedList<>();
+Queue unsentOffsetCommits = new LinkedList<>();
 List unsentOffsetFetches = new ArrayList<>();
 List inflightOffsetFetches = new 
ArrayList<>();
 
-public boolean hasUnsentRequests() {
+boolean hasUnsentRequests() {

Review Comment:
   Sorry - leaving this protected as it is used in one of the test to verify 
there's no more unsent request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325349813


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -106,6 +104,7 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
 return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
 }
 
+pendingRequests.inflightOffsetFetches.forEach(System.out::println);

Review Comment:
   🤦 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -106,6 +104,7 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
 return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
 }
 
+pendingRequests.inflightOffsetFetches.forEach(System.out::println);

Review Comment:
   🤦 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325349677


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java:
##
@@ -19,22 +19,23 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
-public class CommitApplicationEvent extends ApplicationEvent {
-final private CompletableFuture future;
+public class CommitApplicationEvent extends CompletableApplicationEvent {
 final private Map offsets;
 
 public CommitApplicationEvent(final Map 
offsets) {
 super(Type.COMMIT);
-this.offsets = offsets;
-Optional exception = isValid(offsets);
-if (exception.isPresent()) {
-throw new RuntimeException(exception.get());
+this.offsets = Collections.unmodifiableMap(offsets);
+
+for (OffsetAndMetadata offsetAndMetadata : offsets.values()) {

Review Comment:
   There are quite a few events out there that need to be refactored if we do 
it this way.  Would it be ok for you to skip this for now and I will post a 
MINOR patch to address after this is closed?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325345190


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listOffsets
+ * partitionsFor
+ * 
+ * 
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memoized by topic name. If all topics are 
requested, then {@code null} is used as the key.
+ * Once a request is completed successfully, its corresponding entry is 
removed.
+ * 
+ */
+
+public class TopicMetadataRequestManager implements RequestManager {
+private final boolean allowAutoTopicCreation;
+private final Map, TopicMetadataRequestState> 
inflightRequests;
+private final long retryBackoffMs;
+private final long retryBackoffMaxMs;
+private final Logger log;
+private final LogContext logContext;
+
+public TopicMetadataRequestManager(final LogContext logContext, final 
ConsumerConfig config) {
+this.logContext = logContext;
+this.log = logContext.logger(this.getClass());
+this.inflightRequests = new HashMap<>();
+this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+List requests = 
inflightRequests.values().stream()
+.map(req -> req.send(currentTimeMs))
+.filter(Optional::isPresent)
+.map(Optional::get)
+.collect(Collectors.toList());
+return requests.isEmpty() ?
+new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>()) :
+new NetworkClientDelegate.PollResult(0, 
Collections.unmodifiableList(requests));
+}
+
+/**
+ * return the future of the metadata request. Return the existing future 
if a request for the same topic is already
+ * inflight.
+ *
+ * @param topic to be requested. If empty, return the metadata for all 
topics.
+ * @return the future of the metadata request.
+ */
+public CompletableFuture>> 
requestTopicMetadata(final Optional topic) {
+if (inflightRequests.containsKey(topic)) {
+return inflightRequests.get(topic).future;
+}
+
+TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
+logContext,
+topic,
+retryBackoffMs,
+retryB

[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325342579


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listOffsets
+ * partitionsFor
+ * 
+ * 
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memoized by topic name. If all topics are 
requested, then {@code null} is used as the key.

Review Comment:
   thanks - apologize for neglecting to update the doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325341358


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listOffsets

Review Comment:
   🤦  - listTopics() instead.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listOffsets

Review Comment:
   🤦  - listTopics() instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15245) Improve Tiered Storage Metrics

2023-09-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15245:
-
Fix Version/s: 3.6.0
   (was: 3.7.0)

> Improve Tiered Storage Metrics
> --
>
> Key: KAFKA-15245
> URL: https://issues.apache.org/jira/browse/KAFKA-15245
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Abhijeet Kumar
>Assignee: Abhijeet Kumar
>Priority: Major
> Fix For: 3.6.0
>
>
> Rename existing tiered storage metrics to remove ambiguity



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15245) Improve Tiered Storage Metrics

2023-09-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15245:
-
Description: Rename existing tiered storage metrics to remove ambiguity  
(was: Rename existing tiered storage metrics to remove ambiguity and add 
metrics for the RemoteIndexCache.)

> Improve Tiered Storage Metrics
> --
>
> Key: KAFKA-15245
> URL: https://issues.apache.org/jira/browse/KAFKA-15245
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Abhijeet Kumar
>Assignee: Abhijeet Kumar
>Priority: Major
> Fix For: 3.7.0
>
>
> Rename existing tiered storage metrics to remove ambiguity



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15245) Improve Tiered Storage Metrics

2023-09-13 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764959#comment-17764959
 ] 

Kamal Chandraprakash commented on KAFKA-15245:
--

KAFKA-15094 ticket is already filed to track the RemoteIndexCache stats metrics.

> Improve Tiered Storage Metrics
> --
>
> Key: KAFKA-15245
> URL: https://issues.apache.org/jira/browse/KAFKA-15245
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Abhijeet Kumar
>Assignee: Abhijeet Kumar
>Priority: Major
> Fix For: 3.7.0
>
>
> Rename existing tiered storage metrics to remove ambiguity



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15453) Enable `testFencingOnTransactionExpiration` in TransactionsWithTieredStoreTest

2023-09-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15453:
-
Fix Version/s: 3.6.0
   (was: 3.7.0)

> Enable `testFencingOnTransactionExpiration` in TransactionsWithTieredStoreTest
> --
>
> Key: KAFKA-15453
> URL: https://issues.apache.org/jira/browse/KAFKA-15453
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Reporter: Satish Duggana
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15160) Message bytes duplication in Kafka headers when compression is enabled

2023-09-13 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764956#comment-17764956
 ] 

Phuc Hong Tran edited comment on KAFKA-15160 at 9/14/23 3:32 AM:
-

[~vikashmishra0808] How did you setup ConsumerRecord and @KafkaListener in your 
codebase, was the duplication happened for every ConsumerRecord, also can you 
attach the heap dump? Thanks


was (Author: JIRAUSER301295):
[~vikashmishra0808] How did you setup ConsumerRecord and @KafkaListener in your 
codebase, was the duplication happened for every ConsumerRecord?

> Message bytes duplication in Kafka headers when compression is enabled
> --
>
> Key: KAFKA-15160
> URL: https://issues.apache.org/jira/browse/KAFKA-15160
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, compression, consumer
>Affects Versions: 3.2.3, 3.3.2
>Reporter: Vikash Mishra
>Assignee: Phuc Hong Tran
>Priority: Critical
> Attachments: java heap dump.png, wireshark-min.png
>
>
> I created a spring Kafka consumer using @KafkaListener.
> During this, I encounter a scenario where when data is compressed ( any 
> compression snappy/gzip) and consumed by the consumer then I see that in a 
> heap dump, there is a " byte" occupying the same amount of memory as in 
> Message value.
> This behavior is seen only in cases when compressed data is consumed by 
> consumers not in the case of uncompressed data.
> Tried to capture Kafka's message through Wireshark, there it shows the proper 
> size of data incoming from Kafka server & no extra bytes in headers. So, this 
> is definitely something in Kafka client. Spring doesn't do any actions about 
> compression; the whole functionality is done internally in the Kafka client 
> library.
> Attached is the screenshot of the heap dump and Wireshark.
> This seems like a critical issue as message size in memory almost gets 
> doubles impacting consumer memory and performance. Somewhere it feels like 
> the actual message value is copied to headers?
> *To Reproduce*
>  # Produce compressed data on any topic.
>  # Create a simple consumer consuming from the above-created topic.
>  # Capture heap dump.
> *Expected behavior*
> Headers should not show bytes consuming memory equivalent to value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15160) Message bytes duplication in Kafka headers when compression is enabled

2023-09-13 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764956#comment-17764956
 ] 

Phuc Hong Tran commented on KAFKA-15160:


[~vikashmishra0808] How did you setup ConsumerRecord and @KafkaListener in your 
codebase, was the duplication happened for every ConsumerRecord?

> Message bytes duplication in Kafka headers when compression is enabled
> --
>
> Key: KAFKA-15160
> URL: https://issues.apache.org/jira/browse/KAFKA-15160
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, compression, consumer
>Affects Versions: 3.2.3, 3.3.2
>Reporter: Vikash Mishra
>Assignee: Phuc Hong Tran
>Priority: Critical
> Attachments: java heap dump.png, wireshark-min.png
>
>
> I created a spring Kafka consumer using @KafkaListener.
> During this, I encounter a scenario where when data is compressed ( any 
> compression snappy/gzip) and consumed by the consumer then I see that in a 
> heap dump, there is a " byte" occupying the same amount of memory as in 
> Message value.
> This behavior is seen only in cases when compressed data is consumed by 
> consumers not in the case of uncompressed data.
> Tried to capture Kafka's message through Wireshark, there it shows the proper 
> size of data incoming from Kafka server & no extra bytes in headers. So, this 
> is definitely something in Kafka client. Spring doesn't do any actions about 
> compression; the whole functionality is done internally in the Kafka client 
> library.
> Attached is the screenshot of the heap dump and Wireshark.
> This seems like a critical issue as message size in memory almost gets 
> doubles impacting consumer memory and performance. Somewhere it feels like 
> the actual message value is copied to headers?
> *To Reproduce*
>  # Produce compressed data on any topic.
>  # Create a simple consumer consuming from the above-created topic.
>  # Capture heap dump.
> *Expected behavior*
> Headers should not show bytes consuming memory equivalent to value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #14381: KAFKA-14912:Add a dynamic config for remote index cache size

2023-09-13 Thread via GitHub


showuon commented on PR #14381:
URL: https://github.com/apache/kafka/pull/14381#issuecomment-1718627959

   @satishd , not sure if you want to have another look. If no, I'll merge it. 
Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #14381: KAFKA-14912:Add a dynamic config for remote index cache size

2023-09-13 Thread via GitHub


showuon commented on PR #14381:
URL: https://github.com/apache/kafka/pull/14381#issuecomment-1718627359

   Failed tests are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / 
kafka.network.SocketServerTest.testSaslReauthenticationFailureNoKip152SaslAuthenticate()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing()
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector
   Build / JDK 20 and Scala 2.13 / 
kafka.api.SslAdminIntegrationTest.testSynchronousAuthorizerAclUpdatesBlockRequestThreads()
   Build / JDK 20 and Scala 2.13 / 
kafka.api.UserQuotaTest.testThrottledProducerConsumer(String).quorum=kraft
   Build / JDK 20 and Scala 2.13 / 
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
   Build / JDK 20 and Scala 2.13 / 
kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector
   Build / JDK 11 and Scala 2.13 / 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testCreateTokenForOtherUserFails(String).quorum=kraft
   Build / JDK 11 and Scala 2.13 / 
kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #14347: KAFKA-15439: Transactions test with tiered storage

2023-09-13 Thread via GitHub


showuon commented on PR #14347:
URL: https://github.com/apache/kafka/pull/14347#issuecomment-1718626560

   Backported to 3.6 branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #14347: KAFKA-15439: Transactions test with tiered storage

2023-09-13 Thread via GitHub


showuon merged PR #14347:
URL: https://github.com/apache/kafka/pull/14347


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #14347: KAFKA-15439: Transactions test with tiered storage

2023-09-13 Thread via GitHub


showuon commented on PR #14347:
URL: https://github.com/apache/kafka/pull/14347#issuecomment-1718624599

   Failed tests are unrelated:
   ```
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailurePositiveDelayTest.testInvalidPasswordSaslPlain()
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest.shouldAssignRandomInput[enableRackAwareTaskAssignor=true]
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest.testFencingOnSend(String).quorum=zk
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 8 and Scala 2.12 / 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testDescribeTokenForOtherUserPasses(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.DeleteConsumerGroupsTest.testDeleteCmdNonEmptyGroup(String).quorum=kraft
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15466) Add KIP-919 support for some admin APIs

2023-09-13 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-15466:
-
Description: Add KIP-919 support for kafka-features.sh, metadata-quorum.sh, 
and kafka-cluster.sh.

> Add KIP-919 support for some admin APIs
> ---
>
> Key: KAFKA-15466
> URL: https://issues.apache.org/jira/browse/KAFKA-15466
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>
> Add KIP-919 support for kafka-features.sh, metadata-quorum.sh, and 
> kafka-cluster.sh.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15466) Add KIP-919 support for some admin APIs

2023-09-13 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-15466:
-
Summary: Add KIP-919 support for some admin APIs  (was: Add KIP-919 support 
to kafka-features.sh, kafka-metadata-quorum.sh, kafka-cluster.sh)

> Add KIP-919 support for some admin APIs
> ---
>
> Key: KAFKA-15466
> URL: https://issues.apache.org/jira/browse/KAFKA-15466
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15466) Add KIP-919 support to kafka-features.sh, kafka-metadata-quorum.sh, kafka-cluster.sh

2023-09-13 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-15466:


 Summary: Add KIP-919 support to kafka-features.sh, 
kafka-metadata-quorum.sh, kafka-cluster.sh
 Key: KAFKA-15466
 URL: https://issues.apache.org/jira/browse/KAFKA-15466
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325175686


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -381,41 +377,41 @@ public String toString() {
 }
 
 /**
- * This is used to stage the unsent {@link OffsetCommitRequestState} 
and {@link OffsetFetchRequestState}.
+ * This is used to stage the unsent {@link OffsetCommitRequest} and 
{@link OffsetFetchRequestState}.
  * unsentOffsetCommits holds the offset commit requests that have not 
been sent out
  * unsentOffsetFetches holds the offset fetch requests that have not 
been sent out
- * inflightOffsetFetches holds the offset fetch requests that have 
been sent out but incompleted.
- *
+ * inflightOffsetFetches holds the offset fetch requests that have 
been sent out but not completed.
+ * 
  * {@code addOffsetFetchRequest} dedupes the requests to avoid sending the 
same requests.
  */
 
 class PendingRequests {
 // Queue is used to ensure the sequence of commit
-Queue unsentOffsetCommits = new 
LinkedList<>();
+Queue unsentOffsetCommits = new LinkedList<>();
 List unsentOffsetFetches = new ArrayList<>();
 List inflightOffsetFetches = new 
ArrayList<>();
 
-public boolean hasUnsentRequests() {
+boolean hasUnsentRequests() {

Review Comment:
   Sorry - I left out commit.  Fixing those obvious mistakes right away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager

2023-09-13 Thread via GitHub


kirktrue commented on code in PR #14359:
URL: https://github.com/apache/kafka/pull/14359#discussion_r1325174944


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeStatusDetector.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.NetworkClientUtils;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * Use {@code NodeStatusDetector} to determine the status of a given broker 
{@link Node}. It's also
+ * possible to check for previous authentication errors if the node isn't 
available.
+ *
+ * @see ConsumerNetworkClient
+ * @see NetworkClientDelegate
+ */
+public interface NodeStatusDetector {
+
+/**
+ * Check if the node is disconnected and unavailable for immediate 
reconnection (i.e. if it is in
+ * reconnect backoff window following the disconnect).
+ *
+ * @param node {@link Node} to check for availability
+ * @see NetworkClientUtils#isUnavailable(KafkaClient, Node, Time)
+ */
+boolean isUnavailable(Node node);
+
+/**
+ * Checks for an authentication error on a given node and throws the 
exception if it exists.
+ *
+ * @param node {@link Node} to check for a previous {@link 
AuthenticationException}; if found it is thrown
+ * @see NetworkClientUtils#maybeThrowAuthFailure(KafkaClient, Node)
+ */
+void maybeThrowAuthFailure(Node node);
+
+/**
+ * Initiate a connection if currently possible. This is only really useful 
for resetting

Review Comment:
   Per the above comments, I've removed `NodeStatusDetector` for now, so this 
should be moot for the time being.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager

2023-09-13 Thread via GitHub


kirktrue commented on code in PR #14359:
URL: https://github.com/apache/kafka/pull/14359#discussion_r1325174579


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -91,23 +105,70 @@ public void onFailure(RuntimeException e) {
 }
 };
 
-final RequestFuture future = 
client.send(fetchTarget, request);
+final RequestFuture future = 
nodeStatusDetector.send(fetchTarget, request);
 future.addListener(listener);
 }
 
 return fetchRequestMap.size();
 }
 
-public void close(final Timer timer) {
-if (!isClosed.compareAndSet(false, true)) {
-log.info("Fetcher {} is already closed.", this);
-return;
+public Fetch collectFetch() {
+return fetchCollector.collectFetch(fetchBuffer);
+}
+
+protected void maybeCloseFetchSessions(final Timer timer) {

Review Comment:
   Yes, it is a bit obtuse. My apologies.
   
   There are two reasons that `maybeCloseFetchSessions` exists in `Fetcher`:
   
   1. The forthcoming `FetchRequestManager` doesn't close sessions in the same 
way, so it doesn't make sense to have the method in the shared `AbstractFetch` 
class.
   2. `FetchRequestManager` (as with the other `RequestManager`s) doesn't use 
the `ConsumerNetworkClient` but uses the `NetworkClientDelegate`. The code in 
`AbstractFetch` that is shared by both `Fetcher` and—soon—`FetchRequestManager` 
needed a way to call some of the methods that `ConsumerNetworkClient` and 
`NetworkClientDelegate` have in common. Thus the `NodeStatusDetector` interface 
was created.
   
   Because the `FetchRequestManager` isn't in this PR, it's understandably 
unclear the reason for the current structure. For now I've made the following 
changes:
   
   1. I moved `maybeCloseFetchSessions` to `AbstractFetch`
   2. I left `AbstractFetch` with a reference to a `ConsumerNetworkClient`
   3. I've removed the `NodeStatusDetector`
   
   As needed, the above may be changed in forthcoming PRs.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -91,23 +105,70 @@ public void onFailure(RuntimeException e) {
 }
 };
 
-final RequestFuture future = 
client.send(fetchTarget, request);
+final RequestFuture future = 
nodeStatusDetector.send(fetchTarget, request);
 future.addListener(listener);
 }
 
 return fetchRequestMap.size();
 }
 
-public void close(final Timer timer) {
-if (!isClosed.compareAndSet(false, true)) {
-log.info("Fetcher {} is already closed.", this);
-return;
+public Fetch collectFetch() {
+return fetchCollector.collectFetch(fetchBuffer);
+}
+
+protected void maybeCloseFetchSessions(final Timer timer) {

Review Comment:
   Yes, it is a bit obtuse. My apologies.
   
   There are two reasons that `maybeCloseFetchSessions` exists in `Fetcher`:
   
   1. The forthcoming `FetchRequestManager` doesn't close sessions in the same 
way, so it doesn't make sense to have the method in the shared `AbstractFetch` 
class.
   2. `FetchRequestManager` (as with the other `RequestManager`s) doesn't use 
the `ConsumerNetworkClient` but uses the `NetworkClientDelegate`. The code in 
`AbstractFetch` that is shared by both `Fetcher` and—soon—`FetchRequestManager` 
needed a way to call some of the methods that `ConsumerNetworkClient` and 
`NetworkClientDelegate` have in common. Thus the `NodeStatusDetector` interface 
was created.
   
   Because the `FetchRequestManager` isn't in this PR, it's understandably 
unclear the reason for the current structure. For now I've made the following 
changes:
   
   1. I moved `maybeCloseFetchSessions` to `AbstractFetch`
   2. I left `AbstractFetch` with a reference to a `ConsumerNetworkClient`
   3. I've removed the `NodeStatusDetector`
   
   As needed, the above may be changed in forthcoming PRs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager

2023-09-13 Thread via GitHub


kirktrue commented on code in PR #14359:
URL: https://github.com/apache/kafka/pull/14359#discussion_r1325171010


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -63,6 +68,31 @@ public NetworkClientDelegate(
 this.unsentRequests = new ArrayDeque<>();
 this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+this.tryConnectNodes = new HashSet<>();
+}
+
+@Override
+public boolean isUnavailable(Node node) {
+return NetworkClientUtils.isUnavailable(client, node, time);
+}
+
+@Override
+public void maybeThrowAuthFailure(Node node) {
+NetworkClientUtils.maybeThrowAuthFailure(client, node);
+}
+
+@Override
+public void tryConnect(Node node) {

Review Comment:
   Yes—`tryConnect` and `maybeTryConnect` were intended to be used together. 
The reason the latter wasn't in the interface was because of the difference in 
_when_ we perform network I/O. In the existing `Consumer`, we perform it 
immediately, whereas in the refactored `Consumer`, we delay that I/O until the 
last step. So the intent was to for the new `Consumer` to queue up the 
connection attempts and perform them in one step in `maybeTryConnect` vs. 
immediately when `tryConnect` was called.
   
   That said, a) it's confusing, and b) it's not needed in this particular PR, 
so I've removed the whole `NodeStatusDetector` for now.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -63,6 +68,31 @@ public NetworkClientDelegate(
 this.unsentRequests = new ArrayDeque<>();
 this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+this.tryConnectNodes = new HashSet<>();
+}
+
+@Override
+public boolean isUnavailable(Node node) {
+return NetworkClientUtils.isUnavailable(client, node, time);
+}
+
+@Override
+public void maybeThrowAuthFailure(Node node) {
+NetworkClientUtils.maybeThrowAuthFailure(client, node);
+}
+
+@Override
+public void tryConnect(Node node) {

Review Comment:
   Yes—`tryConnect` and `maybeTryConnect` were intended to be used together. 
The reason the latter wasn't in the interface was because of the difference in 
_when_ we perform network I/O. In the existing `Consumer`, we perform it 
immediately, whereas in the refactored `Consumer`, we delay that I/O until the 
last step. So the intent was to for the new `Consumer` to queue up the 
connection attempts and perform them in one step in `maybeTryConnect` vs. 
immediately when `tryConnect` was called.
   
   That said, a) it's confusing, and b) it's not needed in this particular PR, 
so I've removed the whole `NodeStatusDetector` for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


junrao commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1325040397


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -91,7 +89,7 @@ public CommitRequestManager(
 }
 
 /**
- * Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequest} 
request if there's any. The function will
+ * Poll for the {@link OffsetFetchRequest} and {@link 
org.apache.kafka.common.requests.OffsetCommitRequest} request if there's any. 
The function will

Review Comment:
   Do we need to spell out the package name for OffsetCommitRequest? It doesn't 
seem this is done consistently.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listOffsets
+ * partitionsFor

Review Comment:
   It doesn't seem that we have wired `TopicMetadataRequestManager` to the 
`partitionsFor` call?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -381,41 +377,41 @@ public String toString() {
 }
 
 /**
- * This is used to stage the unsent {@link OffsetCommitRequestState} 
and {@link OffsetFetchRequestState}.
+ * This is used to stage the unsent {@link OffsetCommitRequest} and 
{@link OffsetFetchRequestState}.
  * unsentOffsetCommits holds the offset commit requests that have not 
been sent out
  * unsentOffsetFetches holds the offset fetch requests that have not 
been sent out
- * inflightOffsetFetches holds the offset fetch requests that have 
been sent out but incompleted.
- *
+ * inflightOffsetFetches holds the offset fetch requests that have 
been sent out but not completed.
+ * 
  * {@code addOffsetFetchRequest} dedupes the requests to avoid sending the 
same requests.
  */
 
 class PendingRequests {
 // Queue is used to ensure the sequence of commit
-Queue unsentOffsetCommits = new 
LinkedList<>();
+Queue unsentOffsetCommits = new LinkedList<>();
 List unsentOffsetFetches = new ArrayList<>();
 List inflightOffsetFetches = new 
ArrayList<>();
 
-public boolean hasUnsentRequests() {
+boolean hasUnsentRequests() {

Review Comment:
   Could this be private? Ditto below.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the Li

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-13 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1325138384


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java:
##
@@ -79,6 +79,9 @@ public int hashCode() {
 
 @Override
 public String toString() {
-return String.format("Assignor selection {type:%s, name:%s}", type, 
serverAssignor);
+return "AssignorSelection{" +

Review Comment:
   just to create a coherent format



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #14382: KAFKA-15442: add a section in doc for tiered storage

2023-09-13 Thread via GitHub


satishd commented on code in PR #14382:
URL: https://github.com/apache/kafka/pull/14382#discussion_r1324263153


##
docs/ops.html:
##
@@ -3859,6 +3859,80 @@ Finalizing the migration
 
 # Other configs ...
 
+
+6.11 Tiered Storage
+
+Tiered Storage 
overview
+
+Kafka data is mostly consumed in a streaming fashion using tail reads. Tail 
reads leverage OS's page cache to serve the data instead of disk reads.
+  Older data is typically read from the disk for backfill or failure recovery 
purposes and is infrequent.
+
+In the tiered storage approach, Kafka cluster is configured with two tiers 
of storage - local and remote.
+  The local tier is the same as the current Kafka that uses the local disks on 
the Kafka brokers to store the log segments.
+  The new remote tier uses external storage systems, such as HDFS or S3, to 
store the completed log segments.
+
+Configuration
+
+broker 
configurations
+
+By default, Kafka server will not enable tiered storage feature. 
remote.log.storage.system.enable
+  is the property to control whether to enable tiered storage functionality in 
a broker or not. Setting it to "true" to enable this feature.
+
+
+RemoteStorageManager is an interface to provide the lifecycle 
of remote log segments and indexes. Kafka server
+  doesn't provide out-of-the-box implementation of RemoteStorageManager. 
Configuring remote.log.storage.manager.class.name

Review Comment:
   1. `Configuring` -> `Configure`
   
   
   2. remote.log.storage.manager.class.name
 and remote.log.storage.manager.class.path instead of 
remote.log.storage.manager.class.name
 or remote.log.storage.manager.class.path



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #14382: KAFKA-15442: add a section in doc for tiered storage

2023-09-13 Thread via GitHub


satishd commented on code in PR #14382:
URL: https://github.com/apache/kafka/pull/14382#discussion_r1324263153


##
docs/ops.html:
##
@@ -3859,6 +3859,80 @@ Finalizing the migration
 
 # Other configs ...
 
+
+6.11 Tiered Storage
+
+Tiered Storage 
overview
+
+Kafka data is mostly consumed in a streaming fashion using tail reads. Tail 
reads leverage OS's page cache to serve the data instead of disk reads.
+  Older data is typically read from the disk for backfill or failure recovery 
purposes and is infrequent.
+
+In the tiered storage approach, Kafka cluster is configured with two tiers 
of storage - local and remote.
+  The local tier is the same as the current Kafka that uses the local disks on 
the Kafka brokers to store the log segments.
+  The new remote tier uses external storage systems, such as HDFS or S3, to 
store the completed log segments.
+
+Configuration
+
+broker 
configurations
+
+By default, Kafka server will not enable tiered storage feature. 
remote.log.storage.system.enable
+  is the property to control whether to enable tiered storage functionality in 
a broker or not. Setting it to "true" to enable this feature.
+
+
+RemoteStorageManager is an interface to provide the lifecycle 
of remote log segments and indexes. Kafka server
+  doesn't provide out-of-the-box implementation of RemoteStorageManager. 
Configuring remote.log.storage.manager.class.name

Review Comment:
   `Configuring` -> `Configure`
   
   
   remote.log.storage.manager.class.name
 and remote.log.storage.manager.class.path instead of 
remote.log.storage.manager.class.name
 or remote.log.storage.manager.class.path



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15459) Convert coordinator retriable errors to a known producer response error.

2023-09-13 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-15459.

Resolution: Fixed

> Convert coordinator retriable errors to a known producer response error.
> 
>
> Key: KAFKA-15459
> URL: https://issues.apache.org/jira/browse/KAFKA-15459
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.6.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.6.0
>
>
> KIP-890 Part 1 tries to address hanging transactions on old clients. Thus, 
> the produce version can not be bumped and no new errors can be added. 
> Currently we use the java client's notion of retriable and abortable errors 
> -- retriable errors are defined as such by extending the retriable error 
> class, fatal errors are defined explicitly, and abortable errors are the 
> remaining. However, many other clients treat non specified errors as fatal 
> and that means many retriable errors kill the application. This is not ideal.
> While reviewing [https://github.com/apache/kafka/pull/14370] I added some of 
> the documentation for the returned errors in the produce response as well.
> There were concerns about the new errors:
>  * {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
>  * {@link Errors#COORDINATOR_NOT_AVAILABLE}
>  * {@link Errors#INVALID_TXN_STATE}
>  * {@link Errors#INVALID_PRODUCER_ID_MAPPING}
>  * {@link Errors#CONCURRENT_TRANSACTIONS}
> The coordinator load, not available, and concurrent transactions errors 
> should be retriable.
> The invalid txn state and pid mapping errors should be abortable.
> This is how older java clients handle the errors, but it is unclear how other 
> clients handle them. It seems that rdkafka (for example) treats the abortable 
> errors as fatal instead. The coordinator errors are retriable but not the 
> concurrent transactions error. Generally anything not specified otherwise is 
> fatal.
> It seems acceptable for the abortable errors to be fatal on some clients 
> since the error is likely on a zombie producer or in a state that may be 
> harder to recover from. However, for the retriable errors, we can return 
> NOT_ENOUGH_REPLICAS which is a known retriable response. We can use the 
> produce api's response string to specify the real cause of the error for 
> debugging. 
> There were trade-offs between making the older clients work and for clarity 
> in errors. This seems to be the best compromise.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan merged pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


jolshan merged PR #14378:
URL: https://github.com/apache/kafka/pull/14378


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


jolshan commented on PR #14378:
URL: https://github.com/apache/kafka/pull/14378#issuecomment-1718329367

   Test failures look unrelated. I synced with Jason offline and he said we can 
proceed with the merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #14387: [DRAFT] KAFKA-14519: New coordinator metrics

2023-09-13 Thread via GitHub


jeffkbkim opened a new pull request, #14387:
URL: https://github.com/apache/kafka/pull/14387

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager

2023-09-13 Thread via GitHub


kirktrue commented on code in PR #14359:
URL: https://github.com/apache/kafka/pull/14359#discussion_r1325061757


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -256,138 +258,23 @@ protected FetchRequest.Builder createFetchRequest(final 
Node fetchTarget,
 }
 
 /**
- * Return the fetched records, empty the record buffer and update the 
consumed position.
- *
- * 
+ * Return the list of fetchable partitions, which are the set of 
partitions to which we are subscribed,
+ * but excluding any partitions for which we still have buffered 
data. The idea is that since the user
+ * has yet to process the data for the partition that has already been 
fetch, we should not go send for more data

Review Comment:
   Fixed. Also made this method return a `Set` which is more appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager

2023-09-13 Thread via GitHub


kirktrue commented on code in PR #14359:
URL: https://github.com/apache/kafka/pull/14359#discussion_r1325058583


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * {@code FetchUtils} provides a place for disparate parts of the fetch logic 
to live.
+ */
+public class FetchUtils {
+
+/**
+ * Performs two combined actions based on the state related to the {@link 
TopicPartition}:
+ *
+ * 
+ * 
+ * Invokes {@link ConsumerMetadata#requestUpdate(boolean)} to 
signal that the metadata is incorrect and
+ * needs to be updated
+ * 
+ * 
+ * Invokes {@link 
SubscriptionState#clearPreferredReadReplica(TopicPartition)} to clear out any 
read replica
+ * information that may be present.
+ * 
+ * 
+ *
+ * This utility method should be invoked if the client detects (or is told 
by a node in the broker) that an
+ * attempt was made to fetch from a node that isn't the leader or 
preferred replica.
+ *
+ * @param metadata {@link ConsumerMetadata} for which to request an update
+ * @param subscriptions {@link SubscriptionState} to clear any internal 
read replica node
+ * @param topicPartition {@link TopicPartition} for which this state 
change is related
+ */
+static void requestMetadataUpdate(final ConsumerMetadata metadata,
+  final SubscriptionState subscriptions,
+  final TopicPartition topicPartition) {
+metadata.requestUpdate(true);

Review Comment:
   You're right. I double-checked the (very helpful) comments in 
`requestUpdate()` and I agree, so I set it to `false`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager

2023-09-13 Thread via GitHub


kirktrue commented on code in PR #14359:
URL: https://github.com/apache/kafka/pull/14359#discussion_r1325056350


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.IdempotentCloser;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Predicate;
+
+/**
+ * {@code FetchBuffer} buffers up the results from the broker responses as 
they are received. It is essentially a
+ * wrapper around a {@link java.util.Queue} of {@link CompletedFetch}.
+ *
+ * 
+ *
+ * Note: this class is not thread-safe and is intended to only be 
used from a single thread.
+ */
+public class FetchBuffer implements Closeable {
+
+private final Logger log;
+private final ConcurrentLinkedQueue completedFetches;

Review Comment:
   I hadn't addressed it because I wasn't 100% sure if that was accurate. I 
temporarily added some assertion checking to verify that and re-run the unit 
tests and they passed, so I'll take that as sufficient confirmation. I've 
updated the class-level documentation to include that fact. LMK if you're OK 
with the comments. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15465) MM2 not working when its internal topics are pre-created on a cluster that disallows topic creation

2023-09-13 Thread Ahmed HIBOT (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764867#comment-17764867
 ] 

Ahmed HIBOT commented on KAFKA-15465:
-

[~omnia_h_ibrahim], Can you please take a look and confirm my findings. Also, I 
will be happy to provide the fix but I can't assign myself to the bug.

> MM2 not working when its internal topics are pre-created on a cluster that 
> disallows topic creation
> ---
>
> Key: KAFKA-15465
> URL: https://issues.apache.org/jira/browse/KAFKA-15465
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.4.1
>Reporter: Ahmed HIBOT
>Priority: Major
>
> h1. Replication steps
>  * Setup a source kafka cluster (alias SOURCE) which doesn't allow topic 
> creation to MM2 (therefore it doesn't allow the creation of internal topics)
>  * Create MM2 internal topics in the source kafka cluster
>  * Setup a target kafka cluster (alias TARGET)
>  * Enable one way replication SOURCE->TARGET
> MM2 will attempt to create or find its internal topics on the source cluster 
> but it will fail with the following stack trace
> {code:java}
> {"log_timestamp": "2023-09-13T09:39:25.612+", "log_level": "ERROR", 
> "process_id": 1, "process_name": "mirror-maker", "thread_id": 1, 
> "thread_name": "Scheduler for MirrorSourceConnector-creating upstream 
> offset-syncs topic", "action_name": 
> "org.apache.kafka.connect.mirror.Scheduler", "log_message": "Scheduler for 
> MirrorSourceConnector caught exception in scheduled task: creating upstream 
> offset-syncs topic"}
> org.apache.kafka.connect.errors.ConnectException: Error while attempting to 
> create/find topic 'mm2-offset-syncs.TARGET.internal'
>   at 
> org.apache.kafka.connect.mirror.MirrorUtils.createCompactedTopic(MirrorUtils.java:155)
>   at 
> org.apache.kafka.connect.mirror.MirrorUtils.createSinglePartitionCompactedTopic(MirrorUtils.java:161)
>   at 
> org.apache.kafka.connect.mirror.MirrorSourceConnector.createOffsetSyncsTopic(MirrorSourceConnector.java:328)
>   at org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
>   at 
> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
>   at 
> org.apache.kafka.connect.mirror.Scheduler.lambda$execute$2(Scheduler.java:63)
> [...]
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicAuthorizationException: Authorization 
> failed.
>   at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>   at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>   at 
> org.apache.kafka.connect.mirror.MirrorUtils.createCompactedTopic(MirrorUtils.java:124)
>   ... 11 more
> Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: 
> Authorization failed. {code}
>  
> h1. Root cause analysis
> The changes introduced by KAFKA-13401 in 
> [{{{}MirrorUtils{}}}|https://github.com/apache/kafka/pull/12577/files#diff-fa8f595307a4ade20cc22253a7721828e3b55c96f778e9c4842c978801e0a1a4]
>  are supposed to follow the same logic as 
> [{{{}TopicAdmin{}}}|https://github.com/apache/kafka/blob/a7e865c0a756504cc7ae6f4eb0772caddc53/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L423]
>  according to the contributor's 
> [comment|https://github.com/apache/kafka/pull/12577#discussion_r991566108]
> {{TopicAdmin.createOrFindTopics(...)}} and 
> {{MirrorUtils.createCompactedTopic(...)}} aren't aligned in terms of allowed 
> exceptions
> ||Exception||TopicAdmin||MirrorUtils||
> |TopicExistsException|OK|OK|
> |UnsupportedVersionException|OK|_KO_|
> |ClusterAuthorizationException|OK|_KO_|
> |TopicAuthorizationException|OK|_KO_|
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15465) MM2 not working when its internal topics are pre-created on a cluster that disallows topic creation

2023-09-13 Thread Ahmed HIBOT (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed HIBOT updated KAFKA-15465:

Description: 
h1. Replication steps
 * Setup a source kafka cluster (alias SOURCE) which doesn't allow topic 
creation to MM2 (therefore it doesn't allow the creation of internal topics)
 * Create MM2 internal topics in the source kafka cluster
 * Setup a target kafka cluster (alias TARGET)
 * Enable one way replication SOURCE->TARGET

MM2 will attempt to create or find its internal topics on the source cluster 
but it will fail with the following stack trace
{code:java}
{"log_timestamp": "2023-09-13T09:39:25.612+", "log_level": "ERROR", 
"process_id": 1, "process_name": "mirror-maker", "thread_id": 1, "thread_name": 
"Scheduler for MirrorSourceConnector-creating upstream offset-syncs topic", 
"action_name": "org.apache.kafka.connect.mirror.Scheduler", "log_message": 
"Scheduler for MirrorSourceConnector caught exception in scheduled task: 
creating upstream offset-syncs topic"}
org.apache.kafka.connect.errors.ConnectException: Error while attempting to 
create/find topic 'mm2-offset-syncs.TARGET.internal'
at 
org.apache.kafka.connect.mirror.MirrorUtils.createCompactedTopic(MirrorUtils.java:155)
at 
org.apache.kafka.connect.mirror.MirrorUtils.createSinglePartitionCompactedTopic(MirrorUtils.java:161)
at 
org.apache.kafka.connect.mirror.MirrorSourceConnector.createOffsetSyncsTopic(MirrorSourceConnector.java:328)
at org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
at 
org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
at 
org.apache.kafka.connect.mirror.Scheduler.lambda$execute$2(Scheduler.java:63)
[...]
Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TopicAuthorizationException: Authorization 
failed.
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
org.apache.kafka.connect.mirror.MirrorUtils.createCompactedTopic(MirrorUtils.java:124)
... 11 more
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: 
Authorization failed. {code}
 
h1. Root cause analysis

The changes introduced by KAFKA-13401 in 
[{{{}MirrorUtils{}}}|https://github.com/apache/kafka/pull/12577/files#diff-fa8f595307a4ade20cc22253a7721828e3b55c96f778e9c4842c978801e0a1a4]
 are supposed to follow the same logic as 
[{{{}TopicAdmin{}}}|https://github.com/apache/kafka/blob/a7e865c0a756504cc7ae6f4eb0772caddc53/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L423]
 according to the contributor's 
[comment|https://github.com/apache/kafka/pull/12577#discussion_r991566108]

{{TopicAdmin.createOrFindTopics(...)}} and 
{{MirrorUtils.createCompactedTopic(...)}} aren't aligned in terms of allowed 
exceptions
||Exception||TopicAdmin||MirrorUtils||
|TopicExistsException|OK|OK|
|UnsupportedVersionException|OK|_KO_|
|ClusterAuthorizationException|OK|_KO_|
|TopicAuthorizationException|OK|_KO_|

 

> MM2 not working when its internal topics are pre-created on a cluster that 
> disallows topic creation
> ---
>
> Key: KAFKA-15465
> URL: https://issues.apache.org/jira/browse/KAFKA-15465
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.4.1
>Reporter: Ahmed HIBOT
>Priority: Major
>
> h1. Replication steps
>  * Setup a source kafka cluster (alias SOURCE) which doesn't allow topic 
> creation to MM2 (therefore it doesn't allow the creation of internal topics)
>  * Create MM2 internal topics in the source kafka cluster
>  * Setup a target kafka cluster (alias TARGET)
>  * Enable one way replication SOURCE->TARGET
> MM2 will attempt to create or find its internal topics on the source cluster 
> but it will fail with the following stack trace
> {code:java}
> {"log_timestamp": "2023-09-13T09:39:25.612+", "log_level": "ERROR", 
> "process_id": 1, "process_name": "mirror-maker", "thread_id": 1, 
> "thread_name": "Scheduler for MirrorSourceConnector-creating upstream 
> offset-syncs topic", "action_name": 
> "org.apache.kafka.connect.mirror.Scheduler", "log_message": "Scheduler for 
> MirrorSourceConnector caught exception in scheduled task: creating upstream 
> offset-syncs topic"}
> org.apache.kafka.connect.errors.ConnectException: Error while attempting to 
> create/find topic 'mm2-offset-syncs.TARGET.internal'
>   at 
> org.apache.kafka.connect.mirror.MirrorUtils.createCompactedTopic(MirrorUtils.java:155)
>   at 
> org.apach

[GitHub] [kafka] kirktrue commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager

2023-09-13 Thread via GitHub


kirktrue commented on code in PR #14359:
URL: https://github.com/apache/kafka/pull/14359#discussion_r1325038436


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.IdempotentCloser;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Predicate;
+
+/**
+ * {@code FetchBuffer} buffers up the results from the broker responses as 
they are received. It is essentially a
+ * wrapper around a {@link java.util.Queue} of {@link CompletedFetch}.
+ *
+ * 
+ *
+ * Note: this class is not thread-safe and is intended to only be 
used from a single thread.
+ */
+public class FetchBuffer implements Closeable {
+
+private final Logger log;
+private final ConcurrentLinkedQueue completedFetches;
+private final IdempotentCloser idempotentCloser = new IdempotentCloser();
+
+private CompletedFetch nextInLineFetch;
+
+public FetchBuffer(final LogContext logContext) {
+this.log = logContext.logger(FetchBuffer.class);
+this.completedFetches = new ConcurrentLinkedQueue<>();
+}
+
+/**
+ * Returns {@code true} if there are no completed fetches pending to 
return to the user.
+ *
+ * @return {@code true} if the buffer is empty, {@code false} otherwise
+ */
+boolean isEmpty() {
+return completedFetches.isEmpty();
+}
+
+/**
+ * Return whether we have any completed fetches pending return to the 
user. This method is thread-safe. Has
+ * visibility for testing.
+ *
+ * @return {@code true} if there are completed fetches that match the 
{@link Predicate}, {@code false} otherwise
+ */
+boolean hasCompletedFetches(Predicate predicate) {
+return completedFetches.stream().anyMatch(predicate);
+}
+
+void add(CompletedFetch completedFetch) {
+completedFetches.add(completedFetch);
+}
+
+void addAll(Collection completedFetches) {
+this.completedFetches.addAll(completedFetches);
+}
+
+CompletedFetch nextInLineFetch() {
+return nextInLineFetch;
+}
+
+void setNextInLineFetch(CompletedFetch completedFetch) {
+this.nextInLineFetch = completedFetch;
+}
+
+CompletedFetch peek() {
+return completedFetches.peek();
+}
+
+CompletedFetch poll() {
+return completedFetches.poll();
+}
+
+/**
+ * Updates the buffer to retain only the fetch data that corresponds to 
the given partitions. Any previously
+ * {@link CompletedFetch fetched data} is removed if its partition is not 
in the given set of partitions.
+ *
+ * @param partitions {@link Set} of {@link TopicPartition}s for which any 
buffered data should be kept
+ */
+void retainAll(final Set partitions) {
+completedFetches.removeIf(cf -> maybeDrain(partitions, cf));
+
+if (maybeDrain(partitions, nextInLineFetch))
+nextInLineFetch = null;
+}
+
+boolean maybeDrain(final Set partitions, final 
CompletedFetch completedFetch) {
+if (completedFetch != null && 
!partitions.contains(completedFetch.partition)) {
+log.debug("Removing {} from buffered fetch data as it is not in 
the set of partitions to retain ({})", completedFetch.partition, partitions);
+completedFetch.drain();
+return true;
+} else {
+return false;
+}
+}
+
+Set partitions() {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15465) MM2 not working when its internal topics are pre-created on a cluster that disallows topic creation

2023-09-13 Thread Ahmed HIBOT (Jira)
Ahmed HIBOT created KAFKA-15465:
---

 Summary: MM2 not working when its internal topics are pre-created 
on a cluster that disallows topic creation
 Key: KAFKA-15465
 URL: https://issues.apache.org/jira/browse/KAFKA-15465
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 3.4.1
Reporter: Ahmed HIBOT






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on a diff in pull request #14376: KAFKA-15458: Fully resolve endpoint information before registering controllers

2023-09-13 Thread via GitHub


mumrah commented on code in PR #14376:
URL: https://github.com/apache/kafka/pull/14376#discussion_r1324939918


##
metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java:
##
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.ControllerRegistrationRequestData;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterControllerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+
+/**
+ * ListenerInfo contains information about the listeners of either a 
controller or a broker.
+ * ListenerInfo objects are immutable; they cannot be modified once created. 
The intention is
+ * that you store either controller listeners or broker listeners here, but 
not both. On a
+ * combined KRaft node, which has both broker and controller roles, you would 
have two
+ * separate ListenerInfo objects to represent the listeners of each role.
+ *
+ * Listener information is stored in a linked hash map. This maintains 
ordering while still
+ * allowing the traditional O(1) hash map access. By convention, the first 
listener is special,
+ * corresponding to either the inter-controller listener or the inter-broker 
listener.
+ * This is the only listener that other nodes will attempt to use to 
communicate with this node.
+ *
+ * You may wonder why nodes support multiple listeners, given that 
inter-cluster communication only
+ * ever uses the first one. Well, one reason is that external clients may wish 
to use the additional
+ * listeners. It is a good practice to separate external and internal traffic. 
In some cases,
+ * external traffic may be encrypted while internal traffic is not. (Although 
other admins may wish
+ * to encrypt everything.) Another reason is that supporting multiple 
listeners allows us to change
+ * the effective inter-cluster listener via a roll. During such a roll, half 
of the brokers
+ * (or controllers) might be using one listener, while the other half use 
another. This lets us,
+ * for example, transition from using a PLAINTEXT inter broker listener to 
using an SSL one without
+ * taking any downtime.
+ *
+ * The ListenerInfo class is intended to handle translating endpoint 
information between various
+ * different data structures, and also to handle the two big gotchas of Kafka 
endpoints.
+ *
+ * The first gotcha is that the hostname will be null or blank if we are 
listening on 0.0.0.0.
+ * The withWildcardHostnamesResolved function creates a ListenerInfo object 
where all such hostnames
+ * are replaced by specific hostnames. (It's not perfect because we have to 
choose a single hostname
+ * out of multiple possibilities. In production scenarios it would be better 
to set the desired
+ * hostname explicitly in the configuration rather than binding to 0.0.0.0.)
+ *
+ * The second gotcha is that if someone configures an ephemeral port (aka port 
0), we need to fill
+ * The withEphemeralPortsCorrected resolves this by filling in the missing 
information for ephemeral

Review Comment:
   seems some words are missing here? 



##
metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java:
##
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writin

[GitHub] [kafka] philipnee opened a new pull request, #14386: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee opened a new pull request, #14386:
URL: https://github.com/apache/kafka/pull/14386

   TopicMetadataRequestManager is responsible for sending topic metadata 
requests. The manager manages API requests and build the request accordingly. 
All topic metadata requests are chained, if requesting the same topic, to avoid 
sending requests repeatedly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee closed pull request #14362: KAFKA-14960: TopicMetadata request manager

2023-09-13 Thread via GitHub


philipnee closed pull request #14362: KAFKA-14960: TopicMetadata request manager
URL: https://github.com/apache/kafka/pull/14362


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm opened a new pull request, #14385: KAFKA-15306 - Integrating committed offsets for updating fetch positions

2023-09-13 Thread via GitHub


lianetm opened a new pull request, #14385:
URL: https://github.com/apache/kafka/pull/14385

   Support for using committed offsets to update fetch positions.
   
   This PR includes:
   - moving`refreshCommittedOffsets` function out of the existing 
`ConsumerCoordinator` so it can be reused (no code changes)
   - using the above `refreshCommittedOffsets` for updating fetch positions if 
the consumer has enabled the Kafka-based offsets management by defining a 
groupId
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15464) Allow dynamic reloading of certificates with different DN / SANs

2023-09-13 Thread Jakub Scholz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jakub Scholz reassigned KAFKA-15464:


Assignee: Jakub Scholz

> Allow dynamic reloading of certificates with different DN / SANs
> 
>
> Key: KAFKA-15464
> URL: https://issues.apache.org/jira/browse/KAFKA-15464
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jakub Scholz
>Assignee: Jakub Scholz
>Priority: Major
>
> Kafka currently doesn't allow dynamic reloading of keystores when the new key 
> has a different DN or removes some of the SANs. While it might help to 
> prevent users from breaking their cluster, in some cases it would be great to 
> be able to bypass this validation when desired.
> More details are in the [KIP-978: Allow dynamic reloading of certificates 
> with different DN / 
> SANs|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429128]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15464) Allow dynamic reloading of certificates with different DN / SANs

2023-09-13 Thread Jakub Scholz (Jira)
Jakub Scholz created KAFKA-15464:


 Summary: Allow dynamic reloading of certificates with different DN 
/ SANs
 Key: KAFKA-15464
 URL: https://issues.apache.org/jira/browse/KAFKA-15464
 Project: Kafka
  Issue Type: Improvement
Reporter: Jakub Scholz


Kafka currently doesn't allow dynamic reloading of keystores when the new key 
has a different DN or removes some of the SANs. While it might help to prevent 
users from breaking their cluster, in some cases it would be great to be able 
to bypass this validation when desired.

More details are in the [KIP-978: Allow dynamic reloading of certificates with 
different DN / 
SANs|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429128]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15452) Custom KafkaPrincipalBuilder Cannot Access SslPrincipalMapper

2023-09-13 Thread Raghu Baddam (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764839#comment-17764839
 ] 

Raghu Baddam commented on KAFKA-15452:
--

[~hachikuji] /[~mimaison] /[~mjsax]: Do you mind validating this bug and assign 
it to me ?

Tagging Active Contributors for visibility: [~divijvaidya] , [~satish.duggana], 
[~calvinliu] 

Thanks in advance.

> Custom KafkaPrincipalBuilder Cannot Access SslPrincipalMapper
> -
>
> Key: KAFKA-15452
> URL: https://issues.apache.org/jira/browse/KAFKA-15452
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.1
>Reporter: Raghu Baddam
>Priority: Minor
>  Labels: kafka, walmart
>
> When implementing a custom KafkaPrincipalBuilder, it is not possible to 
> access *SslPrincipalMapper* and {*}kerberosShortNamer{*}. This limits the 
> ability to parse Regex Rules from 
> BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG, resulting in lack 
> of support for Mapping Rules as SslPrincipalMapper is null.
> A possible workaround is to read the configuration and build another 
> SslPrincipalMapper. However, it would be beneficial if this issue could be 
> addressed at the ChannelBuilders or SslChannelBuilder level.
> Git Reference: 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java#L223-L242]
> If the community deems this to be a bug, I am willing to provide a fix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] zhaohaidao commented on pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


zhaohaidao commented on PR #14271:
URL: https://github.com/apache/kafka/pull/14271#issuecomment-1718110509

   > @zhaohaidao Thanks for all the work you are doing on this. I really 
appreciate it. I have created a few jiras for the next steps:
   > 
   > * https://issues.apache.org/jira/browse/KAFKA-15460
   > * https://issues.apache.org/jira/browse/KAFKA-15461
   > * https://issues.apache.org/jira/browse/KAFKA-15462
   > 
   > I have assigned all of them to you for now as you were interested in 
pursuing this work. Feel free to unassign them if you don't want or plan to 
tackle them.
   > 
   > From a priority perspective, tacking 
https://issues.apache.org/jira/browse/KAFKA-14504 before the above mentioned 
jiras would be really helpful as it will allow us to start end-to-end testing 
when we have all the existing APIs in place.
   
   Thank you for your suggestion. I will give priority to 14504 and deal with 
other tasks one after another.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 commented on a diff in pull request #14053: KAFKA-15221; Fix the race between fetch requests from a rebooted follower.

2023-09-13 Thread via GitHub


splett2 commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1324902142


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -864,13 +866,19 @@ class Partition(val topicPartition: TopicPartition,
 // No need to calculate low watermark if there is no delayed 
DeleteRecordsRequest
 val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) 
lowWatermarkIfLeader else -1L
 val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset
-replica.updateFetchState(
-  followerFetchOffsetMetadata,
-  followerStartOffset,
-  followerFetchTimeMs,
-  leaderEndOffset,
-  brokerEpoch
-)
+
+// Apply read lock here to avoid the race between ISR updates and the 
fetch requests from rebooted follower. It
+// could break the broker epoch checks in the ISR expansion.
+inReadLock(leaderIsrUpdateLock) {

Review Comment:
   I think we should pass through the fetch request leader epoch and validate 
that it is still valid as well. Otherwise we can end up with a stale fetch 
updating the replica state for a new epoch. It may not be necessary to do it in 
this PR though. @dajac wdyt



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


zhaohaidao commented on PR #14271:
URL: https://github.com/apache/kafka/pull/14271#issuecomment-1718105046

   > @zhaohaidao Thanks for the update. I made another pass on it and I left a 
bunch of small comments. We should be able to merge it when they are addressed. 
Could you also rebase the PR to fix the conflicts?
   
   Hey @dajac  Except for a few comments that may still need to be discussed 
and are still in unresolved status, the other comments have been addressed. 
Please take a look when you have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1324884926


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -431,14 +434,17 @@ public void rollback() {
 snapshotRegistry.revertToSnapshot(lastCommittedOffset);
 }
 
+public void getOrCreateSnapshot() {
+snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+}

Review Comment:
   This will cause testListGroups to fail because 
groupMetadataManager.listGroups always uses lastCommittedOffset to query. 
Putting getOrCreateSnapshot(lastWrittenOffset) at the beginning of the replay 
function will lead to the following result: the largest epoch in the snapshot 
is always 1 less than lastCommittedOffset. In this way, listGroups cannot find 
the expected results.
   Putting getOrCreateSnapshot(lastWrittenOffset) at the end of the function, 
especially after lastWrittenOffset+1, all cases can pass, which is consistent 
with the semantics of CoordinatorRuntime, that is, always update the memory 
first, then update lastWrittenOffset, and use the latest lastWrittenOffset to 
update snapshot.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1324884926


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -431,14 +434,17 @@ public void rollback() {
 snapshotRegistry.revertToSnapshot(lastCommittedOffset);
 }
 
+public void getOrCreateSnapshot() {
+snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+}

Review Comment:
   This will cause testListGroups to fail because 
groupMetadataManager.listGroups always uses lastCommittedOffset to query. 
Putting getOrCreateSnapshot(lastWrittenOffset) at the beginning of the replay 
function will lead to the following result: the largest epoch in the snapshot 
is always 1 less than lastCommittedOffset. In this way, listGroups cannot find 
the expected results.
   Put getOrCreateSnapshot(lastWrittenOffset) at the end of the function, 
especially after lastWrittenOffset+1, all cases can pass, which is consistent 
with the semantics of CoordinatorRuntime, that is, always update the memory 
first, then update lastWrittenOffset, and use the latest lastWrittenOffset 
Update snapshot.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


jolshan commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324872270


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig,
 val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
   origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
 debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-
-val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-  val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception(message))
-  )
-}
 
-val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception())
-  )
+def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+  appendResult.map { case (topicPartition, result) =>
+topicPartition -> ProducePartitionStatus(
+  result.info.lastOffset + 1, // required offset
+  new PartitionResponse(
+result.error,
+result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+result.info.lastOffset,
+result.info.logAppendTime,
+result.info.logStartOffset,
+result.info.recordErrors,
+if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage

Review Comment:
   Correct -- when we use custom message, we have always used 
LogAppendInfo.UnknownLogAppendInfo.
   
   I'm not sure why it is this way. Perhaps if it is the default error message 
(which was the case previously) we didn't want to bloat the response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


dajac commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324871934


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2569,6 +2571,70 @@ class ReplicaManagerTest {
 
   assertEquals((Errors.NONE, node0), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
   assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+  // Test we convert the error correctly when trying to append and 
coordinator is not available
+  val tp0 = new TopicPartition(topic, 0)
+  val producerId = 24L
+  val producerEpoch = 0.toShort
+  val sequence = 0
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+  val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = 
Some(txnCoordinatorPartition1))
+  val expectedError = s"Unable to verify the partition has been added to 
the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+  assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+  assertEquals(expectedError, result.assertFired.errorMessage)
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @Test
+  def testVerificationErrorConversions(): Unit = {
+val tp0 = new TopicPartition(topic, 0)
+val producerId = 24L
+val producerEpoch = 0.toShort
+val sequence = 0
+val node = new Node(0, "host1", 0)
+val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+try {
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+
+  val transactionToAdd = new AddPartitionsToTxnTransaction()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setVerifyOnly(true)
+.setTopics(new AddPartitionsToTxnTopicCollection(
+  Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+))
+
+  // Start verification and return the coordinator related errors.
+  var invocations = 1
+  def verifyError(error: Errors): Unit = {
+val expectedMessage = s"Unable to verify the partition has been added 
to the transaction. Underlying error:${error.toString}"
+val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+verify(addPartitionsToTxnManager, 
times(invocations)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+// Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+callback(Map(tp0 -> error).toMap)
+assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+assertEquals(expectedMessage, result.assertFired.errorMessage)
+invocations = invocations + 1
+  }
+
+  Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_))

Review Comment:
   That's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1324870944


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -599,6 +608,164 @@ public void testHeartbeatCoordinatorException() throws 
Exception {
 );
 }
 
+@Test
+public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+int partitionCount = 3;
+service.startup(() -> partitionCount);
+
+ListGroupsRequestData request = new ListGroupsRequestData();
+
+List expectedResults = 
Arrays.asList(
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group0")
+.setGroupState("Stable")
+.setProtocolType("protocol1"),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group1")
+.setGroupState("Empty")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group2")
+.setGroupState("Dead")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+);
+Map> expectResultMap 
= new HashMap<>();
+for (ListGroupsResponseData.ListedGroup result : expectedResults) {
+expectResultMap.put(result.groupId(), 
Collections.singletonList(result));
+}
+when(runtime.partitions()).thenReturn(Sets.newSet(
+new TopicPartition("__consumer_offsets", 0),
+new TopicPartition("__consumer_offsets", 1),
+new TopicPartition("__consumer_offsets", 2)));
+for (int i = 0; i < partitionCount; i++) {
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("list-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
i)),
+ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i;
+}
+
+CompletableFuture responseFuture = 
service.listGroups(
+requestContext(ApiKeys.LIST_GROUPS),
+request
+);
+
+List actualResults = 
responseFuture.get(5, TimeUnit.SECONDS).groups();
+assertEquals(expectedResults, actualResults);
+assertEquals(expectResultMap.size(), actualResults.size());
+for (ListGroupsResponseData.ListedGroup result : actualResults) {
+assertEquals(expectResultMap.get(result.groupId()), 
Collections.singletonList(result));
+}
+}
+
+@Test
+public void testListGroupsFailedWithNotCoordinatorException()
+throws InterruptedException, ExecutionException, TimeoutException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+int partitionCount = 3;
+service.startup(() -> partitionCount);
+
+List expectedResults = 
Arrays.asList(
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group0")
+.setGroupState("Stable")
+.setProtocolType("protocol1"),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group1")
+.setGroupState("Empty")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+);
+Map> expectResultMap 
= new HashMap<>();
+for (ListGroupsResponseData.ListedGroup result : expectedResults) {
+expectResultMap.put(result.groupId(), 
Collections.singletonList(result));
+}
+
+ListGroupsRequestData request = new ListGroupsRequestData();
+when(runtime.partitions()).thenReturn(Sets.newSet(
+new TopicPartition("__consumer_offsets", 0),
+new TopicPartition("__consumer_offsets", 1),
+new TopicPartition("__consumer_offsets", 2)));
+for (int i = 0; i < 2; i++) {
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("list-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
i)),
+ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i;
+}
+
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("list-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+ArgumentMatchers.any()
+)).thenReturn(FutureUtils.failedFuture(new 
NotCoor

[GitHub] [kafka] jolshan commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


jolshan commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324870442


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2569,6 +2571,70 @@ class ReplicaManagerTest {
 
   assertEquals((Errors.NONE, node0), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
   assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+  // Test we convert the error correctly when trying to append and 
coordinator is not available
+  val tp0 = new TopicPartition(topic, 0)
+  val producerId = 24L
+  val producerEpoch = 0.toShort
+  val sequence = 0
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+  val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = 
Some(txnCoordinatorPartition1))
+  val expectedError = s"Unable to verify the partition has been added to 
the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+  assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+  assertEquals(expectedError, result.assertFired.errorMessage)
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @Test
+  def testVerificationErrorConversions(): Unit = {
+val tp0 = new TopicPartition(topic, 0)
+val producerId = 24L
+val producerEpoch = 0.toShort
+val sequence = 0
+val node = new Node(0, "host1", 0)
+val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+try {
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+
+  val transactionToAdd = new AddPartitionsToTxnTransaction()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setVerifyOnly(true)
+.setTopics(new AddPartitionsToTxnTopicCollection(
+  Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+))
+
+  // Start verification and return the coordinator related errors.
+  var invocations = 1
+  def verifyError(error: Errors): Unit = {
+val expectedMessage = s"Unable to verify the partition has been added 
to the transaction. Underlying error:${error.toString}"
+val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+verify(addPartitionsToTxnManager, 
times(invocations)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+// Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+callback(Map(tp0 -> error).toMap)
+assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+assertEquals(expectedMessage, result.assertFired.errorMessage)
+invocations = invocations + 1
+  }
+
+  Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_))

Review Comment:
   Oops. I already added it. I can remove it. Either way is fine with me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


dajac commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324868208


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig,
 val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
   origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
 debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-
-val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-  val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception(message))
-  )
-}
 
-val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception())
-  )
+def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+  appendResult.map { case (topicPartition, result) =>
+topicPartition -> ProducePartitionStatus(
+  result.info.lastOffset + 1, // required offset
+  new PartitionResponse(
+result.error,
+result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+result.info.lastOffset,
+result.info.logAppendTime,
+result.info.logStartOffset,
+result.info.recordErrors,
+if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage

Review Comment:
   I suppose that you are saying that `result.info.errorMessage` is always null 
when `useCustomMessage` is true because we know that 
`LogAppendInfo.UnknownLogAppendInfo` does not set it. Am I right?
   
   What I was wondering is why don't we set it all the time... I suppose that 
what you did here is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


dajac commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324869043


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2569,6 +2571,70 @@ class ReplicaManagerTest {
 
   assertEquals((Errors.NONE, node0), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
   assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+  // Test we convert the error correctly when trying to append and 
coordinator is not available
+  val tp0 = new TopicPartition(topic, 0)
+  val producerId = 24L
+  val producerEpoch = 0.toShort
+  val sequence = 0
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+  val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = 
Some(txnCoordinatorPartition1))
+  val expectedError = s"Unable to verify the partition has been added to 
the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+  assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+  assertEquals(expectedError, result.assertFired.errorMessage)
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @Test
+  def testVerificationErrorConversions(): Unit = {
+val tp0 = new TopicPartition(topic, 0)
+val producerId = 24L
+val producerEpoch = 0.toShort
+val sequence = 0
+val node = new Node(0, "host1", 0)
+val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+try {
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+
+  val transactionToAdd = new AddPartitionsToTxnTransaction()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setVerifyOnly(true)
+.setTopics(new AddPartitionsToTxnTopicCollection(
+  Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+))
+
+  // Start verification and return the coordinator related errors.
+  var invocations = 1
+  def verifyError(error: Errors): Unit = {
+val expectedMessage = s"Unable to verify the partition has been added 
to the transaction. Underlying error:${error.toString}"
+val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+verify(addPartitionsToTxnManager, 
times(invocations)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+// Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+callback(Map(tp0 -> error).toMap)
+assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+assertEquals(expectedMessage, result.assertFired.errorMessage)
+invocations = invocations + 1

Review Comment:
   Personally, I would use the ParameterizedTest but I leave it up to you. The 
`invocations` feels a bit hacky.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


dajac commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324868507


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2569,6 +2571,70 @@ class ReplicaManagerTest {
 
   assertEquals((Errors.NONE, node0), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
   assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+  // Test we convert the error correctly when trying to append and 
coordinator is not available
+  val tp0 = new TopicPartition(topic, 0)
+  val producerId = 24L
+  val producerEpoch = 0.toShort
+  val sequence = 0
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+  val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = 
Some(txnCoordinatorPartition1))
+  val expectedError = s"Unable to verify the partition has been added to 
the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+  assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+  assertEquals(expectedError, result.assertFired.errorMessage)
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @Test
+  def testVerificationErrorConversions(): Unit = {
+val tp0 = new TopicPartition(topic, 0)
+val producerId = 24L
+val producerEpoch = 0.toShort
+val sequence = 0
+val node = new Node(0, "host1", 0)
+val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+try {
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+
+  val transactionToAdd = new AddPartitionsToTxnTransaction()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setVerifyOnly(true)
+.setTopics(new AddPartitionsToTxnTopicCollection(
+  Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+))
+
+  // Start verification and return the coordinator related errors.
+  var invocations = 1
+  def verifyError(error: Errors): Unit = {
+val expectedMessage = s"Unable to verify the partition has been added 
to the transaction. Underlying error:${error.toString}"
+val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+verify(addPartitionsToTxnManager, 
times(invocations)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+// Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+callback(Map(tp0 -> error).toMap)
+assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+assertEquals(expectedMessage, result.assertFired.errorMessage)
+invocations = invocations + 1
+  }
+
+  Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_))

Review Comment:
   Understood. I think that it is fine as it is then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


dajac commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324868208


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig,
 val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
   origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
 debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-
-val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-  val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception(message))
-  )
-}
 
-val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception())
-  )
+def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+  appendResult.map { case (topicPartition, result) =>
+topicPartition -> ProducePartitionStatus(
+  result.info.lastOffset + 1, // required offset
+  new PartitionResponse(
+result.error,
+result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+result.info.lastOffset,
+result.info.logAppendTime,
+result.info.logStartOffset,
+result.info.recordErrors,
+if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage

Review Comment:
   I suppose that you are saying that `result.info.errorMessage` is always null 
when `useCustomMessage` is true because we know that 
`LogAppendInfo.UnknownLogAppendInfo` does not set it. Am I right?
   
   What I was wondering is why don't we set it all the time. I suppose that 
what you did here is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1324869392


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8574,6 +8584,93 @@ public void 
testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep
 assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
 }
 
+@Test
+public void testListGroups() {
+String consumerGroupId = "consumer-group-id";
+String genericGroupId = "generic-group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String genericGroupType = "generic";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10))
+.build();
+context.replay(newGroupMetadataRecord(
+genericGroupId,
+new GroupMetadataValue()
+.setMembers(Collections.emptyList())
+.setGeneration(2)
+.setLeader(null)
+.setProtocolType(genericGroupType)
+.setProtocol("range")
+.setCurrentStateTimestamp(context.time.milliseconds()),
+MetadataVersion.latest()));
+context.commit();
+context.getOrCreateSnapshot();
+GenericGroup genericGroup = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, 
false);
+
+Map actualAllGroupMap =
+context.sendListGroups(Collections.emptyList())
+
.stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+Map expectAllGroupMap =
+Stream.of(new ListGroupsResponseData.ListedGroup()
+.setGroupId(genericGroup.groupId())
+.setProtocolType(genericGroupType)
+.setGroupState(EMPTY.toString()),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId(consumerGroupId)
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString()))
+
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


dajac commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324869297


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -753,39 +753,56 @@ class ReplicaManager(val config: KafkaConfig,
 val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
   origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
 debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-
-val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-  val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception(message))
-  )
-}
 
-val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception())
-  )
+def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+  appendResult.map { case (topicPartition, result) =>
+topicPartition -> ProducePartitionStatus(
+  result.info.lastOffset + 1, // required offset
+  new PartitionResponse(
+result.error,
+result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+result.info.lastOffset,
+result.info.logAppendTime,
+result.info.logStartOffset,
+result.info.recordErrors,
+if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage
+  )
+) // response status
+  }
 }
 
-val allResults = localProduceResults ++ unverifiedResults ++ 
errorResults
+val unverifiedResults = unverifiedEntries.map {
+  case (topicPartition, error) =>
+val finalException =
+  error match {
+case Errors.INVALID_TXN_STATE => error.exception("Partition 
was not added to the transaction")
+case Errors.CONCURRENT_TRANSACTIONS |
+ Errors.COORDINATOR_LOAD_IN_PROGRESS |
+ Errors.COORDINATOR_NOT_AVAILABLE |
+ Errors.NOT_COORDINATOR => new NotEnoughReplicasException(
+  s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}")

Review Comment:
   nit: Indentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1324869129


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8574,6 +8584,93 @@ public void 
testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep
 assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
 }
 
+@Test
+public void testListGroups() {
+String consumerGroupId = "consumer-group-id";
+String genericGroupId = "generic-group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String genericGroupType = "generic";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10))
+.build();
+context.replay(newGroupMetadataRecord(
+genericGroupId,
+new GroupMetadataValue()
+.setMembers(Collections.emptyList())
+.setGeneration(2)
+.setLeader(null)
+.setProtocolType(genericGroupType)
+.setProtocol("range")
+.setCurrentStateTimestamp(context.time.milliseconds()),
+MetadataVersion.latest()));
+context.commit();
+context.getOrCreateSnapshot();
+GenericGroup genericGroup = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, 
false);
+
+Map actualAllGroupMap =
+context.sendListGroups(Collections.emptyList())
+
.stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+Map expectAllGroupMap =
+Stream.of(new ListGroupsResponseData.ListedGroup()
+.setGroupId(genericGroup.groupId())
+.setProtocolType(genericGroupType)
+.setGroupState(EMPTY.toString()),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId(consumerGroupId)
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString()))
+
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+
+assertEquals(expectAllGroupMap.size(), actualAllGroupMap.size());
+for (Map.Entry entry : 
expectAllGroupMap.entrySet()) {
+assertEquals(entry.getValue(), 
actualAllGroupMap.get(entry.getKey()));
+}
+
+
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new 
ConsumerGroupMember.Builder(memberId1)
+.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+.build()));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11));
+context.commit();

Review Comment:
   Can you be more detailed? I don’t quite understand this comment.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8574,6 +8584,93 @@ public void 
testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep
 assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
 }
 
+@Test
+public void testListGroups() {
+String consumerGroupId = "consumer-group-id";
+String genericGroupId = "generic-group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String genericGroupType = "generic";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10))
+.build();
+context.replay(newGroupMetadataRecord(
+genericGroupId,
+new GroupMetadataValue()
+.setMembers(Collections.emptyList())
+.setGeneration(2)
+.setLeader(null)
+.setProtocolType(genericGroupType)
+.setProtocol("range")
+.setCurrentStateTimestamp(context.time.milliseconds()),
+MetadataVersion.latest()));
+context.commit();
+context.getOrCreateSnapshot();
+GenericGroup genericGroup = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, 
false);
+
+Map actualAllGroupMap

[GitHub] [kafka] splett2 commented on a diff in pull request #14053: KAFKA-15221; Fix the race between fetch requests from a rebooted follower.

2023-09-13 Thread via GitHub


splett2 commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1324845475


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -137,7 +137,8 @@ object Partition {
   delayedOperations = delayedOperations,
   metadataCache = replicaManager.metadataCache,
   logManager = replicaManager.logManager,
-  alterIsrManager = replicaManager.alterPartitionManager)
+  alterIsrManager = replicaManager.alterPartitionManager,
+  zkMigrationEnabled = () => replicaManager.config.migrationEnabled)

Review Comment:
   we can just pass this in as a simple boolean. My understanding is that zk 
migration is not a dynamic config.



##
core/src/main/scala/kafka/cluster/Replica.scala:
##
@@ -98,14 +103,22 @@ class Replica(val brokerId: Int, val topicPartition: 
TopicPartition) extends Log
* fetch request is always smaller than the leader's LEO, which can happen 
if small produce requests are received at
* high frequency.
*/
-  def updateFetchState(
+  def updateFetchStateOrThrow(
 followerFetchOffsetMetadata: LogOffsetMetadata,
 followerStartOffset: Long,
 followerFetchTimeMs: Long,
 leaderEndOffset: Long,
-brokerEpoch: Long
+brokerEpoch: Long,
+verifyBrokerEpoch: Boolean = false

Review Comment:
   I would prefer not to use a default parameter. I guess we did it because 
`updateFetchStateOrThrow` has quite a few callers.
   
   Maybe we can pass the value for `verifyBrokerEpoch` into the `Replica` 
constructor to minimize some of the changes? It also helps us keep the 
`updateFetchState` a bit simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15463) StreamsException: Accessing from an unknown node

2023-09-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15463:

Priority: Major  (was: Blocker)

>  StreamsException: Accessing from an unknown node
> -
>
> Key: KAFKA-15463
> URL: https://issues.apache.org/jira/browse/KAFKA-15463
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.1
>Reporter: Yevgeny
>Priority: Major
>
> After some time application was working fine, starting to get:
>  
> This is springboot application runs in kubernetes as stateful pod.
>  
>  
>  
> {code:java}
>   Exception in thread 
> "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown 
> node at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162)
>  at myclass1.java:28) at myclass2.java:48) at 
> java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at 
> java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
>  at 
> java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637)
>  at myclass3.java:48) at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
>  at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
>    {code}
>  
> stream-thread 
> [-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State 
> transition from PENDING_SHUTDOWN to DEAD
>  
>  
> Transformer is Prototype bean, the supplier supplys new instance of the 
> Transformer:
>  
>  
> {code:java}
> @Override public Transformer> get() 
> {     return ctx.getBean(MyTransformer.class); }{code}
>  
>  
> The only way to recover is to delete all topics used by kafkastreams, even if 
> application restarted same exception is thrown.
> *If messages in internal topics of 'store-changelog'  are deleted/offset 
> manipulated, can it cause the issue?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on a diff in pull request #14322: KAFKA-15418: update statement on decompression

2023-09-13 Thread via GitHub


mjsax commented on code in PR #14322:
URL: https://github.com/apache/kafka/pull/14322#discussion_r1324811185


##
docs/design.html:
##
@@ -136,8 +136,10 @@ 
-Kafka supports this with an efficient batching format. A batch of messages 
can be clumped together compressed and sent to the server in this form. This 
batch of messages will be written in compressed form and will
-remain compressed in the log and will only be decompressed by the consumer.
+Kafka supports this with an efficient batching format. A batch of messages 
can be grouped together, compressed, and sent to the server in this form. The 
broker decompresses the batch in order to validate it. For
+example, it validates that the number of records in the batch is same as 
what batch header states. The broker may also potentially modify the batch 
(e.g., if the topic is compacted, the broker will filter out 

Review Comment:
   Well, that's on the log-cleaner thread, so when actual compaction happens.
   
   This part of the docs is focusing on the write path though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] davetroiano commented on a diff in pull request #14322: KAFKA-15418: update statement on decompression

2023-09-13 Thread via GitHub


davetroiano commented on code in PR #14322:
URL: https://github.com/apache/kafka/pull/14322#discussion_r1324799715


##
docs/design.html:
##
@@ -136,8 +136,10 @@ 
-Kafka supports this with an efficient batching format. A batch of messages 
can be clumped together compressed and sent to the server in this form. This 
batch of messages will be written in compressed form and will
-remain compressed in the log and will only be decompressed by the consumer.
+Kafka supports this with an efficient batching format. A batch of messages 
can be grouped together, compressed, and sent to the server in this form. The 
broker decompresses the batch in order to validate it. For
+example, it validates that the number of records in the batch is same as 
what batch header states. The broker may also potentially modify the batch 
(e.g., if the topic is compacted, the broker will filter out 

Review Comment:
   @divijvaidya @mjsax won't the broker decompress async to perform compaction 
filtering? 
([here](https://github.com/apache/kafka/blob/a7e865c0a756504cc7ae6f4eb0772caddc53/core/src/main/scala/kafka/log/LogCleaner.scala#L801))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


jolshan commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324792282


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2569,6 +2571,70 @@ class ReplicaManagerTest {
 
   assertEquals((Errors.NONE, node0), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
   assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+  // Test we convert the error correctly when trying to append and 
coordinator is not available
+  val tp0 = new TopicPartition(topic, 0)
+  val producerId = 24L
+  val producerEpoch = 0.toShort
+  val sequence = 0
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+  val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = 
Some(txnCoordinatorPartition1))
+  val expectedError = s"Unable to verify the partition has been added to 
the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+  assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+  assertEquals(expectedError, result.assertFired.errorMessage)
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @Test
+  def testVerificationErrorConversions(): Unit = {
+val tp0 = new TopicPartition(topic, 0)
+val producerId = 24L
+val producerEpoch = 0.toShort
+val sequence = 0
+val node = new Node(0, "host1", 0)
+val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+try {
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+
+  val transactionToAdd = new AddPartitionsToTxnTransaction()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setVerifyOnly(true)
+.setTopics(new AddPartitionsToTxnTopicCollection(
+  Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+))
+
+  // Start verification and return the coordinator related errors.
+  var invocations = 1
+  def verifyError(error: Errors): Unit = {
+val expectedMessage = s"Unable to verify the partition has been added 
to the transaction. Underlying error:${error.toString}"
+val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+verify(addPartitionsToTxnManager, 
times(invocations)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+// Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+callback(Map(tp0 -> error).toMap)
+assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+assertEquals(expectedMessage, result.assertFired.errorMessage)
+invocations = invocations + 1
+  }
+
+  Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_))

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-13 Thread via GitHub


zhaohaidao commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1324791489


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -599,6 +608,164 @@ public void testHeartbeatCoordinatorException() throws 
Exception {
 );
 }
 
+@Test
+public void testListGroups() throws ExecutionException, 
InterruptedException, TimeoutException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+int partitionCount = 3;
+service.startup(() -> partitionCount);
+
+ListGroupsRequestData request = new ListGroupsRequestData();
+
+List expectedResults = 
Arrays.asList(
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group0")
+.setGroupState("Stable")
+.setProtocolType("protocol1"),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group1")
+.setGroupState("Empty")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+new ListGroupsResponseData.ListedGroup()
+.setGroupId("group2")
+.setGroupState("Dead")
+.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+);
+Map> expectResultMap 
= new HashMap<>();
+for (ListGroupsResponseData.ListedGroup result : expectedResults) {
+expectResultMap.put(result.groupId(), 
Collections.singletonList(result));
+}
+when(runtime.partitions()).thenReturn(Sets.newSet(
+new TopicPartition("__consumer_offsets", 0),
+new TopicPartition("__consumer_offsets", 1),
+new TopicPartition("__consumer_offsets", 2)));
+for (int i = 0; i < partitionCount; i++) {
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("list-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
i)),
+ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i;
+}
+
+CompletableFuture responseFuture = 
service.listGroups(
+requestContext(ApiKeys.LIST_GROUPS),
+request
+);
+
+List actualResults = 
responseFuture.get(5, TimeUnit.SECONDS).groups();
+assertEquals(expectedResults, actualResults);
+assertEquals(expectResultMap.size(), actualResults.size());
+for (ListGroupsResponseData.ListedGroup result : actualResults) {
+assertEquals(expectResultMap.get(result.groupId()), 
Collections.singletonList(result));
+}

Review Comment:
   Map is used here because the order of the list returned by the function may 
be different from the expected one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14359: KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager

2023-09-13 Thread via GitHub


junrao commented on code in PR #14359:
URL: https://github.com/apache/kafka/pull/14359#discussion_r1323771865


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -63,6 +68,31 @@ public NetworkClientDelegate(
 this.unsentRequests = new ArrayDeque<>();
 this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+this.tryConnectNodes = new HashSet<>();
+}
+
+@Override
+public boolean isUnavailable(Node node) {
+return NetworkClientUtils.isUnavailable(client, node, time);
+}
+
+@Override
+public void maybeThrowAuthFailure(Node node) {
+NetworkClientUtils.maybeThrowAuthFailure(client, node);
+}
+
+@Override
+public void tryConnect(Node node) {

Review Comment:
   `tryConnect` and `maybeTryConnect` seem highly coupled. Why is only the 
former part of the `NodeStatusDetector` interface? Also, it seems that we have 
used `maybeTryConnect`, but not `tryConnect` in this class.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * {@code FetchUtils} provides a place for disparate parts of the fetch logic 
to live.
+ */
+public class FetchUtils {
+
+/**
+ * Performs two combined actions based on the state related to the {@link 
TopicPartition}:
+ *
+ * 
+ * 
+ * Invokes {@link ConsumerMetadata#requestUpdate(boolean)} to 
signal that the metadata is incorrect and
+ * needs to be updated
+ * 
+ * 
+ * Invokes {@link 
SubscriptionState#clearPreferredReadReplica(TopicPartition)} to clear out any 
read replica
+ * information that may be present.
+ * 
+ * 
+ *
+ * This utility method should be invoked if the client detects (or is told 
by a node in the broker) that an
+ * attempt was made to fetch from a node that isn't the leader or 
preferred replica.
+ *
+ * @param metadata {@link ConsumerMetadata} for which to request an update
+ * @param subscriptions {@link SubscriptionState} to clear any internal 
read replica node
+ * @param topicPartition {@link TopicPartition} for which this state 
change is related
+ */
+static void requestMetadataUpdate(final ConsumerMetadata metadata,
+  final SubscriptionState subscriptions,
+  final TopicPartition topicPartition) {
+metadata.requestUpdate(true);

Review Comment:
   This was set to `false`. Why is it changed to `true`? I think `false` is 
correct since we don't want to reset the metadata retry count when receiving 
retriable error due to stale metadata.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeStatusDetector.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.NetworkClientUtils;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationExcep

[GitHub] [kafka] OmniaGM commented on pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-09-13 Thread via GitHub


OmniaGM commented on PR #13204:
URL: https://github.com/apache/kafka/pull/13204#issuecomment-1717905593

   I found out why the tests were failing. It turns out that we are hitting a 
similar problem to this gradle 
[issue#847](https://github.com/gradle/gradle/issues/847) that causing 
transitive dependency problems where `storage:api` and `connect:api` were 
causing unintended conflicts. The gradle issue seems to not be 100% solved so I 
pushed a fix with a workaround that renames the `storage:api` project to 
`storage:storage-api`. This shouldn't impact the name of the final jar.  
   I pushed the changes and am waiting for the pipelines to pass. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on pull request #13201: KAFKA-14596: Move TopicCommand to tools

2023-09-13 Thread via GitHub


OmniaGM commented on PR #13201:
URL: https://github.com/apache/kafka/pull/13201#issuecomment-1717877198

   > Hi @OmniaGM, thanks. I left some comments.
   > 
   > Additionally, this error message is different from previous implementation:
   > 
   > ```shell
   > ### OLD
   > $ bin/kafka-topics.sh --bootstrap-server :9092 --describe --topic 
my-topic-foo --delete-config sdsad
   > Option "[delete-config]" can't be used with option "[describe]"
   > 
   > ### NEW
   > $ bin/kafka-topics.sh --bootstrap-server :9092 --describe --topic 
my-topic-foo --delete-config sdsad
   > Option "[delete-config]" can't be used with option "[bootstrap-server]"
   > ```
   
   Good catch @fvaleri , I missed that `immutable.Set` which is used to define 
invalid options in old scala code does preserve the order of insertion so when 
we check `bootstrap` option isn't the first on the list to compare but instead 
it's `describe`. I updated the code to use `LinkedHashSet` instead of `Set` in 
Java.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


jolshan commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324699341


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig,
 val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
   origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
 debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-
-val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-  val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception(message))
-  )
-}
 
-val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception())
-  )
+def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+  appendResult.map { case (topicPartition, result) =>
+topicPartition -> ProducePartitionStatus(
+  result.info.lastOffset + 1, // required offset
+  new PartitionResponse(
+result.error,
+result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+result.info.lastOffset,
+result.info.logAppendTime,
+result.info.logStartOffset,
+result.info.recordErrors,
+if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage
+  )
+) // response status
+  }
 }
 
-val allResults = localProduceResults ++ unverifiedResults ++ 
errorResults
+val unverifiedResults = unverifiedEntries.map {
+  case (topicPartition, error) =>
+val finalError =
+  error match {
+case Errors.CONCURRENT_TRANSACTIONS |
+ Errors.COORDINATOR_LOAD_IN_PROGRESS |
+ Errors.COORDINATOR_NOT_AVAILABLE |
+ Errors.NOT_COORDINATOR => Errors.NOT_ENOUGH_REPLICAS
+case _ => error
+}
+val message =
+  error match {
+case Errors.INVALID_TXN_STATE => "Partition was not added to 
the transaction"
+case Errors.CONCURRENT_TRANSACTIONS |
+ Errors.COORDINATOR_LOAD_IN_PROGRESS |
+ Errors.COORDINATOR_NOT_AVAILABLE |
+ Errors.NOT_COORDINATOR => s"Unable to verify the 
partition has been added to the transaction. Underlying error:${error.toString}"
+case _ => error.message()
+  }

Review Comment:
   We can.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


jolshan commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324697600


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2569,6 +2571,70 @@ class ReplicaManagerTest {
 
   assertEquals((Errors.NONE, node0), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
   assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+  // Test we convert the error correctly when trying to append and 
coordinator is not available
+  val tp0 = new TopicPartition(topic, 0)
+  val producerId = 24L
+  val producerEpoch = 0.toShort
+  val sequence = 0
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+  val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = 
Some(txnCoordinatorPartition1))
+  val expectedError = s"Unable to verify the partition has been added to 
the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+  assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+  assertEquals(expectedError, result.assertFired.errorMessage)
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @Test
+  def testVerificationErrorConversions(): Unit = {
+val tp0 = new TopicPartition(topic, 0)
+val producerId = 24L
+val producerEpoch = 0.toShort
+val sequence = 0
+val node = new Node(0, "host1", 0)
+val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+try {
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+
+  val transactionToAdd = new AddPartitionsToTxnTransaction()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setVerifyOnly(true)
+.setTopics(new AddPartitionsToTxnTopicCollection(
+  Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+))
+
+  // Start verification and return the coordinator related errors.
+  var invocations = 1
+  def verifyError(error: Errors): Unit = {
+val expectedMessage = s"Unable to verify the partition has been added 
to the transaction. Underlying error:${error.toString}"
+val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+verify(addPartitionsToTxnManager, 
times(invocations)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+// Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+callback(Map(tp0 -> error).toMap)
+assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+assertEquals(expectedMessage, result.assertFired.errorMessage)
+invocations = invocations + 1
+  }
+
+  Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_))

Review Comment:
   It can not be returned in this callback and is why I have a separate test 
for it. But I can add it even if it won't be returned this way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


jolshan commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324696082


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig,
 val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
   origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
 debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-
-val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-  val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception(message))
-  )
-}
 
-val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception())
-  )
+def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+  appendResult.map { case (topicPartition, result) =>
+topicPartition -> ProducePartitionStatus(
+  result.info.lastOffset + 1, // required offset
+  new PartitionResponse(
+result.error,
+result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+result.info.lastOffset,
+result.info.logAppendTime,
+result.info.logStartOffset,
+result.info.recordErrors,
+if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage

Review Comment:
   I'm not sure I understand. Previously the result.info.errorMessage was null 
and it will continue to be so for all error partitions. 
LogAppendInfo.UnknownLogAppendInfo will always have a null error.
   
   I thought for consistency I should not change this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


jolshan commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324692053


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2569,6 +2571,70 @@ class ReplicaManagerTest {
 
   assertEquals((Errors.NONE, node0), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
   assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+  // Test we convert the error correctly when trying to append and 
coordinator is not available
+  val tp0 = new TopicPartition(topic, 0)
+  val producerId = 24L
+  val producerEpoch = 0.toShort
+  val sequence = 0
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+  val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = 
Some(txnCoordinatorPartition1))
+  val expectedError = s"Unable to verify the partition has been added to 
the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+  assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+  assertEquals(expectedError, result.assertFired.errorMessage)
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @Test
+  def testVerificationErrorConversions(): Unit = {
+val tp0 = new TopicPartition(topic, 0)
+val producerId = 24L
+val producerEpoch = 0.toShort
+val sequence = 0
+val node = new Node(0, "host1", 0)
+val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+try {
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+
+  val transactionToAdd = new AddPartitionsToTxnTransaction()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setVerifyOnly(true)
+.setTopics(new AddPartitionsToTxnTopicCollection(
+  Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+))
+
+  // Start verification and return the coordinator related errors.
+  var invocations = 1
+  def verifyError(error: Errors): Unit = {
+val expectedMessage = s"Unable to verify the partition has been added 
to the transaction. Underlying error:${error.toString}"
+val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+verify(addPartitionsToTxnManager, 
times(invocations)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+// Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+callback(Map(tp0 -> error).toMap)
+assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+assertEquals(expectedMessage, result.assertFired.errorMessage)
+invocations = invocations + 1

Review Comment:
   I thought it was a lot of extra overhead to create a new replica manager 
every time -- especially since we can just make another append call. What do 
you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15163) Implement validatePositions functionality for new KafkaConsumer

2023-09-13 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15163.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

This is covered in https://github.com/apache/kafka/pull/14346.

> Implement validatePositions functionality for new KafkaConsumer
> ---
>
> Key: KAFKA-15163
> URL: https://issues.apache.org/jira/browse/KAFKA-15163
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> Introduce support for validating positions in the new OffsetsRequestManager. 
> This task will include a new event for the validatePositions calls performed 
> from the new consumer, and the logic for handling such events in the 
> OffsetRequestManager.
> The validate positions implementation will keep the same behaviour as the one 
> in the old consumer, but adapted to the new threading model. So it is based 
> in a VALIDATE_POSITIONS events that is submitted to the background thread, 
> and the processed by the ApplicationEventProcessor. The processing itself is 
> done by the OffsetRequestManager given that this will require an 
> OFFSET_FOR_LEADER_EPOCH request. This task will introduce support for such 
> requests in the OffsetRequestManager, responsible for offset-related requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14384: KAFKA-15415 On producer-batch retry, skip-backoff on a new leader

2023-09-13 Thread via GitHub


AndrewJSchofield commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1324590647


##
clients/src/main/java/org/apache/kafka/common/PartitionInfo.java:
##
@@ -60,12 +69,19 @@ public int partition() {
 }
 
 /**
- * The node id of the node currently acting as a leader for this partition 
or null if there is no leader
+ * The node currently acting as a leader for this partition or null if 
there is no leader
  */
 public Node leader() {
 return leader;
 }
 
+/**
+ * The epoch of the partition's leader.

Review Comment:
   I think it's the "The leader epoch of this partition if known".



##
clients/src/main/java/org/apache/kafka/common/PartitionInfo.java:
##
@@ -20,26 +20,35 @@
  * This is used to describe per-partition state in the MetadataResponse.
  */
 public class PartitionInfo {
+public static final int UNKNOWN_LEADER_EPOCH = -1;
 private final String topic;
 private final int partition;
 private final Node leader;
+private final int leaderEpoch;
 private final Node[] replicas;
 private final Node[] inSyncReplicas;
 private final Node[] offlineReplicas;
 
 public PartitionInfo(String topic, int partition, Node leader, Node[] 
replicas, Node[] inSyncReplicas) {
-this(topic, partition, leader, replicas, inSyncReplicas, new Node[0]);
+this(topic, partition, leader, UNKNOWN_LEADER_EPOCH, replicas, 
inSyncReplicas, new Node[0]);

Review Comment:
   And then `Optional.empty()` of course.



##
clients/src/main/java/org/apache/kafka/common/Cluster.java:
##
@@ -277,6 +277,19 @@ public Node leaderFor(TopicPartition topicPartition) {
 return info.leader();
 }
 
+/**
+ * Get the current leader's epoch for the given topic-partition.
+ * @param topicPartition
+ * @return The epoch for partition's leader, or UNKNOWN_LEADER_EPOCH if 
epoch unkown.

Review Comment:
   Typo. "unkown" -> "unknown".



##
clients/src/main/java/org/apache/kafka/common/PartitionInfo.java:
##
@@ -20,26 +20,35 @@
  * This is used to describe per-partition state in the MetadataResponse.
  */
 public class PartitionInfo {
+public static final int UNKNOWN_LEADER_EPOCH = -1;
 private final String topic;
 private final int partition;
 private final Node leader;
+private final int leaderEpoch;

Review Comment:
   I think it's more aligned with the existing Kafka interface to use 
`Optional leaderEpoch`. For example, 
`org.apache.kafka.clients.consumer.OffsetAndMetadata`.



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
 this.isSplitBatch = isSplitBatch;
 float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),

 recordsBuilder.compressionType());
+this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+this.leaderChangedAttempts = -1;
 
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
 }
 
+/*
+ * Returns whether the leader epoch has changed since the last attempt.
+ * @param latestLeaderEpoch The latest leader epoch.
+ * @return true if the leader has changed, otherwise false.
+ */
+boolean hasLeaderChanged(int latestLeaderEpoch) {
+boolean leaderChanged = false;
+// Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",

Review Comment:
   There are 4 separate calls to `attempts()` (or the equivalent of 
`attempts.get()` in this method. Capture the value with a single call and use 
instead.



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
 this.isSplitBatch = isSplitBatch;
 float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),

 recordsBuilder.compressionType());
+this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+this.leaderChangedAttempts = -1;

Review Comment:
   I don't think there's any need for a special value of -1. Initializing to 0 
would work fine.



##
clients/src/main/java/org/apache/kafka/common/Cluster.java:
##
@@ -277,6 +277,19 @@ public Node leaderFor(TopicPartition topicPartition) {
 return info.leader();
 }
 
+/**
+ * Get the current leader's epoch 

[jira] [Resolved] (KAFKA-15115) Implement resetPositions functionality in OffsetsRequestManager

2023-09-13 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15115.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

merged the PR to trunk

> Implement resetPositions functionality in OffsetsRequestManager
> ---
>
> Key: KAFKA-15115
> URL: https://issues.apache.org/jira/browse/KAFKA-15115
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> Introduce support for resetting positions in the new OffsetsRequestManager. 
> This task will include a new event for the resetPositions calls performed 
> from the new consumer, and the logic for handling such events in the 
> OffsetRequestManager.
> The reset positions implementation will keep the same behaviour as the one in 
> the old consumer, but adapted to the new threading model. So it is based in a 
> RESET_POSITIONS events that is submitted to the background thread, and then 
> processed by the ApplicationEventProcessor. The processing itself is done by 
> the OffsetRequestManager given that this will require a LIST_OFFSETS request 
> for the partitions awaiting reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] junrao merged pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration

2023-09-13 Thread via GitHub


junrao merged PR #14346:
URL: https://github.com/apache/kafka/pull/14346


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tylerbertrand commented on a diff in pull request #14344: Resolve checkstyle cache miss

2023-09-13 Thread via GitHub


tylerbertrand commented on code in PR #14344:
URL: https://github.com/apache/kafka/pull/14344#discussion_r1324625070


##
build.gradle:
##
@@ -685,6 +685,7 @@ subprojects {
   }
 
   checkstyle {
+configDirectory = rootProject.layout.projectDirectory.dir("checkstyle")

Review Comment:
   These all point to the same directory, though 
`rootProject.layout.projectDirectory.dir("checkstyle")` uses lazy properties, 
which are generally preferred.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tylerbertrand commented on a diff in pull request #14344: Resolve checkstyle cache miss

2023-09-13 Thread via GitHub


tylerbertrand commented on code in PR #14344:
URL: https://github.com/apache/kafka/pull/14344#discussion_r1324618554


##
build.gradle:
##
@@ -685,6 +685,7 @@ subprojects {
   }
 
   checkstyle {
+configDirectory = rootProject.layout.projectDirectory.dir("checkstyle")
 configFile = new File(rootDir, "checkstyle/checkstyle.xml")

Review Comment:
   Good point. It turns out `${configDir}/checkstyle.xml` is [the default 
location](https://docs.gradle.org/current/dsl/org.gradle.api.plugins.quality.CheckstyleExtension.html#org.gradle.api.plugins.quality.CheckstyleExtension:configFile),
 so there is actually no need to specify `configFile` in this case. I pushed a 
commit to remove that line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tylerbertrand commented on a diff in pull request #14344: Resolve checkstyle cache miss

2023-09-13 Thread via GitHub


tylerbertrand commented on code in PR #14344:
URL: https://github.com/apache/kafka/pull/14344#discussion_r1324618554


##
build.gradle:
##
@@ -685,6 +685,7 @@ subprojects {
   }
 
   checkstyle {
+configDirectory = rootProject.layout.projectDirectory.dir("checkstyle")
 configFile = new File(rootDir, "checkstyle/checkstyle.xml")

Review Comment:
   Good point. It turns out `${configDirectory}/checkstyle.xml` is [the default 
location](https://docs.gradle.org/current/dsl/org.gradle.api.plugins.quality.CheckstyleExtension.html#org.gradle.api.plugins.quality.CheckstyleExtension:configFile),
 so there is actually no need to specify `configFile` in this case. I pushed a 
commit to remove that line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #14382: KAFKA-15442: add a section in doc for tiered storage

2023-09-13 Thread via GitHub


satishd commented on PR #14382:
URL: https://github.com/apache/kafka/pull/14382#issuecomment-1717677707

   >That sounds good. But since we don't have much time left for v3.6.0 RC 
build, we can add them after release. Doc updates can directly update in 
kafka-site repo. I'll add them in a separate PR.
   
   +1 on that. We can improve the doc in a followup.
   
   @kamalcph @showuon We can add quick start steps in the docs later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15160) Message bytes duplication in Kafka headers when compression is enabled

2023-09-13 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764677#comment-17764677
 ] 

Phuc Hong Tran commented on KAFKA-15160:


[~vikashmishra0808] Sorry for the delay, I'm investigating it

> Message bytes duplication in Kafka headers when compression is enabled
> --
>
> Key: KAFKA-15160
> URL: https://issues.apache.org/jira/browse/KAFKA-15160
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, compression, consumer
>Affects Versions: 3.2.3, 3.3.2
>Reporter: Vikash Mishra
>Assignee: Phuc Hong Tran
>Priority: Critical
> Attachments: java heap dump.png, wireshark-min.png
>
>
> I created a spring Kafka consumer using @KafkaListener.
> During this, I encounter a scenario where when data is compressed ( any 
> compression snappy/gzip) and consumed by the consumer then I see that in a 
> heap dump, there is a " byte" occupying the same amount of memory as in 
> Message value.
> This behavior is seen only in cases when compressed data is consumed by 
> consumers not in the case of uncompressed data.
> Tried to capture Kafka's message through Wireshark, there it shows the proper 
> size of data incoming from Kafka server & no extra bytes in headers. So, this 
> is definitely something in Kafka client. Spring doesn't do any actions about 
> compression; the whole functionality is done internally in the Kafka client 
> library.
> Attached is the screenshot of the heap dump and Wireshark.
> This seems like a critical issue as message size in memory almost gets 
> doubles impacting consumer memory and performance. Somewhere it feels like 
> the actual message value is copied to headers?
> *To Reproduce*
>  # Produce compressed data on any topic.
>  # Create a simple consumer consuming from the above-created topic.
>  # Capture heap dump.
> *Expected behavior*
> Headers should not show bytes consuming memory equivalent to value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >