Re: [PR] KAFKA-15970: Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]
msn-tldr commented on PR #14916: URL: https://github.com/apache/kafka/pull/14916#issuecomment-1848384342 @wcarlson5 thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15970: Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]
wcarlson5 merged PR #14916: URL: https://github.com/apache/kafka/pull/14916 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15970: Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]
msn-tldr commented on PR #14916: URL: https://github.com/apache/kafka/pull/14916#issuecomment-1842857934 @wcarlson5 The failures are unrelated check here https://ge.apache.org/s/ul2ost2737xbg/tests/overview?outcome=FAILED -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15970 Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]
wcarlson5 commented on PR #14916: URL: https://github.com/apache/kafka/pull/14916#issuecomment-1841690681 I'll rerun it and see if it gets any further :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15970 Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]
msn-tldr commented on PR #14916: URL: https://github.com/apache/kafka/pull/14916#issuecomment-1841370385 So JDK 17 build failed with following and its unrelated. https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14916/5/pipeline/11/ ``` org.gradle.internal.remote.internal.ConnectException: Could not connect to server [a99ad330-91c1-4b36-98c1-035d6d718e13 port:35031, addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1]. at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:67) at org.gradle.internal.remote.internal.hub.MessageHubBackedClient.getConnection(MessageHubBackedClient.java:36) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:103) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65) at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69) at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74) Caused by: java.net.ConnectException: Connection refused at java.base/sun.nio.ch.Net.pollConnect(Native Method) at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672) at java.base/sun.nio.ch.SocketChannelImpl.finishTimedConnect(SocketChannelImpl.java:1141) at java.base/sun.nio.ch.SocketChannelImpl.blockingConnect(SocketChannelImpl.java:1183) at java.base/sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:98) at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.tryConnect(TcpOutgoingConnector.java:81) at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:54) ... 5 more . . * What went wrong: Execution failed for task ':streams:upgrade-system-tests-10:checkstyleTest'. > A failure occurred while executing org.gradle.api.plugins.quality.internal.CheckstyleAction > Failed to run Gradle Worker Daemon > Process 'Gradle Worker Daemon 4' finished with non-zero exit value 1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15970 Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]
AndrewJSchofield commented on code in PR #14916: URL: https://github.com/apache/kafka/pull/14916#discussion_r1415479462 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ## @@ -3496,53 +3501,52 @@ public void testWhenFetchResponseReturnsWithALeaderShipChangeErrorButNoNewLeader FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(nodeId0.id())), tp1Leader); consumerClient.poll(time.timer(0)); assertTrue(fetcher.hasCompletedFetches()); - Map>> partitionRecords = fetchRecords(); assertTrue(partitionRecords.containsKey(tp0)); assertTrue(partitionRecords.containsKey(tp1)); - -// Verify that preferred read replica is set for both tp0 & tp1. +// Validate setup of preferred read replica for tp0 & tp1 is done correctly. Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds()); assertEquals(nodeId0.id(), selected.id()); selected = fetcher.selectReadReplica(tp1, Node.noNode(), time.milliseconds()); assertEquals(nodeId0.id(), selected.id()); -// Next fetch returns an error(due to leadership change) but new leader info is missing, for tp0. -// For tp1 fetch returns with no error. +// Send next fetch request. assertEquals(1, sendFetches()); assertFalse(fetcher.hasCompletedFetches()); -// Verify that metadata-update isn't requested +// Verify that metadata-update isn't requested as metadata is considered upto-date. Review Comment: "up to date" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15970 Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]
msn-tldr commented on PR #14916: URL: https://github.com/apache/kafka/pull/14916#issuecomment-1840609832 @AndrewJSchofield I have update the comments in the test to make it more readable, hope it helps. Also i have made same changes to FetcherTest where these tests were originally added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15970 Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]
AndrewJSchofield commented on code in PR #14916: URL: https://github.com/apache/kafka/pull/14916#discussion_r1414431120 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -3126,6 +3127,207 @@ public void testCorruptMessageError() { assertThrows(KafkaException.class, this::fetchRecords); } + +/** + * Test the scenario that FetchResponse returns with an error indicating leadership change for the partition, but it + * does not contain new leader info(defined in KIP-951). + */ +@ParameterizedTest +@EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", "NOT_LEADER_OR_FOLLOWER"}) +public void testWhenFetchResponseReturnsWithALeaderShipChangeErrorButNoNewLeaderInformation(Errors error) { +// The test runs with 2 partitions where 1 partition is fetched without errors, and 2nd partitions faces errors due to leadership changes. + +buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), +new BytesDeserializer(), +Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED, +Duration.ofMinutes(5).toMillis()); + +// Setup so that fetcher is subscribed to tp0 & tp1, and metadata setup with tp0. +// assignFromUser(singleton(tp0)); +subscriptions.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1))); +client.updateMetadata( +RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 4), +tp -> validLeaderEpoch, topicIds, false)); +Node tp0Leader = metadata.fetch().leaderFor(tp0); +Node tp1Leader = metadata.fetch().leaderFor(tp1); +Node nodeId0 = metadata.fetch().nodeById(0); +Cluster startingClusterMetadata = metadata.fetch(); +subscriptions.seek(tp0, 0); +subscriptions.seek(tp1, 0); + +// Setup preferred read replica to node=1 by doing a fetch for both partitions. +assertEquals(2, sendFetches()); +assertFalse(fetcher.hasCompletedFetches()); +client.prepareResponseFrom(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, +FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(nodeId0.id())), tp0Leader); +client.prepareResponseFrom(fullFetchResponse(tidp1, this.records, Errors.NONE, 100L, +FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(nodeId0.id())), tp1Leader); +networkClientDelegate.poll(time.timer(0)); +assertTrue(fetcher.hasCompletedFetches()); + +Map>> partitionRecords = fetchRecords(); +assertTrue(partitionRecords.containsKey(tp0)); +assertTrue(partitionRecords.containsKey(tp1)); + +// Verify that preferred read replica is set for both tp0 & tp1. +Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds()); +assertEquals(nodeId0.id(), selected.id()); +selected = fetcher.selectReadReplica(tp1, Node.noNode(), time.milliseconds()); +assertEquals(nodeId0.id(), selected.id()); + +// Next fetch returns an error(due to leadership change) but new leader info is missing, for tp0. +// For tp1 fetch returns with no error. +assertEquals(1, sendFetches()); +assertFalse(fetcher.hasCompletedFetches()); +// Verify that metadata-update isn't requested +assertFalse(metadata.updateRequested()); + +LinkedHashMap partitions = new LinkedHashMap<>(); +partitions.put(tidp0, +new FetchResponseData.PartitionData() +.setPartitionIndex(tidp0.topicPartition().partition()) +.setErrorCode(error.code())); +partitions.put(tidp1, +new FetchResponseData.PartitionData() +.setPartitionIndex(tidp1.topicPartition().partition()) +.setErrorCode(Errors.NONE.code()) +.setHighWatermark(100L) +.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) +.setLogStartOffset(0) +.setRecords(nextRecords)); +client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, partitions), nodeId0); +networkClientDelegate.poll(time.timer(0)); +partitionRecords = fetchRecords(); +assertFalse(partitionRecords.containsKey(tp0)); +assertTrue(partitionRecords.containsKey(tp1)); + +// Validate metadata is unchanged, as response has missing new leader info. +assertEquals(startingClusterMetadata, metadata.fetch()); + +// Validate metadata-update is requested due to the error for tp0. +assertTrue(metadata.updateRequested()); + +// Validate preferred-read-replica is cleared for tp0 due to the error. +assertEquals(Optional.empty(), +subscriptions.preferredReadReplica(tp0, time.milliseconds())); +// Validate
[PR] KAFKA-15970 Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]
msn-tldr opened a new pull request, #14916: URL: https://github.com/apache/kafka/pull/14916 Copying over tests added to FetcherTest.java in [PR](https://github.com/apache/kafka/pull/14685/files#diff-2745b4306d343fd2a5abbf6314c0f8d01d07d7140ab989894ca1a0a17100f02a) to FetchRequestManagerTest.java, so KIP-951 changes have coverage with new consumer being worked as described here https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+project+overview *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org