AndrewJSchofield commented on code in PR #20855:
URL: https://github.com/apache/kafka/pull/20855#discussion_r2534920965


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1853,8 +1843,7 @@ public void 
testWhenShareFetchResponseReturnsALeadershipChangeErrorButNoNewLeade
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
-        assertTrue(partitionRecords.containsKey(tp0));
-        assertFalse(partitionRecords.containsKey(tp1));
+        assertTrue(partitionRecords.containsKey(tp0) && 
!partitionRecords.containsKey(tp1));

Review Comment:
   I wonder whether this change is redundant.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1936,16 +1912,9 @@ public void 
testWhenFetchResponseReturnsWithALeadershipChangeErrorAndNewLeaderIn
         assertEquals(2, sendFetches());
         assertFalse(shareConsumeRequestManager.hasCompletedFetches());
 
-        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData = new LinkedHashMap<>();
-        partitionData.put(tip0,
-            new ShareFetchResponseData.PartitionData()
-                .setPartitionIndex(tip0.topicPartition().partition())
-                .setErrorCode(Errors.NONE.code())
-                .setRecords(records)
-                
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
-                .setAcknowledgeErrorCode(Errors.NONE.code()));
+        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData = buildPartitionDataMap(tip0, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);

Review Comment:
   Very long again.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1899,13 +1879,9 @@ public void 
testWhenShareFetchResponseReturnsALeadershipChangeErrorButNoNewLeade
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
         partitionRecords = fetchRecords();
-        assertTrue(partitionRecords.containsKey(tp0));
-        assertTrue(partitionRecords.containsKey(tp1));
-
-        fetchedRecords = partitionRecords.get(tp0);
-        assertEquals(1, fetchedRecords.size());
-        fetchedRecords = partitionRecords.get(tp1);
-        assertEquals(1, fetchedRecords.size());
+        assertTrue(partitionRecords.containsKey(tp0) && 
partitionRecords.containsKey(tp1));

Review Comment:
   I would leave this line unchanged as two assertions just to minimise the 
amount of change.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -2053,8 +2003,7 @@ public void 
testWhenLeadershipChangeBetweenShareFetchRequests(Errors error) {
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
-        assertTrue(partitionRecords.containsKey(tp0));
-        assertFalse(partitionRecords.containsKey(tp1));
+        assertTrue(partitionRecords.containsKey(tp0) && 
!partitionRecords.containsKey(tp1));

Review Comment:
   And here.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -2137,23 +2073,9 @@ void testLeadershipChangeAfterFetchBeforeCommitAsync() {
         assertEquals(2, sendFetches());
         assertFalse(shareConsumeRequestManager.hasCompletedFetches());
 
-        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData = new LinkedHashMap<>();
-        partitionData.put(tip0,
-                new ShareFetchResponseData.PartitionData()
-                        .setPartitionIndex(tip0.topicPartition().partition())
-                        .setErrorCode(Errors.NONE.code())
-                        .setRecords(records)
-                        
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
-                        .setAcknowledgeErrorCode(Errors.NONE.code()));
+        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData = buildPartitionDataMap(tip0, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);

Review Comment:
   And here



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1958,8 +1927,7 @@ public void 
testWhenFetchResponseReturnsWithALeadershipChangeErrorAndNewLeaderIn
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
-        assertTrue(partitionRecords.containsKey(tp0));
-        assertFalse(partitionRecords.containsKey(tp1));
+        assertTrue(partitionRecords.containsKey(tp0) && 
!partitionRecords.containsKey(tp1));

Review Comment:
   And here.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -2509,6 +2394,90 @@ void testWhenLeadershipChangedAfterDisconnected() {
         assertEquals(1, fetchedRecords.size());
     }
 
+    @Test
+    public void testFetchOneNodeAtATimeForRecordLimitMode() {
+        // We will simulate two nodes, each with one partition. The first node 
will have more records
+        buildRequestManager(ShareAcquireMode.RECORD_LIMIT);
+
+        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp0);
+        partitions.add(tp1);
+        subscriptions.assignFromSubscribed(partitions);

Review Comment:
   I don't think this is necessarily deterministic order. I worry that this 
might be a new flaky test.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1834,14 +1831,7 @@ public void 
testWhenShareFetchResponseReturnsALeadershipChangeErrorButNoNewLeade
         assertEquals(2, sendFetches());
         assertFalse(shareConsumeRequestManager.hasCompletedFetches());
 
-        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData = new LinkedHashMap<>();
-        partitionData.put(tip0,
-            new ShareFetchResponseData.PartitionData()
-                .setPartitionIndex(tip0.topicPartition().partition())
-                .setErrorCode(Errors.NONE.code())
-                .setRecords(records)
-                
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
-                .setAcknowledgeErrorCode(Errors.NONE.code()));
+        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData = buildPartitionDataMap(tip0, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);

Review Comment:
   nit: This is an excessively long line. Please break it as `=`.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1997,13 +1958,9 @@ public void 
testWhenFetchResponseReturnsWithALeadershipChangeErrorAndNewLeaderIn
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
         partitionRecords = fetchRecords();
-        assertTrue(partitionRecords.containsKey(tp0));
-        assertTrue(partitionRecords.containsKey(tp1));
-
-        fetchedRecords = partitionRecords.get(tp0);
-        assertEquals(1, fetchedRecords.size());
-        fetchedRecords = partitionRecords.get(tp1);
-        assertEquals(1, fetchedRecords.size());
+        assertTrue(partitionRecords.containsKey(tp0) && 
partitionRecords.containsKey(tp1));

Review Comment:
   And here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to