apoorvmittal10 commented on code in PR #17573:
URL: https://github.com/apache/kafka/pull/17573#discussion_r1816303214
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1968,6 +1977,29 @@ private void
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl
}
}
+ private void setStartOffsetDuringInitialization(PartitionAllData
partitionData) throws Exception {
Review Comment:
We don't use set/get in Kafka, please rename to
`updateStartOffsetDuringInitialization`. Moreover I would say to return a
offset from this method which should be set in the called initialization
method. Else you need to take a lock again in this method. Hence rename this
method to `startOffsetDuringInitialization` and return a long.
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -128,4 +131,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition
topicIdPartition, Replic
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 :
timestampAndOffset.get().offset;
}
+
+ /**
+ * The method is used to get the offset for the latest timestamp for the
topic-partition.
+ *
+ * @return The offset for the latest timestamp.
+ */
+ static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition,
ReplicaManager replicaManager) {
+ // Isolation level is set to READ_UNCOMMITTED, matching with that used
in share fetch requests
+ Option<FileRecords.TimestampAndOffset> timestampAndOffset =
replicaManager.fetchOffsetForTimestamp(
+ topicIdPartition.topicPartition(),
ListOffsetsRequest.LATEST_TIMESTAMP, new
Some<>(IsolationLevel.READ_UNCOMMITTED),
+ Optional.empty(), true).timestampAndOffsetOpt();
+ return timestampAndOffset.isEmpty() ? (long) 0 :
timestampAndOffset.get().offset;
Review Comment:
Is it safe to return `0` from this method when timestampAndOffset is empty?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1968,6 +1977,29 @@ private void
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl
}
}
+ private void setStartOffsetDuringInitialization(PartitionAllData
partitionData) throws Exception {
+ // Set the state epoch and end offset from the persisted state.
+ if (partitionData.startOffset() !=
PartitionFactory.UNINITIALIZED_START_OFFSET) {
+ startOffset = partitionData.startOffset();
Review Comment:
```suggestion
return partitionData.startOffset();
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -314,6 +318,7 @@ public static RecordState forId(byte id) {
* @return The method returns a future which is completed when the share
partition is initialized
* or completes with an exception if the share partition is in
non-initializable state.
*/
+ @SuppressWarnings("CyclomaticComplexity")
Review Comment:
Is the suppression still needed?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -382,8 +387,12 @@ public CompletableFuture<Void> maybeInitialize() {
return;
}
- // Set the state epoch and end offset from the persisted state.
- startOffset = partitionData.startOffset() != -1 ?
partitionData.startOffset() : 0;
+ try {
+ setStartOffsetDuringInitialization(partitionData);
Review Comment:
Why to pass complete `partitionData` when we just need
`partitionData.startOffset()` in the method?
##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -1027,6 +1029,9 @@ public void testMultipleSequentialShareFetches() {
partitionMaxBytes.put(tp6, PARTITION_MAX_BYTES);
ReplicaManager replicaManager = mock(ReplicaManager.class);
+ FileRecords.TimestampAndOffset timestampAndOffset = new
FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
+ Mockito.doReturn(new
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
Review Comment:
Can we please move it to a method and call that method in 3 test cases. Also
Mockito.anyBoolean => anyBoolean, etc. Already static imports exists.
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -117,4 +119,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition
topicIdPartition, Replic
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 :
timestampAndOffset.get().offset;
}
+
+ /**
+ * The method is used to get the offset for the latest timestamp for the
topic-partition.
+ *
+ * @return The offset for the latest timestamp.
+ */
+ static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition,
ReplicaManager replicaManager) {
Review Comment:
My bad, sorry the other method is also used within package only so no need
of visibile of testing there as well.
##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -113,9 +111,8 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// Send the share fetch request to the non-replica and verify the error
code
val shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest, nonReplicaId)
- val partitionData =
shareFetchResponse.responseData(topicNames).get(topicIdPartition)
- assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionData.errorCode)
- assertEquals(leader, partitionData.currentLeader().leaderId())
+ // Top level error thrown while fetching the "LATEST" offset for the
partition during share partition initialization
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code,
shareFetchResponse.data().errorCode())
Review Comment:
This seems not corrected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]