AndrewJSchofield commented on code in PR #20855:
URL: https://github.com/apache/kafka/pull/20855#discussion_r2534920965
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1853,8 +1843,7 @@ public void
testWhenShareFetchResponseReturnsALeadershipChangeErrorButNoNewLeade
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
- assertTrue(partitionRecords.containsKey(tp0));
- assertFalse(partitionRecords.containsKey(tp1));
+ assertTrue(partitionRecords.containsKey(tp0) &&
!partitionRecords.containsKey(tp1));
Review Comment:
I wonder whether this change is redundant.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1936,16 +1912,9 @@ public void
testWhenFetchResponseReturnsWithALeadershipChangeErrorAndNewLeaderIn
assertEquals(2, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
- LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData = new LinkedHashMap<>();
- partitionData.put(tip0,
- new ShareFetchResponseData.PartitionData()
- .setPartitionIndex(tip0.topicPartition().partition())
- .setErrorCode(Errors.NONE.code())
- .setRecords(records)
-
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
- .setAcknowledgeErrorCode(Errors.NONE.code()));
+ LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData = buildPartitionDataMap(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
Review Comment:
Very long again.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1899,13 +1879,9 @@ public void
testWhenShareFetchResponseReturnsALeadershipChangeErrorButNoNewLeade
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
partitionRecords = fetchRecords();
- assertTrue(partitionRecords.containsKey(tp0));
- assertTrue(partitionRecords.containsKey(tp1));
-
- fetchedRecords = partitionRecords.get(tp0);
- assertEquals(1, fetchedRecords.size());
- fetchedRecords = partitionRecords.get(tp1);
- assertEquals(1, fetchedRecords.size());
+ assertTrue(partitionRecords.containsKey(tp0) &&
partitionRecords.containsKey(tp1));
Review Comment:
I would leave this line unchanged as two assertions just to minimise the
amount of change.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -2053,8 +2003,7 @@ public void
testWhenLeadershipChangeBetweenShareFetchRequests(Errors error) {
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
- assertTrue(partitionRecords.containsKey(tp0));
- assertFalse(partitionRecords.containsKey(tp1));
+ assertTrue(partitionRecords.containsKey(tp0) &&
!partitionRecords.containsKey(tp1));
Review Comment:
And here.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -2137,23 +2073,9 @@ void testLeadershipChangeAfterFetchBeforeCommitAsync() {
assertEquals(2, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
- LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData = new LinkedHashMap<>();
- partitionData.put(tip0,
- new ShareFetchResponseData.PartitionData()
- .setPartitionIndex(tip0.topicPartition().partition())
- .setErrorCode(Errors.NONE.code())
- .setRecords(records)
-
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
- .setAcknowledgeErrorCode(Errors.NONE.code()));
+ LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData = buildPartitionDataMap(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
Review Comment:
And here
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1958,8 +1927,7 @@ public void
testWhenFetchResponseReturnsWithALeadershipChangeErrorAndNewLeaderIn
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
- assertTrue(partitionRecords.containsKey(tp0));
- assertFalse(partitionRecords.containsKey(tp1));
+ assertTrue(partitionRecords.containsKey(tp0) &&
!partitionRecords.containsKey(tp1));
Review Comment:
And here.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -2509,6 +2394,90 @@ void testWhenLeadershipChangedAfterDisconnected() {
assertEquals(1, fetchedRecords.size());
}
+ @Test
+ public void testFetchOneNodeAtATimeForRecordLimitMode() {
+ // We will simulate two nodes, each with one partition. The first node
will have more records
+ buildRequestManager(ShareAcquireMode.RECORD_LIMIT);
+
+ subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp0);
+ partitions.add(tp1);
+ subscriptions.assignFromSubscribed(partitions);
Review Comment:
I don't think this is necessarily deterministic order. I worry that this
might be a new flaky test.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1834,14 +1831,7 @@ public void
testWhenShareFetchResponseReturnsALeadershipChangeErrorButNoNewLeade
assertEquals(2, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
- LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData = new LinkedHashMap<>();
- partitionData.put(tip0,
- new ShareFetchResponseData.PartitionData()
- .setPartitionIndex(tip0.topicPartition().partition())
- .setErrorCode(Errors.NONE.code())
- .setRecords(records)
-
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
- .setAcknowledgeErrorCode(Errors.NONE.code()));
+ LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData = buildPartitionDataMap(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
Review Comment:
nit: This is an excessively long line. Please break it as `=`.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1997,13 +1958,9 @@ public void
testWhenFetchResponseReturnsWithALeadershipChangeErrorAndNewLeaderIn
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
partitionRecords = fetchRecords();
- assertTrue(partitionRecords.containsKey(tp0));
- assertTrue(partitionRecords.containsKey(tp1));
-
- fetchedRecords = partitionRecords.get(tp0);
- assertEquals(1, fetchedRecords.size());
- fetchedRecords = partitionRecords.get(tp1);
- assertEquals(1, fetchedRecords.size());
+ assertTrue(partitionRecords.containsKey(tp0) &&
partitionRecords.containsKey(tp1));
Review Comment:
And here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]