AndrewJSchofield commented on code in PR #17573:
URL: https://github.com/apache/kafka/pull/17573#discussion_r1829634776
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -237,4 +266,20 @@ public int shareHeartbeatIntervalMs() {
public int shareRecordLockDurationMs() {
return shareRecordLockDurationMs;
}
+
+ /**
+ * The share group auto offset reset strategy.
+ */
+ public ShareGroupAutoOffsetReset shareAutoOffsetReset() {
+ return
ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT));
+ }
+
+ public enum ShareGroupAutoOffsetReset {
+ LATEST, EARLIEST, NONE;
Review Comment:
This doesn't really have the value "NONE" does it? The KIP mentions only
LATEST and EARLIEST I think.
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1542,14 +1629,135 @@ public void testLsoMovementByRecordsDeletion(String
persister) {
totalMessagesConsumed = new AtomicInteger(0);
future = new CompletableFuture<>();
- consumeMessages(totalMessagesConsumed, 0, "group1", 1, 5, true,
future);
+ consumeMessages(totalMessagesConsumed, 0, groupId, 1, 5, true, future);
assertEquals(0, totalMessagesConsumed.get());
try {
assertEquals(0, future.get());
} catch (Exception e) {
fail("Exception occurred : " + e.getMessage());
}
- adminClient.close();
+ producer.close();
+ }
+
+ @ParameterizedTest(name = "{displayName}.persister={0}")
+ @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+ public void testShareAutoOffsetResetDefaultValue(String persister) {
+ KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
+ // Producing a record.
+ producer.send(record);
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
+ // No records should be consumed because share.auto.offset.reset has a
default of "latest". Since the record
+ // was produced before share partition was initialized (which happens
after the first share fetch request
+ // in the poll method), the start offset would be the latest offset,
i.e. 1 (the next offset after the already
+ // present 0th record)
+ assertEquals(0, records.count());
+ // Producing another record.
+ producer.send(record);
+ records = shareConsumer.poll(Duration.ofMillis(5000));
+ // Now the next record should be consumed successfully
+ assertEquals(1, records.count());
+ shareConsumer.close();
+ producer.close();
+ }
+
+ @ParameterizedTest(name = "{displayName}.persister={0}")
+ @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+ public void testShareAutoOffsetResetEarliest(String persister) {
+ KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ // Changing the value of share.auto.offset.reset value to "earliest"
+ alterShareAutoOffsetReset("group1", "earliest");
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
+ // Producing a record.
+ producer.send(record);
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
+ // Since the value for share.auto.offset.reset has been altered to
"earliest", the consumer should consume
+ // all messages present on the partition
+ assertEquals(1, records.count());
+ // Producing another record.
+ producer.send(record);
+ records = shareConsumer.poll(Duration.ofMillis(5000));
+ // The next records should also be consumed successfully
+ assertEquals(1, records.count());
+ shareConsumer.close();
+ producer.close();
+ }
+
+ @ParameterizedTest(name = "{displayName}.persister={0}")
+ @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+ public void testShareAutoOffsetResetEarliestAfterLsoMovement(String
persister) throws Exception {
+ KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ // Changing the value of share.auto.offset.reset value to "earliest"
+ alterShareAutoOffsetReset("group1", "earliest");
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
+ // We write 10 records to the topic, so they would be written from
offsets 0-9 on the topic.
+ try {
+ for (int i = 0; i < 10; i++) {
+ producer.send(record).get();
+ }
+ } catch (Exception e) {
+ fail("Failed to send records: " + e);
+ }
+
+ // We delete records before offset 5, so the LSO should move to 5.
+ adminClient.deleteRecords(Collections.singletonMap(tp,
RecordsToDelete.beforeOffset(5L)));
+
+ AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
+ CompletableFuture<Integer> future = new CompletableFuture<>();
+ consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true,
future);
+ // The records returned belong to offsets 5-9.
+ assertEquals(5, totalMessagesConsumed.get());
+ assertEquals(5, future.get());
+
+ shareConsumer.close();
+ producer.close();
+ }
+
+ @ParameterizedTest(name = "{displayName}.persister={0}")
+ @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+ public void
testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String persister) {
+ KafkaShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
Review Comment:
How about `shareConsumerEarliest` and `shareConsumerLatest`? Much more
descriptive than 1 or 2.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]