AndrewJSchofield commented on code in PR #17573:
URL: https://github.com/apache/kafka/pull/17573#discussion_r1818965965
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -128,4 +130,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition
topicIdPartition, Replic
Optional.empty(), true).timestampAndOffsetOpt();
Review Comment:
Unfortunately, I think this is more complicated that it seems. If KIP-405 is
enabled, then returning `EARLIEST_TIMESTAMP` is asynchronous. It may be
necessary to wait before there's an answer to the earliest offset. If we put
that concern aside for this PR, we do need an issue to track this.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -238,4 +264,39 @@ public int shareHeartbeatIntervalMs() {
public int shareRecordLockDurationMs() {
return shareRecordLockDurationMs;
}
+
+ /**
+ * The share group auto offset reset strategy.
+ */
+ public ShareGroupAutoOffsetReset shareAutoOffsetReset() {
+ return ShareGroupAutoOffsetReset.forStrategy(shareAutoOffsetReset);
+ }
+
+ public enum ShareGroupAutoOffsetReset {
Review Comment:
I expect that `OffsetResetStrategy` will evolve independently and I prefer
to separate them now.
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -585,6 +585,7 @@ class KafkaApisTest extends Logging {
cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
+ cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG,
GroupConfig.defaultShareAutoOffsetReset().toString())
Review Comment:
nit: Scala doesn't need `()`
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -128,4 +131,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition
topicIdPartition, Replic
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 :
timestampAndOffset.get().offset;
}
+
+ /**
+ * The method is used to get the offset for the latest timestamp for the
topic-partition.
+ *
+ * @return The offset for the latest timestamp.
+ */
+ static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition,
ReplicaManager replicaManager) {
+ // Isolation level is set to READ_UNCOMMITTED, matching with that used
in share fetch requests
+ Option<FileRecords.TimestampAndOffset> timestampAndOffset =
replicaManager.fetchOffsetForTimestamp(
+ topicIdPartition.topicPartition(),
ListOffsetsRequest.LATEST_TIMESTAMP, new
Some<>(IsolationLevel.READ_UNCOMMITTED),
+ Optional.empty(), true).timestampAndOffsetOpt();
+ return timestampAndOffset.isEmpty() ? (long) 0 :
timestampAndOffset.get().offset;
Review Comment:
I think there is the possibility of an exception in the slightly unusual
situation of a new leader which is lagging behind the current epoch. While I'm
sure we're not going to bump into that right now, it is a real situation in the
replica manager I think and the interface here needs to be flexible enough for
an exception and we need to have a plan for what to do if the exception occurs.
@mumrah can you help here please? I guess we'd want to do some kind of retry,
which is perhaps best achieved by aborting the share-partition initialisation
and letting the next attempt to initialize the share-partition have another
try. The edges of the replication behaviour are a bit challenging I think.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]