Re: [PR] KAFKA-15970: Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]

2023-12-09 Thread via GitHub


msn-tldr commented on PR #14916:
URL: https://github.com/apache/kafka/pull/14916#issuecomment-1848384342

   @wcarlson5 thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15970: Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]

2023-12-06 Thread via GitHub


wcarlson5 merged PR #14916:
URL: https://github.com/apache/kafka/pull/14916


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15970: Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]

2023-12-06 Thread via GitHub


msn-tldr commented on PR #14916:
URL: https://github.com/apache/kafka/pull/14916#issuecomment-1842857934

   @wcarlson5 The failures are unrelated check here
   https://ge.apache.org/s/ul2ost2737xbg/tests/overview?outcome=FAILED


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15970 Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]

2023-12-05 Thread via GitHub


wcarlson5 commented on PR #14916:
URL: https://github.com/apache/kafka/pull/14916#issuecomment-1841690681

   I'll rerun it and see if it gets any further :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15970 Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]

2023-12-05 Thread via GitHub


msn-tldr commented on PR #14916:
URL: https://github.com/apache/kafka/pull/14916#issuecomment-1841370385

   So JDK 17 build failed with following and its unrelated.
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14916/5/pipeline/11/
   
   ```
   org.gradle.internal.remote.internal.ConnectException: Could not connect to 
server [a99ad330-91c1-4b36-98c1-035d6d718e13 port:35031, 
addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1].
   
at 
org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:67)
   
at 
org.gradle.internal.remote.internal.hub.MessageHubBackedClient.getConnection(MessageHubBackedClient.java:36)
   
at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:103)
   
at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
   
at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
   
at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
   
   Caused by: java.net.ConnectException: Connection refused
   
at java.base/sun.nio.ch.Net.pollConnect(Native Method)
   
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
   
at 
java.base/sun.nio.ch.SocketChannelImpl.finishTimedConnect(SocketChannelImpl.java:1141)
   
at 
java.base/sun.nio.ch.SocketChannelImpl.blockingConnect(SocketChannelImpl.java:1183)
   
at java.base/sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:98)
   
at 
org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.tryConnect(TcpOutgoingConnector.java:81)
   
at 
org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:54)
   
... 5 more
   .
   .
   
   * What went wrong:
   
   Execution failed for task ':streams:upgrade-system-tests-10:checkstyleTest'.
   
   > A failure occurred while executing 
org.gradle.api.plugins.quality.internal.CheckstyleAction
   
  > Failed to run Gradle Worker Daemon
   
 > Process 'Gradle Worker Daemon 4' finished with non-zero exit value 1
   ```
 
 
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15970 Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]

2023-12-05 Thread via GitHub


AndrewJSchofield commented on code in PR #14916:
URL: https://github.com/apache/kafka/pull/14916#discussion_r1415479462


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -3496,53 +3501,52 @@ public void 
testWhenFetchResponseReturnsWithALeaderShipChangeErrorButNoNewLeader
 FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp1Leader);
 consumerClient.poll(time.timer(0));
 assertTrue(fetcher.hasCompletedFetches());
-
 Map>> 
partitionRecords = fetchRecords();
 assertTrue(partitionRecords.containsKey(tp0));
 assertTrue(partitionRecords.containsKey(tp1));
-
-// Verify that preferred read replica is set for both tp0 & tp1.
+// Validate setup of preferred read replica for tp0 & tp1 is done 
correctly.
 Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
 assertEquals(nodeId0.id(), selected.id());
 selected = fetcher.selectReadReplica(tp1, Node.noNode(), 
time.milliseconds());
 assertEquals(nodeId0.id(), selected.id());
 
-// Next fetch returns an error(due to leadership change) but new 
leader info is missing, for tp0.
-// For tp1 fetch returns with no error.
+// Send next fetch request.
 assertEquals(1, sendFetches());
 assertFalse(fetcher.hasCompletedFetches());
-// Verify that metadata-update isn't requested
+// Verify that metadata-update isn't requested as metadata is 
considered upto-date.

Review Comment:
   "up to date"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15970 Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]

2023-12-05 Thread via GitHub


msn-tldr commented on PR #14916:
URL: https://github.com/apache/kafka/pull/14916#issuecomment-1840609832

   @AndrewJSchofield I have update the comments in the test to make it more 
readable, hope it helps.
   
   Also i have made same changes to FetcherTest where these tests were 
originally added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15970 Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]

2023-12-04 Thread via GitHub


AndrewJSchofield commented on code in PR #14916:
URL: https://github.com/apache/kafka/pull/14916#discussion_r1414431120


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -3126,6 +3127,207 @@ public void testCorruptMessageError() {
 assertThrows(KafkaException.class, this::fetchRecords);
 }
 
+
+/**
+ * Test the scenario that FetchResponse returns with an error indicating 
leadership change for the partition, but it
+ * does not contain new leader info(defined in KIP-951).
+ */
+@ParameterizedTest
+@EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", 
"NOT_LEADER_OR_FOLLOWER"})
+public void 
testWhenFetchResponseReturnsWithALeaderShipChangeErrorButNoNewLeaderInformation(Errors
 error) {
+// The test runs with 2 partitions where 1 partition is fetched 
without errors, and 2nd partitions faces errors due to leadership changes.
+
+buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new 
BytesDeserializer(),
+new BytesDeserializer(),
+Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED,
+Duration.ofMinutes(5).toMillis());
+
+// Setup so that fetcher is subscribed to tp0 & tp1, and metadata 
setup with tp0.
+// assignFromUser(singleton(tp0));
+subscriptions.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
+client.updateMetadata(
+RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 
4),
+tp -> validLeaderEpoch, topicIds, false));
+Node tp0Leader = metadata.fetch().leaderFor(tp0);
+Node tp1Leader = metadata.fetch().leaderFor(tp1);
+Node nodeId0 = metadata.fetch().nodeById(0);
+Cluster startingClusterMetadata = metadata.fetch();
+subscriptions.seek(tp0, 0);
+subscriptions.seek(tp1, 0);
+
+// Setup preferred read replica to node=1 by doing a fetch for both 
partitions.
+assertEquals(2, sendFetches());
+assertFalse(fetcher.hasCompletedFetches());
+client.prepareResponseFrom(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L,
+FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp0Leader);
+client.prepareResponseFrom(fullFetchResponse(tidp1, this.records, 
Errors.NONE, 100L,
+FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 
Optional.of(nodeId0.id())), tp1Leader);
+networkClientDelegate.poll(time.timer(0));
+assertTrue(fetcher.hasCompletedFetches());
+
+Map>> 
partitionRecords = fetchRecords();
+assertTrue(partitionRecords.containsKey(tp0));
+assertTrue(partitionRecords.containsKey(tp1));
+
+// Verify that preferred read replica is set for both tp0 & tp1.
+Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), 
time.milliseconds());
+assertEquals(nodeId0.id(), selected.id());
+selected = fetcher.selectReadReplica(tp1, Node.noNode(), 
time.milliseconds());
+assertEquals(nodeId0.id(), selected.id());
+
+// Next fetch returns an error(due to leadership change) but new 
leader info is missing, for tp0.
+// For tp1 fetch returns with no error.
+assertEquals(1, sendFetches());
+assertFalse(fetcher.hasCompletedFetches());
+// Verify that metadata-update isn't requested
+assertFalse(metadata.updateRequested());
+
+LinkedHashMap 
partitions = new LinkedHashMap<>();
+partitions.put(tidp0,
+new FetchResponseData.PartitionData()
+.setPartitionIndex(tidp0.topicPartition().partition())
+.setErrorCode(error.code()));
+partitions.put(tidp1,
+new FetchResponseData.PartitionData()
+.setPartitionIndex(tidp1.topicPartition().partition())
+.setErrorCode(Errors.NONE.code())
+.setHighWatermark(100L)
+.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+.setLogStartOffset(0)
+.setRecords(nextRecords));
+client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, partitions), nodeId0);
+networkClientDelegate.poll(time.timer(0));
+partitionRecords = fetchRecords();
+assertFalse(partitionRecords.containsKey(tp0));
+assertTrue(partitionRecords.containsKey(tp1));
+
+// Validate metadata is unchanged, as response has missing new leader 
info.
+assertEquals(startingClusterMetadata, metadata.fetch());
+
+// Validate metadata-update is requested due to the error for tp0.
+assertTrue(metadata.updateRequested());
+
+// Validate preferred-read-replica is cleared for tp0 due to the error.
+assertEquals(Optional.empty(),
+subscriptions.preferredReadReplica(tp0, time.milliseconds()));
+// Validate 

[PR] KAFKA-15970 Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]

2023-12-04 Thread via GitHub


msn-tldr opened a new pull request, #14916:
URL: https://github.com/apache/kafka/pull/14916

   Copying over tests added to FetcherTest.java in 
[PR](https://github.com/apache/kafka/pull/14685/files#diff-2745b4306d343fd2a5abbf6314c0f8d01d07d7140ab989894ca1a0a17100f02a)
 to FetchRequestManagerTest.java, so KIP-951 changes have coverage with new 
consumer being worked as described here 
   
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+project+overview
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org