C0urante commented on code in PR #14562: URL: https://github.com/apache/kafka/pull/14562#discussion_r1416090471
########## connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.util.concurrent.atomic.AtomicLong; + +public class Stage { + private final String description; + private final long started; + private final AtomicLong completed; + + public Stage(String description, long started) { + if (started < 0) + throw new IllegalArgumentException("Invalid start timestamp " + started + "; cannot be negative"); + + this.description = description; + this.started = started; + this.completed = new AtomicLong(-1); + } + + public String description() { + return description; + } + + public long started() { + return started; + } + + public Long completed() { + long result = completed.get(); + return result >= 0 ? result : null; + } + + public synchronized void complete(long time) { + if (time < 0) + throw new IllegalArgumentException("Cannot complete stage with negative timestamp " + time); + if (time < started) + throw new IllegalArgumentException("Cannot complete stage with timestamp " + time + " before its start time " + started); + + this.completed.updateAndGet(l -> { + if (l >= 0) + throw new IllegalStateException("Stage is already completed"); Review Comment: Good call, done 👍 Also changed to logging a warning instead of throwing an exception when the completion time precedes the start time. ########## connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.util.concurrent.atomic.AtomicLong; + +public class Stage { + private final String description; + private final long started; + private final AtomicLong completed; + + public Stage(String description, long started) { + if (started < 0) + throw new IllegalArgumentException("Invalid start timestamp " + started + "; cannot be negative"); + + this.description = description; + this.started = started; + this.completed = new AtomicLong(-1); + } + + public String description() { + return description; + } + + public long started() { + return started; + } + + public Long completed() { + long result = completed.get(); + return result >= 0 ? result : null; + } + + public synchronized void complete(long time) { + if (time < 0) + throw new IllegalArgumentException("Cannot complete stage with negative timestamp " + time); + if (time < started) + throw new IllegalArgumentException("Cannot complete stage with timestamp " + time + " before its start time " + started); + + this.completed.updateAndGet(l -> { + if (l >= 0) + throw new IllegalStateException("Stage is already completed"); Review Comment: Good call, done 👍 Also changed to logging a warning instead of throwing an exception when the completion time precedes the start time. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java: ########## @@ -165,7 +164,7 @@ public class AbstractHerderTest { private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA"; private final int generation = 5; private final String connectorName = "connector"; - private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy(); + private final SampleConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new SampleConnectorClientConfigOverridePolicy(); Review Comment: Left in from a previous iteration; reverted. ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ########## @@ -771,6 +778,104 @@ private Map<String, String> defaultSinkConnectorProps(String topics) { return props; } + @Test + public void testRequestTimeouts() throws Exception { + final String configTopic = "test-request-timeout-configs"; + workerProps.put(CONFIG_TOPIC_CONFIG, configTopic); + // Workaround for KAFKA-15676, which can cause the scheduled rebalance delay to + // be spuriously triggered after the group coordinator for a Connect cluster is bounced + // Set to 1 instead of 0 as another workaround for KAFKA-15693, which can cause + // connectors and tasks to be unassigned indefinitely if the scheduled rebalance delay + // is set to 0 + workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1"); + connect = connectBuilder + .numBrokers(1) + .numWorkers(1) + .build(); + connect.start(); + connect.assertions().assertAtLeastNumWorkersAreUp(1, + "Worker did not start in time"); + + Map<String, String> connectorConfig1 = defaultSourceConnectorProps(TOPIC_NAME); + Map<String, String> connectorConfig2 = new HashMap<>(connectorConfig1); + connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 1)); + + // Create a connector to ensure that the worker has completed startup + log.info("Creating initial connector"); + connect.configureConnector(CONNECTOR_NAME, connectorConfig1); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start in time" + ); + + // Bring down Kafka, which should cause some REST requests to fail + log.info("Stopping Kafka cluster"); + connect.kafka().stopOnlyKafka(); + // Allow for the workers to discover that the coordinator is unavailable, wait is + // heartbeat timeout * 2 + 4sec + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + + connect.requestTimeout(5_000); + // Try to reconfigure the connector, which should fail with a timeout error + log.info("Trying to reconfigure connector while Kafka cluster is down"); + ConnectRestException e = assertThrows( + ConnectRestException.class, + () -> connect.configureConnector(CONNECTOR_NAME, connectorConfig2) + ); + assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); + assertNotNull(e.getMessage()); + assertTrue( + "Message '" + e.getMessage() + "' does not match expected format", + e.getMessage().contains("Request timed out. The worker is currently flushing updates to the status topic") Review Comment: Surprisingly, this is actually the first operation that will block indefinitely if the Kafka cluster goes down. We [proactively revoke](https://github.com/apache/kafka/blob/2b99d0e45027c88ae2347fa8f7d1ff4b2b919089/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java#L141) all connectors and tasks on the worker if we're unable to reach the Kafka group coordinator for long enough, which in turn causes the worker to [flush the status store](https://github.com/apache/kafka/blob/2b99d0e45027c88ae2347fa8f7d1ff4b2b919089/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2596), which blocks until Kafka becomes available again. I'd be worried about this (the herder thread getting blocked, which is usually pretty awful), but for a system designed to integrate with Kafka, it doesn't seem so bad or so unreasonable for our availability to be tied to the targeted Kafka cluster's. I've augmented the stage description to hopefully shed a little more light on the issue ########## connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.util.concurrent.atomic.AtomicLong; + +public class Stage { + private final String description; + private final long started; + private final AtomicLong completed; + + public Stage(String description, long started) { + if (started < 0) + throw new IllegalArgumentException("Invalid start timestamp " + started + "; cannot be negative"); Review Comment: Fair enough 😄 I've removed the prohibition on negative times. I believe all that we need for thread safety here is a `volatile Long` (no need for `AtomicReference` or `AtomicLong`); let me know if I'm missing something, though. ########## connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import java.util.concurrent.atomic.AtomicLong; + +public class Stage { + private final String description; + private final long started; + private final AtomicLong completed; + + public Stage(String description, long started) { + if (started < 0) + throw new IllegalArgumentException("Invalid start timestamp " + started + "; cannot be negative"); Review Comment: Fair enough 😄 I've removed the prohibition on negative times. I believe all that we need for thread safety here is a `volatile Long` (no need for `AtomicReference` or `AtomicLong`); let me know if I'm missing something, though. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -394,9 +399,16 @@ public void validateConnectorConfig(Map<String, String> connectorProps, Callback @Override public void validateConnectorConfig(Map<String, String> connectorProps, Callback<ConfigInfos> callback, boolean doLog) { + callback.recordStage(new Stage( + "waiting for a new thread to become available for connector validation", + time.milliseconds() + )); connectorExecutor.submit(() -> { + callback.recordStage(null); Review Comment: Good call, done 👍 ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -394,9 +399,16 @@ public void validateConnectorConfig(Map<String, String> connectorProps, Callback @Override public void validateConnectorConfig(Map<String, String> connectorProps, Callback<ConfigInfos> callback, boolean doLog) { + callback.recordStage(new Stage( + "waiting for a new thread to become available for connector validation", + time.milliseconds() + )); connectorExecutor.submit(() -> { + callback.recordStage(null); Review Comment: Good call, done 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org