smjn commented on code in PR #20815:
URL: https://github.com/apache/kafka/pull/20815#discussion_r2486408540
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1856,26 +1867,21 @@ private boolean checkForStartOffsetWithinBatch(long
batchFirstOffset, long batch
return batchFirstOffset < localStartOffset && batchLastOffset >=
localStartOffset;
}
- private Map<Long, RecordState> fetchRecordStateMapForAcknowledgementBatch(
- ShareAcknowledgementBatch batch) {
+ // Visibility for test
+ static Map<Long, Byte> fetchAckTypeMapForBatch(ShareAcknowledgementBatch
batch) {
// Client can either send a single entry in acknowledgeTypes which
represents the state
// of the complete batch or can send individual offsets state.
Construct a map with record state
// for each offset in the batch, if single acknowledge type is sent,
the map will have only one entry.
- Map<Long, RecordState> recordStateMap = new HashMap<>();
+ Map<Long, Byte> ackTypeMap = new HashMap<>();
for (int index = 0; index < batch.acknowledgeTypes().size(); index++) {
- recordStateMap.put(batch.firstOffset() + index,
- fetchRecordState(batch.acknowledgeTypes().get(index)));
+ byte ackType = batch.acknowledgeTypes().get(index);
+ // Validate
Review Comment:
> So this is going to throw an exception if the ackType is invalid, right?
Yes, correct `AcknowledgeType.forId(ackType)` will throw the exception. In
this case the caller is `SharePartition.acknowledge` which surrounds the call
to this method in a try catch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]