AndrewJSchofield commented on code in PR #20900:
URL: https://github.com/apache/kafka/pull/20900#discussion_r2536924233
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -2998,6 +3008,119 @@ public void testRenewAcknowledgementOnCommitSync() {
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
}
}
+ verifyYammerMetricCount("ackType=Renew", 5);
+ }
+
+ @ClusterTest
+ public void testRenewAcknowledgementInvalidStateRecord() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))
+ ) {
+ AtomicInteger acknowledgementsCommitted = new AtomicInteger(0);
+
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition,
exception) ->
+ offsetsByTopicPartition.forEach((tip, offsets) ->
acknowledgementsCommitted.addAndGet(offsets.size())));
+
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message
".getBytes());
+ producer.send(record);
+ producer.flush();
+
+ shareConsumer.subscribe(List.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+
+ for (ConsumerRecord<byte[], byte[]> rec : records) {
+ shareConsumer.acknowledge(rec, AcknowledgeType.REJECT);
+ shareConsumer.commitSync();
+ assertThrows(IllegalStateException.class, () ->
shareConsumer.acknowledge(rec, AcknowledgeType.RENEW));
+ }
+ }
+ verifyYammerMetricCount("ackType=Renew", 0);
+ }
+
+ @ClusterTest(
+ brokers = 1,
+ serverProperties = {
+ @ClusterConfigProperty(key =
"group.share.record.lock.duration.ms", value = "12000"),
+ @ClusterConfigProperty(key =
"group.share.min.record.lock.duration.ms", value = "12000"),
+ }
+ )
+ public void testRenewAcknowledgementNoResultInPoll() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))
+ ) {
+ AtomicInteger acknowledgementsCommitted = new AtomicInteger(0);
+
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition,
exception) ->
+ offsetsByTopicPartition.forEach((tip, offsets) ->
acknowledgementsCommitted.addAndGet(offsets.size())));
+
+ for (int i = 0; i < 10; i++) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message
" + i).getBytes());
+ producer.send(record);
+ }
+ producer.flush();
+
+ shareConsumer.subscribe(List.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 10);
+ assertEquals(10, records.count());
+
+ int count = 0;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ if (count % 2 == 0) {
+ shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+ } else {
+ shareConsumer.acknowledge(record, AcknowledgeType.RENEW);
+ }
+ count++;
+ }
+
+ // 5 more records (total 15 produced).
+ for (int i = 10; i < 15; i++) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message
" + i).getBytes());
+ producer.send(record);
+ }
+
+ // Get the rest of all 5 records.
+ records = waitedPoll(shareConsumer, 11500L, 0); // This will send
the acks but not return next 5 records (10-15)
+ assertEquals(10, acknowledgementsCommitted.get());
+ assertEquals(0, records.count());
+ verifyYammerMetricCount("ackType=Renew", 5);
+
+ // Renewal duration passed, now records will be back.
+ records = waitedPoll(shareConsumer, 2500L, 5); // Renewed records
as well as 10-15 records.
+ assertEquals(5, records.count());
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+ }
+
+ shareConsumer.commitSync();
+
+ records = waitedPoll(shareConsumer, 2500L, 5); // 10-15 records.
+ assertEquals(5, records.count());
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+ }
+
+ shareConsumer.commitSync();
+
+ // Initial - 5 renew + 5 accept, Subsequent - 5 renewed accepted +
5 fresh accepted (10-15)
+ assertEquals(20, acknowledgementsCommitted.get());
+ }
+ verifyYammerMetricCount("ackType=Renew", 5);
+ }
+
+ private void verifyYammerMetricCount(String filterString, int count) {
+ com.yammer.metrics.core.Metric renewAck =
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
Review Comment:
nit: This isn't really a `renewAck` because the filter string is an
argument. However, that's such a tiny nit that we can fix it next time through
this file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]