apoorvmittal10 commented on code in PR #17573:
URL: https://github.com/apache/kafka/pull/17573#discussion_r1818778889


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1968,6 +1976,26 @@ private void 
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl
         }
     }
 
+    private long updateStartOffsetDuringInitialization(long 
partitionDataStartOffset) throws Exception {

Review Comment:
   ```suggestion
       private long startOffsetDuringInitialization(long 
partitionDataStartOffset) throws Exception {
   ```



##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -90,6 +89,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     ),
     brokers = 2
   )
+  @Disabled

Review Comment:
   Can you write a comment as well regarding why it's diabled. And add a jira 
for myself to fix this.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java:
##########
@@ -57,8 +57,6 @@ public void testFromPropsInvalid() {
                 assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
             } else if 
(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
-            } else {
-                assertPropertyInvalid(name, "not_a_number", "-1");
             }

Review Comment:
   Why this change?



##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -138,17 +138,23 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
+    var acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
+    var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap, 10)
+    var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
-    val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
-    val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
-    val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+    // Send the second share fetch request to fetch the records produced above
+    metadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
+    acknowledgementsMap = Map.empty
+    shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
+    shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)

Review Comment:
   Again please clarify the question on changing order of tests initialization 
and why it's needed.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java:
##########
@@ -165,6 +170,7 @@ private Properties createValidGroupConfig() {
         props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "45000");
         props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
         props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000");
+        props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");

Review Comment:
   Is there any test case which confirms beahviour with `earliet` offset.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -209,11 +224,12 @@ public void testSubscribeSubscribeEmptyPollFails() {
 
     @Test
     public void testSubscriptionAndPoll() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");

Review Comment:
   Thanks @chirag-wadhwa5 , I think then it's fine to add this additional line.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -238,4 +264,39 @@ public int shareHeartbeatIntervalMs() {
     public int shareRecordLockDurationMs() {
         return shareRecordLockDurationMs;
     }
+
+    /**
+     * The share group auto offset reset strategy.
+     */
+    public ShareGroupAutoOffsetReset shareAutoOffsetReset() {
+        return ShareGroupAutoOffsetReset.forStrategy(shareAutoOffsetReset);
+    }
+
+    public enum ShareGroupAutoOffsetReset {

Review Comment:
   > It's better to use OffsetResetStrategy, so I'll remove the newly added 
enum.
   
   Did you miss it or changed mind?
   
   What's the final decision, are we keeping it? I would prefer to use 
`OffsetResetStrategy` as don't see any difference to one defined as new. If we 
need to have an independent one, in future, because of some more states then 
create a new one. Else it's unnecessary.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -209,11 +224,12 @@ public void testSubscribeSubscribeEmptyPollFails() {
 
     @Test
     public void testSubscriptionAndPoll() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");

Review Comment:
   Though not sure why you have to move the order of the calls? Previously we 
were producing and then consuming. Now we start consuming and then produce. Why?
   
   Isn't produce and then consumer from earliest will give same results?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -182,6 +200,15 @@ private static void validateValues(Map<?, ?> valueMaps, 
GroupCoordinatorConfig g
             throw new 
InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be 
greater than " +
                 SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
         }
+        if 
(ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT))
 == ShareGroupAutoOffsetReset.NONE) {
+            throw new 
InvalidConfigurationException(SHARE_AUTO_OFFSET_RESET_CONFIG + " must be " +
+                ShareGroupAutoOffsetReset.LATEST + " or " + 
ShareGroupAutoOffsetReset.EARLIEST);
+        }
+        if 
(ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT))
 != ShareGroupAutoOffsetReset.LATEST &&
+            
ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT))
 != ShareGroupAutoOffsetReset.EARLIEST) {
+            throw new 
InvalidConfigurationException(SHARE_AUTO_OFFSET_RESET_CONFIG + " must be " +
+                ShareGroupAutoOffsetReset.LATEST + " or " + 
ShareGroupAutoOffsetReset.EARLIEST);
+        }

Review Comment:
   Do you need this check, as the config is backed by an enum then will it not 
be an inherent check?



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -5151,6 +5367,11 @@ private SharePartitionBuilder withMaxDeliveryCount(int 
maxDeliveryCount) {
             return this;
         }
 
+        private SharePartitionBuilder withReplicaManager(ReplicaManager 
replicaManager) {
+            this.replicaManager = replicaManager;
+            return this;
+        }
+

Review Comment:
   What about tests when exception is thrown while fetching start offset, there 
is a catch block in the code?
   



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -1027,6 +1029,9 @@ public void testMultipleSequentialShareFetches() {
         partitionMaxBytes.put(tp6, PARTITION_MAX_BYTES);
 
         ReplicaManager replicaManager = mock(ReplicaManager.class);
+        FileRecords.TimestampAndOffset timestampAndOffset = new 
FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
+        Mockito.doReturn(new 
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+            
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), 
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());

Review Comment:
   I suggested to have a mockReplicaManagerOffestForTimestamp() method and move 
code common for 3 tests there it self. It's same. Anyways I left it on you.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java:
##########
@@ -124,6 +122,11 @@ public void testInvalidProps() {
         // Check for invalid shareRecordLockDurationMs, > MAX
         props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "70000");
         doTestInvalidProps(props);
+        props = createValidGroupConfig();
+
+        // Check for invalid shareAutoOffsetReset
+        props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "none");
+        doTestInvalidProps(props);

Review Comment:
   What about a test where the string is not one of the enums?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to