tombentley commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r889992019


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -582,6 +588,59 @@ public boolean startTask(
         }
     }
 
+    /**
+     * Using the admin principal for this connector, perform a round of zombie 
fencing that disables transactional producers
+     * for the specified number of source tasks from sending any more records.
+     * @param connName the name of the connector
+     * @param numTasks the number of tasks to fence out
+     * @param connProps the configuration of the connector; may not be null
+     * @return a {@link KafkaFuture} that will complete when the producers 
have all been fenced out, or the attempt has failed
+     */
+    public KafkaFuture<Void> fenceZombies(String connName, int numTasks, 
Map<String, String> connProps) {
+        return fenceZombies(connName, numTasks, connProps, Admin::create);
+    }
+
+    // Allows us to mock out the Admin client for testing
+    KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, 
String> connProps, Function<Map<String, Object>, Admin> adminFactory) {
+        log.debug("Fencing out {} task producers for source connector {}", 
numTasks, connName);
+        try (LoggingContext loggingContext = 
LoggingContext.forConnector(connName)) {
+            ClassLoader savedLoader = plugins.currentThreadLoader();
+            try {
+                String connType = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+                ClassLoader connectorLoader = 
plugins.delegatingLoader().connectorLoader(connType);
+                savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
+                final SourceConnectorConfig connConfig = new 
SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
+                final Class<? extends Connector> connClass = 
plugins.connectorClass(
+                        
connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        connConfig,
+                        connClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SOURCE);
+                Admin admin = adminFactory.apply(adminConfig);

Review Comment:
   There's a resource leak if the `whenComplete` never calls the passed lambda. 
I think you should be able to call `admin.close` in a `catch(Exception)`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##########
@@ -138,6 +138,17 @@ public interface Herder {
      */
     void putTaskConfigs(String connName, List<Map<String, String>> configs, 
Callback<Void> callback, InternalRequestSignature requestSignature);
 
+    /**
+     * Fence out any older task generations for a source connector, and then 
write a record to the config topic
+     * indicating that it is safe to bring up a new generation of tasks. If 
that record is already present, do nothing
+     * and invoke the callback successfully.
+     * @param connName the name of the connector to fence out; must refer to a 
source connector

Review Comment:
   what happens if it's not a source connector?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection<String> 
connectors, Collection<Connect
         }
     }
 
+    private boolean isSourceConnector(String connName) {
+        return 
ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName)));
+    }
+
+    private boolean requestNotSignedProperly(InternalRequestSignature 
requestSignature, Callback<?> callback) {
+        if (internalRequestValidationEnabled()) {
+            ConnectRestException requestValidationError = null;
+            if (requestSignature == null) {
+                requestValidationError = new BadRequestException("Internal 
request missing required signature");
+            } else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+                requestValidationError = new BadRequestException(String.format(
+                        "This worker does not support the '%s' key signing 
algorithm used by other workers. "
+                                + "This worker is currently configured to use: 
%s. "
+                                + "Check that all workers' configuration files 
permit the same set of signature algorithms, "
+                                + "and correct any misconfigured worker and 
restart it.",
+                        requestSignature.keyAlgorithm(),
+                        keySignatureVerificationAlgorithms
+                ));
+            } else {
+                if (!requestSignature.isValid(sessionKey)) {
+                    requestValidationError = new ConnectRestException(
+                            Response.Status.FORBIDDEN,
+                            "Internal request contained invalid signature."
+                    );
+                }
+            }
+            if (requestValidationError != null) {
+                callback.onCompletion(requestValidationError, null);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Represents an active zombie fencing: that is, an in-progress attempt to 
invoke
+     * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, 
write a new task count
+     * record to the config topic.
+     */
+    class ZombieFencing {
+        private final String connName;
+        private final int tasksToRecord;
+        private final int taskGen;
+        private final FutureCallback<Void> fencingFollowup;
+        private final KafkaFuture<Void> fencingFuture;
+
+        public ZombieFencing(String connName, int tasksToFence, int 
tasksToRecord, int taskGen) {
+            this.connName = connName;
+            this.tasksToRecord = tasksToRecord;
+            this.taskGen = taskGen;
+            this.fencingFollowup = new FutureCallback<>();
+            this.fencingFuture = worker.fenceZombies(connName, tasksToFence, 
configState.connectorConfig(connName)).thenApply(ignored -> {
+                // This callback will be called on the same thread that 
invokes KafkaFuture::thenApply if
+                // the future is already completed. Since that thread is the 
herder tick thread, we don't need
+                // to perform follow-up logic through an additional herder 
request (and if we tried, it would lead
+                // to deadlock)
+                addOrRunRequest(
+                        this::onZombieFencingSuccess,
+                        fencingFollowup
+                );
+                awaitFollowup();
+                return null;
+            });
+        }
+
+        // Invoked after the worker has successfully fenced out the producers 
of old task generations using an admin client
+        // Note that work here will be performed on the herder's tick thread, 
so it should not block for very long
+        private Void onZombieFencingSuccess() throws TimeoutException {
+            configBackingStore.refresh(1, TimeUnit.MINUTES);
+            configState = configBackingStore.snapshot();
+            if (taskGen < configState.taskConfigGeneration(connName)) {
+                throw new ConnectRestException(
+                    Response.Status.CONFLICT.getStatusCode(),
+                    "Fencing failed because new task configurations were 
generated for the connector");
+            }
+            if (!writeToConfigTopicAsLeader(() -> 
configBackingStore.putTaskCountRecord(connName, tasksToRecord))) {

Review Comment:
   This could block indefinitely, since `KafkaConfigBackingStore` calls 
`configLog.readToEnd().get()`, which seems at odds with the `it should not 
block for very long` requirement.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1745,6 +1958,23 @@ private boolean checkRebalanceNeeded(Callback<?> 
callback) {
         return false;
     }
 
+    /**
+     * Execute the given action and subsequent callback immediately if the 
current thread is the herder's tick thread,
+     * or use them to create and store a {@link DistributedHerderRequest} on 
the request queue and return the resulting request
+     * if not.
+     * @param action the action that should be run on the herder's tick thread
+     * @param callback the callback that should be invoked once the action is 
complete
+     * @return a new {@link DistributedHerderRequest} if one has been created 
and added to the request queue, and {@code null} otherwise
+     */
+    DistributedHerderRequest addOrRunRequest(Callable<Void> action, 
Callback<Void> callback) {

Review Comment:
   I wonder if `runOnTickThread` might be a better name, since it more 
explicitly describes what it's doing?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection<String> 
connectors, Collection<Connect
         }
     }
 
+    private boolean isSourceConnector(String connName) {
+        return 
ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName)));
+    }
+
+    private boolean requestNotSignedProperly(InternalRequestSignature 
requestSignature, Callback<?> callback) {
+        if (internalRequestValidationEnabled()) {
+            ConnectRestException requestValidationError = null;
+            if (requestSignature == null) {
+                requestValidationError = new BadRequestException("Internal 
request missing required signature");
+            } else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+                requestValidationError = new BadRequestException(String.format(
+                        "This worker does not support the '%s' key signing 
algorithm used by other workers. "
+                                + "This worker is currently configured to use: 
%s. "
+                                + "Check that all workers' configuration files 
permit the same set of signature algorithms, "
+                                + "and correct any misconfigured worker and 
restart it.",
+                        requestSignature.keyAlgorithm(),
+                        keySignatureVerificationAlgorithms
+                ));
+            } else {
+                if (!requestSignature.isValid(sessionKey)) {
+                    requestValidationError = new ConnectRestException(
+                            Response.Status.FORBIDDEN,
+                            "Internal request contained invalid signature."
+                    );
+                }
+            }
+            if (requestValidationError != null) {
+                callback.onCompletion(requestValidationError, null);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Represents an active zombie fencing: that is, an in-progress attempt to 
invoke
+     * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, 
write a new task count
+     * record to the config topic.
+     */
+    class ZombieFencing {
+        private final String connName;
+        private final int tasksToRecord;
+        private final int taskGen;
+        private final FutureCallback<Void> fencingFollowup;
+        private final KafkaFuture<Void> fencingFuture;
+
+        public ZombieFencing(String connName, int tasksToFence, int 
tasksToRecord, int taskGen) {
+            this.connName = connName;
+            this.tasksToRecord = tasksToRecord;
+            this.taskGen = taskGen;
+            this.fencingFollowup = new FutureCallback<>();
+            this.fencingFuture = worker.fenceZombies(connName, tasksToFence, 
configState.connectorConfig(connName)).thenApply(ignored -> {
+                // This callback will be called on the same thread that 
invokes KafkaFuture::thenApply if
+                // the future is already completed. Since that thread is the 
herder tick thread, we don't need
+                // to perform follow-up logic through an additional herder 
request (and if we tried, it would lead
+                // to deadlock)
+                addOrRunRequest(
+                        this::onZombieFencingSuccess,
+                        fencingFollowup
+                );
+                awaitFollowup();
+                return null;
+            });
+        }
+
+        // Invoked after the worker has successfully fenced out the producers 
of old task generations using an admin client
+        // Note that work here will be performed on the herder's tick thread, 
so it should not block for very long
+        private Void onZombieFencingSuccess() throws TimeoutException {
+            configBackingStore.refresh(1, TimeUnit.MINUTES);

Review Comment:
   So we don't consider 1 minute 'very long'?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -582,6 +588,59 @@ public boolean startTask(
         }
     }
 
+    /**
+     * Using the admin principal for this connector, perform a round of zombie 
fencing that disables transactional producers
+     * for the specified number of source tasks from sending any more records.
+     * @param connName the name of the connector
+     * @param numTasks the number of tasks to fence out
+     * @param connProps the configuration of the connector; may not be null
+     * @return a {@link KafkaFuture} that will complete when the producers 
have all been fenced out, or the attempt has failed
+     */
+    public KafkaFuture<Void> fenceZombies(String connName, int numTasks, 
Map<String, String> connProps) {
+        return fenceZombies(connName, numTasks, connProps, Admin::create);
+    }
+
+    // Allows us to mock out the Admin client for testing
+    KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, 
String> connProps, Function<Map<String, Object>, Admin> adminFactory) {
+        log.debug("Fencing out {} task producers for source connector {}", 
numTasks, connName);
+        try (LoggingContext loggingContext = 
LoggingContext.forConnector(connName)) {
+            ClassLoader savedLoader = plugins.currentThreadLoader();
+            try {
+                String connType = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+                ClassLoader connectorLoader = 
plugins.delegatingLoader().connectorLoader(connType);
+                savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);

Review Comment:
   This pattern of swapping out class loaders is pretty common, but also a 
little verbose. Perhaps `Plugins` could expose a `withClassloader(ClassLoader)` 
method that returned an `AutoClosable`, so that call sites like this could use 
try-with-resources and the compiler could warn about leaking resources?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -836,7 +883,9 @@ public void deleteConnectorConfig(final String connName, 
final Callback<Created<
                     callback.onCompletion(new NotFoundException("Connector " + 
connName + " not found"), null);
                 } else {
                     log.trace("Removing connector config {} {}", connName, 
configState.connectors());
-                    configBackingStore.removeConnectorConfig(connName);
+                    if (!writeToConfigTopicAsLeader(() -> 
configBackingStore.removeConnectorConfig(connName))) {
+                        throw new ConnectException("Failed to remove connector 
configuration from config topic since worker was fenced out");
+                    }
                     callback.onCompletion(null, new Created<>(false, null));

Review Comment:
   Is it OK to not invoke the `callback` in the case where we weren't leader?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -547,6 +703,27 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
     }
 
+    private void maybeSendFencably(String key, byte[] value) {
+        if (!usesFencableWriter) {
+            configLog.send(key, value);

Review Comment:
   I find the method name a bit confusing, because it sends in either case. 
Perhaps something like `sendPossiblyFencibly` would be better, wdyt?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection<String> 
connectors, Collection<Connect
         }
     }
 
+    private boolean isSourceConnector(String connName) {
+        return 
ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName)));
+    }
+
+    private boolean requestNotSignedProperly(InternalRequestSignature 
requestSignature, Callback<?> callback) {
+        if (internalRequestValidationEnabled()) {
+            ConnectRestException requestValidationError = null;
+            if (requestSignature == null) {
+                requestValidationError = new BadRequestException("Internal 
request missing required signature");
+            } else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+                requestValidationError = new BadRequestException(String.format(
+                        "This worker does not support the '%s' key signing 
algorithm used by other workers. "
+                                + "This worker is currently configured to use: 
%s. "
+                                + "Check that all workers' configuration files 
permit the same set of signature algorithms, "
+                                + "and correct any misconfigured worker and 
restart it.",
+                        requestSignature.keyAlgorithm(),
+                        keySignatureVerificationAlgorithms
+                ));
+            } else {
+                if (!requestSignature.isValid(sessionKey)) {
+                    requestValidationError = new ConnectRestException(
+                            Response.Status.FORBIDDEN,
+                            "Internal request contained invalid signature."
+                    );
+                }
+            }
+            if (requestValidationError != null) {
+                callback.onCompletion(requestValidationError, null);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Represents an active zombie fencing: that is, an in-progress attempt to 
invoke
+     * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, 
write a new task count
+     * record to the config topic.
+     */
+    class ZombieFencing {
+        private final String connName;
+        private final int tasksToRecord;
+        private final int taskGen;
+        private final FutureCallback<Void> fencingFollowup;
+        private final KafkaFuture<Void> fencingFuture;
+
+        public ZombieFencing(String connName, int tasksToFence, int 
tasksToRecord, int taskGen) {
+            this.connName = connName;
+            this.tasksToRecord = tasksToRecord;
+            this.taskGen = taskGen;
+            this.fencingFollowup = new FutureCallback<>();
+            this.fencingFuture = worker.fenceZombies(connName, tasksToFence, 
configState.connectorConfig(connName)).thenApply(ignored -> {
+                // This callback will be called on the same thread that 
invokes KafkaFuture::thenApply if
+                // the future is already completed. Since that thread is the 
herder tick thread, we don't need
+                // to perform follow-up logic through an additional herder 
request (and if we tried, it would lead
+                // to deadlock)
+                addOrRunRequest(
+                        this::onZombieFencingSuccess,
+                        fencingFollowup
+                );
+                awaitFollowup();
+                return null;
+            });
+        }
+
+        // Invoked after the worker has successfully fenced out the producers 
of old task generations using an admin client
+        // Note that work here will be performed on the herder's tick thread, 
so it should not block for very long
+        private Void onZombieFencingSuccess() throws TimeoutException {
+            configBackingStore.refresh(1, TimeUnit.MINUTES);
+            configState = configBackingStore.snapshot();
+            if (taskGen < configState.taskConfigGeneration(connName)) {
+                throw new ConnectRestException(
+                    Response.Status.CONFLICT.getStatusCode(),
+                    "Fencing failed because new task configurations were 
generated for the connector");
+            }
+            if (!writeToConfigTopicAsLeader(() -> 
configBackingStore.putTaskCountRecord(connName, tasksToRecord))) {
+                throw new ConnectException("Failed to write connector task 
count record to config topic since worker was fenced out");
+            }

Review Comment:
   There's a lot of repetition of this  `if (!writeToConfigTopicAsLeader()){ 
throw new ConnectException}` pattern. In fact it look like all invocations of 
`writeToConfigTopicAsLeader` are of this form. So what not just put the 
`if/throw` within `writeToConfigTopicAsLeader`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection<String> 
connectors, Collection<Connect
         }
     }
 
+    private boolean isSourceConnector(String connName) {
+        return 
ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName)));
+    }
+
+    private boolean requestNotSignedProperly(InternalRequestSignature 
requestSignature, Callback<?> callback) {
+        if (internalRequestValidationEnabled()) {
+            ConnectRestException requestValidationError = null;
+            if (requestSignature == null) {
+                requestValidationError = new BadRequestException("Internal 
request missing required signature");
+            } else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+                requestValidationError = new BadRequestException(String.format(
+                        "This worker does not support the '%s' key signing 
algorithm used by other workers. "
+                                + "This worker is currently configured to use: 
%s. "
+                                + "Check that all workers' configuration files 
permit the same set of signature algorithms, "
+                                + "and correct any misconfigured worker and 
restart it.",
+                        requestSignature.keyAlgorithm(),
+                        keySignatureVerificationAlgorithms
+                ));
+            } else {
+                if (!requestSignature.isValid(sessionKey)) {
+                    requestValidationError = new ConnectRestException(
+                            Response.Status.FORBIDDEN,
+                            "Internal request contained invalid signature."
+                    );
+                }
+            }
+            if (requestValidationError != null) {
+                callback.onCompletion(requestValidationError, null);

Review Comment:
   Surely we should always invoke the `callback`, even on success, since that's 
the contract for `Callback`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to