apoorvmittal10 commented on code in PR #17573:
URL: https://github.com/apache/kafka/pull/17573#discussion_r1813348831
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -117,4 +119,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition
topicIdPartition, Replic
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 :
timestampAndOffset.get().offset;
}
+
+ /**
+ * The method is used to get the offset for the latest timestamp for the
topic-partition.
+ *
+ * @return The offset for the latest timestamp.
+ */
+ static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition,
ReplicaManager replicaManager) {
+ // Isolation level is set to READ_UNCOMMITTED, matching with that used
in share fetch requests
+ Option<FileRecords.TimestampAndOffset> timestampAndOffset =
replicaManager.fetchOffsetForTimestamp(
+ topicIdPartition.topicPartition(),
ListOffsetsRequest.LATEST_TIMESTAMP, new
Some<>(IsolationLevel.READ_UNCOMMITTED),
+ Optional.empty(), true).timestampAndOffsetOpt();
+ return timestampAndOffset.get().offset;
Review Comment:
Do we need a check that `timestampAndOffset` has the object?
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -117,4 +119,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition
topicIdPartition, Replic
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 :
timestampAndOffset.get().offset;
}
+
+ /**
+ * The method is used to get the offset for the latest timestamp for the
topic-partition.
+ *
+ * @return The offset for the latest timestamp.
+ */
+ static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition,
ReplicaManager replicaManager) {
Review Comment:
// Visible for testing
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -209,11 +224,12 @@ public void testSubscribeSubscribeEmptyPollFails() {
@Test
public void testSubscriptionAndPoll() {
+ KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
Review Comment:
Why to change all tests rather add a test where we do specific updates to
group offset using admin client?
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -117,4 +119,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition
topicIdPartition, Replic
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 :
timestampAndOffset.get().offset;
}
+
+ /**
+ * The method is used to get the offset for the latest timestamp for the
topic-partition.
+ *
+ * @return The offset for the latest timestamp.
+ */
+ static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition,
ReplicaManager replicaManager) {
Review Comment:
When you are in this file then please write the same for other method.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -382,7 +387,35 @@ public CompletableFuture<Void> maybeInitialize() {
}
// Set the state epoch and end offset from the persisted state.
- startOffset = partitionData.startOffset() != -1 ?
partitionData.startOffset() : 0;
+ if (partitionData.startOffset() !=
PartitionFactory.DEFAULT_START_OFFSET) {
+ startOffset = partitionData.startOffset();
+ } else {
+ GroupConfig.ShareGroupAutoOffsetReset offsetResetStrategy;
+ if (groupConfigManager.groupConfig(groupId).isPresent()) {
+ offsetResetStrategy =
groupConfigManager.groupConfig(groupId).get().shareAutoOffsetReset();
+ if (offsetResetStrategy ==
GroupConfig.ShareGroupAutoOffsetReset.UNKNOWN) {
+ offsetResetStrategy =
GroupConfig.defaultShareAutoOffsetReset();
+ }
+ } else {
+ offsetResetStrategy =
GroupConfig.defaultShareAutoOffsetReset();
+ }
+
+ if (offsetResetStrategy ==
GroupConfig.ShareGroupAutoOffsetReset.EARLIEST) {
+ try {
+ startOffset =
offsetForEarliestTimestamp(topicIdPartition, replicaManager);
+ } catch (Exception e) {
+ completeInitializationWithException(future, e);
+ return;
+ }
+ } else {
+ try {
+ startOffset =
offsetForLatestTimestamp(topicIdPartition, replicaManager);
+ } catch (Exception e) {
+ completeInitializationWithException(future, e);
+ return;
+ }
+ }
+ }
Review Comment:
Can we move it to a method?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -238,4 +264,39 @@ public int shareHeartbeatIntervalMs() {
public int shareRecordLockDurationMs() {
return shareRecordLockDurationMs;
}
+
+ /**
+ * The share group auto offset reset strategy.
+ */
+ public ShareGroupAutoOffsetReset shareAutoOffsetReset() {
+ return ShareGroupAutoOffsetReset.forStrategy(shareAutoOffsetReset);
+ }
+
+ public enum ShareGroupAutoOffsetReset {
Review Comment:
Why not to use existing `OffsetResetStrategy`?
##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -113,9 +111,8 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// Send the share fetch request to the non-replica and verify the error
code
val shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest, nonReplicaId)
- val partitionData =
shareFetchResponse.responseData(topicNames).get(topicIdPartition)
- assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionData.errorCode)
- assertEquals(leader, partitionData.currentLeader().leaderId())
+ // Top level error thrown while fetching the "LATEST" offset for the
partition during share partition initialization
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code,
shareFetchResponse.data().errorCode())
Review Comment:
Why do we want to have this behaviour? Shouldn't the exception be at per
topic partition level?
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -209,11 +224,12 @@ public void testSubscribeSubscribeEmptyPollFails() {
@Test
public void testSubscriptionAndPoll() {
+ KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
Review Comment:
Do we have test without specifying the offset i.e. when default offset is
pulled?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -314,6 +318,7 @@ public static RecordState forId(byte id) {
* @return The method returns a future which is completed when the share
partition is initialized
* or completes with an exception if the share partition is in
non-initializable state.
*/
+ @SuppressWarnings("CyclomaticComplexity")
Review Comment:
Why can't we have the added funcitonality from different method?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -382,7 +387,35 @@ public CompletableFuture<Void> maybeInitialize() {
}
// Set the state epoch and end offset from the persisted state.
- startOffset = partitionData.startOffset() != -1 ?
partitionData.startOffset() : 0;
+ if (partitionData.startOffset() !=
PartitionFactory.DEFAULT_START_OFFSET) {
Review Comment:
`partitionData.startOffset()` can't this be -1? If not then how do we
differentiate between if the persisted value is zero vs nothing yet persisted?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]