Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]
stanislavkozlovski merged PR #15130: URL: https://github.com/apache/kafka/pull/15130 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]
jolshan commented on PR #15130: URL: https://github.com/apache/kafka/pull/15130#issuecomment-1883672884 ^ Those are issues I see frequently and are likely unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]
ditac commented on PR #15130: URL: https://github.com/apache/kafka/pull/15130#issuecomment-1881622932 The following tests failed across runs but they dont seem related to this change - ``` testReplicateFromLatest() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest45s testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest2m 0s testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest1m 52s testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest1m 51s testBumpTransactionalEpoch(String).quorum=kraft+kip848 – org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest1m 28s ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]
jolshan commented on PR #15130: URL: https://github.com/apache/kafka/pull/15130#issuecomment-1879485562 Dont worry about `**/build/test-results/**/TEST-*.xml` (It has not worked as long as I can remember) I always just go to the tests tab: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15130/2/tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]
ditac commented on PR #15130: URL: https://github.com/apache/kafka/pull/15130#issuecomment-1879371419 > Some `ZkMigrationIntegrationTest` tests failed. Can we run a rebuild? The tests seem to have passed in the re-run. The step `**/build/test-results/**/TEST-*.xml` seems to have issues but it doesnt seem like a blocker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]
stanislavkozlovski commented on PR #15130: URL: https://github.com/apache/kafka/pull/15130#issuecomment-1878479652 Some `ZkMigrationIntegrationTest` tests failed. Can we run a rebuild? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]
stanislavkozlovski commented on code in PR #15130: URL: https://github.com/apache/kafka/pull/15130#discussion_r1442734095 ## clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java: ## @@ -560,6 +560,38 @@ public void testConnectionSetupTimeout() { "Expected the connections to fail due to the socket connection setup timeout"); } +@Test +public void testConnectionTimeoutAfterThrottling() { +awaitReady(client, node); +short requestVersion = PRODUCE.latestVersion(); +int timeoutMs = 1000; +ProduceRequest.Builder builder = new ProduceRequest.Builder( +requestVersion, +requestVersion, +new ProduceRequestData() +.setAcks((short) 1) +.setTimeoutMs(timeoutMs)); +TestCallbackHandler handler = new TestCallbackHandler(); +ClientRequest r1 = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, +defaultRequestTimeoutMs, handler); + +client.send(r1, time.milliseconds()); +client.poll(0, time.milliseconds()); + +// Throttle long enough to ensure other inFlight requests timeout. +ProduceResponse pr = new ProduceResponse(new ProduceResponseData().setThrottleTimeMs(timeoutMs)); +ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(pr, requestVersion, r1.correlationId()); +selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer))); +ClientRequest r2 = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, +defaultRequestTimeoutMs, handler); +client.send(r2, time.milliseconds()); +time.sleep(timeoutMs); +client.poll(0, time.milliseconds()); + +assertEquals(1, client.inFlightRequestCount(node.idString())); +assertFalse(client.connectionFailed(node), "Connection failed after throttling."); Review Comment: ```suggestion assertFalse(client.connectionFailed(node), "Connection should not have failed due to the extra time spent throttling."); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]
stanislavkozlovski commented on code in PR #15130: URL: https://github.com/apache/kafka/pull/15130#discussion_r1442732922 ## clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java: ## @@ -158,7 +158,9 @@ public Iterable clearAll(String node) { private Boolean hasExpiredRequest(long now, Deque deque) { for (NetworkClient.InFlightRequest request : deque) { -if (request.timeElapsedSinceSendMs(now) > request.requestTimeoutMs) +// We exclude throttle time here because we want to ensure that we don't expire requests while +// they are throttled. The request timeout should take effect only after the throttle time has elapsed. +if (request.timeElapsedSinceSendMs(now) - request.throttleTimeMs() > request.requestTimeoutMs) return true; Review Comment: probably because we attach the timeoutMs to the request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]
stanislavkozlovski commented on code in PR #15130: URL: https://github.com/apache/kafka/pull/15130#discussion_r1442732279 ## clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java: ## @@ -158,7 +158,9 @@ public Iterable clearAll(String node) { private Boolean hasExpiredRequest(long now, Deque deque) { for (NetworkClient.InFlightRequest request : deque) { -if (request.timeElapsedSinceSendMs(now) > request.requestTimeoutMs) +// We exclude throttle time here because we want to ensure that we don't expire requests while +// they are throttled. The request timeout should take effect only after the throttle time has elapsed. +if (request.timeElapsedSinceSendMs(now) - request.throttleTimeMs() > request.requestTimeoutMs) return true; Review Comment: one minor tricky thing here is that we never have a guarantee that we actually throttled fro `throttleTimeMs` precisely, because: - `time.milliseconds()` isn't monotonic - we use `time.milliseconds()` to compute the end time for throttling But this is equally inconsistent with the other two things here - `request.timeElapsedSinceSendMs(now)` and `request.requestTimeoutMs` - so I guess that's fine. Not sure why Kafka doesn't use hiResClockMs here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]
ditac opened a new pull request, #15130: URL: https://github.com/apache/kafka/pull/15130 When expiring inflight requests, the network client does not take throttle time into account. If a connection has multiple inflight requests (default of 5) and each request is throttled then some of the requests can incorrectly marked as expired. Subsequently the connection is closed and the client establishes a new connection to the broker. This behavior leads to unnecessary connections to the broker, leads to connection storms and increases latencies. ### Testing Unit tests ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org