yashmayya commented on code in PR #13424:
URL: https://github.com/apache/kafka/pull/13424#discussion_r1147181327


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -237,6 +237,17 @@ private synchronized void putConnectorConfig(String 
connName,
         }
     }
 
+    @Override
+    public synchronized void stopConnector(String connName, Callback<Void> 
callback) {
+        try {
+            removeConnectorTasks(connName);

Review Comment:
   This calls `TaskStatus.Listener::onDeletion` after the tasks are stopped by 
the worker which updates their status to `DESTROYED` whereas this doesn't seem 
to happen in the `DistributedHerder` implementation for `stopConnector` so the 
task statuses will be `UNASSIGNED` post shutdown. I guess this shouldn't really 
matter anyway because we're publishing an empty set of task configs?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1090,6 +1090,40 @@ public void putConnectorConfig(final String connName, 
final Map<String, String>
         );
     }
 
+    @Override
+    public void stopConnector(final String connName, final Callback<Void> 
callback) {
+        log.trace("Submitting request to transition connector {} to STOPPED 
state", connName);
+
+        addRequest(
+                () -> {
+                    if (!configState.contains(connName))
+                        throw new NotFoundException("Unknown connector " + 
connName);
+
+                    // We only allow the leader to handle this request since 
it involves writing task configs to the config topic
+                    if (!isLeader()) {
+                        callback.onCompletion(new NotLeaderException("Only the 
leader can transition connectors to the STOPPED state.", leaderUrl()), null);
+                        return null;
+                    }
+
+                    // TODO: We may want to add a new ConfigBackingStore 
method for stopping a connector so that
+                    //       these operations can be performed in a single 
(possibly-atomic) call
+                    // We write the task configs first since, if we fail 
between then and writing the target state, the
+                    // cluster is still kept in a healthy state. A RUNNING 
connector with zero tasks is acceptable (although,
+                    // if the connector is reassigned during the ensuing 
rebalance, it is likely that it will immediately generate
+                    // a non-empty set of task configs). A STOPPED connector 
with a non-empty set of tasks is less acceptable
+                    // and likely to confuse users.
+                    writeTaskConfigs(connName, Collections.emptyList());

Review Comment:
   I just noticed that in the standalone mode implementation for 
`stopConnector`, a call is made to `ConfigBackingStore::removeTaskConfigs` 
which is implemented by the `MemoryConfigBackingStore` but not the 
`KafkaConfigBackingStore` used in the distributed mode. Would we want to 
implement the method in the `KafkaConfigBackingStore` as well for this sort of 
use case? I don't think it's really necessary because publishing an empty list 
of task configs seems to do the trick, but just curious.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##########
@@ -274,6 +274,19 @@ public Response restartConnector(final 
@PathParam("connector") String connector,
         return Response.accepted().entity(stateInfo).build();
     }
 
+    @PUT
+    @Path("/{connector}/stop")
+    @Operation(summary = "Stop the specified connector",
+               description = "This operation is idempotent and has no effects 
if the connector is already stopped")
+    public void stopConnector(

Review Comment:
   Ah interesting, I hadn't noticed that. In that case, the current approach 
totally makes sense since we already have a precedent.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -325,6 +325,181 @@ public void 
testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce
         assertTrue("Connector and all tasks were not stopped in time", 
stopCounter.await(1, TimeUnit.MINUTES));
     }
 
+    /**
+     * Verify that the target state (started, paused, stopped) of a connector 
can be updated, with
+     * an emphasis on ensuring that the transitions between each state are 
correct.
+     * <p>
+     * The transitions we need to cover are:
+     * <ol>
+     *     <li>RUNNING -> PAUSED</li>
+     *     <li>RUNNING -> STOPPED</li>
+     *     <li>PAUSED -> RUNNING</li>
+     *     <li>PAUSED -> STOPPED</li>
+     *     <li>STOPPED -> RUNNING</li>
+     *     <li>STOPPED -> PAUSED</li>
+     * </ol>
+     * With some reordering, we can perform each transition just once:
+     * <ul>
+     *     <li>Start with RUNNING</li>
+     *     <li>Transition to STOPPED (2)</li>
+     *     <li>Transition to RUNNING (5)</li>
+     *     <li>Transition to PAUSED (1)</li>
+     *     <li>Transition to STOPPED (4)</li>
+     *     <li>Transition to PAUSED (6)</li>
+     *     <li>Transition to RUNNING (3)</li>
+     * </ul>
+     */
+    @Test
+    public void testPauseStopResume() throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
+
+        // Want to make sure to use multiple tasks
+        final int numTasks = 4;
+        Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+        props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks));
+
+        // Start with RUNNING
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not start in time"
+        );
+
+        // Transition to STOPPED
+        connect.stopConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Transition to RUNNING
+        connect.resumeConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not resume in time"
+        );
+
+        // Transition to PAUSED
+        connect.pauseConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.pauseConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector did not pause in time"
+        );
+
+        // Transition to STOPPED
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Transition to PAUSED
+        connect.pauseConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+                CONNECTOR_NAME,
+                0,
+                "Connector did not pause in time"
+        );
+
+        // Transition to RUNNING
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not resume in time"
+        );
+
+        // Delete the connector
+        connect.deleteConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndTasksAreNotRunning(

Review Comment:
   A follow-up Jira ticket for this sounds good to me, I wouldn't block this PR 
on it. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to