[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2023-03-16 Thread via GitHub


C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1139465010


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
 private static final Logger log = 
LoggerFactory.getLogger(RestClient.class);
 private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+private final HttpClient client;
+public RestClient(WorkerConfig config) {
+client = new 
HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   @imcdo If this change has (or appears to have) broken anything, feel free to 
file a Jira ticket and add example worker configs, testing scenarios, and 
anything else that can help us understand what's going wrong and, if 
applicable, how to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-17 Thread GitBox


C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1025489778


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {

Review Comment:
   Sounds good. This shouldn't be any less performant than the current `trunk` 
and we can always revisit later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-17 Thread GitBox


C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1025340543


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
+import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@SuppressWarnings("unchecked")
+@Category(IntegrationTest.class)
+public class RestForwardingIntegrationTest {
+
+private Map sslConfig;
+@Mock
+private Plugins plugins;
+private RestClient followerClient;
+private RestServer followerServer;
+@Mock
+private Herder followerHerder;
+private RestClient leaderClient;
+private RestServer leaderServer;
+@Mock
+private Herder leaderHerder;
+
+private SslContextFactory factory;
+private CloseableHttpClient httpClient;
+private Collection responses = new ArrayList<>();
+
+@Before
+public void setUp() throws IOException, GeneralSecurityException {
+sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, 
TestUtils.tempFile(), "testCert");
+}
+
+@After
+public void tearDown() throws IOException {
+for (CloseableHttpResponse response: responses) {
+response.close();
+}
+AtomicReference firstException = new AtomicReference<>();
+Utils.closeAllQuietly(
+firstException,
+"clientsAndServers",
+httpClient,
+followerServer != null ? followerServer::stop : null,
+leaderServer != null ? leaderServer::stop : null,
+factory != null ? factory::stop : null
+);
+if (firstException.get() != null) {
+ 

[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-17 Thread GitBox


C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1025336265


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
+import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+@SuppressWarnings("unchecked")
+@Category(IntegrationTest.class)
+public class RestForwardingIntegrationTest {
+
+private Map sslConfig;
+@Mock
+private Plugins plugins;
+private RestClient followerClient;
+private RestServer followerServer;
+@Mock
+private Herder followerHerder;
+private RestClient leaderClient;
+private RestServer leaderServer;
+@Mock
+private Herder leaderHerder;
+
+private SslContextFactory factory;
+private CloseableHttpClient httpClient;
+private Collection responses = new ArrayList<>();
+
+@Before
+public void setUp() throws IOException, GeneralSecurityException {
+sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, 
TestUtils.tempFile(), "testCert");
+}
+
+@After
+public void tearDown() throws IOException {
+for (CloseableHttpResponse response: responses) {
+response.close();
+}
+AtomicReference firstException = new AtomicReference<>();
+Utils.closeAllQuietly(
+firstException,
+"clientsAndServers",
+httpClient,
+followerServer != null ? followerServer::stop : null,
+leaderServer != null ? leaderServer::stop : null,
+factory != null ? factory::stop : null
+);
+if (firstException.get() != null) {
+ 

[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-16 Thread GitBox


C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1024255803


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1906,6 +1917,8 @@ private void reconfigureConnector(final String connName, 
final Callback cb
 if (isLeader()) {
 writeToConfigTopicAsLeader(() -> 
configBackingStore.putTaskConfigs(connName, rawTaskProps));
 cb.onCompletion(null, null);
+} else if (restClient == null) {
+throw new NotLeaderException("Request forwarding disabled 
in distributed MirrorMaker2; reconfiguring tasks must be performed by the 
leader", leaderUrl());

Review Comment:
   Similar wording suggestions for this message:
   ```suggestion
   // TODO: Update this message if KIP-710 is accepted and 
merged
   //   
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)
   throw new NotLeaderException("This worker is not able to 
communicate with the leader of the cluster, "
   + "which is required for 
dynamically-reconfiguring connectors. If running MirrorMaker 2 "
   + "in dedicated mode, consider deploying 
the connectors for MirrorMaker 2 directly onto a "
   + "distributed Kafka Connect cluster.",
   leaderUrl()
   );
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
 private static final Logger log = 
LoggerFactory.getLogger(RestClient.class);
 private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+private final HttpClient client;
+public RestClient(WorkerConfig config) {
+client = new 
HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   This new integration test is fantastic, thanks!



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1157,16 +1163,21 @@ void fenceZombieSourceTasks(final ConnectorTaskId id, 
Callback callback) {
 if (error == null) {
 callback.onCompletion(null, null);
 } else if (error instanceof NotLeaderException) {
-String forwardedUrl = ((NotLeaderException) 
error).forwardUrl() + "connectors/" + id.connector() + "/fence";
-log.trace("Forwarding zombie fencing request for connector {} 
to leader at {}", id.connector(), forwardedUrl);
-forwardRequestExecutor.execute(() -> {
-try {
-RestClient.httpRequest(forwardedUrl, "PUT", null, 
null, null, config, sessionKey, requestSignatureAlgorithm);
-callback.onCompletion(null, null);
-} catch (Throwable t) {
-callback.onCompletion(t, null);
-}
-});
+if (restClient != null) {
+String forwardedUrl = ((NotLeaderException) 
error).forwardUrl() + "connectors/" + id.connector() + "/fence";
+log.trace("Forwarding zombie fencing request for connector 
{} to leader at {}", id.connector(), forwardedUrl);
+forwardRequestExecutor.execute(() -> {
+try {
+restClient.httpRequest(forwardedUrl, "PUT", null, 
null, null, sessionKey, requestSignatureAlgorithm);
+callback.onCompletion(null, null);
+} catch (Throwable t) {
+callback.onCompletion(t, null);
+}
+});
+} else {
+error = ConnectUtils.maybeWrap(error, "Request forwarding 
disabled in distributed MirrorMaker2; fencing zombie source tasks must be 
performed by the leader");

Review Comment:
   We shouldn't use `maybeWrap` here since we know that the exception is a 
`NotLeaderException`, which extends `ConnectException`, so the error message 
will never actually be used.
   
   The message itself is also a little misleading since it implies that it'll 
be possible to bring up source tasks in exactly-once mode on multi-node 
clusters as long as the leader does the fencing, but that's not actually the 
case (it's impossible for a follower to start any source tasks with 
exactly-once enabled if it cannot issue a REST request to the leader's internal 
zombie fencing endpoint). We might consider 

[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-14 Thread GitBox


C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1021729259


##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java:
##
@@ -138,7 +141,7 @@ public Connect startConnect(Map 
workerProps) {
 // herder is stopped. This is easier than having to track and own the 
lifecycle ourselves.
 DistributedHerder herder = new DistributedHerder(config, time, worker,
 kafkaClusterId, statusBackingStore, configBackingStore,
-advertisedUrl.toString(), connectorClientConfigOverridePolicy, 
sharedAdmin);
+advertisedUrl.toString(), restClient, 
connectorClientConfigOverridePolicy, sharedAdmin, restClient);

Review Comment:
   I think it's more natural to close it in the herder, primarily because of 
the difference you've noted (where the shared admin client isn't used directly 
by the herder, but the REST client is).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-11 Thread GitBox


C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1020367480


##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java:
##
@@ -138,7 +141,7 @@ public Connect startConnect(Map 
workerProps) {
 // herder is stopped. This is easier than having to track and own the 
lifecycle ourselves.
 DistributedHerder herder = new DistributedHerder(config, time, worker,
 kafkaClusterId, statusBackingStore, configBackingStore,
-advertisedUrl.toString(), connectorClientConfigOverridePolicy, 
sharedAdmin);
+advertisedUrl.toString(), restClient, 
connectorClientConfigOverridePolicy, sharedAdmin, restClient);

Review Comment:
   Isn't it a bit of an antipattern to have to pass in the client as part of 
the `uponShutdown` list? Is there any case where we'd want to instantiate a 
`DistributedHerder` with a client, but not have that client be shut down at the 
same time as the herder?
   
   I'm wondering if we can just automatically close the client in 
`DistributedHerder::stop`.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##
@@ -82,16 +82,19 @@ public class ConnectorsResource implements ConnectResource {
 new TypeReference>>() { };
 
 private final Herder herder;
-private final WorkerConfig config;
+private final RestClient restClient;
 private long requestTimeoutMs;
 @javax.ws.rs.core.Context
 private ServletContext context;
 private final boolean isTopicTrackingDisabled;
 private final boolean isTopicTrackingResetDisabled;
 
 public ConnectorsResource(Herder herder, WorkerConfig config) {

Review Comment:
   It doesn't look like this constructor is used anywhere; can we remove it?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java:
##
@@ -156,15 +154,15 @@ public class ConnectorsResourceTest {
 private UriInfo forward;
 @Mock
 private WorkerConfig workerConfig;
-
-private MockedStatic restClientStatic;
+@Mock
+private RestClient restClient;
 
 @Before
 public void setUp() throws NoSuchMethodException {
-restClientStatic = mockStatic(RestClient.class);
+restClient = mock(RestClient.class);

Review Comment:
   Same thought--is this necessary since we annotate the field with `@Mock`?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -230,6 +231,7 @@ public class DistributedHerderTest {
 public void setUp() throws Exception {
 time = new MockTime();
 metrics = new MockConnectMetrics(time);
+restClient = PowerMock.createMock(RestClient.class);

Review Comment:
   Do we need this? The `restClient` field is already annotated with `@Mock`.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {

Review Comment:
   Should we add a note on thread-safety to this class, since we're creating 
and sharing a single instance per worker?
   
   Also, have you checked and verified that this class is actually thread-safe?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
 private static final Logger log = 
LoggerFactory.getLogger(RestClient.class);
 private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+private final HttpClient client;
+public RestClient(WorkerConfig config) {
+client = new 
HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   I see the note in the description that this should be a lateral change, but 
that appears to only cover the case of an SSL-configured client being used to 
make plain HTTP requests.
   
   What about cases where the user has not specified any SSL-related properties 
in their worker config? Will `SSLUtils::createClientSideSslContextFactory` 
essentially create a no-op factory that can still be used for HTTP requests?
   
   If it's not too painful, it'd be nice to see some test coverage for the 
`RestClient` with/without SSL properties in the `WorkerConfig` it's 
instantiated with.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1156,12 +1161,12 @@ void fenceZombieSourceTasks(final ConnectorTaskId id, 
Callback callback) {