DL1231 commented on code in PR #20148: URL: https://github.com/apache/kafka/pull/20148#discussion_r2217121724
########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java: ########## @@ -843,6 +846,141 @@ public void testExplicitAcknowledgeThrowsNotInBatch() { } } + @ClusterTest + public void testExplicitOverrideAcknowledgeCorruptedMessage() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer<byte[], byte[]> producer = createProducer(); + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT), + null, + mockErrorDeserializer(3))) { + + ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record1); + producer.send(record2); + producer.send(record3); + producer.flush(); + + shareConsumer.subscribe(Set.of(tp.topic())); + + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60)); + assertEquals(2, records.count()); + Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator(); + + ConsumerRecord<byte[], byte[]> firstRecord = iterator.next(); + ConsumerRecord<byte[], byte[]> secondRecord = iterator.next(); + assertEquals(0L, firstRecord.offset()); + assertEquals(1L, secondRecord.offset()); + shareConsumer.acknowledge(firstRecord); + shareConsumer.acknowledge(secondRecord); + + RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60))); + assertEquals(2, rde.offset()); + shareConsumer.commitSync(); + + // The corrupted record was automatically released, so we can still obtain it. + rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60))); + assertEquals(2, rde.offset()); + + // Reject this record + shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT); + shareConsumer.commitSync(); + + records = shareConsumer.poll(Duration.ZERO); + assertEquals(0, records.count()); + verifyShareGroupStateTopicRecordsProduced(); + } + } + + @ClusterTest + public void testExplicitAcknowledgeOffsetThrowsNotException() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer<byte[], byte[]> producer = createProducer(); + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { + + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(Set.of(tp.topic())); + + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60)); + assertEquals(1, records.count()); + ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0); + assertEquals(0L, consumedRecord.offset()); + + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(tp.topic(), tp.partition(), consumedRecord.offset(), AcknowledgeType.ACCEPT)); + + shareConsumer.acknowledge(consumedRecord); + verifyShareGroupStateTopicRecordsProduced(); + } + } + + @ClusterTest + public void testExplicitAcknowledgeOffsetThrowsParametersError() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer<byte[], byte[]> producer = createProducer(); + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT), + null, + mockErrorDeserializer(2))) { + + ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record1); + producer.send(record2); + producer.flush(); + + shareConsumer.subscribe(Set.of(tp.topic())); + + ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60)); + assertEquals(1, records.count()); + Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator(); + + ConsumerRecord<byte[], byte[]> firstRecord = iterator.next(); + assertEquals(0L, firstRecord.offset()); + shareConsumer.acknowledge(firstRecord); + + final RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60))); + assertEquals(1, rde.offset()); + + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge("foo", rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT)); + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), 1, rde.offset(), AcknowledgeType.REJECT)); + assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), tp2.partition(), 0, AcknowledgeType.REJECT)); + + // Reject this record + shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT); + shareConsumer.commitSync(); Review Comment: Good catch! I have updated the patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org