Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-12-11 Thread via GitHub


C0urante merged PR #14562:
URL: https://github.com/apache/kafka/pull/14562


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-12-11 Thread via GitHub


C0urante commented on PR #14562:
URL: https://github.com/apache/kafka/pull/14562#issuecomment-1850942821

   All CI failures appear unrelated, merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-12-07 Thread via GitHub


C0urante commented on PR #14562:
URL: https://github.com/apache/kafka/pull/14562#issuecomment-1845996495

   Thanks Greg! I've made one final tweak to the integration tests. Hoping this 
doesn't affect test stability; will merge if things look alright on Jenkins.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-12-07 Thread via GitHub


C0urante commented on code in PR #14562:
URL: https://github.com/apache/kafka/pull/14562#discussion_r1419525430


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -771,6 +779,114 @@ private Map 
defaultSinkConnectorProps(String topics) {
 return props;
 }
 
+@Test
+public void testRequestTimeouts() throws Exception {
+final String configTopic = "test-request-timeout-configs";
+workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+// Workaround for KAFKA-15676, which can cause the scheduled rebalance 
delay to
+// be spuriously triggered after the group coordinator for a Connect 
cluster is bounced
+// Set to 1 instead of 0 as another workaround for KAFKA-15693, which 
can cause
+// connectors and tasks to be unassigned indefinitely if the scheduled 
rebalance delay
+// is set to 0

Review Comment:
   I've just noticed that this part is no longer necessary since we've merged a 
[fix](https://github.com/apache/kafka/pull/14647) for 
[KAFKA-15693](https://issues.apache.org/jira/browse/KAFKA-15693).
   
   I'll remove this comment and change the value to zero. If the test continues 
to pass on Jenkins, should be safe to merge.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-12-07 Thread via GitHub


C0urante commented on code in PR #14562:
URL: https://github.com/apache/kafka/pull/14562#discussion_r1419175286


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -771,6 +778,104 @@ private Map 
defaultSinkConnectorProps(String topics) {
 return props;
 }
 
+@Test
+public void testRequestTimeouts() throws Exception {
+final String configTopic = "test-request-timeout-configs";
+workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+// Workaround for KAFKA-15676, which can cause the scheduled rebalance 
delay to
+// be spuriously triggered after the group coordinator for a Connect 
cluster is bounced
+// Set to 1 instead of 0 as another workaround for KAFKA-15693, which 
can cause
+// connectors and tasks to be unassigned indefinitely if the scheduled 
rebalance delay
+// is set to 0
+workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1");
+connect = connectBuilder
+.numBrokers(1)
+.numWorkers(1)
+.build();
+connect.start();
+connect.assertions().assertAtLeastNumWorkersAreUp(1,
+"Worker did not start in time");
+
+Map connectorConfig1 = 
defaultSourceConnectorProps(TOPIC_NAME);
+Map connectorConfig2 = new HashMap<>(connectorConfig1);
+connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 
1));
+
+// Create a connector to ensure that the worker has completed startup
+log.info("Creating initial connector");
+connect.configureConnector(CONNECTOR_NAME, connectorConfig1);
+connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start 
in time"
+);
+
+// Bring down Kafka, which should cause some REST requests to fail
+log.info("Stopping Kafka cluster");
+connect.kafka().stopOnlyKafka();
+// Allow for the workers to discover that the coordinator is 
unavailable, wait is
+// heartbeat timeout * 2 + 4sec
+Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+connect.requestTimeout(5_000);
+// Try to reconfigure the connector, which should fail with a timeout 
error
+log.info("Trying to reconfigure connector while Kafka cluster is 
down");
+ConnectRestException e = assertThrows(
+ConnectRestException.class,
+() -> connect.configureConnector(CONNECTOR_NAME, 
connectorConfig2)
+);
+assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode());
+assertNotNull(e.getMessage());
+assertTrue(
+"Message '" + e.getMessage() + "' does not match expected 
format",
+e.getMessage().contains("Request timed out. The worker is 
currently flushing updates to the status topic")

Review Comment:
   Ah, good idea! Added assert-with-retry logic to the `assertTimeoutException` 
utility method.
   
   I've also tweaked how we track stages when the herder is polling the group 
coordinator. Previously, when the `WorkerGroupMember` and `WorkerCoordinator` 
methods only accepted a `Runnable`, those stages would never be closed. Now, 
those methods accept a `Supplier` that, at the moment, 
always returns a `Distributed.TickThreadStage`, which can be used with a 
try-with-resources block to automatically register and complete tick thread 
stages.
   
   I know that I've used `UncheckedCloseable` incorrectly in the past; I 
believe this time doesn't suffer from any of the issues my previous usages did. 
But if the result is still undesirable, we can explore alternatives like 
introducing a new interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-12-06 Thread via GitHub


gharris1727 commented on code in PR #14562:
URL: https://github.com/apache/kafka/pull/14562#discussion_r1418003736


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -771,6 +778,104 @@ private Map 
defaultSinkConnectorProps(String topics) {
 return props;
 }
 
+@Test
+public void testRequestTimeouts() throws Exception {
+final String configTopic = "test-request-timeout-configs";
+workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+// Workaround for KAFKA-15676, which can cause the scheduled rebalance 
delay to
+// be spuriously triggered after the group coordinator for a Connect 
cluster is bounced
+// Set to 1 instead of 0 as another workaround for KAFKA-15693, which 
can cause
+// connectors and tasks to be unassigned indefinitely if the scheduled 
rebalance delay
+// is set to 0
+workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1");
+connect = connectBuilder
+.numBrokers(1)
+.numWorkers(1)
+.build();
+connect.start();
+connect.assertions().assertAtLeastNumWorkersAreUp(1,
+"Worker did not start in time");
+
+Map connectorConfig1 = 
defaultSourceConnectorProps(TOPIC_NAME);
+Map connectorConfig2 = new HashMap<>(connectorConfig1);
+connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 
1));
+
+// Create a connector to ensure that the worker has completed startup
+log.info("Creating initial connector");
+connect.configureConnector(CONNECTOR_NAME, connectorConfig1);
+connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start 
in time"
+);
+
+// Bring down Kafka, which should cause some REST requests to fail
+log.info("Stopping Kafka cluster");
+connect.kafka().stopOnlyKafka();
+// Allow for the workers to discover that the coordinator is 
unavailable, wait is
+// heartbeat timeout * 2 + 4sec
+Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+connect.requestTimeout(5_000);
+// Try to reconfigure the connector, which should fail with a timeout 
error
+log.info("Trying to reconfigure connector while Kafka cluster is 
down");
+ConnectRestException e = assertThrows(
+ConnectRestException.class,
+() -> connect.configureConnector(CONNECTOR_NAME, 
connectorConfig2)
+);
+assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode());
+assertNotNull(e.getMessage());
+assertTrue(
+"Message '" + e.getMessage() + "' does not match expected 
format",
+e.getMessage().contains("Request timed out. The worker is 
currently flushing updates to the status topic")

Review Comment:
   Ah I see. So the `polling the group coordinator for up to ... or until 
interrupted` stage is only temporary and `flushing updates to the status topic` 
is permanent, so it'll always eventually get stuck on this stage.
   
   Do you think it's possible for the preceeding `Thread.sleep()` to cause a 
flake here, if the worker is in the "polling the group coordinator" stage for 
too long? Perhaps we could replace the sleep with a wait-until-condition that 
repeatedly makes the request until the flushing status store error appears.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-12-05 Thread via GitHub


C0urante commented on code in PR #14562:
URL: https://github.com/apache/kafka/pull/14562#discussion_r1416090471


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Stage {
+private final String description;
+private final long started;
+private final AtomicLong completed;
+
+public Stage(String description, long started) {
+if (started < 0)
+throw new IllegalArgumentException("Invalid start timestamp " + 
started + "; cannot be negative");
+
+this.description = description;
+this.started = started;
+this.completed = new AtomicLong(-1);
+}
+
+public String description() {
+return description;
+}
+
+public long started() {
+return started;
+}
+
+public Long completed() {
+long result = completed.get();
+return result >= 0 ? result : null;
+}
+
+public synchronized void complete(long time) {
+if (time < 0)
+throw new IllegalArgumentException("Cannot complete stage with 
negative timestamp " + time);
+if (time < started)
+throw new IllegalArgumentException("Cannot complete stage with 
timestamp " + time + " before its start time " + started);
+
+this.completed.updateAndGet(l -> {
+if (l >= 0)
+throw new IllegalStateException("Stage is already completed");

Review Comment:
   Good call, done 
   Also changed to logging a warning instead of throwing an exception when the 
completion time precedes the start time.



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Stage {
+private final String description;
+private final long started;
+private final AtomicLong completed;
+
+public Stage(String description, long started) {
+if (started < 0)
+throw new IllegalArgumentException("Invalid start timestamp " + 
started + "; cannot be negative");
+
+this.description = description;
+this.started = started;
+this.completed = new AtomicLong(-1);
+}
+
+public String description() {
+return description;
+}
+
+public long started() {
+return started;
+}
+
+public Long completed() {
+long result = completed.get();
+return result >= 0 ? result : null;
+}
+
+public synchronized void complete(long time) {
+if (time < 0)
+throw new IllegalArgumentException("Cannot complete stage with 
negative timestamp " + time);
+if (time < started)
+throw new IllegalArgumentException("Cannot complete stage with 
timestamp " + time + " before its start time " + started);
+
+this.completed.updateAndGet(l -> {
+if (l >= 0)
+throw new IllegalStateException("Stage is already completed");

Review Comment:
   Good call, done 
   Also changed to logging a warning instead of throwing an exception when the 
completion time precedes the start time.



##

Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-12-04 Thread via GitHub


gharris1727 commented on code in PR #14562:
URL: https://github.com/apache/kafka/pull/14562#discussion_r1414514216


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -394,9 +399,16 @@ public void validateConnectorConfig(Map 
connectorProps, Callback
 
 @Override
 public void validateConnectorConfig(Map connectorProps, 
Callback callback, boolean doLog) {
+callback.recordStage(new Stage(
+"waiting for a new thread to become available for connector 
validation",
+time.milliseconds()
+));
 connectorExecutor.submit(() -> {
+callback.recordStage(null);

Review Comment:
   Could this instead be a call to Stage::complete, or a use for TemporaryStage?



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -771,6 +778,104 @@ private Map 
defaultSinkConnectorProps(String topics) {
 return props;
 }
 
+@Test
+public void testRequestTimeouts() throws Exception {
+final String configTopic = "test-request-timeout-configs";
+workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+// Workaround for KAFKA-15676, which can cause the scheduled rebalance 
delay to
+// be spuriously triggered after the group coordinator for a Connect 
cluster is bounced
+// Set to 1 instead of 0 as another workaround for KAFKA-15693, which 
can cause
+// connectors and tasks to be unassigned indefinitely if the scheduled 
rebalance delay
+// is set to 0
+workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1");
+connect = connectBuilder
+.numBrokers(1)
+.numWorkers(1)
+.build();
+connect.start();
+connect.assertions().assertAtLeastNumWorkersAreUp(1,
+"Worker did not start in time");
+
+Map connectorConfig1 = 
defaultSourceConnectorProps(TOPIC_NAME);
+Map connectorConfig2 = new HashMap<>(connectorConfig1);
+connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 
1));
+
+// Create a connector to ensure that the worker has completed startup
+log.info("Creating initial connector");
+connect.configureConnector(CONNECTOR_NAME, connectorConfig1);
+connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start 
in time"
+);
+
+// Bring down Kafka, which should cause some REST requests to fail
+log.info("Stopping Kafka cluster");
+connect.kafka().stopOnlyKafka();
+// Allow for the workers to discover that the coordinator is 
unavailable, wait is
+// heartbeat timeout * 2 + 4sec
+Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+connect.requestTimeout(5_000);
+// Try to reconfigure the connector, which should fail with a timeout 
error
+log.info("Trying to reconfigure connector while Kafka cluster is 
down");
+ConnectRestException e = assertThrows(
+ConnectRestException.class,
+() -> connect.configureConnector(CONNECTOR_NAME, 
connectorConfig2)
+);
+assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode());
+assertNotNull(e.getMessage());
+assertTrue(
+"Message '" + e.getMessage() + "' does not match expected 
format",
+e.getMessage().contains("Request timed out. The worker is 
currently flushing updates to the status topic")

Review Comment:
   Is this the only error message possible when shutting down kafka, or the 
most common?
   I would expect that an error about ensuring membership in the cluster could 
appear.



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Stage {
+private final String description;
+private final long started;
+private final 

Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-12-04 Thread via GitHub


C0urante commented on PR #14562:
URL: https://github.com/apache/kafka/pull/14562#issuecomment-1839473863

   @gharris1727 @yashmayya would either of you have a moment to take a look at 
this one? Hoping to merge in time for 3.7.0 if possible. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-10-25 Thread via GitHub


C0urante commented on PR #14562:
URL: https://github.com/apache/kafka/pull/14562#issuecomment-1780284540

   Okay, I've pushed a couple new commits that:
   - Introduce the notion of a completion time for a callback stage
   - Add granularity to the callback stages for the distributed herder's tick 
thread
   
   I know that this doesn't cover everything we discussed, but I did give the 
rest a try.
   
   I explored an approach where we defined broader tick thread stages 
(declaring them in `DistributedHerder:;tick` and not in methods it invokes, and 
following a similar approach for herder requests). This turned out to be 
infeasible because of the control flow during a rebalance, where the herder 
invokes `WorkerGroupMember::poll` or `WorkerGroupMember::ensureActive`, which 
in turn can end up invoking `DistributedHerder.RebalanceListener::onRevoked`, 
which in turn can perform operations that warrant a distinct tick stage from, 
e.g., "ensuring membership in the cluster".
   
   Instead, I've tried for an approach where the tick thread stages are defined 
as narrowly as possible, and only around operations that we can reasonably 
anticipate will block. This does slightly increase the odds of a stage being 
completed when a request times out, but since the information about that stage 
isn't lost anymore, the fallout from that scenario is limited.
   
   I also experimented with the `Supplier` approach to reduce the 
runtime complexity of stage tracking for herder requests, but found that this 
was more difficult to unit test. Instead of being able to track the set of all 
recorded stages for a callback, we would have to manually query the `Supplier` 
after each anticipated herder stage update, which is more work and can fail to 
collect some stages if not queried at the correct time (especially if it's too 
difficult to query at a specific point in time during a call to 
`DistributedHerder::tick`). Since we both agree that performance shouldn't be a 
concern here, I hope this is acceptable.
   
   I've also verified with three consecutive Jenkins runs that the new unit 
test should finally be flake-free.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-10-20 Thread via GitHub


gharris1727 commented on PR #14562:
URL: https://github.com/apache/kafka/pull/14562#issuecomment-1773197588

   > it's the first published one :)
   
   This is a much better way to put it, and captures what I was going for :)
   
   > I suppose we could do something similar where we decompose tick()
   
   :+1:
   
   > If there are functional benefits to decomposing operations further, then 
we can and should explore those...
   > but the risk of regression alone outweighs the benefits of doing that 
solely for cosmetic benefits, not to mention the additional implementation 
complexity and hit in readability. 
   
   I don't think there would be a functional change, but I think that 
readability could be improved. I do understand your concern about regressions, 
and I worry if this code has become too difficult and risky to change to 
attempt such refactors. It will only get worse as time goes on.
   
   > there's no guarantee that a request that took too long timed out because 
it was blocked on the herder thread.
   
   Thanks for this clarification. A request can be blocked on various threads 
during it's lifetime, not just the herder tick thread.
   
   > I'm happy to do some benchmarking if we're worried about the performance 
impact of this approach. I'm personally a little skeptical that setting a new 
value for the ConvertingFutureCallback.currentStage field is likely to be 
significant, even if it is marked volatile.
   
   I completely agree, this single operation should be lightweight such that we 
can perform thousands of them and not meaningfully change the runtime of the 
overall request. You could do benchmarking to prove that, but I don't think you 
have to spend the time doing that if you change the asymptotic behavior.
   
   > It's also a little strange to refer to this as a quadratic cost, since I 
can't envision a reasonable version of this approach that records any more than 
a few dozen herder tick thread stages, even with maximum granularity.
   
   I should have clarified: it's quadratic in the number of requests, not the 
stages per request. Here's the situation:
   Lets say every request is identical and goes through K Stages, and that we 
are processing N requests in total, but they come in two at a time, such that 
there is only Q=1 element in the queue at a time.
   For each request (N):
   For each stage (K):
   For each element in the queue (1):
   set the queued stage
   This ultimately does N*K work.
   
   If the requests all come in at once, and the Q=N initially, then this 
happens:
   For each request (N):
  For each stage (K):
  For each element in the queue (N, N-1,  1):
 set the queued stage
   This ultimately does 1/2 * N^2 * K work (1/2 because the queue gets shorter 
after each request).
   
   So the longer the queue is, the more work the herder has to do. It's a very 
weak positive feedback loop.
   
   I think you can eliminate it with a layer of indirection. If instead of 
storing a Stage in the callback, you could store a Supplier that 
retrieves the tickThreadStage. You would set the supplier once when the request 
is added to the queue, and evaluate the supplier when a timeout occurs. 
setTickThreadStage doesn't need to update each of the individual requests, 
because if a timeout occurs they will retrieve the latest one via the getter. 
Once a request leaves the queue, it can change the Supplier to no longer 
blame the tick thread for timeouts.
   
   > "the last thing the worker was working on was , which 
began at  and ended at "
   
   I think this is reasonable, and I'll leave it up to you if you want to 
implement it. Modeling a "Stage" as an interval of time rather than a single 
instant seems natural, even if the REST error will mostly only see in-progress 
intervals.
   
   > Still, if we want to be even more informative, we could augment the 
currently-proposed error message not just with the current stage, but a history 
of stages and their start (and possibly end) times. IMO this is a bit much for 
the first patch and it'd be nice to wait for user feedback to see if it's 
actually necessary, but I wouldn't be opposed to it if others (including you) 
feel differently.
   
   Sure, we can push full traces to an optional follow-up. Full traces would be 
more resource intensive and possibly too verbose for the REST API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-10-19 Thread via GitHub


C0urante commented on PR #14562:
URL: https://github.com/apache/kafka/pull/14562#issuecomment-1771701339

   Thanks Greg. To be clear, this isn't really a first pass--it's the first 
published one :)
   
   With regards to giving each request a single scope--this would fail to 
capture blocking operations outside of the request (such as other requests, 
ensuring group membership, and reading to the end of the config topic). I 
suppose we could do something similar where we decompose `tick()` into a series 
of either 1) method invocations that have a single scope or 2) herder requests 
that have their own scopes (at least one, possibly more).
   
   As far as breaking down herder requests into several smaller requests goes, 
I don't think it's wise to do this when not absolutely necessary. The only 
reason we added that kind of decomposition in 
https://github.com/apache/kafka/pull/8069 was to avoid blocking on interactions 
with connector plugins that could otherwise disable the herder's work thread. 
If there are functional benefits to decomposing operations further, then we can 
and should explore those, but the risk of regression alone outweighs the 
benefits of doing that solely for cosmetic benefits, not to mention the 
additional implementation complexity and hit in readability. I don't think this 
rules out the possibility of trying to add some more scope-based structure to 
herder requests so that, for example, every request is a series of single-scope 
method invocations, but I'm also not sure that that approach is realistic since 
it may increase the odds of inaccurate error messages.
   
   I'm also trying to optimize for user benefit with our error messages here. 
Telling someone that the worker is blocked reading to the end of the config 
topic is useful; telling someone that the worker is blocked while deleting a 
connector is less useful, especially if that error message is delivered in 
response to the request that triggered connector deletion. If we do opt for a 
different approach, I'd like it if we could add fine-grained error messages 
with the first PR, and not as a follow-up item.
   
   With regards to the so-called "smells":
   
   > I think having the stage be both a property of the callback and the herder 
was a bit odd; perhaps the HTTP request thread can just query the herder 
directly when the timeout occurs.
   
   I explored this approach initially, but it comes with a pretty severe 
drawback: there's no guarantee that a request that took too long timed out 
because it was blocked on the herder thread. Connector validation is the 
biggest example of this, though there's also connector restarts, which 
currently block on the connector actually completing startup. In  cases like 
these, it could be misleading to users to tell them what the herder thread is 
doing when that information has no bearing on why their request timed out.
   
   > Related, the linear "set the Stage for all queued requests" in seems bad 
to me. The queue is supposed to be small of course, but if timeouts are 
happening, it may be happening due to high tick thread contention/request load. 
In that situation, having every bit of progress on the tick thread need to 
update the queued requests is ultimately a quadratic cost, potentially making 
tick thread contention worse.
   
   I'm happy to do some benchmarking if we're worried about the performance 
impact of this approach. I'm personally a little skeptical that setting a new 
value for the `ConvertingFutureCallback.currentStage` field is likely to be 
significant, even if it is marked `volatile`. It's also a little strange to 
refer to this as a quadratic cost, since I can't envision a reasonable version 
of this approach that records any more than a few dozen herder tick thread 
stages, even with maximum granularity.
   
   > Setting the stage to null after completing some operation completes leaves 
the opportunity for race conditions to degrade the error messages. Between slow 
operations (presumably when the thread is Running) the error message wouldn't 
have the context for what slow operation just finished, even though if the 
timeout happened a fraction of a second earlier, it would be displayed. This is 
acceptable if the wait times are large, the time spent Running is small, and we 
never forget to add a Stage for a slow operation. But if any of those aren't 
true, the diagnostic power of this feature decreases.
   
   I think the key point here is that we need to be careful to record stages 
for potentially-slow operations, which I've tried to do here. But it's true 
that in the future, if we move things around and forget to wrap a blocking 
operation with, e.g., a `TickThreadStage`, then users will lose insight into 
operations. I haven't found a good alternative yet that doesn't run the risk of 
reporting incorrect operations. Perhaps we could augment the error message not 
just with when the operation started, but also, if applicable, 

Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-10-18 Thread via GitHub


gharris1727 commented on PR #14562:
URL: https://github.com/apache/kafka/pull/14562#issuecomment-1769350486

   Hey @C0urante thanks for taking this on!
   
   This is certainly an interesting first pass at the problem, and I share your 
concerns about the maintenance burden and SNR.
   
   One of the things that I think may be causing some friction for the 
implementation is that the granularity of the Stages doesn't match the 
granularity of the actual tick thread & herder requests. What I mean to say is 
that one function has multiple different stages associated with it, instead of 
there being a 1:1 relationship.
   
   Perhaps the design could be simpler if each code-block was given a 
description of what it did statically, like a new String argument to 
addRequest. This would be significantly less granular than the current 
implementation, but we could refactor the control flow of current requests into 
multiple smaller requests to improve the granularity. This could also be 
beneficial for the structure of the herder tick thread depending on how we 
refactor it.
   
   We could either wait to implement the improved error messages after such a 
refactor, or land the low granularity error messages with a plan to improve 
them. Refactoring the herder requests into smaller sub-units may also be too 
involved and have even worse ROI, i'm not sure.
   
   Also just some smells that I picked up on:
   * I think having the stage be both a property of the callback and the herder 
was a bit odd; perhaps the HTTP request thread can just query the herder 
directly when the timeout occurs.
   * Related, the linear "set the Stage for all queued requests" in seems bad 
to me. The queue is supposed to be small of course, but if timeouts are 
happening, it may be happening due to high tick thread contention/request load. 
In that situation, having every bit of progress on the tick thread need to 
update the queued requests is ultimately a quadratic cost, potentially making 
tick thread contention worse.
   * Setting the stage to `null` after completing some operation completes 
leaves the opportunity for race conditions to degrade the error messages. 
Between slow operations (presumably when the thread is Running) the error 
message wouldn't have the context for what slow operation just finished, even 
though if the timeout happened a fraction of a second earlier, it would be 
displayed. This is acceptable if the wait times are large, the time spent 
Running is small, and we never forget to add a Stage for a slow operation. But 
if any of those aren't true, the diagnostic power of this feature decreases.
   * Statistically, the operation which is running when the timeout occurs is 
the most likely to have caused the timeout, but all of the actions running 
since the request started will contribute. To use an analogy: If you're 
experiencing slowness on your computer, you can sample the CPU and figure out 
that Program A happens to be running at that moment. But wouldn't it be more 
informative to know that over the past 30 seconds, 99% of the CPU was spent on 
program B, and you just got "unlucky" and observed program A was running? If 
you can only sample the CPU, you'd have to take multiple samples (issue 
multiple REST requests) to learn what the real problem was.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-10-18 Thread via GitHub


C0urante commented on PR #14562:
URL: https://github.com/apache/kafka/pull/14562#issuecomment-1769215371

   I've run into some issues with integration tests on Jenkins. I'm 
experimenting now with some potential fixes, and the rest of the PR should 
still be ready for review in the meantime.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-10-17 Thread via GitHub


C0urante commented on PR #14562:
URL: https://github.com/apache/kafka/pull/14562#issuecomment-1766723596

   @gharris1727 @yashmayya I'd be interested in your thoughts on this one 
if/when you have time. I've heard several complaints from Kafka Connect users 
about REST requests timing out and want to make that easier on everyone 
involved, but I'm also conscious that if we're not careful about this kind of 
change, it can present a serious maintenance burden on the project.
   
   I've tried to implement this in a way to reduce impact on the 
signal-to-noise ratio in critical parts of the codebase (i.e., 
`DistributedHerder`) as much as possible, but even with that, I'm not certain 
that this is worth the ROI. Would welcome some outside perspectives on whether 
this is worth adding, and if not, alternatives we can explore.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-10-17 Thread via GitHub


C0urante opened a new pull request, #14562:
URL: https://github.com/apache/kafka/pull/14562

   [Jira](https://issues.apache.org/jira/browse/KAFKA-15563)
   
   There are three common scenarios when Kafka Connect REST requests time out:
   
   1. A connector operation (such as a call to `Connector::validate`) has 
blocked
   2. A distributed worker's herder tick thread is blocked (which can be for a 
variety of reasons, including failure to reach the group coordinator, failure 
to complete startup, and failure to reach the config topic)
   3. A distributed worker has to issue its own request to another worker, and 
that request blocks (note that this does not include transparently-forwarded 
requests, and currently only includes requests to submit task configurations 
and perform zombie fencing)
   
   This PR adds error messages to REST requests that help capture which 
operations are causing the worker to be unable to respond in time. The error 
messages include a description of the action the worker is blocked on, and a 
human-readable timestamp of when that action began.
   
   Some rejected alternatives:
   - One or more new JMX metrics (harder for users to access, would require a 
KIP, and would likely be just as difficult to implement, especially when trying 
to accommodate blocks in both connector interactions, which can take place on 
an isolated per-request basis, and in a distributed worker's herder tick 
thread, which can affect multiple REST requests)
   - Exposing the stack trace of the distributed worker's herder tick thread as 
part of the error message for timeouts (does not help with requests that are 
not blocked because of the herder tick thread, and may present a security risk 
by including source code for user-provided plugins such as `ConfigProvider` 
instances)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org