[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-02-08 Thread via GitHub


C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1100587361


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest.resources;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import java.util.List;
+import java.util.Map;
+
+@Path("/{source}/{target}/connectors")
+public class InternalMirrorResource extends InternalClusterResource {
+
+@Context
+private UriInfo uriInfo;
+
+private static final Logger log = 
LoggerFactory.getLogger(InternalMirrorResource.class);
+
+private final Map herders;
+
+public InternalMirrorResource(Map herders, 
RestClient restClient) {
+super(restClient);
+this.herders = herders;
+}
+
+@Override
+protected Herder herderForRequest() {
+String source = pathParam("source");
+String target = pathParam("target");
+Herder result = herders.get(new SourceAndTarget(source, target));
+if (result == null) {
+log.debug("Failed to find herder for source '{}' and target '{}'", 
source, target);

Review Comment:
   I was under the impression that throwing this would only cause the exception 
message to be present in the REST response (and presumably the logs for the 
worker that issued the request), but after revisiting the 
`ConnectExceptionMapper` class, it appears that we'll also log the exception at 
debug level on this worker. So yep, we can remove this 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-02-08 Thread via GitHub


C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1100585212


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest.resources;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import java.util.List;
+import java.util.Map;
+
+@Path("/{source}/{target}/connectors")
+public class InternalMirrorResource extends InternalClusterResource {
+
+@Context
+private UriInfo uriInfo;
+
+private static final Logger log = 
LoggerFactory.getLogger(InternalMirrorResource.class);
+
+private final Map herders;
+
+public InternalMirrorResource(Map herders, 
RestClient restClient) {
+super(restClient);
+this.herders = herders;
+}
+
+@Override
+protected Herder herderForRequest() {
+String source = pathParam("source");
+String target = pathParam("target");
+Herder result = herders.get(new SourceAndTarget(source, target));
+if (result == null) {
+log.debug("Failed to find herder for source '{}' and target '{}'", 
source, target);
+throw new NotFoundException("No replication flow found for source 
'" + source + "' and target '" + target + "'");
+}
+return result;
+}
+
+private String pathParam(String name) {
+List result = uriInfo.getPathParameters().get(name);
+if (result == null || result.isEmpty())
+throw new NotFoundException();

Review Comment:
   Yeah, couldn't hurt. This is almost guaranteed to end up in a log file on 
another worker so in the unlikely event that this code path is somehow reached, 
more detail would be preferable to less.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-01-30 Thread via GitHub


C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1090943031


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.mirror.MirrorMaker;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+@Tag("integration")
+public class DedicatedMirrorIntegrationTest {
+
+private static final Logger log = 
LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class);
+
+private static final int TOPIC_CREATION_TIMEOUT_MS = 30_000;
+private static final int TOPIC_REPLICATION_TIMEOUT_MS = 30_000;
+
+private Map kafkaClusters;
+private Map mirrorMakers;
+
+@BeforeEach
+public void setup() {
+kafkaClusters = new HashMap<>();
+mirrorMakers = new HashMap<>();
+}
+
+@AfterEach
+public void teardown() throws Throwable {
+AtomicReference shutdownFailure = new AtomicReference<>();
+mirrorMakers.forEach((name, mirrorMaker) ->
+Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + 
name + "'", shutdownFailure)
+);
+kafkaClusters.forEach((name, kafkaCluster) ->
+Utils.closeQuietly(kafkaCluster::stop, "Embedded Kafka cluster '" 
+ name + "'", shutdownFailure)
+);
+if (shutdownFailure.get() != null) {
+throw shutdownFailure.get();
+}
+}
+
+private EmbeddedKafkaCluster startKafkaCluster(String name, int 
numBrokers, Properties brokerProperties) {
+if (kafkaClusters.containsKey(name))
+throw new IllegalStateException("Cannot register multiple Kafka 
clusters with the same name");
+
+EmbeddedKafkaCluster result = new EmbeddedKafkaCluster(numBrokers, 
brokerProperties);
+kafkaClusters.put(name, result);
+
+result.start();
+
+return result;
+}
+
+private MirrorMaker startMirrorMaker(String name, Map 
mmProps) {
+if (mirrorMakers.containsKey(name))
+throw new IllegalStateException("Cannot register multiple 
MirrorMaker nodes with the same name");
+
+MirrorMaker result = new MirrorMaker(mmProps);
+mirrorMakers.put(name, result);
+
+result.start();
+
+return result;
+}
+
+/**
+ * Test that a multi-node dedicated cluster is able to dynamically detect 
new topics at runtime

Review Comment:
   We don't have a guarantee that task configs are relayed from one worker to 
another, that's true. However, because we enable exactly-once support, we do 
get a guarantee that intra-cluster communication takes place, since no source 
tasks can start on a follower node without first issuing a REST request to the 
cluster's leader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-01-30 Thread via GitHub


C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1090936585


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##
@@ -119,7 +126,16 @@ public class MirrorMaker {
 public MirrorMaker(MirrorMakerConfig config, List clusters, Time 
time) {
 log.debug("Kafka MirrorMaker instance created");
 this.time = time;
-this.advertisedBaseUrl = "NOTUSED";
+if (config.enableInternalRest()) {
+this.restClient = new RestClient(config);
+internalServer = new MirrorRestServer(config.originals(), 
restClient);
+internalServer.initializeServer();
+this.advertisedUrl = internalServer.advertisedUrl().toString();
+} else {
+internalServer = null;
+restClient = null;
+this.advertisedUrl = "NOTUSED";

Review Comment:
   It ends up being used in the metadata workers send to the group coordinator 
during rebalances, and the schema we use to (de)serialize that data requires a 
non-null string for the worker URL. I stuck with the `"NOTUSED"` placeholder 
instead of an empty string since that's what we're using currently, and it 
doesn't seem worth the risk to try to change it now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-01-30 Thread via GitHub


C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1090935837


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##
@@ -255,13 +287,26 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
 // Pass the shared admin to the distributed herder as an additional 
AutoCloseable object that should be closed when the
 // herder is stopped. MirrorMaker has multiple herders, and having the 
herder own the close responsibility is much easier than
 // tracking the various shared admin objects in this class.
-// Do not provide a restClient to the DistributedHerder to indicate 
that request forwarding is disabled
 Herder herder = new DistributedHerder(distributedConfig, time, worker,
 kafkaClusterId, statusBackingStore, configBackingStore,
-advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, 
sharedAdmin);
+advertisedUrl, restClient, CLIENT_CONFIG_OVERRIDE_POLICY,
+restNamespace, sharedAdmin);
 herders.put(sourceAndTarget, herder);
 }
 
+private static String encodePath(String rawPath) throws 
UnsupportedEncodingException {
+return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name())
+// Java's out-of-the-box URL encoder encodes spaces (' ') as 
pluses ('+'),
+// and pluses as '%2B'
+// But Jetty doesn't decode pluses at all and leaves them 
as-are in decoded
+// URLs
+// So to get around that, we replace pluses in the encoded URL 
here with '%20',
+// which is the encoding that Jetty expects for spaces
+// Jetty will reverse this transformation when evaluating the 
path parameters
+// and will return decoded strings with all special characters 
as they were.

Review Comment:
   It's the result of fitting a square peg (Java's `URLEncoder` class, which, 
despite the name, is designed for HTML form encoding instead of URL path 
encoding) into a round hole (URL path encoding). I couldn't find a better 
alternative than this, and considering the fairly low risk (this is all 
intended to cover a pretty niche edge case) and integration test coverage, 
figured it'd be good enough for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-01-30 Thread via GitHub


C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1090935837


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##
@@ -255,13 +287,26 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
 // Pass the shared admin to the distributed herder as an additional 
AutoCloseable object that should be closed when the
 // herder is stopped. MirrorMaker has multiple herders, and having the 
herder own the close responsibility is much easier than
 // tracking the various shared admin objects in this class.
-// Do not provide a restClient to the DistributedHerder to indicate 
that request forwarding is disabled
 Herder herder = new DistributedHerder(distributedConfig, time, worker,
 kafkaClusterId, statusBackingStore, configBackingStore,
-advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, 
sharedAdmin);
+advertisedUrl, restClient, CLIENT_CONFIG_OVERRIDE_POLICY,
+restNamespace, sharedAdmin);
 herders.put(sourceAndTarget, herder);
 }
 
+private static String encodePath(String rawPath) throws 
UnsupportedEncodingException {
+return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name())
+// Java's out-of-the-box URL encoder encodes spaces (' ') as 
pluses ('+'),
+// and pluses as '%2B'
+// But Jetty doesn't decode pluses at all and leaves them 
as-are in decoded
+// URLs
+// So to get around that, we replace pluses in the encoded URL 
here with '%20',
+// which is the encoding that Jetty expects for spaces
+// Jetty will reverse this transformation when evaluating the 
path parameters
+// and will return decoded strings with all special characters 
as they were.

Review Comment:
   It's the result of fitting a square peg (Java's `URLEncoder` class, which, 
despite the name, is designed for HTML form encoding instead of URL path 
encoding) into a round hole (URL path encoding). I couldn't find a better 
alternative than this, and considering the fairly low risk and integration test 
coverage, figured it'd be good enough for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-01-23 Thread via GitHub


C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1084460160


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -228,12 +229,17 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
  * @param kafkaClusterId the identifier of the Kafka cluster to use 
for internal topics; may not be null
  * @param statusBackingStore the backing store for statuses; may not be 
null
  * @param configBackingStore the backing store for connector 
configurations; may not be null
- * @param restUrlthe URL of this herder's REST API; may not be 
null
+ * @param restUrlthe URL of this herder's REST API; may not be 
null, but may be an arbitrary placeholder
+ *   value if this worker does not expose a REST 
API
+ * @param restClient a REST client that can be used to issue 
requests to other workers in the cluster; may
+ *   be null if inter-worker communication is not 
enabled
  * @param connectorClientConfigOverridePolicy the policy specifying the 
client configuration properties that may be overridden
  *in connector configurations; 
may not be null
+ * @param restNamespace  zero or more path elements to prepend to the 
paths of forwarded REST requests; may be empty, but not null
  * @param uponShutdown   any {@link AutoCloseable} objects that should 
be closed when this herder is {@link #stop() stopped},
  *   after all services and resources owned by 
this herder are stopped
  */
+// TODO: Do we really need two separate public constructors?

Review Comment:
   Blegh, this comment was left in from an earlier draft where there was an 
additional constructor. I refactored it out before opening the PR, but 
apparently forgot to remove the comment. Took it out now, thanks for catching!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org