C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1024255803


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1906,6 +1917,8 @@ private void reconfigureConnector(final String connName, 
final Callback<Void> cb
                 if (isLeader()) {
                     writeToConfigTopicAsLeader(() -> 
configBackingStore.putTaskConfigs(connName, rawTaskProps));
                     cb.onCompletion(null, null);
+                } else if (restClient == null) {
+                    throw new NotLeaderException("Request forwarding disabled 
in distributed MirrorMaker2; reconfiguring tasks must be performed by the 
leader", leaderUrl());

Review Comment:
   Similar wording suggestions for this message:
   ```suggestion
                       // TODO: Update this message if KIP-710 is accepted and 
merged
                       //       
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)
                       throw new NotLeaderException("This worker is not able to 
communicate with the leader of the cluster, "
                                       + "which is required for 
dynamically-reconfiguring connectors. If running MirrorMaker 2 "
                                       + "in dedicated mode, consider deploying 
the connectors for MirrorMaker 2 directly onto a "
                                       + "distributed Kafka Connect cluster.",
                               leaderUrl()
                       );
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
     private static final Logger log = 
LoggerFactory.getLogger(RestClient.class);
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+    private final HttpClient client;
+    public RestClient(WorkerConfig config) {
+        client = new 
HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   This new integration test is fantastic, thanks!



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1157,16 +1163,21 @@ void fenceZombieSourceTasks(final ConnectorTaskId id, 
Callback<Void> callback) {
             if (error == null) {
                 callback.onCompletion(null, null);
             } else if (error instanceof NotLeaderException) {
-                String forwardedUrl = ((NotLeaderException) 
error).forwardUrl() + "connectors/" + id.connector() + "/fence";
-                log.trace("Forwarding zombie fencing request for connector {} 
to leader at {}", id.connector(), forwardedUrl);
-                forwardRequestExecutor.execute(() -> {
-                    try {
-                        RestClient.httpRequest(forwardedUrl, "PUT", null, 
null, null, config, sessionKey, requestSignatureAlgorithm);
-                        callback.onCompletion(null, null);
-                    } catch (Throwable t) {
-                        callback.onCompletion(t, null);
-                    }
-                });
+                if (restClient != null) {
+                    String forwardedUrl = ((NotLeaderException) 
error).forwardUrl() + "connectors/" + id.connector() + "/fence";
+                    log.trace("Forwarding zombie fencing request for connector 
{} to leader at {}", id.connector(), forwardedUrl);
+                    forwardRequestExecutor.execute(() -> {
+                        try {
+                            restClient.httpRequest(forwardedUrl, "PUT", null, 
null, null, sessionKey, requestSignatureAlgorithm);
+                            callback.onCompletion(null, null);
+                        } catch (Throwable t) {
+                            callback.onCompletion(t, null);
+                        }
+                    });
+                } else {
+                    error = ConnectUtils.maybeWrap(error, "Request forwarding 
disabled in distributed MirrorMaker2; fencing zombie source tasks must be 
performed by the leader");

Review Comment:
   We shouldn't use `maybeWrap` here since we know that the exception is a 
`NotLeaderException`, which extends `ConnectException`, so the error message 
will never actually be used.
   
   The message itself is also a little misleading since it implies that it'll 
be possible to bring up source tasks in exactly-once mode on multi-node 
clusters as long as the leader does the fencing, but that's not actually the 
case (it's impossible for a follower to start any source tasks with 
exactly-once enabled if it cannot issue a REST request to the leader's internal 
zombie fencing endpoint). We might consider altering that behavior in the 
future, but that's definitely out of scope for this PR.
   
   ```suggestion
                       callback.onCompletion(
                               new ConnectException(
                                       // TODO: Update this message if KIP-710 
is accepted and merged
                                       //       
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)
                                       "This worker is not able to communicate 
with the leader of the cluster, "
                                               + "which is required for 
exactly-once source tasks. If running MirrorMaker 2 "
                                               + "in dedicated mode, consider 
either disabling exactly-once support, or deploying "
                                               + "the connectors for 
MirrorMaker 2 directly onto a distributed Kafka Connect cluster."
                               ),
                               null
                       );
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@SuppressWarnings("unchecked")
+@Category(IntegrationTest.class)
+public class RestForwardingIntegrationTest {
+
+    @Mock
+    private Plugins plugins;
+    private RestClient followerClient;
+    private RestServer followerServer;
+    @Mock
+    private Herder followerHerder;
+    private RestClient leaderClient;
+    private RestServer leaderServer;
+    @Mock
+    private Herder leaderHerder;
+
+    private SslContextFactory factory;
+    private CloseableHttpClient httpClient;
+    private Collection<CloseableHttpResponse> responses = new ArrayList<>();
+
+    @After
+    public void tearDown() throws IOException {
+        for (CloseableHttpResponse response: responses) {
+            response.close();
+        }
+        AtomicReference<Throwable> firstException = new AtomicReference<>();
+        Utils.closeAllQuietly(
+                firstException,
+                "clientsAndServers",
+                httpClient,
+                followerClient,
+                followerServer != null ? followerServer::stop : null,
+                leaderClient,
+                leaderServer != null ? leaderServer::stop : null,
+                factory != null ? factory::stop : null
+        );
+        if (firstException.get() != null) {
+            throw new RuntimeException("Unable to cleanly close resources", 
firstException.get());
+        }
+    }
+
+    @Test
+    public void testRestForwardNoSsl() throws Exception {
+        testRestForwardToLeader(false, false);
+    }
+
+    @Test
+    public void testRestForwardSsl() throws Exception {
+        testRestForwardToLeader(true, true);
+    }
+
+    @Test
+    public void testRestForwardLeaderSsl() throws Exception {
+        testRestForwardToLeader(false, true);
+    }
+
+    @Test
+    public void testRestForwardFollowerSsl() throws Exception {
+        testRestForwardToLeader(true, false);
+    }
+
+    public void testRestForwardToLeader(boolean followerSsl, boolean 
leaderSsl) throws Exception {
+        DistributedConfig followerConfig = new 
DistributedConfig(baseWorkerProps(followerSsl));
+        DistributedConfig leaderConfig = new 
DistributedConfig(baseWorkerProps(leaderSsl));
+
+        // Follower worker setup
+        followerClient = new RestClient(followerConfig);
+        followerServer = new RestServer(followerConfig, followerClient);
+        followerServer.initializeServer();
+        when(followerHerder.plugins()).thenReturn(plugins);
+        followerServer.initializeResources(followerHerder);
+
+        // Leader worker setup
+        leaderClient = new RestClient(leaderConfig);
+        leaderServer = new RestServer(leaderConfig, leaderClient);
+        leaderServer.initializeServer();
+        when(leaderHerder.plugins()).thenReturn(plugins);
+        leaderServer.initializeResources(leaderHerder);
+
+        // External client setup
+        factory = SSLUtils.createClientSideSslContextFactory(followerConfig);
+        factory.start();
+        SSLContext ssl = factory.getSslContext();
+        httpClient = HttpClients.custom()
+                .setSSLContext(ssl)
+                .build();
+
+        // Follower will forward to the leader
+        URI leaderUrl = leaderServer.advertisedUrl();
+        RequestTargetException forwardException = new NotLeaderException("Not 
leader", leaderUrl.toString());
+        ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> 
followerCallbackCaptor = ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            followerCallbackCaptor.getValue().onCompletion(forwardException, 
null);
+            return null;
+        }).when(followerHerder)
+                .putConnectorConfig(any(), any(), anyBoolean(), 
followerCallbackCaptor.capture());
+
+        // Leader will reply
+        Herder.Created<ConnectorInfo> leaderAnswer = new 
Herder.Created<>(true, null);
+        ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> 
leaderCallbackCaptor = ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            leaderCallbackCaptor.getValue().onCompletion(null, leaderAnswer);
+            return null;
+        }).when(followerHerder)

Review Comment:
   Should this be the leader?
   ```suggestion
           }).when(leaderHerder)
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {

Review Comment:
   Do we know that the components we use (such as the SSL factory and the Jetty 
`HttpClient`) are themselves thread-safe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to