gharris1727 commented on code in PR #14562:
URL: https://github.com/apache/kafka/pull/14562#discussion_r1414514216


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -394,9 +399,16 @@ public void validateConnectorConfig(Map<String, String> 
connectorProps, Callback
 
     @Override
     public void validateConnectorConfig(Map<String, String> connectorProps, 
Callback<ConfigInfos> callback, boolean doLog) {
+        callback.recordStage(new Stage(
+                "waiting for a new thread to become available for connector 
validation",
+                time.milliseconds()
+        ));
         connectorExecutor.submit(() -> {
+            callback.recordStage(null);

Review Comment:
   Could this instead be a call to Stage::complete, or a use for TemporaryStage?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -771,6 +778,104 @@ private Map<String, String> 
defaultSinkConnectorProps(String topics) {
         return props;
     }
 
+    @Test
+    public void testRequestTimeouts() throws Exception {
+        final String configTopic = "test-request-timeout-configs";
+        workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+        // Workaround for KAFKA-15676, which can cause the scheduled rebalance 
delay to
+        // be spuriously triggered after the group coordinator for a Connect 
cluster is bounced
+        // Set to 1 instead of 0 as another workaround for KAFKA-15693, which 
can cause
+        // connectors and tasks to be unassigned indefinitely if the scheduled 
rebalance delay
+        // is set to 0
+        workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1");
+        connect = connectBuilder
+                .numBrokers(1)
+                .numWorkers(1)
+                .build();
+        connect.start();
+        connect.assertions().assertAtLeastNumWorkersAreUp(1,
+                "Worker did not start in time");
+
+        Map<String, String> connectorConfig1 = 
defaultSourceConnectorProps(TOPIC_NAME);
+        Map<String, String> connectorConfig2 = new HashMap<>(connectorConfig1);
+        connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 
1));
+
+        // Create a connector to ensure that the worker has completed startup
+        log.info("Creating initial connector");
+        connect.configureConnector(CONNECTOR_NAME, connectorConfig1);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start 
in time"
+        );
+
+        // Bring down Kafka, which should cause some REST requests to fail
+        log.info("Stopping Kafka cluster");
+        connect.kafka().stopOnlyKafka();
+        // Allow for the workers to discover that the coordinator is 
unavailable, wait is
+        // heartbeat timeout * 2 + 4sec
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+        connect.requestTimeout(5_000);
+        // Try to reconfigure the connector, which should fail with a timeout 
error
+        log.info("Trying to reconfigure connector while Kafka cluster is 
down");
+        ConnectRestException e = assertThrows(
+                ConnectRestException.class,
+                () -> connect.configureConnector(CONNECTOR_NAME, 
connectorConfig2)
+        );
+        assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode());
+        assertNotNull(e.getMessage());
+        assertTrue(
+                "Message '" + e.getMessage() + "' does not match expected 
format",
+                e.getMessage().contains("Request timed out. The worker is 
currently flushing updates to the status topic")

Review Comment:
   Is this the only error message possible when shutting down kafka, or the 
most common?
   I would expect that an error about ensuring membership in the cluster could 
appear.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Stage {
+    private final String description;
+    private final long started;
+    private final AtomicLong completed;
+
+    public Stage(String description, long started) {
+        if (started < 0)
+            throw new IllegalArgumentException("Invalid start timestamp " + 
started + "; cannot be negative");

Review Comment:
   excited for someone to take Kafka back in time to 1955 and open KAFKA ticket 
via western union 😄
   
   On a serious note, but still a nit: negative times are technically valid, if 
a bit unexpected in the modern era. I think you could allow all start/stop 
times if you use an AtomicReference<Long> and use `null` as the sentinel for a 
non-complete Stage.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Stage {
+    private final String description;
+    private final long started;
+    private final AtomicLong completed;
+
+    public Stage(String description, long started) {
+        if (started < 0)
+            throw new IllegalArgumentException("Invalid start timestamp " + 
started + "; cannot be negative");
+
+        this.description = description;
+        this.started = started;
+        this.completed = new AtomicLong(-1);
+    }
+
+    public String description() {
+        return description;
+    }
+
+    public long started() {
+        return started;
+    }
+
+    public Long completed() {
+        long result = completed.get();
+        return result >= 0 ? result : null;
+    }
+
+    public synchronized void complete(long time) {
+        if (time < 0)
+            throw new IllegalArgumentException("Cannot complete stage with 
negative timestamp " + time);
+        if (time < started)
+            throw new IllegalArgumentException("Cannot complete stage with 
timestamp " + time + " before its start time " + started);
+
+        this.completed.updateAndGet(l -> {
+            if (l >= 0)
+                throw new IllegalStateException("Stage is already completed");

Review Comment:
   This seems like a nasty error for a user to receive if we make a mistake and 
double-complete one of these stages. Could the second completion be a no-op 
instead?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java:
##########
@@ -165,7 +164,7 @@ public class AbstractHerderTest {
     private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
     private final int generation = 5;
     private final String connectorName = "connector";
-    private final ConnectorClientConfigOverridePolicy 
noneConnectorClientConfigOverridePolicy = new 
NoneConnectorClientConfigOverridePolicy();
+    private final SampleConnectorClientConfigOverridePolicy 
noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();

Review Comment:
   nit: the variable name is stale.
   
   Also, what is the importance of this change, could it be omitted?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Stage {
+    private final String description;
+    private final long started;
+    private final AtomicLong completed;
+
+    public Stage(String description, long started) {
+        if (started < 0)
+            throw new IllegalArgumentException("Invalid start timestamp " + 
started + "; cannot be negative");

Review Comment:
   excited for someone to take Kafka back in time to 1955 and open KAFKA ticket 
via western union 😄
   
   On a serious note, but still a nit: negative times are technically valid, if 
a bit unexpected in the modern era. I think you could allow all start/stop 
times if you use an AtomicReference<Long> and use `null` as the sentinel for a 
non-complete Stage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to