apoorvmittal10 commented on code in PR #17573:
URL: https://github.com/apache/kafka/pull/17573#discussion_r1816303214


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1968,6 +1977,29 @@ private void 
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl
         }
     }
 
+    private void setStartOffsetDuringInitialization(PartitionAllData 
partitionData) throws Exception {

Review Comment:
   We don't use set/get in Kafka, please rename to 
`updateStartOffsetDuringInitialization`. Moreover I would say to return a 
offset from this method which should be set in the called initialization 
method. Else you need to take a lock again in this method. Hence rename this 
method to `startOffsetDuringInitialization` and return a long.
   



##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -128,4 +131,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition 
topicIdPartition, Replic
                 Optional.empty(), true).timestampAndOffsetOpt();
         return timestampAndOffset.isEmpty() ? (long) 0 : 
timestampAndOffset.get().offset;
     }
+
+    /**
+     * The method is used to get the offset for the latest timestamp for the 
topic-partition.
+     *
+     * @return The offset for the latest timestamp.
+     */
+    static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, 
ReplicaManager replicaManager) {
+        // Isolation level is set to READ_UNCOMMITTED, matching with that used 
in share fetch requests
+        Option<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
+            topicIdPartition.topicPartition(), 
ListOffsetsRequest.LATEST_TIMESTAMP, new 
Some<>(IsolationLevel.READ_UNCOMMITTED),
+            Optional.empty(), true).timestampAndOffsetOpt();
+        return timestampAndOffset.isEmpty() ? (long) 0 : 
timestampAndOffset.get().offset;

Review Comment:
   Is it safe to return `0` from this method when timestampAndOffset is empty? 



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1968,6 +1977,29 @@ private void 
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl
         }
     }
 
+    private void setStartOffsetDuringInitialization(PartitionAllData 
partitionData) throws Exception {
+        // Set the state epoch and end offset from the persisted state.
+        if (partitionData.startOffset() != 
PartitionFactory.UNINITIALIZED_START_OFFSET) {
+            startOffset = partitionData.startOffset();

Review Comment:
   ```suggestion
               return partitionData.startOffset();
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -314,6 +318,7 @@ public static RecordState forId(byte id) {
      * @return The method returns a future which is completed when the share 
partition is initialized
      *         or completes with an exception if the share partition is in 
non-initializable state.
      */
+    @SuppressWarnings("CyclomaticComplexity")

Review Comment:
   Is the suppression still needed?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -382,8 +387,12 @@ public CompletableFuture<Void> maybeInitialize() {
                     return;
                 }
 
-                // Set the state epoch and end offset from the persisted state.
-                startOffset = partitionData.startOffset() != -1 ? 
partitionData.startOffset() : 0;
+                try {
+                    setStartOffsetDuringInitialization(partitionData);

Review Comment:
   Why to pass complete `partitionData` when we just need 
`partitionData.startOffset()` in the method?



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -1027,6 +1029,9 @@ public void testMultipleSequentialShareFetches() {
         partitionMaxBytes.put(tp6, PARTITION_MAX_BYTES);
 
         ReplicaManager replicaManager = mock(ReplicaManager.class);
+        FileRecords.TimestampAndOffset timestampAndOffset = new 
FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
+        Mockito.doReturn(new 
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+            
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), 
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());

Review Comment:
   Can we please move it to a method and call that method in 3 test cases. Also 
Mockito.anyBoolean => anyBoolean, etc. Already static imports exists.



##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -117,4 +119,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition 
topicIdPartition, Replic
                 Optional.empty(), true).timestampAndOffsetOpt();
         return timestampAndOffset.isEmpty() ? (long) 0 : 
timestampAndOffset.get().offset;
     }
+
+    /**
+     * The method is used to get the offset for the latest timestamp for the 
topic-partition.
+     *
+     * @return The offset for the latest timestamp.
+     */
+    static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, 
ReplicaManager replicaManager) {

Review Comment:
   My bad, sorry the other method is also used within package only so no need 
of visibile of testing there as well.



##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -113,9 +111,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     // Send the share fetch request to the non-replica and verify the error 
code
     val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
     val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest, nonReplicaId)
-    val partitionData = 
shareFetchResponse.responseData(topicNames).get(topicIdPartition)
-    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionData.errorCode)
-    assertEquals(leader, partitionData.currentLeader().leaderId())
+    // Top level error thrown while fetching the "LATEST" offset for the 
partition during share partition initialization
+    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, 
shareFetchResponse.data().errorCode())

Review Comment:
   This seems not corrected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to