[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls
C0urante commented on code in PR #12828: URL: https://github.com/apache/kafka/pull/12828#discussion_r1139465010 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java: ## @@ -43,10 +43,38 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -public class RestClient { +public class RestClient implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(RestClient.class); private static final ObjectMapper JSON_SERDE = new ObjectMapper(); +private final HttpClient client; +public RestClient(WorkerConfig config) { +client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config)); Review Comment: @imcdo If this change has (or appears to have) broken anything, feel free to file a Jira ticket and add example worker configs, testing scenarios, and anything else that can help us understand what's going wrong and, if applicable, how to fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls
C0urante commented on code in PR #12828: URL: https://github.com/apache/kafka/pull/12828#discussion_r1025489778 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java: ## @@ -43,10 +43,38 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -public class RestClient { +public class RestClient implements AutoCloseable { Review Comment: Sounds good. This shouldn't be any less performant than the current `trunk` and we can always revisit later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls
C0urante commented on code in PR #12828: URL: https://github.com/apache/kafka/pull/12828#discussion_r1025340543 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.distributed.NotLeaderException; +import org.apache.kafka.connect.runtime.distributed.RequestTargetException; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.RestServer; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; +import org.apache.kafka.connect.runtime.rest.util.SSLUtils; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestSslUtils; +import org.apache.kafka.test.TestUtils; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +@SuppressWarnings("unchecked") +@Category(IntegrationTest.class) +public class RestForwardingIntegrationTest { + +private Map sslConfig; +@Mock +private Plugins plugins; +private RestClient followerClient; +private RestServer followerServer; +@Mock +private Herder followerHerder; +private RestClient leaderClient; +private RestServer leaderServer; +@Mock +private Herder leaderHerder; + +private SslContextFactory factory; +private CloseableHttpClient httpClient; +private Collection responses = new ArrayList<>(); + +@Before +public void setUp() throws IOException, GeneralSecurityException { +sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert"); +} + +@After +public void tearDown() throws IOException { +for (CloseableHttpResponse response: responses) { +response.close(); +} +AtomicReference firstException = new AtomicReference<>(); +Utils.closeAllQuietly( +firstException, +"clientsAndServers", +httpClient, +followerServer != null ? followerServer::stop : null, +leaderServer != null ? leaderServer::stop : null, +factory != null ? factory::stop : null +); +if (firstException.get() != null) { +
[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls
C0urante commented on code in PR #12828: URL: https://github.com/apache/kafka/pull/12828#discussion_r1025336265 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.distributed.NotLeaderException; +import org.apache.kafka.connect.runtime.distributed.RequestTargetException; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.RestServer; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; +import org.apache.kafka.connect.runtime.rest.util.SSLUtils; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestSslUtils; +import org.apache.kafka.test.TestUtils; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +@SuppressWarnings("unchecked") +@Category(IntegrationTest.class) +public class RestForwardingIntegrationTest { + +private Map sslConfig; +@Mock +private Plugins plugins; +private RestClient followerClient; +private RestServer followerServer; +@Mock +private Herder followerHerder; +private RestClient leaderClient; +private RestServer leaderServer; +@Mock +private Herder leaderHerder; + +private SslContextFactory factory; +private CloseableHttpClient httpClient; +private Collection responses = new ArrayList<>(); + +@Before +public void setUp() throws IOException, GeneralSecurityException { +sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert"); +} + +@After +public void tearDown() throws IOException { +for (CloseableHttpResponse response: responses) { +response.close(); +} +AtomicReference firstException = new AtomicReference<>(); +Utils.closeAllQuietly( +firstException, +"clientsAndServers", +httpClient, +followerServer != null ? followerServer::stop : null, +leaderServer != null ? leaderServer::stop : null, +factory != null ? factory::stop : null +); +if (firstException.get() != null) { +
[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls
C0urante commented on code in PR #12828: URL: https://github.com/apache/kafka/pull/12828#discussion_r1024255803 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1906,6 +1917,8 @@ private void reconfigureConnector(final String connName, final Callback cb if (isLeader()) { writeToConfigTopicAsLeader(() -> configBackingStore.putTaskConfigs(connName, rawTaskProps)); cb.onCompletion(null, null); +} else if (restClient == null) { +throw new NotLeaderException("Request forwarding disabled in distributed MirrorMaker2; reconfiguring tasks must be performed by the leader", leaderUrl()); Review Comment: Similar wording suggestions for this message: ```suggestion // TODO: Update this message if KIP-710 is accepted and merged // (https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters) throw new NotLeaderException("This worker is not able to communicate with the leader of the cluster, " + "which is required for dynamically-reconfiguring connectors. If running MirrorMaker 2 " + "in dedicated mode, consider deploying the connectors for MirrorMaker 2 directly onto a " + "distributed Kafka Connect cluster.", leaderUrl() ); ``` ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java: ## @@ -43,10 +43,38 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -public class RestClient { +public class RestClient implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(RestClient.class); private static final ObjectMapper JSON_SERDE = new ObjectMapper(); +private final HttpClient client; +public RestClient(WorkerConfig config) { +client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config)); Review Comment: This new integration test is fantastic, thanks! ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1157,16 +1163,21 @@ void fenceZombieSourceTasks(final ConnectorTaskId id, Callback callback) { if (error == null) { callback.onCompletion(null, null); } else if (error instanceof NotLeaderException) { -String forwardedUrl = ((NotLeaderException) error).forwardUrl() + "connectors/" + id.connector() + "/fence"; -log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), forwardedUrl); -forwardRequestExecutor.execute(() -> { -try { -RestClient.httpRequest(forwardedUrl, "PUT", null, null, null, config, sessionKey, requestSignatureAlgorithm); -callback.onCompletion(null, null); -} catch (Throwable t) { -callback.onCompletion(t, null); -} -}); +if (restClient != null) { +String forwardedUrl = ((NotLeaderException) error).forwardUrl() + "connectors/" + id.connector() + "/fence"; +log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), forwardedUrl); +forwardRequestExecutor.execute(() -> { +try { +restClient.httpRequest(forwardedUrl, "PUT", null, null, null, sessionKey, requestSignatureAlgorithm); +callback.onCompletion(null, null); +} catch (Throwable t) { +callback.onCompletion(t, null); +} +}); +} else { +error = ConnectUtils.maybeWrap(error, "Request forwarding disabled in distributed MirrorMaker2; fencing zombie source tasks must be performed by the leader"); Review Comment: We shouldn't use `maybeWrap` here since we know that the exception is a `NotLeaderException`, which extends `ConnectException`, so the error message will never actually be used. The message itself is also a little misleading since it implies that it'll be possible to bring up source tasks in exactly-once mode on multi-node clusters as long as the leader does the fencing, but that's not actually the case (it's impossible for a follower to start any source tasks with exactly-once enabled if it cannot issue a REST request to the leader's internal zombie fencing endpoint). We might consider
[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls
C0urante commented on code in PR #12828: URL: https://github.com/apache/kafka/pull/12828#discussion_r1021729259 ## connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java: ## @@ -138,7 +141,7 @@ public Connect startConnect(Map workerProps) { // herder is stopped. This is easier than having to track and own the lifecycle ourselves. DistributedHerder herder = new DistributedHerder(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, -advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin); +advertisedUrl.toString(), restClient, connectorClientConfigOverridePolicy, sharedAdmin, restClient); Review Comment: I think it's more natural to close it in the herder, primarily because of the difference you've noted (where the shared admin client isn't used directly by the herder, but the REST client is). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls
C0urante commented on code in PR #12828: URL: https://github.com/apache/kafka/pull/12828#discussion_r1020367480 ## connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java: ## @@ -138,7 +141,7 @@ public Connect startConnect(Map workerProps) { // herder is stopped. This is easier than having to track and own the lifecycle ourselves. DistributedHerder herder = new DistributedHerder(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, -advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin); +advertisedUrl.toString(), restClient, connectorClientConfigOverridePolicy, sharedAdmin, restClient); Review Comment: Isn't it a bit of an antipattern to have to pass in the client as part of the `uponShutdown` list? Is there any case where we'd want to instantiate a `DistributedHerder` with a client, but not have that client be shut down at the same time as the herder? I'm wondering if we can just automatically close the client in `DistributedHerder::stop`. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ## @@ -82,16 +82,19 @@ public class ConnectorsResource implements ConnectResource { new TypeReference>>() { }; private final Herder herder; -private final WorkerConfig config; +private final RestClient restClient; private long requestTimeoutMs; @javax.ws.rs.core.Context private ServletContext context; private final boolean isTopicTrackingDisabled; private final boolean isTopicTrackingResetDisabled; public ConnectorsResource(Herder herder, WorkerConfig config) { Review Comment: It doesn't look like this constructor is used anywhere; can we remove it? ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java: ## @@ -156,15 +154,15 @@ public class ConnectorsResourceTest { private UriInfo forward; @Mock private WorkerConfig workerConfig; - -private MockedStatic restClientStatic; +@Mock +private RestClient restClient; @Before public void setUp() throws NoSuchMethodException { -restClientStatic = mockStatic(RestClient.class); +restClient = mock(RestClient.class); Review Comment: Same thought--is this necessary since we annotate the field with `@Mock`? ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -230,6 +231,7 @@ public class DistributedHerderTest { public void setUp() throws Exception { time = new MockTime(); metrics = new MockConnectMetrics(time); +restClient = PowerMock.createMock(RestClient.class); Review Comment: Do we need this? The `restClient` field is already annotated with `@Mock`. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java: ## @@ -43,10 +43,38 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -public class RestClient { +public class RestClient implements AutoCloseable { Review Comment: Should we add a note on thread-safety to this class, since we're creating and sharing a single instance per worker? Also, have you checked and verified that this class is actually thread-safe? ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java: ## @@ -43,10 +43,38 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -public class RestClient { +public class RestClient implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(RestClient.class); private static final ObjectMapper JSON_SERDE = new ObjectMapper(); +private final HttpClient client; +public RestClient(WorkerConfig config) { +client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config)); Review Comment: I see the note in the description that this should be a lateral change, but that appears to only cover the case of an SSL-configured client being used to make plain HTTP requests. What about cases where the user has not specified any SSL-related properties in their worker config? Will `SSLUtils::createClientSideSslContextFactory` essentially create a no-op factory that can still be used for HTTP requests? If it's not too painful, it'd be nice to see some test coverage for the `RestClient` with/without SSL properties in the `WorkerConfig` it's instantiated with. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1156,12 +1161,12 @@ void fenceZombieSourceTasks(final ConnectorTaskId id, Callback callback) {