erdody commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r649771532
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ########## @@ -357,6 +367,62 @@ public void validateConnectorConfig(Map<String, String> connectorProps, Callback }); } + /** + * Build the {@link RestartPlan} that describes what should and should not be restarted given the restart request + * and the current status of the connector and task instances. + * + * @param request the restart request; may not be null + * @return the restart plan, or empty this worker has no status for the connector named in the request and therefore the Review comment: Nit: or empty **if** this worker .... ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java ########## @@ -255,12 +257,29 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto @POST @Path("/{connector}/restart") - public void restartConnector(final @PathParam("connector") String connector, + public Response restartConnector(final @PathParam("connector") String connector, final @Context HttpHeaders headers, + final @DefaultValue ("false") @QueryParam("includeTasks") Boolean includeTasks, + final @DefaultValue ("false") @QueryParam("onlyFailed") Boolean onlyFailed, final @QueryParam("forward") Boolean forward) throws Throwable { - FutureCallback<Void> cb = new FutureCallback<>(); - herder.restartConnector(connector, cb); - completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", headers, null, forward); + RestartRequest restartRequest = new RestartRequest(connector, onlyFailed, includeTasks); + if (restartRequest.forciblyRestartConnectorOnly()) { + // For backward compatibility, just restart the connector instance and return OK with no body + FutureCallback<Void> cb = new FutureCallback<>(); + herder.restartConnector(connector, cb); + completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", headers, null, forward); + return Response.ok().build(); + } + + FutureCallback<ConnectorStateInfo> cb = new FutureCallback<>(); + herder.restartConnectorAndTasks(restartRequest, cb); + Map<String, String> queryParameters = new HashMap<>(); + queryParameters.put("includeTasks", String.valueOf(includeTasks)); + queryParameters.put("onlyFailed", String.valueOf(onlyFailed)); + String forwardingPath = "/connectors/" + connector + "/restart"; Review comment: Nit: Move up so you can share with 270? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -186,6 +192,10 @@ private short currentProtocolVersion; private short backoffRetries; + // visible for testing + // The pending restart requests for the connectors; + final NavigableSet<RestartRequest> pendingRestartRequests = new TreeSet<>(); Review comment: There are a few comments in different places explaining the special equality implementation in RestartRequest. Have we considered making this a Map<String, RestartRequest> to make it explicit that we keep the latest per connector, have a more typical equals/hashcode and avoid all the warnings? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -186,6 +192,10 @@ private short currentProtocolVersion; private short backoffRetries; + // visible for testing + // The pending restart requests for the connectors; + final NavigableSet<RestartRequest> pendingRestartRequests = new TreeSet<>(); Review comment: Just out of curiosity, any particular reason why we want to process these in connectorName order? (instead of FIFO) ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -1063,6 +1076,104 @@ public int generation() { return generation; } + @Override + public void restartConnectorAndTasks( + RestartRequest request, + Callback<ConnectorStateInfo> callback + ) { + final String connectorName = request.connectorName(); + addRequest( + () -> { + if (checkRebalanceNeeded(callback)) { + return null; + } + if (!configState.connectors().contains(request.connectorName())) { + callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); + return null; + } + if (isLeader()) { + // Write a restart request to the config backing store, to be executed asynchronously in tick() + configBackingStore.putRestartRequest(request); + // Compute and send the response that this was accepted + Optional<RestartPlan> maybePlan = buildRestartPlanFor(request); + if (!maybePlan.isPresent()) { + callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); + } else { + RestartPlan plan = maybePlan.get(); + callback.onCompletion(null, plan.restartConnectorStateInfo()); + } + } else { + callback.onCompletion(new NotLeaderException("Cannot process restart request since it is not assigned to this member", leaderUrl()), null); Review comment: Nit: Use a message similar to the one you corrected in line 1030? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.util.ConnectorTaskId; + +/** + * An immutable restart plan. + */ +public class RestartPlan { + + private final RestartRequest request; + private final ConnectorStateInfo stateInfo; + private final Collection<ConnectorTaskId> idsToRestart; + + /** + * Create a new request to restart a connector and optionally its tasks. + * + * @param request the restart request; may not be null + * @param restartStateInfo the current state info for the connector; may not be null + */ + public RestartPlan(RestartRequest request, ConnectorStateInfo restartStateInfo) { + this.request = Objects.requireNonNull(request, "RestartRequest name may not be null"); + this.stateInfo = Objects.requireNonNull(restartStateInfo, "ConnectorStateInfo name may not be null"); + idsToRestart = Collections.unmodifiableList( + stateInfo.tasks() + .stream() + .filter(taskState -> isRestarting(taskState)) Review comment: Nit: can use a method reference ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ########## @@ -357,6 +367,62 @@ public void validateConnectorConfig(Map<String, String> connectorProps, Callback }); } + /** + * Build the {@link RestartPlan} that describes what should and should not be restarted given the restart request + * and the current status of the connector and task instances. + * + * @param request the restart request; may not be null + * @return the restart plan, or empty this worker has no status for the connector named in the request and therefore the + * connector cannot be restarted + */ + public Optional<RestartPlan> buildRestartPlanFor(RestartRequest request) { + String connectorName = request.connectorName(); + ConnectorStatus connectorStatus = statusBackingStore.get(connectorName); + if (connectorStatus == null) { + return Optional.empty(); + } + + // If requested, mark the connector as restarting + AbstractStatus.State connectorState; + if (request.includeConnector(connectorStatus)) { + connectorState = AbstractStatus.State.RESTARTING; + } else { + connectorState = connectorStatus.state(); + } + ConnectorStateInfo.ConnectorState connectorInfoState = new ConnectorStateInfo.ConnectorState( + connectorState.toString(), + connectorStatus.workerId(), + connectorStatus.trace() + ); + + // Collect the task IDs to stop and restart (may be none) Review comment: Nit: This actually collects the state of all tasks, so it's only empty if there are no tasks, right? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -1063,6 +1076,104 @@ public int generation() { return generation; } + @Override + public void restartConnectorAndTasks( + RestartRequest request, + Callback<ConnectorStateInfo> callback + ) { + final String connectorName = request.connectorName(); + addRequest( + () -> { + if (checkRebalanceNeeded(callback)) { + return null; + } + if (!configState.connectors().contains(request.connectorName())) { + callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); + return null; + } + if (isLeader()) { + // Write a restart request to the config backing store, to be executed asynchronously in tick() + configBackingStore.putRestartRequest(request); + // Compute and send the response that this was accepted + Optional<RestartPlan> maybePlan = buildRestartPlanFor(request); + if (!maybePlan.isPresent()) { + callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); + } else { + RestartPlan plan = maybePlan.get(); + callback.onCompletion(null, plan.restartConnectorStateInfo()); + } + } else { + callback.onCompletion(new NotLeaderException("Cannot process restart request since it is not assigned to this member", leaderUrl()), null); + } + + return null; + }, + forwardErrorCallback(callback) + ); + } + + /** + * Process all pending restart requests. There can be at most one request per connector, because of how + * {@link RestartRequest#equals(Object)} and {@link RestartRequest#hashCode()} are based only on the connector name. + * + * <p>This method is called from within the {@link #tick()} method. It is synchronized so that all pending restart requests + * are processed at once before any additional requests are added. + */ + private synchronized void processRestartRequests() { + RestartRequest request; + while ((request = pendingRestartRequests.pollFirst()) != null) { + doRestartConnectorAndTasks(request); + } + } + + protected synchronized boolean doRestartConnectorAndTasks(RestartRequest request) { + final String connectorName = request.connectorName(); + Optional<RestartPlan> maybePlan = buildRestartPlanFor(request); + if (!maybePlan.isPresent()) { + log.debug("Skipping restart of connector '{}' since no status is available: {}", connectorName, request); + return false; + } + RestartPlan plan = maybePlan.get(); + log.info("Executing {}", plan); + + + // If requested, stop the connector and any tasks, marking each as restarting + final ExtendedAssignment currentAssignments = assignment; + final Collection<ConnectorTaskId> assignedIdsToRestart = plan.taskIdsToRestart() + .stream() + .filter(taskId -> currentAssignments.tasks().contains(taskId)) + .collect(Collectors.toList()); + final boolean restartConnector = plan.restartConnector() && currentAssignments.connectors().contains(connectorName); + final boolean restartTasks = !assignedIdsToRestart.isEmpty(); + if (restartConnector) { + worker.stopAndAwaitConnector(connectorName); + recordRestarting(connectorName); + } + if (restartTasks) { + // Stop the tasks and mark as restarting + worker.stopAndAwaitTasks(assignedIdsToRestart); + assignedIdsToRestart.forEach(this::recordRestarting); + } + + // Now restart the connector and tasks + if (restartConnector) { + startConnector(connectorName, (error, targetState) -> { + if (error == null) { + log.info("Connector {} successfully restarted", connectorName); + } else { + log.error("Failed to restart connector '" + connectorName + "'", error); + } + }); + } + if (restartTasks) { + log.debug("Restarting {} of {} tasks for {}", plan.restartTaskCount(), plan.totalTaskCount(), request); + plan.taskIdsToRestart().forEach(this::startTask); + log.debug("Restarted {} of {} tasks for {} as requested", plan.restartTaskCount(), plan.totalTaskCount(), request); + } + log.info("Completed {}", plan); + return restartConnector || restartTasks; Review comment: The return value is only used by tests. Can we assert based on other methods calls instead? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -1063,6 +1076,104 @@ public int generation() { return generation; } + @Override + public void restartConnectorAndTasks( + RestartRequest request, + Callback<ConnectorStateInfo> callback + ) { + final String connectorName = request.connectorName(); + addRequest( + () -> { + if (checkRebalanceNeeded(callback)) { + return null; + } + if (!configState.connectors().contains(request.connectorName())) { + callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); + return null; + } + if (isLeader()) { + // Write a restart request to the config backing store, to be executed asynchronously in tick() + configBackingStore.putRestartRequest(request); + // Compute and send the response that this was accepted + Optional<RestartPlan> maybePlan = buildRestartPlanFor(request); + if (!maybePlan.isPresent()) { + callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); + } else { + RestartPlan plan = maybePlan.get(); + callback.onCompletion(null, plan.restartConnectorStateInfo()); + } + } else { + callback.onCompletion(new NotLeaderException("Cannot process restart request since it is not assigned to this member", leaderUrl()), null); + } + + return null; + }, + forwardErrorCallback(callback) + ); + } + + /** + * Process all pending restart requests. There can be at most one request per connector, because of how + * {@link RestartRequest#equals(Object)} and {@link RestartRequest#hashCode()} are based only on the connector name. + * + * <p>This method is called from within the {@link #tick()} method. It is synchronized so that all pending restart requests + * are processed at once before any additional requests are added. + */ + private synchronized void processRestartRequests() { Review comment: Just wondering: could blocking the addition of new entries be a problem, considering that this method can take some time? Would it be worth creating a copy of the collection to minimize the synchronized time? ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java ########## @@ -58,6 +58,9 @@ public void start(Map<String, String> props) { commonConfigs = props; log.info("Started {} connector {}", this.getClass().getSimpleName(), connectorName); connectorHandle.recordConnectorStart(); + if ("true".equalsIgnoreCase(props.getOrDefault("connector.start.inject.error", "false"))) { Review comment: Nit: `Boolean.parseBoolean()` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -1063,6 +1076,104 @@ public int generation() { return generation; } + @Override + public void restartConnectorAndTasks( + RestartRequest request, + Callback<ConnectorStateInfo> callback + ) { + final String connectorName = request.connectorName(); + addRequest( + () -> { + if (checkRebalanceNeeded(callback)) { + return null; + } + if (!configState.connectors().contains(request.connectorName())) { + callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); + return null; + } + if (isLeader()) { + // Write a restart request to the config backing store, to be executed asynchronously in tick() + configBackingStore.putRestartRequest(request); + // Compute and send the response that this was accepted + Optional<RestartPlan> maybePlan = buildRestartPlanFor(request); + if (!maybePlan.isPresent()) { + callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); + } else { + RestartPlan plan = maybePlan.get(); + callback.onCompletion(null, plan.restartConnectorStateInfo()); + } + } else { + callback.onCompletion(new NotLeaderException("Cannot process restart request since it is not assigned to this member", leaderUrl()), null); + } + + return null; + }, + forwardErrorCallback(callback) + ); + } + + /** + * Process all pending restart requests. There can be at most one request per connector, because of how + * {@link RestartRequest#equals(Object)} and {@link RestartRequest#hashCode()} are based only on the connector name. + * + * <p>This method is called from within the {@link #tick()} method. It is synchronized so that all pending restart requests + * are processed at once before any additional requests are added. + */ + private synchronized void processRestartRequests() { + RestartRequest request; + while ((request = pendingRestartRequests.pollFirst()) != null) { + doRestartConnectorAndTasks(request); + } + } + + protected synchronized boolean doRestartConnectorAndTasks(RestartRequest request) { + final String connectorName = request.connectorName(); + Optional<RestartPlan> maybePlan = buildRestartPlanFor(request); + if (!maybePlan.isPresent()) { + log.debug("Skipping restart of connector '{}' since no status is available: {}", connectorName, request); + return false; + } + RestartPlan plan = maybePlan.get(); + log.info("Executing {}", plan); + + + // If requested, stop the connector and any tasks, marking each as restarting + final ExtendedAssignment currentAssignments = assignment; + final Collection<ConnectorTaskId> assignedIdsToRestart = plan.taskIdsToRestart() + .stream() + .filter(taskId -> currentAssignments.tasks().contains(taskId)) + .collect(Collectors.toList()); + final boolean restartConnector = plan.restartConnector() && currentAssignments.connectors().contains(connectorName); + final boolean restartTasks = !assignedIdsToRestart.isEmpty(); + if (restartConnector) { + worker.stopAndAwaitConnector(connectorName); + recordRestarting(connectorName); + } + if (restartTasks) { + // Stop the tasks and mark as restarting + worker.stopAndAwaitTasks(assignedIdsToRestart); + assignedIdsToRestart.forEach(this::recordRestarting); + } + + // Now restart the connector and tasks + if (restartConnector) { + startConnector(connectorName, (error, targetState) -> { + if (error == null) { + log.info("Connector {} successfully restarted", connectorName); Review comment: Nit: quotes around connectorName, like in the line below -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org