[GitHub] [kafka] erdody commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-15 Thread GitBox


erdody commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r651509086



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -193,8 +194,8 @@
 private short backoffRetries;
 
 // visible for testing
-// The pending restart requests for the connectors;
-final NavigableSet pendingRestartRequests = new 
TreeSet<>();
+// The latest pending restart requests for the connectors;
+final Map pendingRestartRequests = new 
ConcurrentHashMap<>();

Review comment:
   AFAICS, since all accesses are synchronized, this doesn't need to be 
concurrent.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1063,6 +1076,104 @@ public int generation() {
 return generation;
 }
 
+@Override
+public void restartConnectorAndTasks(
+RestartRequest request,
+Callback callback
+) {
+final String connectorName = request.connectorName();
+addRequest(
+() -> {
+if (checkRebalanceNeeded(callback)) {
+return null;
+}
+if 
(!configState.connectors().contains(request.connectorName())) {
+callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+return null;
+}
+if (isLeader()) {
+// Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+configBackingStore.putRestartRequest(request);
+// Compute and send the response that this was accepted
+Optional maybePlan = 
buildRestartPlanFor(request);
+if (!maybePlan.isPresent()) {
+callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+} else {
+RestartPlan plan = maybePlan.get();
+callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+}
+} else {
+callback.onCompletion(new NotLeaderException("Cannot 
process restart request since it is not assigned to this member", leaderUrl()), 
null);
+}
+
+return null;
+},
+forwardErrorCallback(callback)
+);
+}
+
+/**
+ * Process all pending restart requests. There can be at most one request 
per connector, because of how
+ * {@link RestartRequest#equals(Object)} and {@link 
RestartRequest#hashCode()} are based only on the connector name.
+ *
+ * This method is called from within the {@link #tick()} method. It is 
synchronized so that all pending restart requests
+ * are processed at once before any additional requests are added.
+ */
+private synchronized void processRestartRequests() {
+RestartRequest request;
+while ((request = pendingRestartRequests.pollFirst()) != null) {
+doRestartConnectorAndTasks(request);
+}
+}
+
+protected synchronized boolean doRestartConnectorAndTasks(RestartRequest 
request) {
+final String connectorName = request.connectorName();
+Optional maybePlan = buildRestartPlanFor(request);
+if (!maybePlan.isPresent()) {
+log.debug("Skipping restart of connector '{}' since no status is 
available: {}", connectorName, request);
+return false;
+}
+RestartPlan plan = maybePlan.get();
+log.info("Executing {}", plan);
+
+
+// If requested, stop the connector and any tasks, marking each as 
restarting
+final ExtendedAssignment currentAssignments = assignment;
+final Collection assignedIdsToRestart = 
plan.taskIdsToRestart()
+ .stream()
+ 
.filter(taskId -> currentAssignments.tasks().contains(taskId))
+ 
.collect(Collectors.toList());
+final boolean restartConnector = plan.restartConnector() && 
currentAssignments.connectors().contains(connectorName);
+final boolean restartTasks = !assignedIdsToRestart.isEmpty();
+if (restartConnector) {
+worker.stopAndAwaitConnector(connectorName);
+recordRestarting(connectorName);
+}
+if (restartTasks) {
+// Stop the tasks and mark as restarting
+worker.stopAndAwaitTasks(assignedIdsToRestart);
+assignedIdsToRestart.forEach(this::recordRest

[GitHub] [kafka] erdody commented on a change in pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-14 Thread GitBox


erdody commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r649771532



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##
@@ -357,6 +367,62 @@ public void validateConnectorConfig(Map 
connectorProps, Callback
 });
 }
 
+/**
+ * Build the {@link RestartPlan} that describes what should and should not 
be restarted given the restart request
+ * and the current status of the connector and task instances.
+ *
+ * @param request the restart request; may not be null
+ * @return the restart plan, or empty this worker has no status for the 
connector named in the request and therefore the

Review comment:
   Nit: or empty **if** this worker 

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
##
@@ -255,12 +257,29 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
 
 @POST
 @Path("/{connector}/restart")
-public void restartConnector(final @PathParam("connector") String 
connector,
+public Response restartConnector(final @PathParam("connector") String 
connector,
  final @Context HttpHeaders headers,
+ final @DefaultValue ("false") 
@QueryParam("includeTasks") Boolean includeTasks,
+ final @DefaultValue ("false") 
@QueryParam("onlyFailed") Boolean onlyFailed,
  final @QueryParam("forward") Boolean forward) 
throws Throwable {
-FutureCallback cb = new FutureCallback<>();
-herder.restartConnector(connector, cb);
-completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", 
"POST", headers, null, forward);
+RestartRequest restartRequest = new RestartRequest(connector, 
onlyFailed, includeTasks);
+if (restartRequest.forciblyRestartConnectorOnly()) {
+// For backward compatibility, just restart the connector instance 
and return OK with no body
+FutureCallback cb = new FutureCallback<>();
+herder.restartConnector(connector, cb);
+completeOrForwardRequest(cb, "/connectors/" + connector + 
"/restart", "POST", headers, null, forward);
+return Response.ok().build();
+}
+
+FutureCallback cb = new FutureCallback<>();
+herder.restartConnectorAndTasks(restartRequest, cb);
+Map queryParameters = new HashMap<>();
+queryParameters.put("includeTasks", String.valueOf(includeTasks));
+queryParameters.put("onlyFailed", String.valueOf(onlyFailed));
+String forwardingPath = "/connectors/" + connector + "/restart";

Review comment:
   Nit: Move up so you can share with 270?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -186,6 +192,10 @@
 private short currentProtocolVersion;
 private short backoffRetries;
 
+// visible for testing
+// The pending restart requests for the connectors;
+final NavigableSet pendingRestartRequests = new 
TreeSet<>();

Review comment:
   There are a few comments in different places explaining the special 
equality implementation in RestartRequest.
   Have we considered making this a Map to make it 
explicit that we keep the latest per connector, have a more typical 
equals/hashcode and avoid all the warnings?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -186,6 +192,10 @@
 private short currentProtocolVersion;
 private short backoffRetries;
 
+// visible for testing
+// The pending restart requests for the connectors;
+final NavigableSet pendingRestartRequests = new 
TreeSet<>();

Review comment:
   Just out of curiosity, any particular reason why we want to process 
these in connectorName order? (instead of FIFO)

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1063,6 +1076,104 @@ public int generation() {
 return generation;
 }
 
+@Override
+public void restartConnectorAndTasks(
+RestartRequest request,
+Callback callback
+) {
+final String connectorName = request.connectorName();
+addRequest(
+() -> {
+if (checkRebalanceNeeded(callback)) {
+return null;
+}
+if 
(!configState.connectors().contains(request.connectorName())) {
+callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+return null;
+}
+if (isLeader()) {
+