This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 94f50393504 KAFKA-17378 Fixes for performance testing (#16942) 94f50393504 is described below commit 94f5039350432e25191d708fd8500c6e285f7bd9 Author: Andrew Schofield <aschofi...@confluent.io> AuthorDate: Wed Aug 21 23:53:21 2024 +0100 KAFKA-17378 Fixes for performance testing (#16942) Reviewers: Apoorv Mittal <apoorvmitta...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../internals/ShareConsumeRequestManager.java | 49 ++++++++++++---------- .../java/kafka/server/share/SharePartition.java | 24 +++++------ .../kafka/server/share/SharePartitionManager.java | 2 +- 3 files changed, 40 insertions(+), 35 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 92da67600c6..4c83c99a657 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -259,36 +259,41 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi long currentTimeMs, boolean onCommitAsync, AtomicBoolean isAsyncDone) { - if (acknowledgeRequestState == null || (!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) { - if (onCommitAsync) { - isAsyncDone.set(true); + boolean asyncDone = true; + try { + if (acknowledgeRequestState == null || (!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) { + return Optional.empty(); } - return Optional.empty(); - } else if (!acknowledgeRequestState.maybeExpire()) { - if (acknowledgeRequestState.canSendRequest(currentTimeMs)) { - acknowledgeRequestState.onSendAttempt(currentTimeMs); - if (onCommitAsync) { - isAsyncDone.set(true); + + if (acknowledgeRequestState.maybeExpire()) { + // Fill in TimeoutException + for (TopicIdPartition tip : acknowledgeRequestState.incompleteAcknowledgements.keySet()) { + metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip)); + acknowledgeRequestState.handleAcknowledgeTimedOut(tip); } - return Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs)); - } else { + acknowledgeRequestState.incompleteAcknowledgements.clear(); + return Optional.empty(); + } + + if (!acknowledgeRequestState.canSendRequest(currentTimeMs)) { // We wait for the backoff before we can send this request. - if (onCommitAsync) { - isAsyncDone.set(false); - } + asyncDone = false; + return Optional.empty(); } - } else { - // Fill in TimeoutException - for (TopicIdPartition tip : acknowledgeRequestState.incompleteAcknowledgements.keySet()) { - metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip)); - acknowledgeRequestState.handleAcknowledgeTimedOut(tip); + + UnsentRequest request = acknowledgeRequestState.buildRequest(currentTimeMs); + if (request == null) { + asyncDone = false; + return Optional.empty(); } - acknowledgeRequestState.incompleteAcknowledgements.clear(); + + acknowledgeRequestState.onSendAttempt(currentTimeMs); + return Optional.of(request); + } finally { if (onCommitAsync) { - isAsyncDone.set(true); + isAsyncDone.set(asyncDone); } } - return Optional.empty(); } /** diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index b49a05d2ddb..cb507f6a8be 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -351,7 +351,7 @@ public class SharePartition { String memberId, FetchPartitionData fetchPartitionData ) { - log.trace("Received acquire request for share partition: {}-{}", memberId, fetchPartitionData); + log.trace("Received acquire request for share partition: {}-{} memberId: {}", groupId, topicIdPartition, memberId); RecordBatch lastBatch = fetchPartitionData.records.lastBatch().orElse(null); if (lastBatch == null) { // Nothing to acquire. @@ -386,7 +386,7 @@ public class SharePartition { } log.trace("Overlap exists with in-flight records. Acquire the records if available for" - + " the share group: {}-{}", groupId, topicIdPartition); + + " the share partition: {}-{}", groupId, topicIdPartition); List<AcquiredRecords> result = new ArrayList<>(); // The fetched records are already part of the in-flight records. The records might // be available for re-delivery hence try acquiring same. The request batches could @@ -399,7 +399,7 @@ public class SharePartition { if (!fullMatch || inFlightBatch.offsetState() != null) { log.trace("Subset or offset tracked batch record found for share partition," + " batch: {} request offsets - first: {}, last: {} for the share" - + " group: {}-{}", inFlightBatch, firstBatch.baseOffset(), + + " partition: {}-{}", inFlightBatch, firstBatch.baseOffset(), lastBatch.lastOffset(), groupId, topicIdPartition); if (inFlightBatch.offsetState() == null) { // Though the request is a subset of in-flight batch but the offset @@ -408,7 +408,7 @@ public class SharePartition { // complete batch is available yet. Hence, do a pre-check to avoid exploding // the in-flight offset tracking unnecessarily. if (inFlightBatch.batchState() != RecordState.AVAILABLE) { - log.trace("The batch is not available to acquire in share group: {}-{}, skipping: {}" + log.trace("The batch is not available to acquire in share partition: {}-{}, skipping: {}" + " skipping offset tracking for batch as well.", groupId, topicIdPartition, inFlightBatch); continue; @@ -423,14 +423,14 @@ public class SharePartition { // The in-flight batch is a full match hence change the state of the complete batch. if (inFlightBatch.batchState() != RecordState.AVAILABLE) { - log.trace("The batch is not available to acquire in share group: {}-{}, skipping: {}", + log.trace("The batch is not available to acquire in share partition: {}-{}, skipping: {}", groupId, topicIdPartition, inFlightBatch); continue; } InFlightState updateResult = inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, true, maxDeliveryCount, memberId); if (updateResult == null) { - log.info("Unable to acquire records for the batch: {} in share group: {}-{}", + log.info("Unable to acquire records for the batch: {} in share partition: {}-{}", inFlightBatch, groupId, topicIdPartition); continue; } @@ -972,7 +972,7 @@ public class SharePartition { if (offsetState.getValue().state != RecordState.AVAILABLE) { log.trace("The offset is not available skipping, offset: {} batch: {}" - + " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, + + " for the share partition: {}-{}", offsetState.getKey(), inFlightBatch, groupId, topicIdPartition); continue; } @@ -981,7 +981,7 @@ public class SharePartition { memberId); if (updateResult == null) { log.trace("Unable to acquire records for the offset: {} in batch: {}" - + " for the share group: {}-{}", offsetState.getKey(), inFlightBatch, + + " for the share partition: {}-{}", offsetState.getKey(), inFlightBatch, groupId, topicIdPartition); continue; } @@ -1601,7 +1601,7 @@ public class SharePartition { if (!stateBatches.isEmpty() && !isWriteShareGroupStateSuccessful(stateBatches)) { // Even if write share group state RPC call fails, we will still go ahead with the state transition. - log.error("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId {}. " + + log.error("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}. " + "Proceeding with state transition.", groupId, topicIdPartition, memberId); } @@ -1637,7 +1637,7 @@ public class SharePartition { return; } log.debug("The batch is not in acquired state while release of acquisition lock on timeout, skipping, batch: {}" - + " for the share group: {}-{}-{}", inFlightBatch, groupId, memberId, topicIdPartition); + + " for the share partition: {}-{} memberId: {}", inFlightBatch, groupId, topicIdPartition, memberId); } private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFlightBatch, @@ -1658,7 +1658,7 @@ public class SharePartition { } if (offsetState.getValue().state != RecordState.ACQUIRED) { log.debug("The offset is not in acquired state while release of acquisition lock on timeout, skipping, offset: {} batch: {}" - + " for the share group: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch, + + " for the share partition: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch, groupId, topicIdPartition, memberId); continue; } @@ -1669,7 +1669,7 @@ public class SharePartition { EMPTY_MEMBER_ID); if (updateResult == null) { log.error("Unable to release acquisition lock on timeout for the offset: {} in batch: {}" - + " for the share group: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch, + + " for the share partition: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch, groupId, topicIdPartition, memberId); continue; } diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 9141416cf48..f62d5ba4a60 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -461,7 +461,7 @@ public class SharePartitionManager implements AutoCloseable { if (shareSession.epoch != reqMetadata.epoch()) { log.debug("Share session error for {}: expected epoch {}, but got {} instead", key, shareSession.epoch, reqMetadata.epoch()); - throw Errors.INVALID_SHARE_SESSION_EPOCH.exception(); + throw Errors.INVALID_SHARE_SESSION_EPOCH.exception(); } else { cache.touch(shareSession, time.milliseconds()); shareSession.epoch = ShareRequestMetadata.nextEpoch(shareSession.epoch);