Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
C0urante merged PR #14562: URL: https://github.com/apache/kafka/pull/14562 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
C0urante commented on PR #14562: URL: https://github.com/apache/kafka/pull/14562#issuecomment-1850942821 All CI failures appear unrelated, merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
C0urante commented on PR #14562: URL: https://github.com/apache/kafka/pull/14562#issuecomment-1845996495 Thanks Greg! I've made one final tweak to the integration tests. Hoping this doesn't affect test stability; will merge if things look alright on Jenkins. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
C0urante commented on code in PR #14562: URL: https://github.com/apache/kafka/pull/14562#discussion_r1419525430 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -771,6 +779,114 @@ private Map defaultSinkConnectorProps(String topics) { return props; } +@Test +public void testRequestTimeouts() throws Exception { +final String configTopic = "test-request-timeout-configs"; +workerProps.put(CONFIG_TOPIC_CONFIG, configTopic); +// Workaround for KAFKA-15676, which can cause the scheduled rebalance delay to +// be spuriously triggered after the group coordinator for a Connect cluster is bounced +// Set to 1 instead of 0 as another workaround for KAFKA-15693, which can cause +// connectors and tasks to be unassigned indefinitely if the scheduled rebalance delay +// is set to 0 Review Comment: I've just noticed that this part is no longer necessary since we've merged a [fix](https://github.com/apache/kafka/pull/14647) for [KAFKA-15693](https://issues.apache.org/jira/browse/KAFKA-15693). I'll remove this comment and change the value to zero. If the test continues to pass on Jenkins, should be safe to merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
C0urante commented on code in PR #14562: URL: https://github.com/apache/kafka/pull/14562#discussion_r1419175286 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -771,6 +778,104 @@ private Map defaultSinkConnectorProps(String topics) { return props; } +@Test +public void testRequestTimeouts() throws Exception { +final String configTopic = "test-request-timeout-configs"; +workerProps.put(CONFIG_TOPIC_CONFIG, configTopic); +// Workaround for KAFKA-15676, which can cause the scheduled rebalance delay to +// be spuriously triggered after the group coordinator for a Connect cluster is bounced +// Set to 1 instead of 0 as another workaround for KAFKA-15693, which can cause +// connectors and tasks to be unassigned indefinitely if the scheduled rebalance delay +// is set to 0 +workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1"); +connect = connectBuilder +.numBrokers(1) +.numWorkers(1) +.build(); +connect.start(); +connect.assertions().assertAtLeastNumWorkersAreUp(1, +"Worker did not start in time"); + +Map connectorConfig1 = defaultSourceConnectorProps(TOPIC_NAME); +Map connectorConfig2 = new HashMap<>(connectorConfig1); +connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 1)); + +// Create a connector to ensure that the worker has completed startup +log.info("Creating initial connector"); +connect.configureConnector(CONNECTOR_NAME, connectorConfig1); +connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( +CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start in time" +); + +// Bring down Kafka, which should cause some REST requests to fail +log.info("Stopping Kafka cluster"); +connect.kafka().stopOnlyKafka(); +// Allow for the workers to discover that the coordinator is unavailable, wait is +// heartbeat timeout * 2 + 4sec +Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + +connect.requestTimeout(5_000); +// Try to reconfigure the connector, which should fail with a timeout error +log.info("Trying to reconfigure connector while Kafka cluster is down"); +ConnectRestException e = assertThrows( +ConnectRestException.class, +() -> connect.configureConnector(CONNECTOR_NAME, connectorConfig2) +); +assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); +assertNotNull(e.getMessage()); +assertTrue( +"Message '" + e.getMessage() + "' does not match expected format", +e.getMessage().contains("Request timed out. The worker is currently flushing updates to the status topic") Review Comment: Ah, good idea! Added assert-with-retry logic to the `assertTimeoutException` utility method. I've also tweaked how we track stages when the herder is polling the group coordinator. Previously, when the `WorkerGroupMember` and `WorkerCoordinator` methods only accepted a `Runnable`, those stages would never be closed. Now, those methods accept a `Supplier` that, at the moment, always returns a `Distributed.TickThreadStage`, which can be used with a try-with-resources block to automatically register and complete tick thread stages. I know that I've used `UncheckedCloseable` incorrectly in the past; I believe this time doesn't suffer from any of the issues my previous usages did. But if the result is still undesirable, we can explore alternatives like introducing a new interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
gharris1727 commented on code in PR #14562: URL: https://github.com/apache/kafka/pull/14562#discussion_r1418003736 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -771,6 +778,104 @@ private Map defaultSinkConnectorProps(String topics) { return props; } +@Test +public void testRequestTimeouts() throws Exception { +final String configTopic = "test-request-timeout-configs"; +workerProps.put(CONFIG_TOPIC_CONFIG, configTopic); +// Workaround for KAFKA-15676, which can cause the scheduled rebalance delay to +// be spuriously triggered after the group coordinator for a Connect cluster is bounced +// Set to 1 instead of 0 as another workaround for KAFKA-15693, which can cause +// connectors and tasks to be unassigned indefinitely if the scheduled rebalance delay +// is set to 0 +workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1"); +connect = connectBuilder +.numBrokers(1) +.numWorkers(1) +.build(); +connect.start(); +connect.assertions().assertAtLeastNumWorkersAreUp(1, +"Worker did not start in time"); + +Map connectorConfig1 = defaultSourceConnectorProps(TOPIC_NAME); +Map connectorConfig2 = new HashMap<>(connectorConfig1); +connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 1)); + +// Create a connector to ensure that the worker has completed startup +log.info("Creating initial connector"); +connect.configureConnector(CONNECTOR_NAME, connectorConfig1); +connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( +CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start in time" +); + +// Bring down Kafka, which should cause some REST requests to fail +log.info("Stopping Kafka cluster"); +connect.kafka().stopOnlyKafka(); +// Allow for the workers to discover that the coordinator is unavailable, wait is +// heartbeat timeout * 2 + 4sec +Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + +connect.requestTimeout(5_000); +// Try to reconfigure the connector, which should fail with a timeout error +log.info("Trying to reconfigure connector while Kafka cluster is down"); +ConnectRestException e = assertThrows( +ConnectRestException.class, +() -> connect.configureConnector(CONNECTOR_NAME, connectorConfig2) +); +assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); +assertNotNull(e.getMessage()); +assertTrue( +"Message '" + e.getMessage() + "' does not match expected format", +e.getMessage().contains("Request timed out. The worker is currently flushing updates to the status topic") Review Comment: Ah I see. So the `polling the group coordinator for up to ... or until interrupted` stage is only temporary and `flushing updates to the status topic` is permanent, so it'll always eventually get stuck on this stage. Do you think it's possible for the preceeding `Thread.sleep()` to cause a flake here, if the worker is in the "polling the group coordinator" stage for too long? Perhaps we could replace the sleep with a wait-until-condition that repeatedly makes the request until the flushing status store error appears. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
C0urante commented on code in PR #14562: URL: https://github.com/apache/kafka/pull/14562#discussion_r1416090471 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.util.concurrent.atomic.AtomicLong; + +public class Stage { +private final String description; +private final long started; +private final AtomicLong completed; + +public Stage(String description, long started) { +if (started < 0) +throw new IllegalArgumentException("Invalid start timestamp " + started + "; cannot be negative"); + +this.description = description; +this.started = started; +this.completed = new AtomicLong(-1); +} + +public String description() { +return description; +} + +public long started() { +return started; +} + +public Long completed() { +long result = completed.get(); +return result >= 0 ? result : null; +} + +public synchronized void complete(long time) { +if (time < 0) +throw new IllegalArgumentException("Cannot complete stage with negative timestamp " + time); +if (time < started) +throw new IllegalArgumentException("Cannot complete stage with timestamp " + time + " before its start time " + started); + +this.completed.updateAndGet(l -> { +if (l >= 0) +throw new IllegalStateException("Stage is already completed"); Review Comment: Good call, done Also changed to logging a warning instead of throwing an exception when the completion time precedes the start time. ## connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.util.concurrent.atomic.AtomicLong; + +public class Stage { +private final String description; +private final long started; +private final AtomicLong completed; + +public Stage(String description, long started) { +if (started < 0) +throw new IllegalArgumentException("Invalid start timestamp " + started + "; cannot be negative"); + +this.description = description; +this.started = started; +this.completed = new AtomicLong(-1); +} + +public String description() { +return description; +} + +public long started() { +return started; +} + +public Long completed() { +long result = completed.get(); +return result >= 0 ? result : null; +} + +public synchronized void complete(long time) { +if (time < 0) +throw new IllegalArgumentException("Cannot complete stage with negative timestamp " + time); +if (time < started) +throw new IllegalArgumentException("Cannot complete stage with timestamp " + time + " before its start time " + started); + +this.completed.updateAndGet(l -> { +if (l >= 0) +throw new IllegalStateException("Stage is already completed"); Review Comment: Good call, done Also changed to logging a warning instead of throwing an exception when the completion time precedes the start time. ##
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
gharris1727 commented on code in PR #14562: URL: https://github.com/apache/kafka/pull/14562#discussion_r1414514216 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -394,9 +399,16 @@ public void validateConnectorConfig(Map connectorProps, Callback @Override public void validateConnectorConfig(Map connectorProps, Callback callback, boolean doLog) { +callback.recordStage(new Stage( +"waiting for a new thread to become available for connector validation", +time.milliseconds() +)); connectorExecutor.submit(() -> { +callback.recordStage(null); Review Comment: Could this instead be a call to Stage::complete, or a use for TemporaryStage? ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -771,6 +778,104 @@ private Map defaultSinkConnectorProps(String topics) { return props; } +@Test +public void testRequestTimeouts() throws Exception { +final String configTopic = "test-request-timeout-configs"; +workerProps.put(CONFIG_TOPIC_CONFIG, configTopic); +// Workaround for KAFKA-15676, which can cause the scheduled rebalance delay to +// be spuriously triggered after the group coordinator for a Connect cluster is bounced +// Set to 1 instead of 0 as another workaround for KAFKA-15693, which can cause +// connectors and tasks to be unassigned indefinitely if the scheduled rebalance delay +// is set to 0 +workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1"); +connect = connectBuilder +.numBrokers(1) +.numWorkers(1) +.build(); +connect.start(); +connect.assertions().assertAtLeastNumWorkersAreUp(1, +"Worker did not start in time"); + +Map connectorConfig1 = defaultSourceConnectorProps(TOPIC_NAME); +Map connectorConfig2 = new HashMap<>(connectorConfig1); +connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 1)); + +// Create a connector to ensure that the worker has completed startup +log.info("Creating initial connector"); +connect.configureConnector(CONNECTOR_NAME, connectorConfig1); +connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( +CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start in time" +); + +// Bring down Kafka, which should cause some REST requests to fail +log.info("Stopping Kafka cluster"); +connect.kafka().stopOnlyKafka(); +// Allow for the workers to discover that the coordinator is unavailable, wait is +// heartbeat timeout * 2 + 4sec +Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + +connect.requestTimeout(5_000); +// Try to reconfigure the connector, which should fail with a timeout error +log.info("Trying to reconfigure connector while Kafka cluster is down"); +ConnectRestException e = assertThrows( +ConnectRestException.class, +() -> connect.configureConnector(CONNECTOR_NAME, connectorConfig2) +); +assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); +assertNotNull(e.getMessage()); +assertTrue( +"Message '" + e.getMessage() + "' does not match expected format", +e.getMessage().contains("Request timed out. The worker is currently flushing updates to the status topic") Review Comment: Is this the only error message possible when shutting down kafka, or the most common? I would expect that an error about ensuring membership in the cluster could appear. ## connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.util.concurrent.atomic.AtomicLong; + +public class Stage { +private final String description; +private final long started; +private final
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
C0urante commented on PR #14562: URL: https://github.com/apache/kafka/pull/14562#issuecomment-1839473863 @gharris1727 @yashmayya would either of you have a moment to take a look at this one? Hoping to merge in time for 3.7.0 if possible. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
C0urante commented on PR #14562: URL: https://github.com/apache/kafka/pull/14562#issuecomment-1780284540 Okay, I've pushed a couple new commits that: - Introduce the notion of a completion time for a callback stage - Add granularity to the callback stages for the distributed herder's tick thread I know that this doesn't cover everything we discussed, but I did give the rest a try. I explored an approach where we defined broader tick thread stages (declaring them in `DistributedHerder:;tick` and not in methods it invokes, and following a similar approach for herder requests). This turned out to be infeasible because of the control flow during a rebalance, where the herder invokes `WorkerGroupMember::poll` or `WorkerGroupMember::ensureActive`, which in turn can end up invoking `DistributedHerder.RebalanceListener::onRevoked`, which in turn can perform operations that warrant a distinct tick stage from, e.g., "ensuring membership in the cluster". Instead, I've tried for an approach where the tick thread stages are defined as narrowly as possible, and only around operations that we can reasonably anticipate will block. This does slightly increase the odds of a stage being completed when a request times out, but since the information about that stage isn't lost anymore, the fallout from that scenario is limited. I also experimented with the `Supplier` approach to reduce the runtime complexity of stage tracking for herder requests, but found that this was more difficult to unit test. Instead of being able to track the set of all recorded stages for a callback, we would have to manually query the `Supplier` after each anticipated herder stage update, which is more work and can fail to collect some stages if not queried at the correct time (especially if it's too difficult to query at a specific point in time during a call to `DistributedHerder::tick`). Since we both agree that performance shouldn't be a concern here, I hope this is acceptable. I've also verified with three consecutive Jenkins runs that the new unit test should finally be flake-free. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
gharris1727 commented on PR #14562: URL: https://github.com/apache/kafka/pull/14562#issuecomment-1773197588 > it's the first published one :) This is a much better way to put it, and captures what I was going for :) > I suppose we could do something similar where we decompose tick() :+1: > If there are functional benefits to decomposing operations further, then we can and should explore those... > but the risk of regression alone outweighs the benefits of doing that solely for cosmetic benefits, not to mention the additional implementation complexity and hit in readability. I don't think there would be a functional change, but I think that readability could be improved. I do understand your concern about regressions, and I worry if this code has become too difficult and risky to change to attempt such refactors. It will only get worse as time goes on. > there's no guarantee that a request that took too long timed out because it was blocked on the herder thread. Thanks for this clarification. A request can be blocked on various threads during it's lifetime, not just the herder tick thread. > I'm happy to do some benchmarking if we're worried about the performance impact of this approach. I'm personally a little skeptical that setting a new value for the ConvertingFutureCallback.currentStage field is likely to be significant, even if it is marked volatile. I completely agree, this single operation should be lightweight such that we can perform thousands of them and not meaningfully change the runtime of the overall request. You could do benchmarking to prove that, but I don't think you have to spend the time doing that if you change the asymptotic behavior. > It's also a little strange to refer to this as a quadratic cost, since I can't envision a reasonable version of this approach that records any more than a few dozen herder tick thread stages, even with maximum granularity. I should have clarified: it's quadratic in the number of requests, not the stages per request. Here's the situation: Lets say every request is identical and goes through K Stages, and that we are processing N requests in total, but they come in two at a time, such that there is only Q=1 element in the queue at a time. For each request (N): For each stage (K): For each element in the queue (1): set the queued stage This ultimately does N*K work. If the requests all come in at once, and the Q=N initially, then this happens: For each request (N): For each stage (K): For each element in the queue (N, N-1, 1): set the queued stage This ultimately does 1/2 * N^2 * K work (1/2 because the queue gets shorter after each request). So the longer the queue is, the more work the herder has to do. It's a very weak positive feedback loop. I think you can eliminate it with a layer of indirection. If instead of storing a Stage in the callback, you could store a Supplier that retrieves the tickThreadStage. You would set the supplier once when the request is added to the queue, and evaluate the supplier when a timeout occurs. setTickThreadStage doesn't need to update each of the individual requests, because if a timeout occurs they will retrieve the latest one via the getter. Once a request leaves the queue, it can change the Supplier to no longer blame the tick thread for timeouts. > "the last thing the worker was working on was , which began at and ended at " I think this is reasonable, and I'll leave it up to you if you want to implement it. Modeling a "Stage" as an interval of time rather than a single instant seems natural, even if the REST error will mostly only see in-progress intervals. > Still, if we want to be even more informative, we could augment the currently-proposed error message not just with the current stage, but a history of stages and their start (and possibly end) times. IMO this is a bit much for the first patch and it'd be nice to wait for user feedback to see if it's actually necessary, but I wouldn't be opposed to it if others (including you) feel differently. Sure, we can push full traces to an optional follow-up. Full traces would be more resource intensive and possibly too verbose for the REST API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
C0urante commented on PR #14562: URL: https://github.com/apache/kafka/pull/14562#issuecomment-1771701339 Thanks Greg. To be clear, this isn't really a first pass--it's the first published one :) With regards to giving each request a single scope--this would fail to capture blocking operations outside of the request (such as other requests, ensuring group membership, and reading to the end of the config topic). I suppose we could do something similar where we decompose `tick()` into a series of either 1) method invocations that have a single scope or 2) herder requests that have their own scopes (at least one, possibly more). As far as breaking down herder requests into several smaller requests goes, I don't think it's wise to do this when not absolutely necessary. The only reason we added that kind of decomposition in https://github.com/apache/kafka/pull/8069 was to avoid blocking on interactions with connector plugins that could otherwise disable the herder's work thread. If there are functional benefits to decomposing operations further, then we can and should explore those, but the risk of regression alone outweighs the benefits of doing that solely for cosmetic benefits, not to mention the additional implementation complexity and hit in readability. I don't think this rules out the possibility of trying to add some more scope-based structure to herder requests so that, for example, every request is a series of single-scope method invocations, but I'm also not sure that that approach is realistic since it may increase the odds of inaccurate error messages. I'm also trying to optimize for user benefit with our error messages here. Telling someone that the worker is blocked reading to the end of the config topic is useful; telling someone that the worker is blocked while deleting a connector is less useful, especially if that error message is delivered in response to the request that triggered connector deletion. If we do opt for a different approach, I'd like it if we could add fine-grained error messages with the first PR, and not as a follow-up item. With regards to the so-called "smells": > I think having the stage be both a property of the callback and the herder was a bit odd; perhaps the HTTP request thread can just query the herder directly when the timeout occurs. I explored this approach initially, but it comes with a pretty severe drawback: there's no guarantee that a request that took too long timed out because it was blocked on the herder thread. Connector validation is the biggest example of this, though there's also connector restarts, which currently block on the connector actually completing startup. In cases like these, it could be misleading to users to tell them what the herder thread is doing when that information has no bearing on why their request timed out. > Related, the linear "set the Stage for all queued requests" in seems bad to me. The queue is supposed to be small of course, but if timeouts are happening, it may be happening due to high tick thread contention/request load. In that situation, having every bit of progress on the tick thread need to update the queued requests is ultimately a quadratic cost, potentially making tick thread contention worse. I'm happy to do some benchmarking if we're worried about the performance impact of this approach. I'm personally a little skeptical that setting a new value for the `ConvertingFutureCallback.currentStage` field is likely to be significant, even if it is marked `volatile`. It's also a little strange to refer to this as a quadratic cost, since I can't envision a reasonable version of this approach that records any more than a few dozen herder tick thread stages, even with maximum granularity. > Setting the stage to null after completing some operation completes leaves the opportunity for race conditions to degrade the error messages. Between slow operations (presumably when the thread is Running) the error message wouldn't have the context for what slow operation just finished, even though if the timeout happened a fraction of a second earlier, it would be displayed. This is acceptable if the wait times are large, the time spent Running is small, and we never forget to add a Stage for a slow operation. But if any of those aren't true, the diagnostic power of this feature decreases. I think the key point here is that we need to be careful to record stages for potentially-slow operations, which I've tried to do here. But it's true that in the future, if we move things around and forget to wrap a blocking operation with, e.g., a `TickThreadStage`, then users will lose insight into operations. I haven't found a good alternative yet that doesn't run the risk of reporting incorrect operations. Perhaps we could augment the error message not just with when the operation started, but also, if applicable,
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
gharris1727 commented on PR #14562: URL: https://github.com/apache/kafka/pull/14562#issuecomment-1769350486 Hey @C0urante thanks for taking this on! This is certainly an interesting first pass at the problem, and I share your concerns about the maintenance burden and SNR. One of the things that I think may be causing some friction for the implementation is that the granularity of the Stages doesn't match the granularity of the actual tick thread & herder requests. What I mean to say is that one function has multiple different stages associated with it, instead of there being a 1:1 relationship. Perhaps the design could be simpler if each code-block was given a description of what it did statically, like a new String argument to addRequest. This would be significantly less granular than the current implementation, but we could refactor the control flow of current requests into multiple smaller requests to improve the granularity. This could also be beneficial for the structure of the herder tick thread depending on how we refactor it. We could either wait to implement the improved error messages after such a refactor, or land the low granularity error messages with a plan to improve them. Refactoring the herder requests into smaller sub-units may also be too involved and have even worse ROI, i'm not sure. Also just some smells that I picked up on: * I think having the stage be both a property of the callback and the herder was a bit odd; perhaps the HTTP request thread can just query the herder directly when the timeout occurs. * Related, the linear "set the Stage for all queued requests" in seems bad to me. The queue is supposed to be small of course, but if timeouts are happening, it may be happening due to high tick thread contention/request load. In that situation, having every bit of progress on the tick thread need to update the queued requests is ultimately a quadratic cost, potentially making tick thread contention worse. * Setting the stage to `null` after completing some operation completes leaves the opportunity for race conditions to degrade the error messages. Between slow operations (presumably when the thread is Running) the error message wouldn't have the context for what slow operation just finished, even though if the timeout happened a fraction of a second earlier, it would be displayed. This is acceptable if the wait times are large, the time spent Running is small, and we never forget to add a Stage for a slow operation. But if any of those aren't true, the diagnostic power of this feature decreases. * Statistically, the operation which is running when the timeout occurs is the most likely to have caused the timeout, but all of the actions running since the request started will contribute. To use an analogy: If you're experiencing slowness on your computer, you can sample the CPU and figure out that Program A happens to be running at that moment. But wouldn't it be more informative to know that over the past 30 seconds, 99% of the CPU was spent on program B, and you just got "unlucky" and observed program A was running? If you can only sample the CPU, you'd have to take multiple samples (issue multiple REST requests) to learn what the real problem was. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
C0urante commented on PR #14562: URL: https://github.com/apache/kafka/pull/14562#issuecomment-1769215371 I've run into some issues with integration tests on Jenkins. I'm experimenting now with some potential fixes, and the rest of the PR should still be ready for review in the meantime. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
C0urante commented on PR #14562: URL: https://github.com/apache/kafka/pull/14562#issuecomment-1766723596 @gharris1727 @yashmayya I'd be interested in your thoughts on this one if/when you have time. I've heard several complaints from Kafka Connect users about REST requests timing out and want to make that easier on everyone involved, but I'm also conscious that if we're not careful about this kind of change, it can present a serious maintenance burden on the project. I've tried to implement this in a way to reduce impact on the signal-to-noise ratio in critical parts of the codebase (i.e., `DistributedHerder`) as much as possible, but even with that, I'm not certain that this is worth the ROI. Would welcome some outside perspectives on whether this is worth adding, and if not, alternatives we can explore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]
C0urante opened a new pull request, #14562: URL: https://github.com/apache/kafka/pull/14562 [Jira](https://issues.apache.org/jira/browse/KAFKA-15563) There are three common scenarios when Kafka Connect REST requests time out: 1. A connector operation (such as a call to `Connector::validate`) has blocked 2. A distributed worker's herder tick thread is blocked (which can be for a variety of reasons, including failure to reach the group coordinator, failure to complete startup, and failure to reach the config topic) 3. A distributed worker has to issue its own request to another worker, and that request blocks (note that this does not include transparently-forwarded requests, and currently only includes requests to submit task configurations and perform zombie fencing) This PR adds error messages to REST requests that help capture which operations are causing the worker to be unable to respond in time. The error messages include a description of the action the worker is blocked on, and a human-readable timestamp of when that action began. Some rejected alternatives: - One or more new JMX metrics (harder for users to access, would require a KIP, and would likely be just as difficult to implement, especially when trying to accommodate blocks in both connector interactions, which can take place on an isolated per-request basis, and in a distributed worker's herder tick thread, which can affect multiple REST requests) - Exposing the stack trace of the distributed worker's herder tick thread as part of the error message for timeouts (does not help with requests that are not blocked because of the herder tick thread, and may present a security risk by including source code for user-provided plugins such as `ConfigProvider` instances) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org