junrao commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1815510878
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1525,6 +1537,24 @@ private Optional<Throwable> acknowledgeCompleteBatch(
return Optional.empty();
}
+ protected void updateLatestFetchOffsetMetadata(LogOffsetMetadata
fetchOffsetMetadata) {
+ lock.writeLock().lock();
+ try {
+ latestFetchOffsetMetadata = fetchOffsetMetadata;
Review Comment:
We need to reset `latestFetchOffsetMetadata` every time `endOffset` changes.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -542,6 +552,8 @@ public ShareAcquiredRecords acquire(
RecordBatch firstBatch =
fetchPartitionData.records.batches().iterator().next();
lock.writeLock().lock();
try {
+ // Update the latest fetch offset metadata for any future queries.
+ this.latestFetchOffsetMetadata = fetchOffsetMetadata;
Review Comment:
The `acquire` call always comes after `DelayedShareFetch.tryComplete`, which
already updates `latestFetchOffsetMetadata`. So, it seems that we don't need
to update `latestFetchOffsetMetadata` in `acquire`?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +202,117 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ // In case, fetch offset metadata doesn't exist for any topic partition in
the list of topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private void
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ Map<TopicIdPartition, FetchPartitionOffsetData>
replicaManagerReadResponseData = null;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.error("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ if (sharePartition.latestFetchOffsetMetadata() == null) {
Review Comment:
Perhaps it's clearer to make sharePartition.latestFetchOffsetMetadata() an
Optional?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -153,13 +141,23 @@ public boolean tryComplete() {
shareFetchData.groupId(), shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());
- topicPartitionDataFromTryComplete = acquirablePartitions();
+ Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
acquirablePartitions();
- if (!topicPartitionDataFromTryComplete.isEmpty())
- return forceComplete();
- log.info("Can't acquire records for any partition in the share fetch
request for group {}, member {}, " +
- "topic partitions {}", shareFetchData.groupId(),
- shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());
+ if (!topicPartitionData.isEmpty()) {
+
maybeUpdateFetchOffsetMetadataForTopicPartitions(topicPartitionData);
Review Comment:
It's possible that `maybeUpdateFetchOffsetMetadataForTopicPartitions` calls
`readFromLog`, which returns enough bytes. In that case, it's more efficient to
reuse the fetched result instead of calling `readFromLog` again in `onComplete`.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +202,117 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ // In case, fetch offset metadata doesn't exist for any topic partition in
the list of topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private void
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ Map<TopicIdPartition, FetchPartitionOffsetData>
replicaManagerReadResponseData = null;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.error("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ if (sharePartition.latestFetchOffsetMetadata() == null) {
+ if (replicaManagerReadResponseData == null) {
+ replicaManagerReadResponseData =
readFromLog(topicPartitionData);
Review Comment:
Hmm, we don't want to read all partitions if only one partition's offset
metadata is missing, right?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +202,117 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ // In case, fetch offset metadata doesn't exist for any topic partition in
the list of topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private void
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ Map<TopicIdPartition, FetchPartitionOffsetData>
replicaManagerReadResponseData = null;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.error("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
+ }
+ if (sharePartition.latestFetchOffsetMetadata() == null) {
+ if (replicaManagerReadResponseData == null) {
+ replicaManagerReadResponseData =
readFromLog(topicPartitionData);
+ }
+ FetchPartitionOffsetData fetchPartitionOffsetData =
replicaManagerReadResponseData.get(topicIdPartition);
+ if (fetchPartitionOffsetData == null) {
+ log.debug("Replica manager read log result {} does not
contain topic partition {}",
+ replicaManagerReadResponseData, topicIdPartition);
+ continue;
+ }
+
sharePartition.updateLatestFetchOffsetMetadata(fetchPartitionOffsetData.logOffsetMetadata());
+ }
+ }
+ }
+
+ private boolean isMinBytesSatisfied(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ AtomicLong accumulatedSize = new AtomicLong(0);
Review Comment:
Could this just be a `long`?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +202,117 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ // In case, fetch offset metadata doesn't exist for any topic partition in
the list of topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private void
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ Map<TopicIdPartition, FetchPartitionOffsetData>
replicaManagerReadResponseData = null;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
+ if (sharePartition == null) {
+ log.error("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+ continue;
Review Comment:
In DelayedFetch, if a partition no longer exists, we complete the operation
immediately.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -277,6 +279,12 @@ public static RecordState forId(byte id) {
*/
private final ReplicaManager replicaManager;
+ /**
+ * We maintain the latest fetch offset metadata in order to know the last
segment position that has been fetched
Review Comment:
1. Perhaps "We maintain the latest fetch offset metadata to estimate the
minBytes requirement more efficiently."?
2. Also, could we keep it together with `endOffset` since they are related.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]