This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 94f50393504 KAFKA-17378 Fixes for performance testing (#16942)
94f50393504 is described below

commit 94f5039350432e25191d708fd8500c6e285f7bd9
Author: Andrew Schofield <aschofi...@confluent.io>
AuthorDate: Wed Aug 21 23:53:21 2024 +0100

    KAFKA-17378 Fixes for performance testing (#16942)
    
    Reviewers: Apoorv Mittal <apoorvmitta...@gmail.com>, Chia-Ping Tsai 
<chia7...@gmail.com>
---
 .../internals/ShareConsumeRequestManager.java      | 49 ++++++++++++----------
 .../java/kafka/server/share/SharePartition.java    | 24 +++++------
 .../kafka/server/share/SharePartitionManager.java  |  2 +-
 3 files changed, 40 insertions(+), 35 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 92da67600c6..4c83c99a657 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -259,36 +259,41 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                                                       long currentTimeMs,
                                                       boolean onCommitAsync,
                                                       AtomicBoolean 
isAsyncDone) {
-        if (acknowledgeRequestState == null || 
(!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) {
-            if (onCommitAsync) {
-                isAsyncDone.set(true);
+        boolean asyncDone = true;
+        try {
+            if (acknowledgeRequestState == null || 
(!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) {
+                return Optional.empty();
             }
-            return Optional.empty();
-        } else if (!acknowledgeRequestState.maybeExpire()) {
-            if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
-                acknowledgeRequestState.onSendAttempt(currentTimeMs);
-                if (onCommitAsync) {
-                    isAsyncDone.set(true);
+
+            if (acknowledgeRequestState.maybeExpire()) {
+                // Fill in TimeoutException
+                for (TopicIdPartition tip : 
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+                    
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+                    acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
                 }
-                return 
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
-            } else {
+                acknowledgeRequestState.incompleteAcknowledgements.clear();
+                return Optional.empty();
+            }
+
+            if (!acknowledgeRequestState.canSendRequest(currentTimeMs)) {
                 // We wait for the backoff before we can send this request.
-                if (onCommitAsync) {
-                    isAsyncDone.set(false);
-                }
+                asyncDone = false;
+                return Optional.empty();
             }
-        } else {
-            // Fill in TimeoutException
-            for (TopicIdPartition tip : 
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
-                
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
-                acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+
+            UnsentRequest request = 
acknowledgeRequestState.buildRequest(currentTimeMs);
+            if (request == null) {
+                asyncDone = false;
+                return Optional.empty();
             }
-            acknowledgeRequestState.incompleteAcknowledgements.clear();
+
+            acknowledgeRequestState.onSendAttempt(currentTimeMs);
+            return Optional.of(request);
+        } finally {
             if (onCommitAsync) {
-                isAsyncDone.set(true);
+                isAsyncDone.set(asyncDone);
             }
         }
-        return Optional.empty();
     }
 
     /**
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index b49a05d2ddb..cb507f6a8be 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -351,7 +351,7 @@ public class SharePartition {
         String memberId,
         FetchPartitionData fetchPartitionData
     ) {
-        log.trace("Received acquire request for share partition: {}-{}", 
memberId, fetchPartitionData);
+        log.trace("Received acquire request for share partition: {}-{} 
memberId: {}", groupId, topicIdPartition, memberId);
         RecordBatch lastBatch = 
fetchPartitionData.records.lastBatch().orElse(null);
         if (lastBatch == null) {
             // Nothing to acquire.
@@ -386,7 +386,7 @@ public class SharePartition {
             }
 
             log.trace("Overlap exists with in-flight records. Acquire the 
records if available for"
-                + " the share group: {}-{}", groupId, topicIdPartition);
+                + " the share partition: {}-{}", groupId, topicIdPartition);
             List<AcquiredRecords> result = new ArrayList<>();
             // The fetched records are already part of the in-flight records. 
The records might
             // be available for re-delivery hence try acquiring same. The 
request batches could
@@ -399,7 +399,7 @@ public class SharePartition {
                 if (!fullMatch || inFlightBatch.offsetState() != null) {
                     log.trace("Subset or offset tracked batch record found for 
share partition,"
                             + " batch: {} request offsets - first: {}, last: 
{} for the share"
-                            + " group: {}-{}", inFlightBatch, 
firstBatch.baseOffset(),
+                            + " partition: {}-{}", inFlightBatch, 
firstBatch.baseOffset(),
                         lastBatch.lastOffset(), groupId, topicIdPartition);
                     if (inFlightBatch.offsetState() == null) {
                         // Though the request is a subset of in-flight batch 
but the offset
@@ -408,7 +408,7 @@ public class SharePartition {
                         // complete batch is available yet. Hence, do a 
pre-check to avoid exploding
                         // the in-flight offset tracking unnecessarily.
                         if (inFlightBatch.batchState() != 
RecordState.AVAILABLE) {
-                            log.trace("The batch is not available to acquire 
in share group: {}-{}, skipping: {}"
+                            log.trace("The batch is not available to acquire 
in share partition: {}-{}, skipping: {}"
                                     + " skipping offset tracking for batch as 
well.", groupId,
                                 topicIdPartition, inFlightBatch);
                             continue;
@@ -423,14 +423,14 @@ public class SharePartition {
 
                 // The in-flight batch is a full match hence change the state 
of the complete batch.
                 if (inFlightBatch.batchState() != RecordState.AVAILABLE) {
-                    log.trace("The batch is not available to acquire in share 
group: {}-{}, skipping: {}",
+                    log.trace("The batch is not available to acquire in share 
partition: {}-{}, skipping: {}",
                         groupId, topicIdPartition, inFlightBatch);
                     continue;
                 }
 
                 InFlightState updateResult = 
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, true, maxDeliveryCount, 
memberId);
                 if (updateResult == null) {
-                    log.info("Unable to acquire records for the batch: {} in 
share group: {}-{}",
+                    log.info("Unable to acquire records for the batch: {} in 
share partition: {}-{}",
                         inFlightBatch, groupId, topicIdPartition);
                     continue;
                 }
@@ -972,7 +972,7 @@ public class SharePartition {
 
                 if (offsetState.getValue().state != RecordState.AVAILABLE) {
                     log.trace("The offset is not available skipping, offset: 
{} batch: {}"
-                            + " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+                            + " for the share partition: {}-{}", 
offsetState.getKey(), inFlightBatch,
                         groupId, topicIdPartition);
                     continue;
                 }
@@ -981,7 +981,7 @@ public class SharePartition {
                     memberId);
                 if (updateResult == null) {
                     log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
-                            + " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+                            + " for the share partition: {}-{}", 
offsetState.getKey(), inFlightBatch,
                         groupId, topicIdPartition);
                     continue;
                 }
@@ -1601,7 +1601,7 @@ public class SharePartition {
             if (!stateBatches.isEmpty() && 
!isWriteShareGroupStateSuccessful(stateBatches)) {
 
                 // Even if write share group state RPC call fails, we will 
still go ahead with the state transition.
-                log.error("Failed to write the share group state on 
acquisition lock timeout for share partition: {}-{} memberId {}. " +
+                log.error("Failed to write the share group state on 
acquisition lock timeout for share partition: {}-{} memberId: {}. " +
                                 "Proceeding with state transition.", groupId, 
topicIdPartition, memberId);
             }
 
@@ -1637,7 +1637,7 @@ public class SharePartition {
             return;
         }
         log.debug("The batch is not in acquired state while release of 
acquisition lock on timeout, skipping, batch: {}"
-                + " for the share group: {}-{}-{}", inFlightBatch, groupId, 
memberId, topicIdPartition);
+                + " for the share partition: {}-{} memberId: {}", 
inFlightBatch, groupId, topicIdPartition, memberId);
     }
 
     private void 
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFlightBatch,
@@ -1658,7 +1658,7 @@ public class SharePartition {
             }
             if (offsetState.getValue().state != RecordState.ACQUIRED) {
                 log.debug("The offset is not in acquired state while release 
of acquisition lock on timeout, skipping, offset: {} batch: {}"
-                                + " for the share group: {}-{} memberId: {}", 
offsetState.getKey(), inFlightBatch,
+                                + " for the share partition: {}-{} memberId: 
{}", offsetState.getKey(), inFlightBatch,
                         groupId, topicIdPartition, memberId);
                 continue;
             }
@@ -1669,7 +1669,7 @@ public class SharePartition {
                     EMPTY_MEMBER_ID);
             if (updateResult == null) {
                 log.error("Unable to release acquisition lock on timeout for 
the offset: {} in batch: {}"
-                                + " for the share group: {}-{} memberId: {}", 
offsetState.getKey(), inFlightBatch,
+                                + " for the share partition: {}-{} memberId: 
{}", offsetState.getKey(), inFlightBatch,
                         groupId, topicIdPartition, memberId);
                 continue;
             }
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 9141416cf48..f62d5ba4a60 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -461,7 +461,7 @@ public class SharePartitionManager implements AutoCloseable 
{
                     if (shareSession.epoch != reqMetadata.epoch()) {
                         log.debug("Share session error for {}: expected epoch 
{}, but got {} instead", key,
                                 shareSession.epoch, reqMetadata.epoch());
-                        throw  Errors.INVALID_SHARE_SESSION_EPOCH.exception();
+                        throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
                     } else {
                         cache.touch(shareSession, time.milliseconds());
                         shareSession.epoch = 
ShareRequestMetadata.nextEpoch(shareSession.epoch);

Reply via email to