This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f737ef31d9b KAFKA-18900: Implement share.acknowledgement.mode to
choose acknowledgement mode (#19417)
f737ef31d9b is described below
commit f737ef31d9bd5e8bbce11ffd7b9afd2e424cce3d
Author: Shivsundar R <[email protected]>
AuthorDate: Tue Apr 15 11:38:33 2025 -0400
KAFKA-18900: Implement share.acknowledgement.mode to choose acknowledgement
mode (#19417)
Choose the acknowledgement mode based on the config
(`share.acknowledgement.mode`) and not on the basis of how the user
designs the application.
- The default value of the config is `IMPLICIT`, so if any
empty/null/invalid value is configured, then the mode defaults to
`IMPLICIT`.
- Removed AcknowledgementModes `UNKNOWN` and `PENDING` as they are no
longer required.
- Added code to ensure if the application has any unacknowledged records
in a batch in "`explicit`" mode, then it will throw an
`IllegalStateException`. The expectation is if the mode is "explicit",
all the records received in that `poll()` would be acknowledged before
the next call to `poll()`.
- Modified the `ConsoleShareConsumer` to configure the mode to
"explicit" as it was using the explicit mode of acknowledging records.
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 116 +++++++++------------
.../kafka/clients/consumer/ConsumerConfig.java | 22 ++--
.../kafka/clients/consumer/KafkaShareConsumer.java | 84 +++++----------
.../internals/ShareAcknowledgementMode.java | 116 +++++++++++++++++++++
.../consumer/internals/ShareConsumerImpl.java | 78 ++++----------
.../internals/ShareAcknowledgementModeTest.java | 67 ++++++++++++
.../consumer/internals/ShareConsumerImplTest.java | 89 +++++++++++++++-
.../kafka/tools/consumer/ConsoleShareConsumer.java | 8 +-
8 files changed, 376 insertions(+), 204 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 8d9e4dbd610..e402a4344c1 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -135,6 +135,8 @@ public class ShareConsumerTest {
private List<TopicPartition> sgsTopicPartitions;
private static final String KEY = "content-type";
private static final String VALUE = "application/octet-stream";
+ private static final String EXPLICIT = "explicit";
+ private static final String IMPLICIT = "implicit";
public ShareConsumerTest(ClusterInstance cluster) {
this.cluster = cluster;
@@ -594,7 +596,7 @@ public class ShareConsumerTest {
public void testExplicitAcknowledgeSuccess() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
- ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -615,7 +617,7 @@ public class ShareConsumerTest {
public void testExplicitAcknowledgeCommitSuccess() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
- ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -638,7 +640,7 @@ public class ShareConsumerTest {
public void testExplicitAcknowledgementCommitAsync() throws
InterruptedException {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
- ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer("group1");
+ ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer("group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
ShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer("group1")) {
ProducerRecord<byte[], byte[]> record1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
@@ -663,15 +665,16 @@ public class ShareConsumerTest {
// Acknowledging 2 out of the 3 records received via commitAsync.
ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+ ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next();
assertEquals(0L, firstRecord.offset());
assertEquals(1L, secondRecord.offset());
shareConsumer1.acknowledge(firstRecord);
shareConsumer1.acknowledge(secondRecord);
+ shareConsumer1.acknowledge(thirdRecord, AcknowledgeType.RELEASE);
shareConsumer1.commitAsync();
- // The 3rd record should be reassigned to 2nd consumer when it
polls, kept higher wait time
- // as time out for locks is 15 secs.
+ // The 3rd record should be reassigned to 2nd consumer when it
polls.
TestUtils.waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> records2 =
shareConsumer2.poll(Duration.ofMillis(1000));
return records2.count() == 1 &&
records2.iterator().next().offset() == 2L;
@@ -690,51 +693,16 @@ public class ShareConsumerTest {
}
}
- @ClusterTest
- public void testImplicitModeNotTriggeredByPollWhenNoAcksToSend() throws
InterruptedException {
- alterShareAutoOffsetReset("group1", "earliest");
- try (Producer<byte[], byte[]> producer = createProducer();
- ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
-
- shareConsumer.subscribe(Set.of(tp.topic()));
-
- Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new
HashMap<>();
- shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap1, Map.of()));
-
- // The acknowledgement mode moves to PENDING from UNKNOWN.
- ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
- assertEquals(0, records.count());
- shareConsumer.commitAsync();
-
- ProducerRecord<byte[], byte[]> record1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
- producer.send(record1);
- producer.flush();
-
- // The acknowledgement mode remains in PENDING because no records
were returned.
- records = shareConsumer.poll(Duration.ofMillis(5000));
- assertEquals(1, records.count());
-
- // The acknowledgement mode now moves to EXPLICIT.
- shareConsumer.acknowledge(records.iterator().next());
- shareConsumer.commitAsync();
-
- TestUtils.waitForCondition(() -> {
- shareConsumer.poll(Duration.ofMillis(500));
- return partitionOffsetsMap1.containsKey(tp);
- }, 30000, 100L, () -> "Didn't receive call to callback");
- verifyShareGroupStateTopicRecordsProduced();
- }
- }
-
@ClusterTest
public void testExplicitAcknowledgementCommitAsyncPartialBatch() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
- ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer("group1")) {
+ ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer("group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
ProducerRecord<byte[], byte[]> record2 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
ProducerRecord<byte[], byte[]> record3 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ ProducerRecord<byte[], byte[]> record4 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record1);
producer.send(record2);
producer.send(record3);
@@ -753,6 +721,7 @@ public class ShareConsumerTest {
// Acknowledging 2 out of the 3 records received via commitAsync.
ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+ ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next();
assertEquals(0L, firstRecord.offset());
assertEquals(1L, secondRecord.offset());
@@ -760,21 +729,25 @@ public class ShareConsumerTest {
shareConsumer1.acknowledge(secondRecord);
shareConsumer1.commitAsync();
- // The 3rd record should be re-presented to the consumer when it
polls again.
- records = shareConsumer1.poll(Duration.ofMillis(5000));
- assertEquals(1, records.count());
- iterator = records.iterator();
- firstRecord = iterator.next();
- assertEquals(2L, firstRecord.offset());
+ producer.send(record4);
+ producer.flush();
+
+ // The next poll() should throw an IllegalStateException as there
is still 1 unacknowledged record.
+ // In EXPLICIT acknowledgement mode, we are not allowed to have
unacknowledged records from a batch.
+ assertThrows(IllegalStateException.class, () ->
shareConsumer1.poll(Duration.ofMillis(5000)));
+
+ // Acknowledging the 3rd record
+ shareConsumer1.acknowledge(thirdRecord);
+ shareConsumer1.commitAsync();
- // And poll again without acknowledging - the callback will
receive the acknowledgement responses too
+ // The next poll() will not throw an exception, it would continue
to fetch more records.
records = shareConsumer1.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
iterator = records.iterator();
- firstRecord = iterator.next();
- assertEquals(2L, firstRecord.offset());
+ ConsumerRecord<byte[], byte[]> fourthRecord = iterator.next();
+ assertEquals(3L, fourthRecord.offset());
- shareConsumer1.acknowledge(firstRecord);
+ shareConsumer1.acknowledge(fourthRecord);
// The callback will receive the acknowledgement responses after
polling. The callback is
// called on entry to the poll method or during close. The commit
is being performed asynchronously, so
@@ -784,6 +757,7 @@ public class ShareConsumerTest {
shareConsumer1.close();
assertFalse(partitionExceptionMap.containsKey(tp));
+ assertTrue(partitionOffsetsMap.containsKey(tp) &&
partitionOffsetsMap.get(tp).size() == 4);
verifyShareGroupStateTopicRecordsProduced();
}
}
@@ -792,7 +766,7 @@ public class ShareConsumerTest {
public void testExplicitAcknowledgeReleasePollAccept() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
- ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -815,7 +789,7 @@ public class ShareConsumerTest {
public void testExplicitAcknowledgeReleaseAccept() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
- ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -835,7 +809,7 @@ public class ShareConsumerTest {
public void testExplicitAcknowledgeReleaseClose() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
- ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -853,7 +827,7 @@ public class ShareConsumerTest {
public void testExplicitAcknowledgeThrowsNotInBatch() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
- ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -970,7 +944,7 @@ public class ShareConsumerTest {
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
-
Map.of(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit")))
{
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
"explicit"))) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -995,7 +969,7 @@ public class ShareConsumerTest {
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
-
Map.of(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "implicit")))
{
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
IMPLICIT))) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
@@ -2069,7 +2043,7 @@ public class ShareConsumerTest {
cluster.bootstrapServers(),
topicName,
groupId,
- Map.of()
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT)
);
service.schedule(
@@ -2145,7 +2119,8 @@ public class ShareConsumerTest {
alterShareAutoOffsetReset("group1", "earliest");
alterShareIsolationLevel("group1", "read_uncommitted");
try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
- ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1", Map.of(
+ ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT)))
{
shareConsumer.subscribe(Set.of(tp.topic()));
transactionalProducer.initTransactions();
try {
@@ -2231,7 +2206,8 @@ public class ShareConsumerTest {
alterShareAutoOffsetReset("group1", "earliest");
alterShareIsolationLevel("group1", "read_committed");
try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
- ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1", Map.of(
+ ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT)))
{
shareConsumer.subscribe(Set.of(tp.topic()));
transactionalProducer.initTransactions();
@@ -2282,7 +2258,11 @@ public class ShareConsumerTest {
// Wait for the aborted marker offset for Message 4 (7L) to be
fetched and acknowledged by the consumer.
TestUtils.waitForCondition(() -> {
- shareConsumer.poll(Duration.ofMillis(500));
+ ConsumerRecords<byte[], byte[]> pollRecords =
shareConsumer.poll(Duration.ofMillis(500));
+ if (pollRecords.count() > 0) {
+ // We will release Message 3 again if it was received
in this poll().
+ pollRecords.forEach(consumerRecord ->
shareConsumer.acknowledge(consumerRecord, AcknowledgeType.RELEASE));
+ }
return partitionOffsetsMap2.containsKey(tp) &&
partitionOffsetsMap2.get(tp).contains(7L);
}, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume abort
transaction marker offset for Message 4");
@@ -2299,7 +2279,7 @@ public class ShareConsumerTest {
produceCommittedTransaction(transactionalProducer, "Message
8");
// Since isolation level is READ_UNCOMMITTED, we can consume
Message 3 (committed transaction that was released), Message 5, Message 6,
Message 7 and Message 8.
- List<String> finalMessages = new ArrayList<>();
+ Set<String> finalMessages = new HashSet<>();
TestUtils.waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> pollRecords =
shareConsumer.poll(Duration.ofMillis(5000));
if (pollRecords.count() > 0) {
@@ -2311,11 +2291,8 @@ public class ShareConsumerTest {
return finalMessages.size() == 5;
}, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all
records post altering share isolation level");
- assertEquals("Message 3", finalMessages.get(0));
- assertEquals("Message 5", finalMessages.get(1));
- assertEquals("Message 6", finalMessages.get(2));
- assertEquals("Message 7", finalMessages.get(3));
- assertEquals("Message 8", finalMessages.get(4));
+ Set<String> expected = Set.of("Message 3", "Message 5",
"Message 6", "Message 7", "Message 8");
+ assertEquals(expected, finalMessages);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
@@ -2330,7 +2307,8 @@ public class ShareConsumerTest {
alterShareAutoOffsetReset("group1", "earliest");
alterShareIsolationLevel("group1", "read_committed");
try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
- ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1", Map.of(
+ ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT)))
{
shareConsumer.subscribe(Set.of(tp.topic()));
transactionalProducer.initTransactions();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index e5917d9d593..68fe89a4147 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
@@ -381,13 +382,10 @@ public class ConsumerConfig extends AbstractConfig {
private static final String SECURITY_PROVIDERS_DOC =
SecurityConfig.SECURITY_PROVIDERS_DOC;
/**
- * <code>share.acknowledgement.mode</code> is being evaluated as a new
configuration to control the acknowledgement mode
- * for share consumers. It will be removed or converted to a proper
configuration before release.
- * An alternative being considered is
<code>enable.explicit.share.acknowledgement</code> as a boolean configuration.
+ * <code>share.acknowledgement.mode</code>
*/
- public static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG =
"internal.share.acknowledgement.mode";
- private static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC =
"Controls the acknowledgement mode for a share consumer." +
- " If unset, the acknowledgement mode of the consumer is decided by
the method calls it uses to fetch and commit." +
+ public static final String SHARE_ACKNOWLEDGEMENT_MODE_CONFIG =
"share.acknowledgement.mode";
+ private static final String SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the
acknowledgement mode for a share consumer." +
" If set to <code>implicit</code>, the acknowledgement mode of the
consumer is implicit and it must not" +
" use
<code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to
acknowledge delivery of records. Instead," +
" delivery is acknowledged implicitly on the next call to poll or
commit." +
@@ -401,7 +399,7 @@ public class ConsumerConfig extends AbstractConfig {
*/
private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS =
List.of(
GROUP_REMOTE_ASSIGNOR_CONFIG,
- INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
+ SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
);
/**
@@ -411,7 +409,7 @@ public class ConsumerConfig extends AbstractConfig {
PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
HEARTBEAT_INTERVAL_MS_CONFIG,
SESSION_TIMEOUT_MS_CONFIG,
- INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
+ SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
);
static {
@@ -695,12 +693,12 @@ public class ConsumerConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
-
.define(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
+
.define(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
Type.STRING,
- null,
- in(null, "implicit", "explicit"),
+
ShareAcknowledgementMode.IMPLICIT.name(),
+ new
ShareAcknowledgementMode.Validator(),
Importance.MEDIUM,
-
ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC);
+
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_DOC);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
index d694984db57..b5a862f239d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
@@ -116,33 +116,33 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* {@code group.share.record.lock.partition.limit}. By limiting the duration
of the acquisition lock and automatically
* releasing the locks, the broker ensures delivery progresses even in the
presence of consumer failures.
* <p>
- * The consumer can choose to use implicit or explicit acknowledgement of the
records it processes.
- * <p>If the application calls {@link #acknowledge(ConsumerRecord,
AcknowledgeType)} for any record in the batch,
- * it is using <em>explicit acknowledgement</em>. In this case:
- * <ul>
- * <li>The application calls {@link #commitSync()} or {@link
#commitAsync()} which commits the acknowledgements to Kafka.
- * If any records in the batch were not acknowledged, they remain acquired
and will be presented to the application
- * in response to a future poll.</li>
- * <li>The application calls {@link #poll(Duration)} without committing
first, which commits the acknowledgements to
- * Kafka asynchronously. In this case, no exception is thrown by a failure
to commit the acknowledgement.
- * If any records in the batch were not acknowledged, they remain acquired
and will be presented to the application
- * in response to a future poll.</li>
- * <li>The application calls {@link #close()} which attempts to commit any
pending acknowledgements and
- * releases any remaining acquired records.</li>
- * </ul>
- * If the application does not call {@link #acknowledge(ConsumerRecord,
AcknowledgeType)} for any record in the batch,
- * it is using <em>implicit acknowledgement</em>. In this case:
+ * The consumer can choose to use implicit or explicit acknowledgement of the
records it processes by configuring the
+ * consumer {@code share.acknowledgement.mode} property.
+ * <p>
+ * If the application sets the property to "implicit" or does not set it at
all, then the consumer is using
+ * <em>implicit acknowledgement</em>. In this mode, the application
acknowledges delivery by:
* <ul>
- * <li>The application calls {@link #commitSync()} or {@link
#commitAsync()} which implicitly acknowledges all of
- * the delivered records as processed successfully and commits the
acknowledgements to Kafka.</li>
- * <li>The application calls {@link #poll(Duration)} without committing,
which also implicitly acknowledges all of
+ * <li>Calling {@link #poll(Duration)} without committing, which also
implicitly acknowledges all
* the delivered records and commits the acknowledgements to Kafka
asynchronously. In this case, no exception is
* thrown by a failure to commit the acknowledgements.</li>
- * <li>The application calls {@link #close()} which releases any acquired
records without acknowledgement.</li>
+ * <li>Calling {@link #commitSync()} or {@link #commitAsync()} which
implicitly acknowledges all
+ * the delivered records as processed successfully and commits the
acknowledgements to Kafka.</li>
+ * <li>Calling {@link #close()} which releases any acquired records
without acknowledgement.</li>
+ * </ul>
+ * If the application sets the property to "explicit", then the consumer is
using <em>explicit acknowledgment</em>.
+ * The application must acknowledge all records returned from {@link
#poll(Duration)} using
+ * {@link #acknowledge(ConsumerRecord, AcknowledgeType)} before its next call
to {@link #poll(Duration)}.
+ * If the application calls {@link #poll(Duration)} without having
acknowledged all records, an
+ * {@link IllegalStateException} is thrown. The remaining unacknowledged
records can still be acknowledged.
+ * In this mode, the application acknowledges delivery by:
+ * <ul>
+ * <li>Calling {@link #poll(Duration)} after it has acknowledged all
records, which commits the acknowledgements
+ * to Kafka asynchronously. In this case, no exception is thrown by a
failure to commit the acknowledgements.</li>
+ * <li>Calling {@link #commitSync()} or {@link #commitAsync()} which
commits any pending
+ * acknowledgements to Kafka.</li>
+ * <li>Calling {@link #close()} which attempts to commit any pending
acknowledgements and releases
+ * any remaining acquired records.</li>
* </ul>
- * <p>The consumer can optionally use the {@code
internal.share.acknowledgement.mode} configuration property to choose
- * between implicit and explicit acknowledgement, specifying
<code>"implicit"</code> or <code>"explicit"</code> as required.
- * <p>
* The consumer guarantees that the records returned in the {@code
ConsumerRecords} object for a specific topic-partition
* are in order of increasing offset. For each topic-partition, Kafka
guarantees that acknowledgements for the records
* in a batch are performed atomically. This makes error handling
significantly more straightforward because there can be
@@ -195,12 +195,14 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
*
* <h4>Per-record acknowledgement (explicit acknowledgement)</h4>
* This example demonstrates using different acknowledgement types depending
on the outcome of processing the records.
+ * Here the {@code share.acknowledgement.mode} property is set to "explicit"
so the consumer must explicitly acknowledge each record.
* <pre>
* Properties props = new Properties();
* props.setProperty("bootstrap.servers",
"localhost:9092");
* props.setProperty("group.id", "test");
* props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
* props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ * props.setProperty("share.acknowledgement.mode",
"explicit");
* KafkaShareConsumer<String, String> consumer = new
KafkaShareConsumer<>(props);
* consumer.subscribe(Arrays.asList("foo"));
* while (true) {
@@ -227,42 +229,6 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* It is only once {@link #commitSync()} is called that the acknowledgements
are committed by sending the new state
* information to Kafka.
*
- * <h4>Per-record acknowledgement, ending processing of the batch on an error
(explicit acknowledgement)</h4>
- * This example demonstrates ending processing of a batch of records on the
first error.
- * <pre>
- * Properties props = new Properties();
- * props.setProperty("bootstrap.servers",
"localhost:9092");
- * props.setProperty("group.id", "test");
- * props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
- * props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
- * KafkaShareConsumer<String, String> consumer = new
KafkaShareConsumer<>(props);
- * consumer.subscribe(Arrays.asList("foo"));
- * while (true) {
- * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
- * for (ConsumerRecord<String, String> record : records) {
- * try {
- * doProcessing(record);
- * consumer.acknowledge(record, AcknowledgeType.ACCEPT);
- * } catch (Exception e) {
- * consumer.acknowledge(record, AcknowledgeType.REJECT);
- * break;
- * }
- * }
- * consumer.commitSync();
- * }
- * </pre>
- * There are the following cases in this example:
- * <ol>
- * <li>The batch contains no records, in which case the application just
polls again. The call to {@link #commitSync()}
- * just does nothing because the batch was empty.</li>
- * <li>All of the records in the batch are processed successfully. The
calls to {@link #acknowledge(ConsumerRecord, AcknowledgeType)}
- * specifying {@code AcknowledgeType.ACCEPT} mark all records in the batch
as successfully processed.</li>
- * <li>One of the records encounters an exception. The call to {@link
#acknowledge(ConsumerRecord, AcknowledgeType)} specifying
- * {@code AcknowledgeType.REJECT} rejects that record. Earlier records in
the batch have already been marked as successfully
- * processed. The call to {@link #commitSync()} commits the
acknowledgements, but the records after the failed record
- * remain acquired as part of the same delivery attempt and will be
presented to the application in response to another poll.</li>
- * </ol>
- *
* <h3>Reading Transactional Records</h3>
* The way that share groups handle transactional records is controlled by the
{@code group.share.isolation.level}</code>
* configuration property. In a share group, the isolation level applies to
the entire share group, not just individual
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementMode.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementMode.java
new file mode 100644
index 00000000000..0f8effa30a1
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementMode.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class ShareAcknowledgementMode {
+ public enum AcknowledgementMode {
+ IMPLICIT, EXPLICIT;
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase(Locale.ROOT);
+ }
+ }
+
+ private final AcknowledgementMode acknowledgementMode;
+
+ public static final ShareAcknowledgementMode IMPLICIT = new
ShareAcknowledgementMode(AcknowledgementMode.IMPLICIT);
+ public static final ShareAcknowledgementMode EXPLICIT = new
ShareAcknowledgementMode(AcknowledgementMode.EXPLICIT);
+
+ private ShareAcknowledgementMode(AcknowledgementMode acknowledgementMode) {
+ this.acknowledgementMode = acknowledgementMode;
+ }
+
+ /**
+ * Returns the ShareAcknowledgementMode from the given string.
+ */
+ public static ShareAcknowledgementMode fromString(String
acknowledgementMode) {
+ if (acknowledgementMode == null) {
+ throw new IllegalArgumentException("Acknowledgement mode is null");
+ }
+
+ if
(Arrays.asList(Utils.enumOptions(AcknowledgementMode.class)).contains(acknowledgementMode))
{
+ AcknowledgementMode mode =
AcknowledgementMode.valueOf(acknowledgementMode.toUpperCase(Locale.ROOT));
+ switch (mode) {
+ case IMPLICIT:
+ return IMPLICIT;
+ case EXPLICIT:
+ return EXPLICIT;
+ default:
+ throw new IllegalArgumentException("Invalid
acknowledgement mode: " + acknowledgementMode);
+ }
+ } else {
+ throw new IllegalArgumentException("Invalid acknowledgement mode:
" + acknowledgementMode);
+ }
+ }
+
+ /**
+ * Returns the name of the acknowledgement mode.
+ */
+ public String name() {
+ return acknowledgementMode.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ShareAcknowledgementMode that = (ShareAcknowledgementMode) o;
+ return acknowledgementMode == that.acknowledgementMode;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(acknowledgementMode);
+ }
+
+ @Override
+ public String toString() {
+ return "ShareAcknowledgementMode{" +
+ "mode=" + acknowledgementMode +
+ '}';
+ }
+
+ public static class Validator implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(String name, Object value) {
+ String acknowledgementMode = (String) value;
+ try {
+ fromString(acknowledgementMode);
+ } catch (Exception e) {
+ throw new ConfigException(name, value, "Invalid value `" +
acknowledgementMode + "` for configuration " +
+ name + ". The value must either be 'implicit' or
'explicit'.");
+ }
+ }
+
+ @Override
+ public String toString() {
+ String values =
Arrays.stream(ShareAcknowledgementMode.AcknowledgementMode.values())
+
.map(ShareAcknowledgementMode.AcknowledgementMode::toString).collect(Collectors.joining(",
"));
+ return "[" + values + "]";
+ }
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index c77443e39a1..bb5193f8dc6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -176,20 +176,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
private ShareFetch<K, V> currentFetch;
private AcknowledgementCommitCallbackHandler
acknowledgementCommitCallbackHandler;
private final List<Map<TopicIdPartition, Acknowledgements>>
completedAcknowledgements;
-
- private enum AcknowledgementMode {
- /** Acknowledgement mode is not yet known */
- UNKNOWN,
- /** Acknowledgement mode is pending, meaning that {@link
#poll(Duration)} has been called once and
- * {@link #acknowledge(ConsumerRecord, AcknowledgeType)} has not been
called */
- PENDING,
- /** Acknowledgements are explicit, using {@link
#acknowledge(ConsumerRecord, AcknowledgeType)} */
- EXPLICIT,
- /** Acknowledgements are implicit, not using {@link
#acknowledge(ConsumerRecord, AcknowledgeType)} */
- IMPLICIT
- }
-
- private AcknowledgementMode acknowledgementMode;
+ private final ShareAcknowledgementMode acknowledgementMode;
/**
* A thread-safe {@link ShareFetchBuffer fetch buffer} for the results
that are populated in the
@@ -457,7 +444,8 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
final ConsumerMetadata metadata,
final int requestTimeoutMs,
final int defaultApiTimeoutMs,
- final String groupId) {
+ final String groupId,
+ final String acknowledgementModeConfig) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = clientId;
@@ -472,7 +460,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.metadata = metadata;
this.requestTimeoutMs = requestTimeoutMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
- this.acknowledgementMode = initializeAcknowledgementMode(null, log);
+ this.acknowledgementMode =
ShareAcknowledgementMode.fromString(acknowledgementModeConfig);
this.deserializers = new Deserializers<>(keyDeserializer,
valueDeserializer, metrics);
this.currentFetch = ShareFetch.empty();
this.applicationEventHandler = applicationEventHandler;
@@ -582,7 +570,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
handleCompletedAcknowledgements();
// If using implicit acknowledgement, acknowledge the previously
fetched records
- acknowledgeBatchIfImplicitAcknowledgement(true);
+ acknowledgeBatchIfImplicitAcknowledgement();
kafkaShareConsumerMetrics.recordPollStart(timer.currentTimeMs());
@@ -674,6 +662,10 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
// Notify the network thread to wake up and start the next
round of fetching
applicationEventHandler.wakeupNetworkThread();
}
+ if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) {
+ // We cannot leave unacknowledged records in EXPLICIT
acknowledgement mode, so we throw an exception to the application.
+ throw new IllegalStateException("All records must be
acknowledged in explicit acknowledgement mode.");
+ }
return currentFetch;
}
}
@@ -719,7 +711,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
handleCompletedAcknowledgements();
// If using implicit acknowledgement, acknowledge the previously
fetched records
- acknowledgeBatchIfImplicitAcknowledgement(false);
+ acknowledgeBatchIfImplicitAcknowledgement();
Timer requestTimer = time.timer(timeout.toMillis());
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap =
acknowledgementsToSend();
@@ -763,7 +755,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
handleCompletedAcknowledgements();
// If using implicit acknowledgement, acknowledge the previously
fetched records
- acknowledgeBatchIfImplicitAcknowledgement(false);
+ acknowledgeBatchIfImplicitAcknowledgement();
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap =
acknowledgementsToSend();
if (!acknowledgementsMap.isEmpty()) {
@@ -1040,29 +1032,11 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
}
/**
- * Called to progressively move the acknowledgement mode into IMPLICIT if
it is not known to be EXPLICIT.
* If the acknowledgement mode is IMPLICIT, acknowledges all records in
the current batch.
- *
- * @param calledOnPoll If true, called on poll. Otherwise, called on
commit.
*/
- private void acknowledgeBatchIfImplicitAcknowledgement(boolean
calledOnPoll) {
- if (calledOnPoll) {
- if (acknowledgementMode == AcknowledgementMode.UNKNOWN) {
- // The first call to poll(Duration) moves into PENDING
- acknowledgementMode = AcknowledgementMode.PENDING;
- } else if (acknowledgementMode == AcknowledgementMode.PENDING &&
!currentFetch.isEmpty()) {
- // If there are records to acknowledge and PENDING, moves into
IMPLICIT
- acknowledgementMode = AcknowledgementMode.IMPLICIT;
- }
- } else {
- // If there are records to acknowledge and PENDING, moves into
IMPLICIT
- if (acknowledgementMode == AcknowledgementMode.PENDING &&
!currentFetch.isEmpty()) {
- acknowledgementMode = AcknowledgementMode.IMPLICIT;
- }
- }
-
+ private void acknowledgeBatchIfImplicitAcknowledgement() {
// If IMPLICIT, acknowledge all records
- if (acknowledgementMode == AcknowledgementMode.IMPLICIT) {
+ if (acknowledgementMode == ShareAcknowledgementMode.IMPLICIT) {
currentFetch.acknowledgeAll(AcknowledgeType.ACCEPT);
}
}
@@ -1075,36 +1049,20 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
}
/**
- * Called to move the acknowledgement mode into EXPLICIT, if it is not
known to be IMPLICIT.
+ * Called to verify if the acknowledgement mode is EXPLICIT, else throws
an exception.
*/
private void ensureExplicitAcknowledgement() {
- if (acknowledgementMode == AcknowledgementMode.PENDING) {
- // If poll(Duration) has been called once, moves into EXPLICIT
- acknowledgementMode = AcknowledgementMode.EXPLICIT;
- } else if (acknowledgementMode == AcknowledgementMode.IMPLICIT) {
+ if (acknowledgementMode == ShareAcknowledgementMode.IMPLICIT) {
throw new IllegalStateException("Implicit acknowledgement of
delivery is being used.");
- } else if (acknowledgementMode == AcknowledgementMode.UNKNOWN) {
- throw new IllegalStateException("Acknowledge called before poll.");
}
}
/**
* Initializes the acknowledgement mode based on the configuration.
*/
- private static AcknowledgementMode
initializeAcknowledgementMode(ConsumerConfig config, Logger log) {
- if (config == null) {
- return AcknowledgementMode.UNKNOWN;
- }
- String acknowledgementModeStr =
config.getString(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
- if ((acknowledgementModeStr == null) ||
acknowledgementModeStr.isEmpty()) {
- return AcknowledgementMode.UNKNOWN;
- } else if (acknowledgementModeStr.equalsIgnoreCase("implicit")) {
- return AcknowledgementMode.IMPLICIT;
- } else if (acknowledgementModeStr.equalsIgnoreCase("explicit")) {
- return AcknowledgementMode.EXPLICIT;
- }
- log.warn("Invalid value for config {}: \"{}\"",
ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
acknowledgementModeStr);
- return AcknowledgementMode.UNKNOWN;
+ private static ShareAcknowledgementMode
initializeAcknowledgementMode(ConsumerConfig config, Logger log) {
+ String s =
config.getString(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
+ return ShareAcknowledgementMode.fromString(s);
}
/**
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementModeTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementModeTest.java
new file mode 100644
index 00000000000..1ae0df0634d
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementModeTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class ShareAcknowledgementModeTest {
+
+ @Test
+ public void testFromString() {
+ assertEquals(ShareAcknowledgementMode.IMPLICIT,
ShareAcknowledgementMode.fromString("implicit"));
+ assertEquals(ShareAcknowledgementMode.EXPLICIT,
ShareAcknowledgementMode.fromString("explicit"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareAcknowledgementMode.fromString("invalid"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareAcknowledgementMode.fromString("IMPLICIT"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareAcknowledgementMode.fromString("EXPLICIT"));
+ assertThrows(IllegalArgumentException.class, () ->
ShareAcknowledgementMode.fromString(""));
+ assertThrows(IllegalArgumentException.class, () ->
ShareAcknowledgementMode.fromString(null));
+ }
+
+ @Test
+ public void testValidator() {
+ ShareAcknowledgementMode.Validator validator = new
ShareAcknowledgementMode.Validator();
+ assertDoesNotThrow(() -> validator.ensureValid("test", "implicit"));
+ assertDoesNotThrow(() -> validator.ensureValid("test", "explicit"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "invalid"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "IMPLICIT"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", "EXPLICIT"));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", ""));
+ assertThrows(ConfigException.class, () ->
validator.ensureValid("test", null));
+ }
+
+ @Test
+ public void testEqualsAndHashCode() {
+ ShareAcknowledgementMode mode1 = ShareAcknowledgementMode.IMPLICIT;
+ ShareAcknowledgementMode mode2 = ShareAcknowledgementMode.IMPLICIT;
+ ShareAcknowledgementMode mode3 = ShareAcknowledgementMode.EXPLICIT;
+
+ assertEquals(mode1, mode2);
+ assertNotEquals(mode1, mode3);
+ assertNotEquals(mode2, mode3);
+
+ assertEquals(mode1.hashCode(), mode2.hashCode());
+ assertNotEquals(mode1.hashCode(), mode3.hashCode());
+ }
+
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 342540c5885..64c729730fb 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
@@ -56,6 +57,7 @@ import org.mockito.Mockito;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -145,14 +147,16 @@ public class ShareConsumerImplTest {
mock(ShareFetchBuffer.class),
subscriptions,
"group-id",
- "client-id");
+ "client-id",
+ "implicit");
}
private ShareConsumerImpl<String, String> newConsumer(
ShareFetchBuffer fetchBuffer,
SubscriptionState subscriptions,
String groupId,
- String clientId
+ String clientId,
+ String acknowledgementMode
) {
final int defaultApiTimeoutMs = 1000;
final int requestTimeoutMs = 30000;
@@ -173,7 +177,8 @@ public class ShareConsumerImplTest {
metadata,
requestTimeoutMs,
defaultApiTimeoutMs,
- groupId
+ groupId,
+ acknowledgementMode
);
}
@@ -345,6 +350,84 @@ public class ShareConsumerImplTest {
assertDoesNotThrow(() -> consumer.close());
}
+ @Test
+ public void testExplicitModeUnacknowledgedRecords() {
+ // Setup consumer with explicit acknowledgement mode
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(
+ mock(ShareFetchBuffer.class),
+ subscriptions,
+ "group-id",
+ "client-id",
+ "explicit");
+
+ // Setup test data
+ String topic = "test-topic";
+ int partition = 0;
+ TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(),
partition, topic);
+ ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0,
tip);
+ batch.addRecord(new ConsumerRecord<>(topic, partition, 0, "key1",
"value1"));
+ batch.addRecord(new ConsumerRecord<>(topic, partition, 1, "key2",
"value2"));
+
+ // Setup first fetch to return records
+ ShareFetch<String, String> firstFetch = ShareFetch.empty();
+ firstFetch.add(tip, batch);
+ doReturn(firstFetch)
+ .doReturn(ShareFetch.empty())
+ .when(fetchCollector)
+ .collect(any(ShareFetchBuffer.class));
+
+ // Setup subscription
+ List<String> topics = Collections.singletonList(topic);
+
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
topics);
+ consumer.subscribe(topics);
+
+ // First poll should succeed and return records
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
+ assertEquals(2, records.count(), "Should have received 2 records");
+
+ // Second poll should fail because records weren't acknowledged
+ IllegalStateException exception = assertThrows(
+ IllegalStateException.class,
+ () -> consumer.poll(Duration.ofMillis(100))
+ );
+ assertTrue(
+ exception.getMessage().contains("All records must be acknowledged
in explicit acknowledgement mode."),
+ "Unexpected error message: " + exception.getMessage()
+ );
+
+ // Verify that acknowledging one record but not all still throws
exception
+ Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
+ consumer.acknowledge(iterator.next());
+ exception = assertThrows(
+ IllegalStateException.class,
+ () -> consumer.poll(Duration.ofMillis(100))
+ );
+ assertTrue(
+ exception.getMessage().contains("All records must be acknowledged
in explicit acknowledgement mode."),
+ "Unexpected error message: " + exception.getMessage()
+ );
+
+ // Verify that after acknowledging all records, poll succeeds
+ consumer.acknowledge(iterator.next());
+
+ // Setup second fetch to return new records
+ ShareFetch<String, String> secondFetch = ShareFetch.empty();
+ ShareInFlightBatch<String, String> newBatch = new
ShareInFlightBatch<>(2, tip);
+ newBatch.addRecord(new ConsumerRecord<>(topic, partition, 2, "key3",
"value3"));
+ newBatch.addRecord(new ConsumerRecord<>(topic, partition, 3, "key4",
"value4"));
+ secondFetch.add(tip, newBatch);
+
+ // Reset mock to return new records
+ doReturn(secondFetch)
+ .when(fetchCollector)
+ .collect(any(ShareFetchBuffer.class));
+
+ // Verify that poll succeeds and returns new records
+ ConsumerRecords<String, String> newRecords =
consumer.poll(Duration.ofMillis(100));
+ assertEquals(2, newRecords.count(), "Should have received 2 new
records");
+ }
+
@Test
public void testCloseWithTopicAuthorizationException() {
SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
index 699397f9a70..43ba5185dc0 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
@@ -17,6 +17,7 @@
package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.AcknowledgeType;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ShareConsumer;
@@ -36,6 +37,7 @@ import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
+import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@@ -66,7 +68,11 @@ public class ConsoleShareConsumer {
messageCount = 0;
long timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() :
Long.MAX_VALUE;
- ShareConsumer<byte[], byte[]> consumer = new
KafkaShareConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new
ByteArrayDeserializer());
+ Properties consumerProps = opts.consumerProps();
+ // Set share acknowledgement mode to explicit.
+ consumerProps.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
"explicit");
+
+ ShareConsumer<byte[], byte[]> consumer = new
KafkaShareConsumer<>(consumerProps, new ByteArrayDeserializer(), new
ByteArrayDeserializer());
ConsumerWrapper consumerWrapper = new ConsumerWrapper(opts.topicArg(),
consumer, timeoutMs);
addShutdownHook(consumerWrapper);