apoorvmittal10 commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2013454652
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long
partitionDataStartOffset) thro
}
}
+ private ShareAcquiredRecords
filterAbortedTransactionalAcquiredRecords(FetchPartitionData
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords
shareAcquiredRecords) {
+ if (isolationLevel != FetchIsolation.TXN_COMMITTED)
+ return shareAcquiredRecords;
+ // When FetchIsolation.TXN_COMMITTED is used as isolation type by the
share group, we need to filter any
+ // transactions that were aborted/did not commit due to timeout.
+ List<AcquiredRecords> result =
filterAbortedTransactionalRecords(fetchPartitionData.records.batches(),
shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions);
+ int acquiredCount = 0;
+ for (AcquiredRecords records : result) {
+ acquiredCount += (int) (records.lastOffset() -
records.firstOffset() + 1);
+ }
+ return new ShareAcquiredRecords(result, acquiredCount);
+ }
+
+ private List<AcquiredRecords> filterAbortedTransactionalRecords(
+ Iterable<? extends RecordBatch> batches,
+ List<AcquiredRecords> acquiredRecords,
+ Optional<List<FetchResponseData.AbortedTransaction>>
abortedTransactions
Review Comment:
Are there any guarantees for `abortedTransactions` to be in some defined
order with `firstOffset` for the RecordBatch?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long
partitionDataStartOffset) thro
}
}
+ private ShareAcquiredRecords
filterAbortedTransactionalAcquiredRecords(FetchPartitionData
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords
shareAcquiredRecords) {
+ if (isolationLevel != FetchIsolation.TXN_COMMITTED)
+ return shareAcquiredRecords;
+ // When FetchIsolation.TXN_COMMITTED is used as isolation type by the
share group, we need to filter any
+ // transactions that were aborted/did not commit due to timeout.
+ List<AcquiredRecords> result =
filterAbortedTransactionalRecords(fetchPartitionData.records.batches(),
shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions);
+ int acquiredCount = 0;
+ for (AcquiredRecords records : result) {
+ acquiredCount += (int) (records.lastOffset() -
records.firstOffset() + 1);
+ }
+ return new ShareAcquiredRecords(result, acquiredCount);
+ }
+
+ private List<AcquiredRecords> filterAbortedTransactionalRecords(
Review Comment:
Shouldn't the method name be consistent with earlier method
`filterAbortedTransactionalAcquiredRecords` as we are sending the `acquired`
records list here as well. Hence earlier method can have `maybe` prefix and
this one as `filterAbortedTransactionalAcquiredRecords`.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long
partitionDataStartOffset) thro
}
}
+ private ShareAcquiredRecords
filterAbortedTransactionalAcquiredRecords(FetchPartitionData
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords
shareAcquiredRecords) {
Review Comment:
```suggestion
private ShareAcquiredRecords
maybeFilterAbortedTransactionalAcquiredRecords(FetchPartitionData
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords
shareAcquiredRecords) {
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long
partitionDataStartOffset) thro
}
}
+ private ShareAcquiredRecords
filterAbortedTransactionalAcquiredRecords(FetchPartitionData
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords
shareAcquiredRecords) {
+ if (isolationLevel != FetchIsolation.TXN_COMMITTED)
+ return shareAcquiredRecords;
+ // When FetchIsolation.TXN_COMMITTED is used as isolation type by the
share group, we need to filter any
Review Comment:
Why not to have the below check in the same method and pass the aborted
transactions to `filterAbortedTransactionalRecords` without optional. This can
make sure that when it's required to filter then only call goes to further
methods, and maybe method should have all these pre-checks.
```
if (abortedTransactions.isEmpty())
return acquiredRecords;
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]