tombentley commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r889992019
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -582,6 +588,59 @@ public boolean startTask( } } + /** + * Using the admin principal for this connector, perform a round of zombie fencing that disables transactional producers + * for the specified number of source tasks from sending any more records. + * @param connName the name of the connector + * @param numTasks the number of tasks to fence out + * @param connProps the configuration of the connector; may not be null + * @return a {@link KafkaFuture} that will complete when the producers have all been fenced out, or the attempt has failed + */ + public KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, String> connProps) { + return fenceZombies(connName, numTasks, connProps, Admin::create); + } + + // Allows us to mock out the Admin client for testing + KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, String> connProps, Function<Map<String, Object>, Admin> adminFactory) { + log.debug("Fencing out {} task producers for source connector {}", numTasks, connName); + try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) { + ClassLoader savedLoader = plugins.currentThreadLoader(); + try { + String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType); + savedLoader = Plugins.compareAndSwapLoaders(connectorLoader); + final SourceConnectorConfig connConfig = new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable()); + final Class<? extends Connector> connClass = plugins.connectorClass( + connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + connConfig, + connClass, + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SOURCE); + Admin admin = adminFactory.apply(adminConfig); Review Comment: There's a resource leak if the `whenComplete` never calls the passed lambda. I think you should be able to call `admin.close` in a `catch(Exception)`. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java: ########## @@ -138,6 +138,17 @@ public interface Herder { */ void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback, InternalRequestSignature requestSignature); + /** + * Fence out any older task generations for a source connector, and then write a record to the config topic + * indicating that it is safe to bring up a new generation of tasks. If that record is already present, do nothing + * and invoke the callback successfully. + * @param connName the name of the connector to fence out; must refer to a source connector Review Comment: what happens if it's not a source connector? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection<String> connectors, Collection<Connect } } + private boolean isSourceConnector(String connName) { + return ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName))); + } + + private boolean requestNotSignedProperly(InternalRequestSignature requestSignature, Callback<?> callback) { + if (internalRequestValidationEnabled()) { + ConnectRestException requestValidationError = null; + if (requestSignature == null) { + requestValidationError = new BadRequestException("Internal request missing required signature"); + } else if (!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) { + requestValidationError = new BadRequestException(String.format( + "This worker does not support the '%s' key signing algorithm used by other workers. " + + "This worker is currently configured to use: %s. " + + "Check that all workers' configuration files permit the same set of signature algorithms, " + + "and correct any misconfigured worker and restart it.", + requestSignature.keyAlgorithm(), + keySignatureVerificationAlgorithms + )); + } else { + if (!requestSignature.isValid(sessionKey)) { + requestValidationError = new ConnectRestException( + Response.Status.FORBIDDEN, + "Internal request contained invalid signature." + ); + } + } + if (requestValidationError != null) { + callback.onCompletion(requestValidationError, null); + return true; + } + } + + return false; + } + + /** + * Represents an active zombie fencing: that is, an in-progress attempt to invoke + * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, write a new task count + * record to the config topic. + */ + class ZombieFencing { + private final String connName; + private final int tasksToRecord; + private final int taskGen; + private final FutureCallback<Void> fencingFollowup; + private final KafkaFuture<Void> fencingFuture; + + public ZombieFencing(String connName, int tasksToFence, int tasksToRecord, int taskGen) { + this.connName = connName; + this.tasksToRecord = tasksToRecord; + this.taskGen = taskGen; + this.fencingFollowup = new FutureCallback<>(); + this.fencingFuture = worker.fenceZombies(connName, tasksToFence, configState.connectorConfig(connName)).thenApply(ignored -> { + // This callback will be called on the same thread that invokes KafkaFuture::thenApply if + // the future is already completed. Since that thread is the herder tick thread, we don't need + // to perform follow-up logic through an additional herder request (and if we tried, it would lead + // to deadlock) + addOrRunRequest( + this::onZombieFencingSuccess, + fencingFollowup + ); + awaitFollowup(); + return null; + }); + } + + // Invoked after the worker has successfully fenced out the producers of old task generations using an admin client + // Note that work here will be performed on the herder's tick thread, so it should not block for very long + private Void onZombieFencingSuccess() throws TimeoutException { + configBackingStore.refresh(1, TimeUnit.MINUTES); + configState = configBackingStore.snapshot(); + if (taskGen < configState.taskConfigGeneration(connName)) { + throw new ConnectRestException( + Response.Status.CONFLICT.getStatusCode(), + "Fencing failed because new task configurations were generated for the connector"); + } + if (!writeToConfigTopicAsLeader(() -> configBackingStore.putTaskCountRecord(connName, tasksToRecord))) { Review Comment: This could block indefinitely, since `KafkaConfigBackingStore` calls `configLog.readToEnd().get()`, which seems at odds with the `it should not block for very long` requirement. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1745,6 +1958,23 @@ private boolean checkRebalanceNeeded(Callback<?> callback) { return false; } + /** + * Execute the given action and subsequent callback immediately if the current thread is the herder's tick thread, + * or use them to create and store a {@link DistributedHerderRequest} on the request queue and return the resulting request + * if not. + * @param action the action that should be run on the herder's tick thread + * @param callback the callback that should be invoked once the action is complete + * @return a new {@link DistributedHerderRequest} if one has been created and added to the request queue, and {@code null} otherwise + */ + DistributedHerderRequest addOrRunRequest(Callable<Void> action, Callback<Void> callback) { Review Comment: I wonder if `runOnTickThread` might be a better name, since it more explicitly describes what it's doing? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection<String> connectors, Collection<Connect } } + private boolean isSourceConnector(String connName) { + return ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName))); + } + + private boolean requestNotSignedProperly(InternalRequestSignature requestSignature, Callback<?> callback) { + if (internalRequestValidationEnabled()) { + ConnectRestException requestValidationError = null; + if (requestSignature == null) { + requestValidationError = new BadRequestException("Internal request missing required signature"); + } else if (!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) { + requestValidationError = new BadRequestException(String.format( + "This worker does not support the '%s' key signing algorithm used by other workers. " + + "This worker is currently configured to use: %s. " + + "Check that all workers' configuration files permit the same set of signature algorithms, " + + "and correct any misconfigured worker and restart it.", + requestSignature.keyAlgorithm(), + keySignatureVerificationAlgorithms + )); + } else { + if (!requestSignature.isValid(sessionKey)) { + requestValidationError = new ConnectRestException( + Response.Status.FORBIDDEN, + "Internal request contained invalid signature." + ); + } + } + if (requestValidationError != null) { + callback.onCompletion(requestValidationError, null); + return true; + } + } + + return false; + } + + /** + * Represents an active zombie fencing: that is, an in-progress attempt to invoke + * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, write a new task count + * record to the config topic. + */ + class ZombieFencing { + private final String connName; + private final int tasksToRecord; + private final int taskGen; + private final FutureCallback<Void> fencingFollowup; + private final KafkaFuture<Void> fencingFuture; + + public ZombieFencing(String connName, int tasksToFence, int tasksToRecord, int taskGen) { + this.connName = connName; + this.tasksToRecord = tasksToRecord; + this.taskGen = taskGen; + this.fencingFollowup = new FutureCallback<>(); + this.fencingFuture = worker.fenceZombies(connName, tasksToFence, configState.connectorConfig(connName)).thenApply(ignored -> { + // This callback will be called on the same thread that invokes KafkaFuture::thenApply if + // the future is already completed. Since that thread is the herder tick thread, we don't need + // to perform follow-up logic through an additional herder request (and if we tried, it would lead + // to deadlock) + addOrRunRequest( + this::onZombieFencingSuccess, + fencingFollowup + ); + awaitFollowup(); + return null; + }); + } + + // Invoked after the worker has successfully fenced out the producers of old task generations using an admin client + // Note that work here will be performed on the herder's tick thread, so it should not block for very long + private Void onZombieFencingSuccess() throws TimeoutException { + configBackingStore.refresh(1, TimeUnit.MINUTES); Review Comment: So we don't consider 1 minute 'very long'? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -582,6 +588,59 @@ public boolean startTask( } } + /** + * Using the admin principal for this connector, perform a round of zombie fencing that disables transactional producers + * for the specified number of source tasks from sending any more records. + * @param connName the name of the connector + * @param numTasks the number of tasks to fence out + * @param connProps the configuration of the connector; may not be null + * @return a {@link KafkaFuture} that will complete when the producers have all been fenced out, or the attempt has failed + */ + public KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, String> connProps) { + return fenceZombies(connName, numTasks, connProps, Admin::create); + } + + // Allows us to mock out the Admin client for testing + KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, String> connProps, Function<Map<String, Object>, Admin> adminFactory) { + log.debug("Fencing out {} task producers for source connector {}", numTasks, connName); + try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) { + ClassLoader savedLoader = plugins.currentThreadLoader(); + try { + String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType); + savedLoader = Plugins.compareAndSwapLoaders(connectorLoader); Review Comment: This pattern of swapping out class loaders is pretty common, but also a little verbose. Perhaps `Plugins` could expose a `withClassloader(ClassLoader)` method that returned an `AutoClosable`, so that call sites like this could use try-with-resources and the compiler could warn about leaking resources? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -836,7 +883,9 @@ public void deleteConnectorConfig(final String connName, final Callback<Created< callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); } else { log.trace("Removing connector config {} {}", connName, configState.connectors()); - configBackingStore.removeConnectorConfig(connName); + if (!writeToConfigTopicAsLeader(() -> configBackingStore.removeConnectorConfig(connName))) { + throw new ConnectException("Failed to remove connector configuration from config topic since worker was fenced out"); + } callback.onCompletion(null, new Created<>(false, null)); Review Comment: Is it OK to not invoke the `callback` in the case where we weren't leader? ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ########## @@ -547,6 +703,27 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier); } + private void maybeSendFencably(String key, byte[] value) { + if (!usesFencableWriter) { + configLog.send(key, value); Review Comment: I find the method name a bit confusing, because it sends in either case. Perhaps something like `sendPossiblyFencibly` would be better, wdyt? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection<String> connectors, Collection<Connect } } + private boolean isSourceConnector(String connName) { + return ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName))); + } + + private boolean requestNotSignedProperly(InternalRequestSignature requestSignature, Callback<?> callback) { + if (internalRequestValidationEnabled()) { + ConnectRestException requestValidationError = null; + if (requestSignature == null) { + requestValidationError = new BadRequestException("Internal request missing required signature"); + } else if (!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) { + requestValidationError = new BadRequestException(String.format( + "This worker does not support the '%s' key signing algorithm used by other workers. " + + "This worker is currently configured to use: %s. " + + "Check that all workers' configuration files permit the same set of signature algorithms, " + + "and correct any misconfigured worker and restart it.", + requestSignature.keyAlgorithm(), + keySignatureVerificationAlgorithms + )); + } else { + if (!requestSignature.isValid(sessionKey)) { + requestValidationError = new ConnectRestException( + Response.Status.FORBIDDEN, + "Internal request contained invalid signature." + ); + } + } + if (requestValidationError != null) { + callback.onCompletion(requestValidationError, null); + return true; + } + } + + return false; + } + + /** + * Represents an active zombie fencing: that is, an in-progress attempt to invoke + * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, write a new task count + * record to the config topic. + */ + class ZombieFencing { + private final String connName; + private final int tasksToRecord; + private final int taskGen; + private final FutureCallback<Void> fencingFollowup; + private final KafkaFuture<Void> fencingFuture; + + public ZombieFencing(String connName, int tasksToFence, int tasksToRecord, int taskGen) { + this.connName = connName; + this.tasksToRecord = tasksToRecord; + this.taskGen = taskGen; + this.fencingFollowup = new FutureCallback<>(); + this.fencingFuture = worker.fenceZombies(connName, tasksToFence, configState.connectorConfig(connName)).thenApply(ignored -> { + // This callback will be called on the same thread that invokes KafkaFuture::thenApply if + // the future is already completed. Since that thread is the herder tick thread, we don't need + // to perform follow-up logic through an additional herder request (and if we tried, it would lead + // to deadlock) + addOrRunRequest( + this::onZombieFencingSuccess, + fencingFollowup + ); + awaitFollowup(); + return null; + }); + } + + // Invoked after the worker has successfully fenced out the producers of old task generations using an admin client + // Note that work here will be performed on the herder's tick thread, so it should not block for very long + private Void onZombieFencingSuccess() throws TimeoutException { + configBackingStore.refresh(1, TimeUnit.MINUTES); + configState = configBackingStore.snapshot(); + if (taskGen < configState.taskConfigGeneration(connName)) { + throw new ConnectRestException( + Response.Status.CONFLICT.getStatusCode(), + "Fencing failed because new task configurations were generated for the connector"); + } + if (!writeToConfigTopicAsLeader(() -> configBackingStore.putTaskCountRecord(connName, tasksToRecord))) { + throw new ConnectException("Failed to write connector task count record to config topic since worker was fenced out"); + } Review Comment: There's a lot of repetition of this `if (!writeToConfigTopicAsLeader()){ throw new ConnectException}` pattern. In fact it look like all invocations of `writeToConfigTopicAsLeader` are of this form. So what not just put the `if/throw` within `writeToConfigTopicAsLeader`? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection<String> connectors, Collection<Connect } } + private boolean isSourceConnector(String connName) { + return ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName))); + } + + private boolean requestNotSignedProperly(InternalRequestSignature requestSignature, Callback<?> callback) { + if (internalRequestValidationEnabled()) { + ConnectRestException requestValidationError = null; + if (requestSignature == null) { + requestValidationError = new BadRequestException("Internal request missing required signature"); + } else if (!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) { + requestValidationError = new BadRequestException(String.format( + "This worker does not support the '%s' key signing algorithm used by other workers. " + + "This worker is currently configured to use: %s. " + + "Check that all workers' configuration files permit the same set of signature algorithms, " + + "and correct any misconfigured worker and restart it.", + requestSignature.keyAlgorithm(), + keySignatureVerificationAlgorithms + )); + } else { + if (!requestSignature.isValid(sessionKey)) { + requestValidationError = new ConnectRestException( + Response.Status.FORBIDDEN, + "Internal request contained invalid signature." + ); + } + } + if (requestValidationError != null) { + callback.onCompletion(requestValidationError, null); Review Comment: Surely we should always invoke the `callback`, even on success, since that's the contract for `Callback`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org