AndrewJSchofield commented on code in PR #17573:
URL: https://github.com/apache/kafka/pull/17573#discussion_r1818965965


##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -128,4 +130,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition 
topicIdPartition, Replic
                 Optional.empty(), true).timestampAndOffsetOpt();

Review Comment:
   Unfortunately, I think this is more complicated that it seems. If KIP-405 is 
enabled, then returning `EARLIEST_TIMESTAMP` is asynchronous. It may be 
necessary to wait before there's an answer to the earliest offset. If we put 
that concern aside for this PR, we do need an issue to track this.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -238,4 +264,39 @@ public int shareHeartbeatIntervalMs() {
     public int shareRecordLockDurationMs() {
         return shareRecordLockDurationMs;
     }
+
+    /**
+     * The share group auto offset reset strategy.
+     */
+    public ShareGroupAutoOffsetReset shareAutoOffsetReset() {
+        return ShareGroupAutoOffsetReset.forStrategy(shareAutoOffsetReset);
+    }
+
+    public enum ShareGroupAutoOffsetReset {

Review Comment:
   I expect that `OffsetResetStrategy` will evolve independently and I prefer 
to separate them now.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -585,6 +585,7 @@ class KafkaApisTest extends Logging {
     cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
     cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
     cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
+    cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, 
GroupConfig.defaultShareAutoOffsetReset().toString())

Review Comment:
   nit: Scala doesn't need `()`



##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -128,4 +131,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition 
topicIdPartition, Replic
                 Optional.empty(), true).timestampAndOffsetOpt();
         return timestampAndOffset.isEmpty() ? (long) 0 : 
timestampAndOffset.get().offset;
     }
+
+    /**
+     * The method is used to get the offset for the latest timestamp for the 
topic-partition.
+     *
+     * @return The offset for the latest timestamp.
+     */
+    static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, 
ReplicaManager replicaManager) {
+        // Isolation level is set to READ_UNCOMMITTED, matching with that used 
in share fetch requests
+        Option<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
+            topicIdPartition.topicPartition(), 
ListOffsetsRequest.LATEST_TIMESTAMP, new 
Some<>(IsolationLevel.READ_UNCOMMITTED),
+            Optional.empty(), true).timestampAndOffsetOpt();
+        return timestampAndOffset.isEmpty() ? (long) 0 : 
timestampAndOffset.get().offset;

Review Comment:
   I think there is the possibility of an exception in the slightly unusual 
situation of a new leader which is lagging behind the current epoch. While I'm 
sure we're not going to bump into that right now, it is a real situation in the 
replica manager I think and the interface here needs to be flexible enough for 
an exception and we need to have a plan for what to do if the exception occurs. 
@mumrah can you help here please? I guess we'd want to do some kind of retry, 
which is perhaps best achieved by aborting the share-partition initialisation 
and letting the next attempt to initialize the share-partition have another 
try. The edges of the replication behaviour are a bit challenging I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to