This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c5ae7b41281 KAFKA-18794: Add check to send one ShareFetchEvent per
poll in ShareConsumer. (#20794)
c5ae7b41281 is described below
commit c5ae7b4128148c97845843db7ea818c8e798eef9
Author: Shivsundar R <[email protected]>
AuthorDate: Mon Nov 3 10:11:38 2025 -0500
KAFKA-18794: Add check to send one ShareFetchEvent per poll in
ShareConsumer. (#20794)
*What*
- Currently in `ShareConsumerImpl`, there is a chance we could be
sending multiple `ShareFetchEvents` for a single `poll()`.
- This could happen when the `ShareFetchBuffer` is empty in the first
wait and the `pollTimer` has not completed yet, then we would send
multiple events.
- Usually the `ShareFetchBuffer` would wait for a time until the
pollTimeout, so we would not send multiple events, but the buffer wait
time is also controlled by `applicationEventHandler.maximumTimeToWait()`
which can return lower values (even 0) in some cases (especially during
startup of heartbeat request manager).
- If this happens, we will see multiple events sent, and this could even
lead to multiple fetching from the broker (sort of a fetch and a
pre-fetch) if the response for the previous `ShareFetchEvent` arrives
before the next `ShareFetchEvent` was processed.
- To avoid this, we have a check now which only sends one
`ShareFetchEvent` per poll.
- This was the reason a couple of tests were flaky in
KafkaShareConsumerTest -
https://issues.apache.org/jira/browse/KAFKA-18794. The PR fixes the
flakiness, now the tests reliably pass locally.
Reviewers: Andrew Schofield <[email protected]>
---
.../internals/ShareConsumeRequestManager.java | 6 +-
.../consumer/internals/ShareConsumerImpl.java | 40 ++++++++-----
.../events/ApplicationEventProcessor.java | 2 +-
.../consumer/internals/events/ShareFetchEvent.java | 10 +---
.../clients/consumer/KafkaShareConsumerTest.java | 7 ---
.../internals/ShareConsumeRequestManagerTest.java | 66 +++++++---------------
.../consumer/internals/ShareConsumerImplTest.java | 42 ++++++++++----
7 files changed, 82 insertions(+), 91 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index e7bfdaefe49..703f7530a5c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -283,16 +283,14 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
}
- public void fetch(Map<TopicIdPartition, NodeAcknowledgements>
acknowledgementsMap,
- Map<TopicIdPartition, NodeAcknowledgements>
controlRecordAcknowledgements) {
+ public void fetch(Map<TopicIdPartition, NodeAcknowledgements>
acknowledgementsMap) {
if (!fetchMoreRecords) {
log.debug("Fetch more data");
fetchMoreRecords = true;
}
- // Process both acknowledgement maps and sends them in the next
ShareFetch.
+ // Store the acknowledgements and send them in the next ShareFetch.
processAcknowledgementsMap(acknowledgementsMap);
- processAcknowledgementsMap(controlRecordAcknowledgements);
}
private void processAcknowledgementsMap(Map<TopicIdPartition,
NodeAcknowledgements> acknowledgementsMap) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 0530155e538..501821a2310 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -203,6 +203,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
// and is used to prevent multithreaded access
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
private final AtomicInteger refCount = new AtomicInteger(0);
+ private boolean shouldSendShareFetchEvent = false;
ShareConsumerImpl(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
@@ -581,6 +582,8 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
throw new IllegalStateException("Consumer is not subscribed to
any topics.");
}
+ shouldSendShareFetchEvent = true;
+
do {
// Make sure the network thread can tell the application is
actively polling
applicationEventHandler.add(new
PollEvent(timer.currentTimeMs()));
@@ -654,28 +657,29 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
if (currentFetch.isEmpty()) {
final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
if (fetch.isEmpty()) {
- // Check for any acknowledgements which could have come from
control records (GAP) and include them.
- applicationEventHandler.add(new
ShareFetchEvent(acknowledgementsMap, fetch.takeAcknowledgedRecords()));
+ Map<TopicIdPartition, NodeAcknowledgements>
controlRecordAcknowledgements = fetch.takeAcknowledgedRecords();
- // Notify the network thread to wake up and start the next
round of fetching
- applicationEventHandler.wakeupNetworkThread();
+ if (!controlRecordAcknowledgements.isEmpty()) {
+ // Asynchronously commit any waiting acknowledgements from
control records.
+
sendShareAcknowledgeAsyncEvent(controlRecordAcknowledgements);
+ }
+ // We only send one ShareFetchEvent per poll call.
+ if (shouldSendShareFetchEvent) {
+ // Check for any acknowledgements which could have come
from control records (GAP) and include them.
+ applicationEventHandler.add(new
ShareFetchEvent(acknowledgementsMap));
+ shouldSendShareFetchEvent = false;
+ // Notify the network thread to wake up and start the next
round of fetching
+ applicationEventHandler.wakeupNetworkThread();
+ }
} else if (!acknowledgementsMap.isEmpty()) {
// Asynchronously commit any waiting acknowledgements
- Timer timer = time.timer(defaultApiTimeoutMs);
- applicationEventHandler.add(new
ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));
-
- // Notify the network thread to wake up and start the next
round of fetching
- applicationEventHandler.wakeupNetworkThread();
+ sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
}
return fetch;
} else {
if (!acknowledgementsMap.isEmpty()) {
// Asynchronously commit any waiting acknowledgements
- Timer timer = time.timer(defaultApiTimeoutMs);
- applicationEventHandler.add(new
ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));
-
- // Notify the network thread to wake up and start the next
round of fetching
- applicationEventHandler.wakeupNetworkThread();
+ sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
}
if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) {
// We cannot leave unacknowledged records in EXPLICIT
acknowledgement mode, so we throw an exception to the application.
@@ -685,6 +689,14 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
}
}
+ private void sendShareAcknowledgeAsyncEvent(Map<TopicIdPartition,
NodeAcknowledgements> acknowledgementsMap) {
+ Timer timer = time.timer(defaultApiTimeoutMs);
+ applicationEventHandler.add(new
ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));
+
+ // Notify the network thread to wake up and start the next round of
fetching
+ applicationEventHandler.wakeupNetworkThread();
+ }
+
/**
* {@inheritDoc}
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 31eef662ce8..2ac1be587a6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -514,7 +514,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
* Process event that tells the share consume request manager to fetch
more records.
*/
private void process(final ShareFetchEvent event) {
- requestManagers.shareConsumeRequestManager.ifPresent(scrm ->
scrm.fetch(event.acknowledgementsMap(), event.controlRecordAcknowledgements()));
+ requestManagers.shareConsumeRequestManager.ifPresent(scrm ->
scrm.fetch(event.acknowledgementsMap()));
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
index 2bc38b07b8b..1f2f0cc9a17 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
@@ -25,23 +25,15 @@ public class ShareFetchEvent extends ApplicationEvent {
private final Map<TopicIdPartition, NodeAcknowledgements>
acknowledgementsMap;
- private final Map<TopicIdPartition, NodeAcknowledgements>
controlRecordAcknowledgements;
-
- public ShareFetchEvent(Map<TopicIdPartition, NodeAcknowledgements>
acknowledgementsMap,
- Map<TopicIdPartition, NodeAcknowledgements>
controlRecordAcknowledgements) {
+ public ShareFetchEvent(Map<TopicIdPartition, NodeAcknowledgements>
acknowledgementsMap) {
super(Type.SHARE_FETCH);
this.acknowledgementsMap = acknowledgementsMap;
- this.controlRecordAcknowledgements = controlRecordAcknowledgements;
}
public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
return acknowledgementsMap;
}
- public Map<TopicIdPartition, NodeAcknowledgements>
controlRecordAcknowledgements() {
- return controlRecordAcknowledgements;
- }
-
@Override
protected String toStringBase() {
return super.toStringBase() + ", acknowledgementsMap=" +
acknowledgementsMap;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
index 1346ea7fd09..e1a4aef6498 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java
@@ -49,7 +49,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -141,9 +140,6 @@ public class KafkaShareConsumerTest {
}
}
- // This test is proving sufficiently flaky that it has been disabled
pending investigation
- @Disabled
- // @Flaky("KAFKA-18488")
@Test
public void testVerifyFetchAndCommitSyncImplicit() {
ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0,
Long.MAX_VALUE, false,
@@ -218,9 +214,6 @@ public class KafkaShareConsumerTest {
}
}
- // This test is proving sufficiently flaky that it has been disabled
pending investigation
- @Disabled
- //@Flaky("KAFKA-18794")
@Test
public void testVerifyFetchAndCloseImplicit() {
ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0,
Long.MAX_VALUE, false,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 6c47aea4171..d2caebb1c4c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -252,7 +252,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
sendFetchAndVerifyResponse(records,
ShareCompletedFetchTest.acquiredRecords(2L, 1), Errors.NONE);
assertEquals(1.0,
@@ -265,7 +265,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(2L, AcknowledgeType.REJECT);
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements2)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements2)));
// Preparing a response with an acknowledgement error.
sendFetchAndVerifyResponse(records, Collections.emptyList(),
Errors.NONE, Errors.INVALID_RECORD_STATE);
@@ -391,7 +391,7 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
// Piggyback acknowledgements
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
// Remaining acknowledgements sent with close().
Acknowledgements acknowledgements2 = getAcknowledgements(2,
AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@@ -797,7 +797,7 @@ public class ShareConsumeRequestManagerTest {
fetchRecords();
// Piggyback acknowledgements
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@@ -807,7 +807,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(3L, AcknowledgeType.ACCEPT);
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements2)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements2)));
client.prepareResponse(fullFetchResponse(tip0, records,
acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
@@ -935,7 +935,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
// Send acknowledgements via ShareFetch
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
fetchRecords();
// Subscription changes.
subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
@@ -974,7 +974,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = getAcknowledgements(0,
AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
// Send acknowledgements via ShareFetch
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
fetchRecords();
// Subscription changes.
subscriptions.assignFromSubscribed(Collections.singletonList(tp1));
@@ -1362,7 +1362,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements1 = getAcknowledgements(2,
AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements1)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements1)));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@@ -1406,7 +1406,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
NetworkClientDelegate.PollResult pollResult =
shareConsumeRequestManager.sendFetchesReturnPollResult();
assertEquals(1, pollResult.unsentRequests.size());
@@ -1442,7 +1442,7 @@ public class ShareConsumeRequestManagerTest {
// The acknowledgements for the initial fetch from tip0 are processed
now and sent to the background thread.
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
assertEquals(0, completedAcknowledgements.size());
@@ -1467,7 +1467,7 @@ public class ShareConsumeRequestManagerTest {
// The acknowledgements for the initial fetch from tip0 are processed
now and sent to the background thread.
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
// We attempt to send the acknowledgements piggybacking on the fetch.
assertEquals(1, sendFetches());
@@ -1813,7 +1813,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
assertEquals(startingClusterMetadata, metadata.fetch());
@@ -1918,7 +1918,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
// The metadata snapshot will have been updated with the new leader
information
assertNotEquals(startingClusterMetadata, metadata.fetch());
@@ -2013,7 +2013,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
assertEquals(startingClusterMetadata, metadata.fetch());
@@ -2301,7 +2301,7 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgementsTp1 = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
- shareConsumeRequestManager.fetch(Map.of(tip1, new
NodeAcknowledgements(1, acknowledgementsTp1)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip1, new
NodeAcknowledgements(1, acknowledgementsTp1)));
// Move the leadership of tp0 onto node 1
HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders =
new HashMap<>();
@@ -2384,13 +2384,13 @@ public class ShareConsumeRequestManagerTest {
Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1, AcknowledgeType.ACCEPT);
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
assertEquals(startingClusterMetadata, metadata.fetch());
acknowledgements = Acknowledgements.empty();
acknowledgements.add(1, AcknowledgeType.ACCEPT);
- shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
assertEquals(2, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@@ -2431,7 +2431,7 @@ public class ShareConsumeRequestManagerTest {
assertNotEquals(startingClusterMetadata, metadata.fetch());
- shareConsumeRequestManager.fetch(Map.of(tip1, new
NodeAcknowledgements(1, acknowledgements)), Collections.emptyMap());
+ shareConsumeRequestManager.fetch(Map.of(tip1, new
NodeAcknowledgements(1, acknowledgements)));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@@ -2714,14 +2714,14 @@ public class ShareConsumeRequestManagerTest {
}
private int sendFetches() {
- fetch(new HashMap<>(), new HashMap<>());
+ fetch(new HashMap<>());
NetworkClientDelegate.PollResult pollResult =
poll(time.milliseconds());
networkClientDelegate.addAll(pollResult.unsentRequests);
return pollResult.unsentRequests.size();
}
private NetworkClientDelegate.PollResult sendFetchesReturnPollResult()
{
- fetch(new HashMap<>(), new HashMap<>());
+ fetch(new HashMap<>());
NetworkClientDelegate.PollResult pollResult =
poll(time.milliseconds());
networkClientDelegate.addAll(pollResult.unsentRequests);
return pollResult;
@@ -2861,32 +2861,6 @@ public class ShareConsumeRequestManagerTest {
}
}
- @Test
- void testFetchWithControlRecords() {
- buildRequestManager();
-
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
-
- Map<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsMap =
new HashMap<>();
-
- Acknowledgements acknowledgements = Acknowledgements.empty();
- acknowledgements.add(1L, AcknowledgeType.ACCEPT);
- nodeAcknowledgementsMap.put(tip0, new NodeAcknowledgements(0,
acknowledgements));
-
- Map<TopicIdPartition, NodeAcknowledgements>
nodeAcknowledgementsControlRecordMap = new HashMap<>();
-
- Acknowledgements controlAcknowledgements = Acknowledgements.empty();
- controlAcknowledgements.addGap(2L);
- nodeAcknowledgementsControlRecordMap.put(tip0, new
NodeAcknowledgements(0, controlAcknowledgements));
-
- shareConsumeRequestManager.fetch(nodeAcknowledgementsMap,
nodeAcknowledgementsControlRecordMap);
-
- Map<TopicIdPartition, Acknowledgements> fetchAcksToSend =
shareConsumeRequestManager.getFetchAcknowledgementsToSend(0);
- assertEquals(1, fetchAcksToSend.size());
- assertEquals(AcknowledgeType.ACCEPT,
fetchAcksToSend.get(tip0).get(1L));
- assertEquals(2, fetchAcksToSend.get(tip0).size());
- assertNull(fetchAcksToSend.get(tip0).get(3L));
- }
-
private void sendFetchAndVerifyResponse(MemoryRecords records,
List<ShareFetchResponseData.AcquiredRecords> acquiredRecords,
Errors... error) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index edb244b06aa..1765563adcd 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -25,6 +25,7 @@ import
org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
@@ -269,20 +270,15 @@ public class ShareConsumerImplTest {
consumer.poll(Duration.ZERO);
- // Verify that next ShareFetchEvent was sent with the acknowledgement
GAP for offset 1
+ // Verify that a ShareAcknowledeAsyncEvent was sent with the
acknowledgement GAP for offset 1
verify(applicationEventHandler).add(argThat(event -> {
- if (!(event instanceof ShareFetchEvent)) {
+ if (!(event instanceof ShareAcknowledgeAsyncEvent)) {
return false;
}
- ShareFetchEvent fetchEvent = (ShareFetchEvent) event;
+ ShareAcknowledgeAsyncEvent shareAcknowledgeAsyncEvent =
(ShareAcknowledgeAsyncEvent) event;
- // Regular acknowledgements map should be empty
- if (!fetchEvent.acknowledgementsMap().isEmpty()) {
- return false;
- }
-
- // Control record acknowledgements map should contain the GAP for
offset 1
- Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcks =
fetchEvent.controlRecordAcknowledgements();
+ // Acknowledgements map should contain the GAP for offset 1
+ Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcks =
shareAcknowledgeAsyncEvent.acknowledgementsMap();
return controlRecordAcks.containsKey(tip) &&
controlRecordAcks.get(tip).acknowledgements().get(1L) ==
null; // Null indicates GAP
}));
@@ -346,6 +342,32 @@ public class ShareConsumerImplTest {
assertEquals("This consumer has already been closed.",
res.getMessage());
}
+ @Test
+ public void testShouldSendOneShareFetchEventPerPoll() {
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(subscriptions);
+
+ // Setup test data
+ String topic = "test-topic";
+ // Setup an empty fetch.
+ ShareFetch<String, String> firstFetch = ShareFetch.empty();
+
+ doReturn(firstFetch)
+ .doReturn(ShareFetch.empty())
+ .when(fetchCollector)
+ .collect(any(ShareFetchBuffer.class));
+
+ // Setup subscription
+ List<String> topics = Collections.singletonList(topic);
+
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
topics);
+ consumer.subscribe(topics);
+
+ doReturn(0L).when(applicationEventHandler).maximumTimeToWait();
+ // Check that only 1 ShareFetchEvent is sent per poll
+ consumer.poll(Duration.ofMillis(100));
+ verify(applicationEventHandler, times(1)).add(argThat(event -> event
instanceof ShareFetchEvent));
+ }
+
@Test
public void testUnsubscribeWithTopicAuthorizationException() {
SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);