DL1231 commented on code in PR #20148:
URL: https://github.com/apache/kafka/pull/20148#discussion_r2217121724


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -843,6 +846,141 @@ public void testExplicitAcknowledgeThrowsNotInBatch() {
         }
     }
 
+    @ClusterTest
+    public void testExplicitOverrideAcknowledgeCorruptedMessage() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                "group1",
+                Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT),
+                null,
+                mockErrorDeserializer(3))) {
+
+            ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            ProducerRecord<byte[], byte[]> record3 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            producer.send(record1);
+            producer.send(record2);
+            producer.send(record3);
+            producer.flush();
+
+            shareConsumer.subscribe(Set.of(tp.topic()));
+
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofSeconds(60));
+            assertEquals(2, records.count());
+            Iterator<ConsumerRecord<byte[], byte[]>> iterator = 
records.iterator();
+
+            ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
+            ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+            assertEquals(0L, firstRecord.offset());
+            assertEquals(1L, secondRecord.offset());
+            shareConsumer.acknowledge(firstRecord);
+            shareConsumer.acknowledge(secondRecord);
+
+            RecordDeserializationException rde = 
assertThrows(RecordDeserializationException.class, () -> 
shareConsumer.poll(Duration.ofSeconds(60)));
+            assertEquals(2, rde.offset());
+            shareConsumer.commitSync();
+
+            // The corrupted record was automatically released, so we can 
still obtain it.
+            rde = assertThrows(RecordDeserializationException.class, () -> 
shareConsumer.poll(Duration.ofSeconds(60)));
+            assertEquals(2, rde.offset());
+
+            // Reject this record
+            shareConsumer.acknowledge(rde.topicPartition().topic(), 
rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
+            shareConsumer.commitSync();
+
+            records = shareConsumer.poll(Duration.ZERO);
+            assertEquals(0, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
+        }
+    }
+
+    @ClusterTest
+    public void testExplicitAcknowledgeOffsetThrowsNotException() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                "group1",
+                Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))) {
+
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            producer.send(record);
+            producer.flush();
+
+            shareConsumer.subscribe(Set.of(tp.topic()));
+
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofSeconds(60));
+            assertEquals(1, records.count());
+            ConsumerRecord<byte[], byte[]> consumedRecord = 
records.records(tp).get(0);
+            assertEquals(0L, consumedRecord.offset());
+
+            assertThrows(IllegalStateException.class, () -> 
shareConsumer.acknowledge(tp.topic(), tp.partition(), consumedRecord.offset(), 
AcknowledgeType.ACCEPT));
+
+            shareConsumer.acknowledge(consumedRecord);
+            verifyShareGroupStateTopicRecordsProduced();
+        }
+    }
+
+    @ClusterTest
+    public void testExplicitAcknowledgeOffsetThrowsParametersError() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                "group1",
+                Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT),
+                null,
+                mockErrorDeserializer(2))) {
+
+            ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            producer.send(record1);
+            producer.send(record2);
+            producer.flush();
+
+            shareConsumer.subscribe(Set.of(tp.topic()));
+
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofSeconds(60));
+            assertEquals(1, records.count());
+            Iterator<ConsumerRecord<byte[], byte[]>> iterator = 
records.iterator();
+
+            ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
+            assertEquals(0L, firstRecord.offset());
+            shareConsumer.acknowledge(firstRecord);
+
+            final RecordDeserializationException rde = 
assertThrows(RecordDeserializationException.class, () -> 
shareConsumer.poll(Duration.ofSeconds(60)));
+            assertEquals(1, rde.offset());
+
+            assertThrows(IllegalStateException.class, () -> 
shareConsumer.acknowledge("foo", rde.topicPartition().partition(), 
rde.offset(), AcknowledgeType.REJECT));
+            assertThrows(IllegalStateException.class, () -> 
shareConsumer.acknowledge(rde.topicPartition().topic(), 1, rde.offset(), 
AcknowledgeType.REJECT));
+            assertThrows(IllegalStateException.class, () -> 
shareConsumer.acknowledge(rde.topicPartition().topic(), tp2.partition(), 0, 
AcknowledgeType.REJECT));
+
+            // Reject this record
+            shareConsumer.acknowledge(rde.topicPartition().topic(), 
rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
+            shareConsumer.commitSync();

Review Comment:
   Good catch! I have updated the patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to