C0urante commented on code in PR #12828: URL: https://github.com/apache/kafka/pull/12828#discussion_r1024255803
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1906,6 +1917,8 @@ private void reconfigureConnector(final String connName, final Callback<Void> cb if (isLeader()) { writeToConfigTopicAsLeader(() -> configBackingStore.putTaskConfigs(connName, rawTaskProps)); cb.onCompletion(null, null); + } else if (restClient == null) { + throw new NotLeaderException("Request forwarding disabled in distributed MirrorMaker2; reconfiguring tasks must be performed by the leader", leaderUrl()); Review Comment: Similar wording suggestions for this message: ```suggestion // TODO: Update this message if KIP-710 is accepted and merged // (https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters) throw new NotLeaderException("This worker is not able to communicate with the leader of the cluster, " + "which is required for dynamically-reconfiguring connectors. If running MirrorMaker 2 " + "in dedicated mode, consider deploying the connectors for MirrorMaker 2 directly onto a " + "distributed Kafka Connect cluster.", leaderUrl() ); ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java: ########## @@ -43,10 +43,38 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -public class RestClient { +public class RestClient implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(RestClient.class); private static final ObjectMapper JSON_SERDE = new ObjectMapper(); + private final HttpClient client; + public RestClient(WorkerConfig config) { + client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config)); Review Comment: This new integration test is fantastic, thanks! ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1157,16 +1163,21 @@ void fenceZombieSourceTasks(final ConnectorTaskId id, Callback<Void> callback) { if (error == null) { callback.onCompletion(null, null); } else if (error instanceof NotLeaderException) { - String forwardedUrl = ((NotLeaderException) error).forwardUrl() + "connectors/" + id.connector() + "/fence"; - log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), forwardedUrl); - forwardRequestExecutor.execute(() -> { - try { - RestClient.httpRequest(forwardedUrl, "PUT", null, null, null, config, sessionKey, requestSignatureAlgorithm); - callback.onCompletion(null, null); - } catch (Throwable t) { - callback.onCompletion(t, null); - } - }); + if (restClient != null) { + String forwardedUrl = ((NotLeaderException) error).forwardUrl() + "connectors/" + id.connector() + "/fence"; + log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), forwardedUrl); + forwardRequestExecutor.execute(() -> { + try { + restClient.httpRequest(forwardedUrl, "PUT", null, null, null, sessionKey, requestSignatureAlgorithm); + callback.onCompletion(null, null); + } catch (Throwable t) { + callback.onCompletion(t, null); + } + }); + } else { + error = ConnectUtils.maybeWrap(error, "Request forwarding disabled in distributed MirrorMaker2; fencing zombie source tasks must be performed by the leader"); Review Comment: We shouldn't use `maybeWrap` here since we know that the exception is a `NotLeaderException`, which extends `ConnectException`, so the error message will never actually be used. The message itself is also a little misleading since it implies that it'll be possible to bring up source tasks in exactly-once mode on multi-node clusters as long as the leader does the fencing, but that's not actually the case (it's impossible for a follower to start any source tasks with exactly-once enabled if it cannot issue a REST request to the leader's internal zombie fencing endpoint). We might consider altering that behavior in the future, but that's definitely out of scope for this PR. ```suggestion callback.onCompletion( new ConnectException( // TODO: Update this message if KIP-710 is accepted and merged // (https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters) "This worker is not able to communicate with the leader of the cluster, " + "which is required for exactly-once source tasks. If running MirrorMaker 2 " + "in dedicated mode, consider either disabling exactly-once support, or deploying " + "the connectors for MirrorMaker 2 directly onto a distributed Kafka Connect cluster." ), null ); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.distributed.NotLeaderException; +import org.apache.kafka.connect.runtime.distributed.RequestTargetException; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.RestServer; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.util.SSLUtils; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestSslUtils; +import org.apache.kafka.test.TestUtils; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.junit.After; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +@SuppressWarnings("unchecked") +@Category(IntegrationTest.class) +public class RestForwardingIntegrationTest { + + @Mock + private Plugins plugins; + private RestClient followerClient; + private RestServer followerServer; + @Mock + private Herder followerHerder; + private RestClient leaderClient; + private RestServer leaderServer; + @Mock + private Herder leaderHerder; + + private SslContextFactory factory; + private CloseableHttpClient httpClient; + private Collection<CloseableHttpResponse> responses = new ArrayList<>(); + + @After + public void tearDown() throws IOException { + for (CloseableHttpResponse response: responses) { + response.close(); + } + AtomicReference<Throwable> firstException = new AtomicReference<>(); + Utils.closeAllQuietly( + firstException, + "clientsAndServers", + httpClient, + followerClient, + followerServer != null ? followerServer::stop : null, + leaderClient, + leaderServer != null ? leaderServer::stop : null, + factory != null ? factory::stop : null + ); + if (firstException.get() != null) { + throw new RuntimeException("Unable to cleanly close resources", firstException.get()); + } + } + + @Test + public void testRestForwardNoSsl() throws Exception { + testRestForwardToLeader(false, false); + } + + @Test + public void testRestForwardSsl() throws Exception { + testRestForwardToLeader(true, true); + } + + @Test + public void testRestForwardLeaderSsl() throws Exception { + testRestForwardToLeader(false, true); + } + + @Test + public void testRestForwardFollowerSsl() throws Exception { + testRestForwardToLeader(true, false); + } + + public void testRestForwardToLeader(boolean followerSsl, boolean leaderSsl) throws Exception { + DistributedConfig followerConfig = new DistributedConfig(baseWorkerProps(followerSsl)); + DistributedConfig leaderConfig = new DistributedConfig(baseWorkerProps(leaderSsl)); + + // Follower worker setup + followerClient = new RestClient(followerConfig); + followerServer = new RestServer(followerConfig, followerClient); + followerServer.initializeServer(); + when(followerHerder.plugins()).thenReturn(plugins); + followerServer.initializeResources(followerHerder); + + // Leader worker setup + leaderClient = new RestClient(leaderConfig); + leaderServer = new RestServer(leaderConfig, leaderClient); + leaderServer.initializeServer(); + when(leaderHerder.plugins()).thenReturn(plugins); + leaderServer.initializeResources(leaderHerder); + + // External client setup + factory = SSLUtils.createClientSideSslContextFactory(followerConfig); + factory.start(); + SSLContext ssl = factory.getSslContext(); + httpClient = HttpClients.custom() + .setSSLContext(ssl) + .build(); + + // Follower will forward to the leader + URI leaderUrl = leaderServer.advertisedUrl(); + RequestTargetException forwardException = new NotLeaderException("Not leader", leaderUrl.toString()); + ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> followerCallbackCaptor = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { + followerCallbackCaptor.getValue().onCompletion(forwardException, null); + return null; + }).when(followerHerder) + .putConnectorConfig(any(), any(), anyBoolean(), followerCallbackCaptor.capture()); + + // Leader will reply + Herder.Created<ConnectorInfo> leaderAnswer = new Herder.Created<>(true, null); + ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> leaderCallbackCaptor = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { + leaderCallbackCaptor.getValue().onCompletion(null, leaderAnswer); + return null; + }).when(followerHerder) Review Comment: Should this be the leader? ```suggestion }).when(leaderHerder) ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java: ########## @@ -43,10 +43,38 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -public class RestClient { +public class RestClient implements AutoCloseable { Review Comment: Do we know that the components we use (such as the SSL factory and the Jetty `HttpClient`) are themselves thread-safe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org