[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1100587361 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.rest.resources; + +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.NotFoundException; +import javax.ws.rs.Path; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.UriInfo; +import java.util.List; +import java.util.Map; + +@Path("/{source}/{target}/connectors") +public class InternalMirrorResource extends InternalClusterResource { + +@Context +private UriInfo uriInfo; + +private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class); + +private final Map herders; + +public InternalMirrorResource(Map herders, RestClient restClient) { +super(restClient); +this.herders = herders; +} + +@Override +protected Herder herderForRequest() { +String source = pathParam("source"); +String target = pathParam("target"); +Herder result = herders.get(new SourceAndTarget(source, target)); +if (result == null) { +log.debug("Failed to find herder for source '{}' and target '{}'", source, target); Review Comment: I was under the impression that throwing this would only cause the exception message to be present in the REST response (and presumably the logs for the worker that issued the request), but after revisiting the `ConnectExceptionMapper` class, it appears that we'll also log the exception at debug level on this worker. So yep, we can remove this 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1100585212 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.rest.resources; + +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.NotFoundException; +import javax.ws.rs.Path; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.UriInfo; +import java.util.List; +import java.util.Map; + +@Path("/{source}/{target}/connectors") +public class InternalMirrorResource extends InternalClusterResource { + +@Context +private UriInfo uriInfo; + +private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class); + +private final Map herders; + +public InternalMirrorResource(Map herders, RestClient restClient) { +super(restClient); +this.herders = herders; +} + +@Override +protected Herder herderForRequest() { +String source = pathParam("source"); +String target = pathParam("target"); +Herder result = herders.get(new SourceAndTarget(source, target)); +if (result == null) { +log.debug("Failed to find herder for source '{}' and target '{}'", source, target); +throw new NotFoundException("No replication flow found for source '" + source + "' and target '" + target + "'"); +} +return result; +} + +private String pathParam(String name) { +List result = uriInfo.getPathParameters().get(name); +if (result == null || result.isEmpty()) +throw new NotFoundException(); Review Comment: Yeah, couldn't hurt. This is almost guaranteed to end up in a log file on another worker so in the unlikely event that this code path is somehow reached, more detail would be preferable to less. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1090943031 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.mirror.MirrorMaker; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.test.TestUtils.waitForCondition; + +@Tag("integration") +public class DedicatedMirrorIntegrationTest { + +private static final Logger log = LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class); + +private static final int TOPIC_CREATION_TIMEOUT_MS = 30_000; +private static final int TOPIC_REPLICATION_TIMEOUT_MS = 30_000; + +private Map kafkaClusters; +private Map mirrorMakers; + +@BeforeEach +public void setup() { +kafkaClusters = new HashMap<>(); +mirrorMakers = new HashMap<>(); +} + +@AfterEach +public void teardown() throws Throwable { +AtomicReference shutdownFailure = new AtomicReference<>(); +mirrorMakers.forEach((name, mirrorMaker) -> +Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure) +); +kafkaClusters.forEach((name, kafkaCluster) -> +Utils.closeQuietly(kafkaCluster::stop, "Embedded Kafka cluster '" + name + "'", shutdownFailure) +); +if (shutdownFailure.get() != null) { +throw shutdownFailure.get(); +} +} + +private EmbeddedKafkaCluster startKafkaCluster(String name, int numBrokers, Properties brokerProperties) { +if (kafkaClusters.containsKey(name)) +throw new IllegalStateException("Cannot register multiple Kafka clusters with the same name"); + +EmbeddedKafkaCluster result = new EmbeddedKafkaCluster(numBrokers, brokerProperties); +kafkaClusters.put(name, result); + +result.start(); + +return result; +} + +private MirrorMaker startMirrorMaker(String name, Map mmProps) { +if (mirrorMakers.containsKey(name)) +throw new IllegalStateException("Cannot register multiple MirrorMaker nodes with the same name"); + +MirrorMaker result = new MirrorMaker(mmProps); +mirrorMakers.put(name, result); + +result.start(); + +return result; +} + +/** + * Test that a multi-node dedicated cluster is able to dynamically detect new topics at runtime Review Comment: We don't have a guarantee that task configs are relayed from one worker to another, that's true. However, because we enable exactly-once support, we do get a guarantee that intra-cluster communication takes place, since no source tasks can start on a follower node without first issuing a REST request to the cluster's leader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1090936585 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java: ## @@ -119,7 +126,16 @@ public class MirrorMaker { public MirrorMaker(MirrorMakerConfig config, List clusters, Time time) { log.debug("Kafka MirrorMaker instance created"); this.time = time; -this.advertisedBaseUrl = "NOTUSED"; +if (config.enableInternalRest()) { +this.restClient = new RestClient(config); +internalServer = new MirrorRestServer(config.originals(), restClient); +internalServer.initializeServer(); +this.advertisedUrl = internalServer.advertisedUrl().toString(); +} else { +internalServer = null; +restClient = null; +this.advertisedUrl = "NOTUSED"; Review Comment: It ends up being used in the metadata workers send to the group coordinator during rebalances, and the schema we use to (de)serialize that data requires a non-null string for the worker URL. I stuck with the `"NOTUSED"` placeholder instead of an empty string since that's what we're using currently, and it doesn't seem worth the risk to try to change it now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1090935837 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java: ## @@ -255,13 +287,26 @@ private void addHerder(SourceAndTarget sourceAndTarget) { // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than // tracking the various shared admin objects in this class. -// Do not provide a restClient to the DistributedHerder to indicate that request forwarding is disabled Herder herder = new DistributedHerder(distributedConfig, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, -advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin); +advertisedUrl, restClient, CLIENT_CONFIG_OVERRIDE_POLICY, +restNamespace, sharedAdmin); herders.put(sourceAndTarget, herder); } +private static String encodePath(String rawPath) throws UnsupportedEncodingException { +return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name()) +// Java's out-of-the-box URL encoder encodes spaces (' ') as pluses ('+'), +// and pluses as '%2B' +// But Jetty doesn't decode pluses at all and leaves them as-are in decoded +// URLs +// So to get around that, we replace pluses in the encoded URL here with '%20', +// which is the encoding that Jetty expects for spaces +// Jetty will reverse this transformation when evaluating the path parameters +// and will return decoded strings with all special characters as they were. Review Comment: It's the result of fitting a square peg (Java's `URLEncoder` class, which, despite the name, is designed for HTML form encoding instead of URL path encoding) into a round hole (URL path encoding). I couldn't find a better alternative than this, and considering the fairly low risk (this is all intended to cover a pretty niche edge case) and integration test coverage, figured it'd be good enough for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1090935837 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java: ## @@ -255,13 +287,26 @@ private void addHerder(SourceAndTarget sourceAndTarget) { // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than // tracking the various shared admin objects in this class. -// Do not provide a restClient to the DistributedHerder to indicate that request forwarding is disabled Herder herder = new DistributedHerder(distributedConfig, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, -advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin); +advertisedUrl, restClient, CLIENT_CONFIG_OVERRIDE_POLICY, +restNamespace, sharedAdmin); herders.put(sourceAndTarget, herder); } +private static String encodePath(String rawPath) throws UnsupportedEncodingException { +return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name()) +// Java's out-of-the-box URL encoder encodes spaces (' ') as pluses ('+'), +// and pluses as '%2B' +// But Jetty doesn't decode pluses at all and leaves them as-are in decoded +// URLs +// So to get around that, we replace pluses in the encoded URL here with '%20', +// which is the encoding that Jetty expects for spaces +// Jetty will reverse this transformation when evaluating the path parameters +// and will return decoded strings with all special characters as they were. Review Comment: It's the result of fitting a square peg (Java's `URLEncoder` class, which, despite the name, is designed for HTML form encoding instead of URL path encoding) into a round hole (URL path encoding). I couldn't find a better alternative than this, and considering the fairly low risk and integration test coverage, figured it'd be good enough for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1084460160 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -228,12 +229,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable { * @param kafkaClusterId the identifier of the Kafka cluster to use for internal topics; may not be null * @param statusBackingStore the backing store for statuses; may not be null * @param configBackingStore the backing store for connector configurations; may not be null - * @param restUrlthe URL of this herder's REST API; may not be null + * @param restUrlthe URL of this herder's REST API; may not be null, but may be an arbitrary placeholder + * value if this worker does not expose a REST API + * @param restClient a REST client that can be used to issue requests to other workers in the cluster; may + * be null if inter-worker communication is not enabled * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden *in connector configurations; may not be null + * @param restNamespace zero or more path elements to prepend to the paths of forwarded REST requests; may be empty, but not null * @param uponShutdown any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped}, * after all services and resources owned by this herder are stopped */ +// TODO: Do we really need two separate public constructors? Review Comment: Blegh, this comment was left in from an earlier draft where there was an additional constructor. I refactored it out before opening the PR, but apparently forgot to remove the comment. Took it out now, thanks for catching! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org