junrao commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1826157479
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1576,6 +1582,34 @@ private Optional<Throwable> acknowledgeCompleteBatch(
return Optional.empty();
}
+ protected void updateEndOffsetAndFetchOffsetMetadata(long endOffset,
Optional<LogOffsetMetadata> fetchOffsetMetadata) {
Review Comment:
There is no need to pass in `fetchOffsetMetadata` since it's always empty.
updateEndOffsetAndFetchOffsetMetadata =>
updateEndOffsetAndResetFetchOffsetMetadata?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,47 @@ public void onComplete() {
*/
@Override
public boolean tryComplete() {
- topicPartitionDataFromTryComplete = acquirablePartitions();
+ if (anySharePartitionNoLongerExists()) {
Review Comment:
This problem is still there?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,47 @@ public void onComplete() {
*/
@Override
public boolean tryComplete() {
- topicPartitionDataFromTryComplete = acquirablePartitions();
+ if (anySharePartitionNoLongerExists()) {
+ return forceComplete();
+ }
+ Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
acquirablePartitions();
- if (!topicPartitionDataFromTryComplete.isEmpty()) {
+ try {
+ if (!topicPartitionData.isEmpty()) {
+ Map<TopicIdPartition, LogReadResult>
replicaManagerReadResponse =
maybeReadFromLogAndUpdateFetchOffsetMetadata(topicPartitionData);
+ if
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) ||
isMinBytesSatisfied(topicPartitionData)) {
+ partitionsToComplete = topicPartitionData;
+ if (!replicaManagerReadResponse.isEmpty())
+ partitionsAlreadyFetched = replicaManagerReadResponse;
+ boolean completedByMe = forceComplete();
+ // If invocation of forceComplete is not successful, then
that means the request is already completed
+ // hence release the acquired locks.
+ if (!completedByMe) {
+ releasePartitionLocks(partitionsToComplete.keySet());
Review Comment:
Should we reset `partitionsToComplete` and `partitionsAlreadyFetched` too
when we release the locks?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1576,6 +1582,34 @@ private Optional<Throwable> acknowledgeCompleteBatch(
return Optional.empty();
}
+ protected void updateEndOffsetAndFetchOffsetMetadata(long endOffset,
Optional<LogOffsetMetadata> fetchOffsetMetadata) {
+ lock.writeLock().lock();
+ try {
+ this.endOffset = endOffset;
Review Comment:
Let's be consistent with the usage of `this`. Most other places don't use
`this`.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1576,6 +1582,34 @@ private Optional<Throwable> acknowledgeCompleteBatch(
return Optional.empty();
}
+ protected void updateEndOffsetAndFetchOffsetMetadata(long endOffset,
Optional<LogOffsetMetadata> fetchOffsetMetadata) {
+ lock.writeLock().lock();
Review Comment:
All callers hold the lock. So we could remove the locking here and add a
comment that the caller is expected to hold the lock when calling this method.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,47 @@ public void onComplete() {
*/
@Override
public boolean tryComplete() {
- topicPartitionDataFromTryComplete = acquirablePartitions();
+ if (anySharePartitionNoLongerExists()) {
+ return forceComplete();
+ }
+ Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
acquirablePartitions();
- if (!topicPartitionDataFromTryComplete.isEmpty()) {
+ try {
+ if (!topicPartitionData.isEmpty()) {
+ Map<TopicIdPartition, LogReadResult>
replicaManagerReadResponse =
maybeReadFromLogAndUpdateFetchOffsetMetadata(topicPartitionData);
+ if
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) ||
isMinBytesSatisfied(topicPartitionData)) {
+ partitionsToComplete = topicPartitionData;
+ if (!replicaManagerReadResponse.isEmpty())
Review Comment:
This check is unnecessary since partitionsAlreadyFetched initializes to
empty.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +239,159 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
- topicIdPartitions.forEach(tp -> {
- SharePartition sharePartition =
sharePartitionManager.sharePartition(groupId, tp);
+ // In case, fetch offset metadata doesn't exist for one or more topic
partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private Map<TopicIdPartition, LogReadResult>
maybeReadFromLogAndUpdateFetchOffsetMetadata(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
+ if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition,
entry.getValue());
+ }
+ }
+ if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ // We fetch data from replica manager corresponding to the topic
partitions that have missing fetch offset metadata.
+ Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData =
readFromLog(missingFetchOffsetMetadataTopicPartitions);
+ return
updateFetchOffsetMetadataForMissingTopicPartitions(replicaManagerReadResponseData);
+ }
+
+ private Map<TopicIdPartition, LogReadResult>
updateFetchOffsetMetadataForMissingTopicPartitions(
Review Comment:
updateFetchOffsetMetadataForMissingTopicPartitions =>
updateFetchOffsetMetadata ?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +239,159 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
- topicIdPartitions.forEach(tp -> {
- SharePartition sharePartition =
sharePartitionManager.sharePartition(groupId, tp);
+ // In case, fetch offset metadata doesn't exist for one or more topic
partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private Map<TopicIdPartition, LogReadResult>
maybeReadFromLogAndUpdateFetchOffsetMetadata(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
+ if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition,
entry.getValue());
+ }
Review Comment:
This code can be a bit simpler.
```
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
SharePartition sharePartition = sharePartitions.get(topicIdPartition);
if (sharePartition.fetchOffsetMetadata().isEmpty()) {
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition,
partitionData);
}
});
```
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,47 @@ public void onComplete() {
*/
@Override
public boolean tryComplete() {
- topicPartitionDataFromTryComplete = acquirablePartitions();
+ if (anySharePartitionNoLongerExists()) {
+ return forceComplete();
+ }
+ Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
acquirablePartitions();
Review Comment:
Should we just assign the return value to partitionsToComplete directly? We
already acquired the locks for those partitions and partitionsToComplete is the
only place to track them for releasing.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +239,159 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
- topicIdPartitions.forEach(tp -> {
- SharePartition sharePartition =
sharePartitionManager.sharePartition(groupId, tp);
+ // In case, fetch offset metadata doesn't exist for one or more topic
partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata.
+ private Map<TopicIdPartition, LogReadResult>
maybeReadFromLogAndUpdateFetchOffsetMetadata(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
+ if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition,
entry.getValue());
+ }
+ }
+ if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ // We fetch data from replica manager corresponding to the topic
partitions that have missing fetch offset metadata.
+ Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData =
readFromLog(missingFetchOffsetMetadataTopicPartitions);
+ return
updateFetchOffsetMetadataForMissingTopicPartitions(replicaManagerReadResponseData);
Review Comment:
Perhaps do this in the caller? Then the purpose of the method is simpler and
the method name can just be `maybeReadFromLog`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]