exceptionfactory commented on code in PR #10769:
URL: https://github.com/apache/nifi/pull/10769#discussion_r2764601238
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java:
##########
@@ -135,6 +161,13 @@ public Iterable<ByteRecord> poll(final Duration
maxWaitDuration) {
return List.of();
}
+ // Track the maximum offset for each partition to commit during
rebalance
+ for (final ConsumerRecord<byte[], byte[]> record : consumerRecords) {
Review Comment:
This approach requires iterating over all Records multiple times, once here,
and again in actual usage in `RecordIterable`. The `RecordIterable` is designed
for optimal iteration and client usage. Instead of this loop, I recommend
adjusting `RecordIterable` to track the offset retrieved. This should also
provide the opportunity to get the list of `TopicPartitions` once, and avoid
creating a new instance for every Record.
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerServiceTest.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.consumer;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class Kafka3ConsumerServiceTest {
+
+ private static final String TOPIC = "test-topic";
+ private static final String GROUP_ID = "test-group";
+ private static final int PARTITION_0 = 0;
+ private static final int PARTITION_1 = 1;
+
+ @Mock
+ private Consumer<byte[], byte[]> consumer;
+
+ @Mock
+ private ComponentLog componentLog;
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>>
offsetsCaptor;
+
+ private Kafka3ConsumerService consumerService;
+
+ @BeforeEach
+ void setUp() {
+ final Subscription subscription = new Subscription(GROUP_ID,
Collections.singletonList(TOPIC), AutoOffsetReset.EARLIEST);
+ consumerService = new Kafka3ConsumerService(componentLog, consumer,
subscription);
+ }
+
+ @Test
+ void testOnPartitionsRevokedCommitsUncommittedOffsets() {
+ // Arrange: Simulate polling records from two partitions
+ final TopicPartition partition0 = new TopicPartition(TOPIC,
PARTITION_0);
+ final TopicPartition partition1 = new TopicPartition(TOPIC,
PARTITION_1);
+
+ final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC,
PARTITION_0, 5L);
+ final ConsumerRecord<byte[], byte[]> record1 = createRecord(TOPIC,
PARTITION_1, 10L);
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
recordsMap = new HashMap<>();
+ recordsMap.put(partition0, List.of(record0));
+ recordsMap.put(partition1, List.of(record1));
+ final ConsumerRecords<byte[], byte[]> consumerRecords =
createConsumerRecords(recordsMap);
+
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+ // Act: Poll records (this should track the offsets internally)
+ final Iterable<ByteRecord> polledRecords =
consumerService.poll(Duration.ofMillis(100));
+ // Consume the iterator to ensure records are processed
+ for (ByteRecord ignored : polledRecords) {
+ // Just iterate through
+ }
+
+ // Act: Simulate rebalance - partitions being revoked
+ final Collection<TopicPartition> revokedPartitions =
List.of(partition0, partition1);
+ consumerService.onPartitionsRevoked(revokedPartitions);
+
+ // Assert: Verify that offsets were committed for the revoked
partitions
+ verify(consumer).commitSync(offsetsCaptor.capture());
+ final Map<TopicPartition, OffsetAndMetadata> committedOffsets =
offsetsCaptor.getValue();
+
+ assertEquals(2, committedOffsets.size());
+ // Offset should be record.offset + 1 (next offset to consume)
+ assertEquals(6L, committedOffsets.get(partition0).offset());
+ assertEquals(11L, committedOffsets.get(partition1).offset());
+ }
+
+ @Test
+ void testOnPartitionsRevokedWithNoUncommittedOffsets() {
+ // Arrange: No records polled
+ final TopicPartition partition0 = new TopicPartition(TOPIC,
PARTITION_0);
+
+ // Act: Simulate rebalance without any prior polling
+ consumerService.onPartitionsRevoked(List.of(partition0));
+
+ // Assert: No commit should be called since there are no uncommitted
offsets
+ verify(consumer, never()).commitSync(anyMap());
+ }
+
+ @Test
+ void testOnPartitionsRevokedOnlyCommitsRevokedPartitions() {
+ // Arrange: Poll records from two partitions
+ final TopicPartition partition0 = new TopicPartition(TOPIC,
PARTITION_0);
+ final TopicPartition partition1 = new TopicPartition(TOPIC,
PARTITION_1);
+
+ final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC,
PARTITION_0, 5L);
+ final ConsumerRecord<byte[], byte[]> record1 = createRecord(TOPIC,
PARTITION_1, 10L);
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
recordsMap = new HashMap<>();
+ recordsMap.put(partition0, List.of(record0));
+ recordsMap.put(partition1, List.of(record1));
+ final ConsumerRecords<byte[], byte[]> consumerRecords =
createConsumerRecords(recordsMap);
+
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+ // Act: Poll records
+ final Iterable<ByteRecord> polledRecords =
consumerService.poll(Duration.ofMillis(100));
+ for (ByteRecord ignored : polledRecords) {
+ // Just iterate through
+ }
+
+ // Act: Only revoke partition 0, keep partition 1
+ consumerService.onPartitionsRevoked(List.of(partition0));
+
+ // Assert: Only partition 0 should be committed
+ verify(consumer).commitSync(offsetsCaptor.capture());
+ final Map<TopicPartition, OffsetAndMetadata> committedOffsets =
offsetsCaptor.getValue();
+
+ assertEquals(1, committedOffsets.size());
+ assertEquals(6L, committedOffsets.get(partition0).offset());
+ assertFalse(committedOffsets.containsKey(partition1));
+ }
+
+ @Test
+ void testOnPartitionsRevokedTracksMaxOffset() {
+ // Arrange: Poll multiple records from same partition
+ final TopicPartition partition0 = new TopicPartition(TOPIC,
PARTITION_0);
+
+ final ConsumerRecord<byte[], byte[]> record1 = createRecord(TOPIC,
PARTITION_0, 5L);
+ final ConsumerRecord<byte[], byte[]> record2 = createRecord(TOPIC,
PARTITION_0, 7L);
+ final ConsumerRecord<byte[], byte[]> record3 = createRecord(TOPIC,
PARTITION_0, 6L); // Out of order
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
recordsMap = new HashMap<>();
+ recordsMap.put(partition0, List.of(record1, record2, record3));
+ final ConsumerRecords<byte[], byte[]> consumerRecords =
createConsumerRecords(recordsMap);
+
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+ // Act: Poll records
+ final Iterable<ByteRecord> polledRecords =
consumerService.poll(Duration.ofMillis(100));
+ for (ByteRecord ignored : polledRecords) {
+ // Just iterate through
+ }
+
+ // Act: Revoke partition
+ consumerService.onPartitionsRevoked(List.of(partition0));
+
+ // Assert: Should commit max offset + 1 (7 + 1 = 8)
+ verify(consumer).commitSync(offsetsCaptor.capture());
+ final Map<TopicPartition, OffsetAndMetadata> committedOffsets =
offsetsCaptor.getValue();
+
+ assertEquals(1, committedOffsets.size());
+ assertEquals(8L, committedOffsets.get(partition0).offset());
+ }
+
+ @Test
+ void testRollbackClearsUncommittedOffsets() {
+ // Arrange: Poll records
+ final TopicPartition partition0 = new TopicPartition(TOPIC,
PARTITION_0);
+
+ final ConsumerRecord<byte[], byte[]> record0 = createRecord(TOPIC,
PARTITION_0, 5L);
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
recordsMap = new HashMap<>();
+ recordsMap.put(partition0, List.of(record0));
+ final ConsumerRecords<byte[], byte[]> consumerRecords =
createConsumerRecords(recordsMap);
+
+ when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
when(consumer.assignment()).thenReturn(Collections.singleton(partition0));
+
when(consumer.committed(any())).thenReturn(Collections.singletonMap(partition0,
new OffsetAndMetadata(0L)));
+
+ // Act: Poll records
+ final Iterable<ByteRecord> polledRecords =
consumerService.poll(Duration.ofMillis(100));
+ for (ByteRecord ignored : polledRecords) {
+ // Just iterate through
+ }
+
+ // Act: Rollback
Review Comment:
I generally recommend avoiding the `Arrange / Act / Assert` comments. Some
of them provide useful details, so they could be retained, but others just
repeat what the method call is named.
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
+import org.apache.nifi.kafka.service.consumer.Subscription;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Integration tests for verifying that ConsumeKafka correctly handles
consumer group rebalances
+ * without causing duplicate message processing.
+ */
+class ConsumeKafkaRebalanceIT extends AbstractConsumeKafkaIT {
+
+ /**
+ * Tests that when onPartitionsRevoked is called (simulating rebalance),
the consumer
+ * correctly commits offsets, and a subsequent consumer in the same group
doesn't
+ * re-consume the same messages (no duplicates).
+ *
+ * This test:
+ * 1. Produces messages to a multi-partition topic
+ * 2. Consumer 1 polls and processes messages
+ * 3. Simulates rebalance by calling onPartitionsRevoked on Consumer 1
+ * 4. Consumer 2 joins and continues consuming from committed offsets
+ * 5. Verifies no duplicate messages were consumed
+ */
+ @Test
+ @Timeout(value = 60, unit = TimeUnit.SECONDS)
+ void testRebalanceDoesNotCauseDuplicates() throws Exception {
+ final String topic = "rebalance-test-" + UUID.randomUUID();
+ final String groupId = "rebalance-group-" + UUID.randomUUID();
+ final int numPartitions = 3;
+ final int messagesPerPartition = 20;
+ final int totalMessages = numPartitions * messagesPerPartition;
+
+ // Create topic with multiple partitions
+ createTopic(topic, numPartitions);
+
+ // Produce messages to all partitions
+ produceMessagesToTopic(topic, numPartitions, messagesPerPartition);
+
+ // Track consumed messages to detect duplicates
+ final Set<String> consumedMessages = new HashSet<>();
+ final AtomicInteger duplicateCount = new AtomicInteger(0);
+
+ final ComponentLog mockLog = mock(ComponentLog.class);
+
+ // Consumer 1: Poll some messages, then simulate rebalance
+ final Properties props1 = getConsumerProperties(groupId);
+ try (KafkaConsumer<byte[], byte[]> kafkaConsumer1 = new
KafkaConsumer<>(props1)) {
+ final Subscription subscription = new Subscription(groupId,
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+ final Kafka3ConsumerService service1 = new
Kafka3ConsumerService(mockLog, kafkaConsumer1, subscription);
+
+ // Poll about half the messages
+ int consumer1Count = 0;
+ int maxAttempts = 20;
+ while (consumer1Count < totalMessages / 2 && maxAttempts-- > 0) {
+ for (ByteRecord record : service1.poll(Duration.ofSeconds(2)))
{
+ final String messageId = record.getTopic() + "-" +
record.getPartition() + "-" + record.getOffset();
+ if (!consumedMessages.add(messageId)) {
+ duplicateCount.incrementAndGet();
+ }
+ consumer1Count++;
+ }
+ }
+
+ // Simulate rebalance - this should commit the offsets
+ final Set<TopicPartition> assignment = kafkaConsumer1.assignment();
+ service1.onPartitionsRevoked(assignment);
+ service1.close();
+ }
+
+ // Consumer 2: Should continue from where Consumer 1 left off (no
duplicates)
+ final Properties props2 = getConsumerProperties(groupId);
+ try (KafkaConsumer<byte[], byte[]> kafkaConsumer2 = new
KafkaConsumer<>(props2)) {
+ final Subscription subscription = new Subscription(groupId,
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+ final Kafka3ConsumerService service2 = new
Kafka3ConsumerService(mockLog, kafkaConsumer2, subscription);
+
+ // Poll remaining messages
+ int emptyPolls = 0;
+ while (emptyPolls < 5 && consumedMessages.size() < totalMessages) {
+ boolean hasRecords = false;
+ for (ByteRecord record : service2.poll(Duration.ofSeconds(2)))
{
+ hasRecords = true;
+ final String messageId = record.getTopic() + "-" +
record.getPartition() + "-" + record.getOffset();
+ if (!consumedMessages.add(messageId)) {
+ duplicateCount.incrementAndGet();
+ }
+ }
+ if (!hasRecords) {
+ emptyPolls++;
+ } else {
+ emptyPolls = 0;
+ }
+ }
+
+ service2.close();
+ }
+
+ // Verify results
Review Comment:
```suggestion
```
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
+import org.apache.nifi.kafka.service.consumer.Subscription;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Integration tests for verifying that ConsumeKafka correctly handles
consumer group rebalances
+ * without causing duplicate message processing.
+ */
+class ConsumeKafkaRebalanceIT extends AbstractConsumeKafkaIT {
+
+ /**
+ * Tests that when onPartitionsRevoked is called (simulating rebalance),
the consumer
+ * correctly commits offsets, and a subsequent consumer in the same group
doesn't
+ * re-consume the same messages (no duplicates).
+ *
+ * This test:
+ * 1. Produces messages to a multi-partition topic
+ * 2. Consumer 1 polls and processes messages
+ * 3. Simulates rebalance by calling onPartitionsRevoked on Consumer 1
+ * 4. Consumer 2 joins and continues consuming from committed offsets
+ * 5. Verifies no duplicate messages were consumed
+ */
+ @Test
+ @Timeout(value = 60, unit = TimeUnit.SECONDS)
+ void testRebalanceDoesNotCauseDuplicates() throws Exception {
+ final String topic = "rebalance-test-" + UUID.randomUUID();
+ final String groupId = "rebalance-group-" + UUID.randomUUID();
Review Comment:
Recommend declaring static values for topic and partition across test
methods.
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
+import org.apache.nifi.kafka.service.consumer.Subscription;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Integration tests for verifying that ConsumeKafka correctly handles
consumer group rebalances
+ * without causing duplicate message processing.
+ */
+class ConsumeKafkaRebalanceIT extends AbstractConsumeKafkaIT {
+
+ /**
+ * Tests that when onPartitionsRevoked is called (simulating rebalance),
the consumer
+ * correctly commits offsets, and a subsequent consumer in the same group
doesn't
+ * re-consume the same messages (no duplicates).
+ *
+ * This test:
+ * 1. Produces messages to a multi-partition topic
+ * 2. Consumer 1 polls and processes messages
+ * 3. Simulates rebalance by calling onPartitionsRevoked on Consumer 1
+ * 4. Consumer 2 joins and continues consuming from committed offsets
+ * 5. Verifies no duplicate messages were consumed
+ */
+ @Test
+ @Timeout(value = 60, unit = TimeUnit.SECONDS)
+ void testRebalanceDoesNotCauseDuplicates() throws Exception {
+ final String topic = "rebalance-test-" + UUID.randomUUID();
+ final String groupId = "rebalance-group-" + UUID.randomUUID();
+ final int numPartitions = 3;
+ final int messagesPerPartition = 20;
+ final int totalMessages = numPartitions * messagesPerPartition;
+
+ // Create topic with multiple partitions
+ createTopic(topic, numPartitions);
+
+ // Produce messages to all partitions
+ produceMessagesToTopic(topic, numPartitions, messagesPerPartition);
+
+ // Track consumed messages to detect duplicates
+ final Set<String> consumedMessages = new HashSet<>();
+ final AtomicInteger duplicateCount = new AtomicInteger(0);
+
+ final ComponentLog mockLog = mock(ComponentLog.class);
+
+ // Consumer 1: Poll some messages, then simulate rebalance
+ final Properties props1 = getConsumerProperties(groupId);
+ try (KafkaConsumer<byte[], byte[]> kafkaConsumer1 = new
KafkaConsumer<>(props1)) {
+ final Subscription subscription = new Subscription(groupId,
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+ final Kafka3ConsumerService service1 = new
Kafka3ConsumerService(mockLog, kafkaConsumer1, subscription);
+
+ // Poll about half the messages
+ int consumer1Count = 0;
+ int maxAttempts = 20;
+ while (consumer1Count < totalMessages / 2 && maxAttempts-- > 0) {
+ for (ByteRecord record : service1.poll(Duration.ofSeconds(2)))
{
+ final String messageId = record.getTopic() + "-" +
record.getPartition() + "-" + record.getOffset();
+ if (!consumedMessages.add(messageId)) {
+ duplicateCount.incrementAndGet();
+ }
+ consumer1Count++;
+ }
+ }
+
+ // Simulate rebalance - this should commit the offsets
+ final Set<TopicPartition> assignment = kafkaConsumer1.assignment();
+ service1.onPartitionsRevoked(assignment);
+ service1.close();
+ }
+
+ // Consumer 2: Should continue from where Consumer 1 left off (no
duplicates)
+ final Properties props2 = getConsumerProperties(groupId);
+ try (KafkaConsumer<byte[], byte[]> kafkaConsumer2 = new
KafkaConsumer<>(props2)) {
+ final Subscription subscription = new Subscription(groupId,
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+ final Kafka3ConsumerService service2 = new
Kafka3ConsumerService(mockLog, kafkaConsumer2, subscription);
+
+ // Poll remaining messages
+ int emptyPolls = 0;
+ while (emptyPolls < 5 && consumedMessages.size() < totalMessages) {
+ boolean hasRecords = false;
+ for (ByteRecord record : service2.poll(Duration.ofSeconds(2)))
{
+ hasRecords = true;
+ final String messageId = record.getTopic() + "-" +
record.getPartition() + "-" + record.getOffset();
+ if (!consumedMessages.add(messageId)) {
+ duplicateCount.incrementAndGet();
+ }
+ }
+ if (!hasRecords) {
+ emptyPolls++;
+ } else {
+ emptyPolls = 0;
+ }
+ }
+
+ service2.close();
+ }
+
+ // Verify results
+ assertEquals(0, duplicateCount.get(),
+ "Expected no duplicate messages but found " +
duplicateCount.get());
+ assertEquals(totalMessages, consumedMessages.size(),
+ "Expected to consume " + totalMessages + " unique messages but
got " + consumedMessages.size());
+ }
+
+ /**
+ * Tests that offsets are committed during rebalance by simulating the
onPartitionsRevoked callback.
+ * This test:
+ * 1. Creates a consumer and polls messages
+ * 2. Manually invokes onPartitionsRevoked (simulating what Kafka does
during rebalance)
+ * 3. Verifies that offsets were committed to Kafka
+ */
+ @Test
+ @Timeout(value = 60, unit = TimeUnit.SECONDS)
+ void testOffsetsCommittedDuringRebalance() throws Exception {
+ final String topic = "rebalance-offset-test-" + UUID.randomUUID();
+ final String groupId = "rebalance-offset-group-" + UUID.randomUUID();
+
+ // Create topic with multiple partitions
+ createTopic(topic, 3);
+
+ // Produce some messages (10 per partition = 30 total)
+ final int messagesPerPartition = 10;
+ produceMessagesToTopic(topic, 3, messagesPerPartition);
+
+ final ComponentLog mockLog = mock(ComponentLog.class);
+
+ // Create consumer and poll messages
+ final Properties props = getConsumerProperties(groupId);
+ try (KafkaConsumer<byte[], byte[]> kafkaConsumer = new
KafkaConsumer<>(props)) {
+ final Subscription subscription = new Subscription(groupId,
Collections.singletonList(topic), AutoOffsetReset.EARLIEST);
+ final Kafka3ConsumerService service = new
Kafka3ConsumerService(mockLog, kafkaConsumer, subscription);
+
+ // Poll messages until we have some
+ int polledCount = 0;
+ int maxAttempts = 20;
+ while (polledCount < 15 && maxAttempts-- > 0) {
+ for (ByteRecord ignored : service.poll(Duration.ofSeconds(2)))
{
+ polledCount++;
+ }
+ }
+
+ assertTrue(polledCount > 0, "Should have polled at least some
messages");
+
+ // Get the current assignment before simulating rebalance
+ final Set<TopicPartition> assignment = kafkaConsumer.assignment();
+ assertFalse(assignment.isEmpty(), "Consumer should have partition
assignments");
+
+ // Simulate rebalance by calling onPartitionsRevoked
+ // This is what Kafka calls when a rebalance occurs
+ service.onPartitionsRevoked(assignment);
+
+ // Close the service
+ service.close();
+ }
+
+ // Verify that offsets were committed by checking with a new consumer
+ try (KafkaConsumer<byte[], byte[]> verifyConsumer = new
KafkaConsumer<>(getConsumerProperties(groupId))) {
+ final Set<TopicPartition> partitions = new HashSet<>();
+ for (int i = 0; i < 3; i++) {
+ partitions.add(new TopicPartition(topic, i));
+ }
+
+ final Map<TopicPartition, OffsetAndMetadata> committedOffsets =
verifyConsumer.committed(partitions);
+
+ // At least some offsets should be committed
+ long totalCommitted = committedOffsets.values().stream()
+ .filter(o -> o != null)
+ .mapToLong(OffsetAndMetadata::offset)
+ .sum();
+
+ assertTrue(totalCommitted > 0,
+ "Expected offsets to be committed during
onPartitionsRevoked, but total committed offset was " + totalCommitted);
+ }
+ }
+
+ /**
+ * Produces messages to a specific topic with a given number of partitions.
+ */
+ private void produceMessagesToTopic(final String topic, final int
numPartitions, final int messagesPerPartition) throws Exception {
+ final Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+
+ try (KafkaProducer<String, String> producer = new
KafkaProducer<>(producerProps)) {
+ for (int partition = 0; partition < numPartitions; partition++) {
+ for (int i = 0; i < messagesPerPartition; i++) {
+ final String key = "key-" + partition + "-" + i;
+ final String value = "value-" + partition + "-" + i;
+ producer.send(new ProducerRecord<>(topic, partition, key,
value)).get();
+ }
+ }
+ }
+ }
+
+ private void createTopic(final String topic, final int numPartitions)
throws Exception {
+ final Properties adminProps = new Properties();
+ adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
+
+ try (Admin admin = Admin.create(adminProps)) {
+ final NewTopic newTopic = new NewTopic(topic, numPartitions,
(short) 1);
+
admin.createTopics(Collections.singletonList(newTopic)).all().get(30,
TimeUnit.SECONDS);
+ }
+
+ // Wait for topic to be fully created
+ Thread.sleep(1000);
Review Comment:
Is there a better way to do this than sleeping?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]