apoorvmittal10 commented on code in PR #17573:
URL: https://github.com/apache/kafka/pull/17573#discussion_r1818778889
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1968,6 +1976,26 @@ private void
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl
}
}
+ private long updateStartOffsetDuringInitialization(long
partitionDataStartOffset) throws Exception {
Review Comment:
```suggestion
private long startOffsetDuringInitialization(long
partitionDataStartOffset) throws Exception {
```
##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -90,6 +89,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
),
brokers = 2
)
+ @Disabled
Review Comment:
Can you write a comment as well regarding why it's diabled. And add a jira
for myself to fix this.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java:
##########
@@ -57,8 +57,6 @@ public void testFromPropsInvalid() {
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
} else if
(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
- } else {
- assertPropertyInvalid(name, "not_a_number", "-1");
}
Review Comment:
Why this change?
##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -138,17 +138,23 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
+ var acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
+ var shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap, 10)
+ var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
- val acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
- val shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
- val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ // Send the second share fetch request to fetch the records produced above
+ metadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
+ acknowledgementsMap = Map.empty
+ shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
+ shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
Review Comment:
Again please clarify the question on changing order of tests initialization
and why it's needed.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java:
##########
@@ -165,6 +170,7 @@ private Properties createValidGroupConfig() {
props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "45000");
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000");
+ props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
Review Comment:
Is there any test case which confirms beahviour with `earliet` offset.
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -209,11 +224,12 @@ public void testSubscribeSubscribeEmptyPollFails() {
@Test
public void testSubscriptionAndPoll() {
+ KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
Review Comment:
Thanks @chirag-wadhwa5 , I think then it's fine to add this additional line.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -238,4 +264,39 @@ public int shareHeartbeatIntervalMs() {
public int shareRecordLockDurationMs() {
return shareRecordLockDurationMs;
}
+
+ /**
+ * The share group auto offset reset strategy.
+ */
+ public ShareGroupAutoOffsetReset shareAutoOffsetReset() {
+ return ShareGroupAutoOffsetReset.forStrategy(shareAutoOffsetReset);
+ }
+
+ public enum ShareGroupAutoOffsetReset {
Review Comment:
> It's better to use OffsetResetStrategy, so I'll remove the newly added
enum.
Did you miss it or changed mind?
What's the final decision, are we keeping it? I would prefer to use
`OffsetResetStrategy` as don't see any difference to one defined as new. If we
need to have an independent one, in future, because of some more states then
create a new one. Else it's unnecessary.
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -209,11 +224,12 @@ public void testSubscribeSubscribeEmptyPollFails() {
@Test
public void testSubscriptionAndPoll() {
+ KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
Review Comment:
Though not sure why you have to move the order of the calls? Previously we
were producing and then consuming. Now we start consuming and then produce. Why?
Isn't produce and then consumer from earliest will give same results?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -182,6 +200,15 @@ private static void validateValues(Map<?, ?> valueMaps,
GroupCoordinatorConfig g
throw new
InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be
greater than " +
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
}
+ if
(ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT))
== ShareGroupAutoOffsetReset.NONE) {
+ throw new
InvalidConfigurationException(SHARE_AUTO_OFFSET_RESET_CONFIG + " must be " +
+ ShareGroupAutoOffsetReset.LATEST + " or " +
ShareGroupAutoOffsetReset.EARLIEST);
+ }
+ if
(ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT))
!= ShareGroupAutoOffsetReset.LATEST &&
+
ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT))
!= ShareGroupAutoOffsetReset.EARLIEST) {
+ throw new
InvalidConfigurationException(SHARE_AUTO_OFFSET_RESET_CONFIG + " must be " +
+ ShareGroupAutoOffsetReset.LATEST + " or " +
ShareGroupAutoOffsetReset.EARLIEST);
+ }
Review Comment:
Do you need this check, as the config is backed by an enum then will it not
be an inherent check?
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -5151,6 +5367,11 @@ private SharePartitionBuilder withMaxDeliveryCount(int
maxDeliveryCount) {
return this;
}
+ private SharePartitionBuilder withReplicaManager(ReplicaManager
replicaManager) {
+ this.replicaManager = replicaManager;
+ return this;
+ }
+
Review Comment:
What about tests when exception is thrown while fetching start offset, there
is a catch block in the code?
##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -1027,6 +1029,9 @@ public void testMultipleSequentialShareFetches() {
partitionMaxBytes.put(tp6, PARTITION_MAX_BYTES);
ReplicaManager replicaManager = mock(ReplicaManager.class);
+ FileRecords.TimestampAndOffset timestampAndOffset = new
FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
+ Mockito.doReturn(new
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
Review Comment:
I suggested to have a mockReplicaManagerOffestForTimestamp() method and move
code common for 3 tests there it self. It's same. Anyways I left it on you.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java:
##########
@@ -124,6 +122,11 @@ public void testInvalidProps() {
// Check for invalid shareRecordLockDurationMs, > MAX
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "70000");
doTestInvalidProps(props);
+ props = createValidGroupConfig();
+
+ // Check for invalid shareAutoOffsetReset
+ props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "none");
+ doTestInvalidProps(props);
Review Comment:
What about a test where the string is not one of the enums?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]