chia7712 commented on code in PR #21467:
URL: https://github.com/apache/kafka/pull/21467#discussion_r2889941592


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2298,6 +2303,11 @@ private Optional<Throwable> 
acknowledgePerOffsetBatchRecords(
                 byte ackType = ackTypeMap.size() > 1 ? 
ackTypeMap.get(offsetState.getKey()) : batch.acknowledgeTypes().get(0);
 
                 if (ackType == AcknowledgeType.RENEW.id) {
+                    if (!configProvider.isRenewAcknowledgeEnabled(groupId)) {
+                        log.warn("Renew acknowledge is not enabled for the 
group: {}", groupId);

Review Comment:
   I'm not sure if `WARN` is appropriate here, as other cases that return 
`InvalidRecordStateException` use the `DEBUG` level. Should we stay consistent 
with those?



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3082,6 +3082,36 @@ public void testRenewAcknowledgementInvalidStateRecord() 
{
         verifyYammerMetricCount("ackType=Renew", 0);
     }
 
+    @ClusterTest
+    public void testRenewAcknowledgementDisabled() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareRenewAcknowledgeEnable("group1", false);
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                 "group1",
+                 Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))
+        ) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            producer.send(record);
+            producer.flush();
+
+            shareConsumer.subscribe(List.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
+            assertEquals(1, records.count());
+
+            for (ConsumerRecord<byte[], byte[]> rec : records) {
+                shareConsumer.acknowledge(rec, AcknowledgeType.RENEW);
+            }
+
+            // The RENEW should be rejected by the broker because 
share.renew.acknowledge.enable defaults to false.

Review Comment:
   The default value is `true`, right?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2371,6 +2381,11 @@ private Optional<Throwable> acknowledgeCompleteBatch(
             // Before reaching this point, it should be verified that it is 
full batch ack and
             // not per offset ack as well as startOffset not moved.
             if (ackType == AcknowledgeType.RENEW.id) {
+                if (!configProvider.isRenewAcknowledgeEnabled(groupId)) {
+                    log.warn("Renew acknowledge is not enabled for the group: 
{}", groupId);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to