This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b909544e996 MINOR: Improve consistency of acknowledge type terminology
(#20282)
b909544e996 is described below
commit b909544e996cdde0972099fb57af65cd4e83dfdc
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Aug 1 21:17:22 2025 +0100
MINOR: Improve consistency of acknowledge type terminology (#20282)
The code had a mixture of "acknowledgement type" and "acknowledge type".
The latter is preferred.
Reviewers: TengYao Chi <[email protected]>, Lan Ding
<[email protected]>
---
.../apache/kafka/clients/consumer/ShareConsumerTest.java | 10 +++++-----
.../kafka/clients/consumer/KafkaShareConsumer.java | 2 +-
.../clients/consumer/internals/Acknowledgements.java | 6 +++---
.../clients/consumer/internals/ShareCompletedFetch.java | 2 +-
.../kafka/clients/consumer/internals/ShareFetch.java | 4 ++--
.../clients/consumer/internals/AcknowledgementsTest.java | 6 +++---
.../test/java/kafka/server/share/SharePartitionTest.java | 16 ++++++++--------
.../src/test/scala/unit/kafka/server/KafkaApisTest.scala | 4 ++--
.../share/acknowledge/ShareAcknowledgementBatch.java | 2 +-
.../apache/kafka/server/share/fetch/InFlightBatch.java | 2 +-
10 files changed, 27 insertions(+), 27 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 957d72cea01..2335c223b07 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -362,7 +362,7 @@ public class ShareConsumerTest {
return partitionOffsetsMap.containsKey(tp);
}, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive call to
callback");
- // We expect no exception as the acknowledgment error code is null.
+ // We expect no exception as the acknowledgement error code is
null.
assertFalse(partitionExceptionMap.containsKey(tp));
verifyShareGroupStateTopicRecordsProduced();
}
@@ -391,7 +391,7 @@ public class ShareConsumerTest {
shareConsumer.poll(Duration.ofMillis(1000));
shareConsumer.close();
- // We expect no exception as the acknowledgment error code is null.
+ // We expect no exception as the acknowledgement error code is
null.
assertFalse(partitionExceptionMap.containsKey(tp));
verifyShareGroupStateTopicRecordsProduced();
}
@@ -1500,7 +1500,7 @@ public class ShareConsumerTest {
shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallbackWithShareConsumer<>(shareConsumer));
shareConsumer.subscribe(Set.of(tp.topic()));
- // The acknowledgment commit callback will try to call a method of
ShareConsumer
+ // The acknowledgement commit callback will try to call a method
of ShareConsumer
shareConsumer.poll(Duration.ofMillis(5000));
// The second poll sends the acknowledgements implicitly.
// The acknowledgement commit callback will be called and the
exception is thrown.
@@ -1540,14 +1540,14 @@ public class ShareConsumerTest {
producer.send(record);
producer.flush();
- // The acknowledgment commit callback will try to call a method of
ShareConsumer
+ // The acknowledgement commit callback will try to call a method
of ShareConsumer
shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallbackWakeup<>(shareConsumer));
shareConsumer.subscribe(Set.of(tp.topic()));
TestUtils.waitForCondition(() ->
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records
for share consumer");
- // The second poll sends the acknowledgments implicitly.
+ // The second poll sends the acknowledgements implicitly.
shareConsumer.poll(Duration.ofMillis(2000));
// Till now acknowledgement commit callback has not been called,
so no exception thrown yet.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
index 743c77b4228..655788e8469 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
@@ -195,7 +195,7 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* </pre>
*
* <h4>Per-record acknowledgement (explicit acknowledgement)</h4>
- * This example demonstrates using different acknowledgement types depending
on the outcome of processing the records.
+ * This example demonstrates using different acknowledge types depending on
the outcome of processing the records.
* Here the {@code share.acknowledgement.mode} property is set to "explicit"
so the consumer must explicitly acknowledge each record.
* <pre>
* Properties props = new Properties();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java
index 8d3fab23587..5bce77651b9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java
@@ -185,7 +185,7 @@ public class Acknowledgements {
currentBatch.acknowledgeTypes().add(ACKNOWLEDGE_TYPE_GAP);
}
}
- List<AcknowledgementBatch> optimalBatches =
maybeOptimiseAcknowledgementTypes(currentBatch);
+ List<AcknowledgementBatch> optimalBatches =
maybeOptimiseAcknowledgeTypes(currentBatch);
optimalBatches.forEach(batch -> {
if (canOptimiseForSingleAcknowledgeType(batch)) {
@@ -204,7 +204,7 @@ public class Acknowledgements {
*/
private AcknowledgementBatch maybeCreateNewBatch(AcknowledgementBatch
currentBatch, Long nextOffset, List<AcknowledgementBatch> batches) {
if (nextOffset != currentBatch.lastOffset() + 1) {
- List<AcknowledgementBatch> optimalBatches =
maybeOptimiseAcknowledgementTypes(currentBatch);
+ List<AcknowledgementBatch> optimalBatches =
maybeOptimiseAcknowledgeTypes(currentBatch);
optimalBatches.forEach(batch -> {
if (canOptimiseForSingleAcknowledgeType(batch)) {
@@ -228,7 +228,7 @@ public class Acknowledgements {
* whose count exceeds the default value. In this case, the batch is split
into 2 such that the
* batch with the continuous records has only 1 acknowledge type in its
array.
*/
- private List<AcknowledgementBatch>
maybeOptimiseAcknowledgementTypes(AcknowledgementBatch currentAcknowledgeBatch)
{
+ private List<AcknowledgementBatch>
maybeOptimiseAcknowledgeTypes(AcknowledgementBatch currentAcknowledgeBatch) {
List<AcknowledgementBatch> batches = new ArrayList<>();
if (currentAcknowledgeBatch == null) return batches;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
index f2664050bc8..2c337782dd4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
@@ -154,7 +154,7 @@ public class ShareCompletedFetch {
* @param maxRecords The number of records to return; the number returned
may be {@code 0 <= maxRecords}
* @param checkCrcs Whether to check the CRC of fetched records
*
- * @return {@link ShareInFlightBatch The ShareInFlightBatch containing
records and their acknowledgments}
+ * @return {@link ShareInFlightBatch The ShareInFlightBatch containing
records and their acknowledgements}
*/
<K, V> ShareInFlightBatch<K, V> fetchRecords(final Deserializers<K, V>
deserializers,
final int maxRecords,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
index d587e29f382..406110fe502 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
@@ -110,7 +110,7 @@ public class ShareFetch<K, V> {
* Acknowledge a single record in the current batch.
*
* @param record The record to acknowledge
- * @param type The acknowledgment type which indicates whether it was
processed successfully
+ * @param type The acknowledge type which indicates whether it was
processed successfully
*/
public void acknowledge(final ConsumerRecord<K, V> record, final
AcknowledgeType type) {
for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch :
batches.entrySet()) {
@@ -129,7 +129,7 @@ public class ShareFetch<K, V> {
* @param topic The topic of the record to acknowledge
* @param partition The partition of the record
* @param offset The offset of the record
- * @param type The acknowledgment type which indicates whether it was
processed successfully
+ * @param type The acknowledge type which indicates whether it was
processed successfully
*/
public void acknowledge(final String topic, final int partition, final
long offset, final AcknowledgeType type) {
for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch :
batches.entrySet()) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java
index 779df4fb43c..b6818ab51b5 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java
@@ -83,7 +83,7 @@ public class AcknowledgementsTest {
}
@Test
- public void testSingleAcknowledgementTypeExceedingLimit() {
+ public void testSingleAcknowledgeTypeExceedingLimit() {
int i = 0;
for (; i < maxRecordsWithSameAcknowledgeType; i++) {
acks.add(i, AcknowledgeType.ACCEPT);
@@ -119,7 +119,7 @@ public class AcknowledgementsTest {
}
@Test
- public void testSingleAcknowledgementTypeWithGap() {
+ public void testSingleAcknowledgeTypeWithGap() {
for (int i = 0; i < maxRecordsWithSameAcknowledgeType; i++) {
acks.add(i, null);
}
@@ -186,7 +186,7 @@ public class AcknowledgementsTest {
}
@Test
- public void testSingleAcknowledgementTypeWithinLimit() {
+ public void testSingleAcknowledgeTypeWithinLimit() {
acks.add(0L, AcknowledgeType.ACCEPT);
acks.add(1L, AcknowledgeType.ACCEPT);
acks.add(2L, AcknowledgeType.ACCEPT);
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index c4fc0a0454d..c17ce391b37 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -4462,7 +4462,7 @@ public class SharePartitionTest {
assertEquals(20, sharePartition.startOffset());
assertEquals(36, sharePartition.endOffset());
- // For cached state corresponding to entry 2, the batch state will be
ACKNOWLEDGED, hence it will be cleared as part of acknowledgment.
+ // For cached state corresponding to entry 2, the batch state will be
ACKNOWLEDGED, hence it will be cleared as part of acknowledgement.
assertEquals(6, sharePartition.cachedState().size());
assertEquals(MEMBER_ID,
sharePartition.cachedState().get(7L).batchMemberId());
@@ -4768,7 +4768,7 @@ public class SharePartitionTest {
}
@Test
- public void testLsoMovementAheadOfEndOffsetPostAcknowledgment() {
+ public void testLsoMovementAheadOfEndOffsetPostAcknowledgement() {
SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
@@ -4884,7 +4884,7 @@ public class SharePartitionTest {
}
@Test
- public void testLsoMovementPostGapsInAcknowledgments() {
+ public void testLsoMovementPostGapsInAcknowledgements() {
SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
MemoryRecords records1 = memoryRecords(2, 5);
@@ -5733,7 +5733,7 @@ public class SharePartitionTest {
}
@Test
- public void testMaybeUpdateCachedStateWhenAcknowledgementTypeAccept() {
+ public void testMaybeUpdateCachedStateWhenAcknowledgeTypeAccept() {
SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250);
@@ -5753,7 +5753,7 @@ public class SharePartitionTest {
}
@Test
- public void testMaybeUpdateCachedStateWhenAcknowledgementTypeReject() {
+ public void testMaybeUpdateCachedStateWhenAcknowledgeTypeReject() {
SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250);
@@ -5773,7 +5773,7 @@ public class SharePartitionTest {
}
@Test
- public void testMaybeUpdateCachedStateWhenAcknowledgementTypeRelease() {
+ public void testMaybeUpdateCachedStateWhenAcknowledgeTypeRelease() {
SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250);
@@ -5937,7 +5937,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(100, 80), 100);
assertFalse(sharePartition.canAcquireRecords());
- // Final Acknowledgment, all records are acknowledged here.
+ // Final Acknowledgement, all records are acknowledged here.
sharePartition.acknowledge(MEMBER_ID, List.of(
new ShareAcknowledgementBatch(50, 179, List.of((byte) 3))));
@@ -5984,7 +5984,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10);
assertTrue(sharePartition.canAcquireRecords());
- // Sending acknowledgment for the first batch from 11 to 20
+ // Sending acknowledgement for the first batch from 11 to 20
sharePartition.acknowledge(MEMBER_ID, List.of(
new ShareAcknowledgementBatch(11, 20, List.of((byte) 1))));
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d7b35de63e5..5676e454887 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -7979,12 +7979,12 @@ class KafkaApisTest extends Logging {
private def compareAcknowledgementBatches(baseOffset: Long,
endOffset: Long,
- acknowledgementType: Byte,
+ acknowledgeType: Byte,
acknowledgementBatch:
ShareAcknowledgementBatch
): Boolean = {
if (baseOffset == acknowledgementBatch.firstOffset()
&& endOffset == acknowledgementBatch.lastOffset()
- && acknowledgementType ==
acknowledgementBatch.acknowledgeTypes().get(0)) {
+ && acknowledgeType == acknowledgementBatch.acknowledgeTypes().get(0)) {
return true
}
false
diff --git
a/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
b/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
index 50aabb3903e..bc29f37c62f 100644
---
a/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
+++
b/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
@@ -21,7 +21,7 @@ import java.util.List;
/**
* The ShareAcknowledgementBatch containing the fields required to acknowledge
the fetched records.
- * The class abstracts the acknowledgment request for
<code>SharePartition</code> class constructed
+ * The class abstracts the acknowledgement request for
<code>SharePartition</code> class constructed
* from {@link
org.apache.kafka.common.message.ShareFetchRequestData.AcknowledgementBatch} and
* {@link
org.apache.kafka.common.message.ShareAcknowledgeRequestData.AcknowledgementBatch}
classes.
*/
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
index c6a16c5056e..c3e2d353328 100644
---
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
+++
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
@@ -48,7 +48,7 @@ public class InFlightBatch {
// The offset state map is used to track the state of the records per
offset. However, the
// offset state map is only required when the state of the offsets within
same batch are
- // different. The states can be different when explicit offset
acknowledgment is done which
+ // different. The states can be different when explicit offset
acknowledgement is done which
// is different from the batch state.
private NavigableMap<Long, InFlightState> offsetState;