This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0772144e510 MINOR: Add javadoc for Connect public packages/classes 
(#16404)
0772144e510 is described below

commit 0772144e510b490fbfbd7fa96e926bdba95c8b00
Author: Mickael Maison <mimai...@users.noreply.github.com>
AuthorDate: Fri Jun 21 10:23:35 2024 +0200

    MINOR: Add javadoc for Connect public packages/classes (#16404)
    
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../apache/kafka/connect/mirror/Checkpoint.java    |  4 +-
 .../connect/mirror/DefaultReplicationPolicy.java   |  7 ++-
 .../org/apache/kafka/connect/mirror/Heartbeat.java |  4 +-
 .../connect/mirror/IdentityReplicationPolicy.java  | 47 +++++++++--------
 .../apache/kafka/connect/mirror/MirrorClient.java  | 61 +++++++++++++---------
 .../kafka/connect/mirror/MirrorClientConfig.java   | 50 ++++++++----------
 .../kafka/connect/mirror/RemoteClusterUtils.java   | 58 ++++++++++++--------
 .../kafka/connect/mirror/ReplicationPolicy.java    | 59 ++++++++++++++-------
 .../kafka/connect/mirror/SourceAndTarget.java      |  4 +-
 .../{SourceAndTarget.java => package-info.java}    | 39 ++------------
 .../apache/kafka/connect/tools/MockSinkTask.java   |  3 ++
 .../apache/kafka/connect/tools/MockSourceTask.java |  3 ++
 .../kafka/connect/tools/SchemaSourceTask.java      |  3 ++
 .../connect/tools/VerifiableSinkConnector.java     |  1 +
 .../connect/tools/VerifiableSourceConnector.java   |  1 +
 .../apache/kafka/connect/tools/package-info.java}  | 39 ++------------
 16 files changed, 192 insertions(+), 191 deletions(-)

diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java
index 603f09df84c..3e0a2ee6177 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java
@@ -29,7 +29,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-/** Checkpoint records emitted from MirrorCheckpointConnector. Encodes remote 
consumer group state. */
+/**
+ * Checkpoint records emitted by MirrorCheckpointConnector.
+ */
 public class Checkpoint {
     public static final String TOPIC_KEY = "topic";
     public static final String PARTITION_KEY = "partition";
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
index fa2c5a75b24..7733ccf3fd5 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
@@ -24,7 +24,12 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.regex.Pattern;
 
-/** Defines remote topics like "us-west.topic1". The separator is customizable 
and defaults to a period. */
+/**
+ * Default implementation of {@link ReplicationPolicy} which prepends the 
source cluster alias to
+ * remote topic names.
+ * For example, if the source cluster alias is "us-west", topics created in 
the target cluster will be named
+ * us-west.&lt;TOPIC&gt;. The separator is customizable by setting {@link 
#SEPARATOR_CONFIG} and defaults to a period.
+ */
 public class DefaultReplicationPolicy implements ReplicationPolicy, 
Configurable {
     
     private static final Logger log = 
LoggerFactory.getLogger(DefaultReplicationPolicy.class);
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java
index ab88e60439a..d63dfa70ff8 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java
@@ -26,7 +26,9 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
-/** Heartbeat message sent from MirrorHeartbeatTask to target cluster. 
Heartbeats are always replicated. */
+/**
+ * Heartbeat records emitted by MirrorHeartbeatConnector.
+ */
 public class Heartbeat {
     public static final String SOURCE_CLUSTER_ALIAS_KEY = "sourceClusterAlias";
     public static final String TARGET_CLUSTER_ALIAS_KEY = "targetClusterAlias";
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
index 16a3dfa11ff..1206becd5ee 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
@@ -21,13 +21,13 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
-/** IdentityReplicationPolicy does not rename remote topics. This is useful 
for migrating
-  * from legacy MM1, or for any use-case involving one-way replication.
-  * <p>
-  * N.B. MirrorMaker is not able to prevent cycles when using this class, so 
take care that
-  * your replication topology is acyclic. If migrating from MirrorMaker v1, 
this will likely
-  * already be the case.
-  */
+/**
+ * Alternative implementation of {@link ReplicationPolicy} that does not 
rename remote topics.
+ * This is useful for migrating from legacy MirrorMaker, or for any use-case 
involving one-way replication.
+ * <p>
+ * N.B. MirrorMaker is not able to prevent cycles when using this replication 
policy, so take care that
+ * your replication topology is acyclic. If migrating from legacy MirrorMaker, 
this will likely already be the case.
+ */
 public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
     private static final Logger log = 
LoggerFactory.getLogger(IdentityReplicationPolicy.class);
 
@@ -44,11 +44,12 @@ public class IdentityReplicationPolicy extends 
DefaultReplicationPolicy {
         }
     }
 
-    /** Unlike DefaultReplicationPolicy, IdentityReplicationPolicy does not 
include the source
-      * cluster alias in the remote topic name. Instead, topic names are 
unchanged.
-      * <p>
-      * In the special case of heartbeats, we defer to 
DefaultReplicationPolicy.
-      */
+    /**
+     * Unlike {@link DefaultReplicationPolicy}, IdentityReplicationPolicy does 
not include the source
+     * cluster alias in the remote topic name. Instead, topic names are 
unchanged.
+     * <p>
+     * In the special case of heartbeats, we defer to {@link 
DefaultReplicationPolicy#formatRemoteTopic(String, String)}.
+     */
     @Override
     public String formatRemoteTopic(String sourceClusterAlias, String topic) {
         if (looksLikeHeartbeat(topic)) {
@@ -58,12 +59,13 @@ public class IdentityReplicationPolicy extends 
DefaultReplicationPolicy {
         }
     }
 
-    /** Unlike DefaultReplicationPolicy, IdentityReplicationPolicy cannot know 
the source of
-      * a remote topic based on its name alone. If `source.cluster.alias` is 
provided,
-      * `topicSource` will return that.
-      * <p>
-      * In the special case of heartbeats, we defer to 
DefaultReplicationPolicy.
-      */
+    /**
+     * Unlike {@link DefaultReplicationPolicy}, IdentityReplicationPolicy 
cannot know the source of
+     * a remote topic based on its name alone. If 
<code>source.cluster.alias</code> is provided,
+     * this method will return that.
+     * <p>
+     * In the special case of heartbeats, we defer to {@link 
DefaultReplicationPolicy#topicSource(String)}.
+     */
     @Override
     public String topicSource(String topic) {
         if (looksLikeHeartbeat(topic)) {
@@ -73,10 +75,11 @@ public class IdentityReplicationPolicy extends 
DefaultReplicationPolicy {
         }
     }
 
-    /** Since any topic may be a "remote topic", this just returns `topic`.
-      * <p>
-      * In the special case of heartbeats, we defer to 
DefaultReplicationPolicy.
-      */
+    /**
+     * Since any topic may be a remote topic, this just returns `topic`.
+     * <p>
+     * In the special case of heartbeats, we defer to {@link 
DefaultReplicationPolicy#upstreamTopic(String)}.
+     */
     @Override
     public String upstreamTopic(String topic) {
         if (looksLikeHeartbeat(topic)) {
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
index a071521aa88..0b74b64ebbb 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
@@ -42,16 +42,9 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
-/** Interprets MM2's internal topics (checkpoints, heartbeats) on a given 
cluster.
- *  <p> 
- *  Given a top-level "mm2.properties" configuration file, MirrorClients can 
be constructed
- *  for individual clusters as follows:
- *  </p> 
- *  <pre>
- *    MirrorMakerConfig mmConfig = new MirrorMakerConfig(props);
- *    MirrorClientConfig mmClientConfig = 
mmConfig.clientConfig("some-cluster");
- *    MirrorClient mmClient = new Mirrorclient(mmClientConfig);
- *  </pre>
+/**
+ * Client to interact with MirrorMaker internal topics (checkpoints, 
heartbeats) on a given cluster.
+ * Whenever possible use the methods from {@link RemoteClusterUtils} instead 
of directly using MirrorClient.
  */
 public class MirrorClient implements AutoCloseable {
     private static final Logger log = 
LoggerFactory.getLogger(MirrorClient.class);
@@ -78,20 +71,25 @@ public class MirrorClient implements AutoCloseable {
         this.consumerConfig = consumerConfig;
     }
 
-    /** Close internal clients. */
+    /**
+     * Closes internal clients.
+     */
     public void close() {
         adminClient.close();
     }
 
-    /** Get the ReplicationPolicy instance used to interpret remote topics. 
This instance is constructed based on
-     *  relevant configuration properties, including {@code 
replication.policy.class}. */
+    /**
+     * Gets the {@link ReplicationPolicy} instance used to interpret remote 
topics. This instance is constructed based on
+     * relevant configuration properties, including {@code 
replication.policy.class}.
+     */
     public ReplicationPolicy replicationPolicy() {
         return replicationPolicy;
     }
 
-    /** Compute shortest number of hops from an upstream source cluster.
-     *  For example, given replication flow A-&gt;B-&gt;C, there are two hops 
from A to C.
-     *  Returns -1 if upstream cluster is unreachable.
+    /**
+     * Computes the shortest number of hops from an upstream source cluster.
+     * For example, given replication flow A-&gt;B-&gt;C, there are two hops 
from A to C.
+     * Returns -1 if the upstream cluster is unreachable.
      */
     public int replicationHops(String upstreamClusterAlias) throws 
InterruptedException {
         return heartbeatTopics().stream()
@@ -102,21 +100,27 @@ public class MirrorClient implements AutoCloseable {
             .orElse(-1);
     }
 
-    /** Find all heartbeat topics on this cluster. Heartbeat topics are 
replicated from other clusters. */
+    /**
+     * Finds all heartbeats topics on this cluster. Heartbeats topics are 
replicated from other clusters.
+     */
     public Set<String> heartbeatTopics() throws InterruptedException {
         return listTopics().stream()
             .filter(this::isHeartbeatTopic)
             .collect(Collectors.toSet());
     }
 
-    /** Find all checkpoint topics on this cluster. */
+    /**
+     * Finds all checkpoints topics on this cluster.
+     */
     public Set<String> checkpointTopics() throws InterruptedException {
         return listTopics().stream()
             .filter(this::isCheckpointTopic)
             .collect(Collectors.toSet());
     }
 
-    /** Find upstream clusters, which may be multiple hops away, based on 
incoming heartbeats. */
+    /**
+     * Finds upstream clusters, which may be multiple hops away, based on 
incoming heartbeats.
+     */
     public Set<String> upstreamClusters() throws InterruptedException {
         return listTopics().stream()
             .filter(this::isHeartbeatTopic)
@@ -124,14 +128,18 @@ public class MirrorClient implements AutoCloseable {
             .collect(Collectors.toSet());
     }
 
-    /** Find all remote topics on this cluster. This does not include internal 
topics (heartbeats, checkpoints). */
+    /**
+     * Finds all remote topics on this cluster. This does not include internal 
topics (heartbeats, checkpoints).
+     */
     public Set<String> remoteTopics() throws InterruptedException {
         return listTopics().stream()
             .filter(this::isRemoteTopic)
             .collect(Collectors.toSet());
     }
 
-    /** Find all remote topics that have been replicated directly from the 
given source cluster. */
+    /**
+     * Finds all remote topics that have been replicated directly from the 
given source cluster.
+     */
     public Set<String> remoteTopics(String source) throws InterruptedException 
{
         return listTopics().stream()
             .filter(this::isRemoteTopic)
@@ -139,11 +147,12 @@ public class MirrorClient implements AutoCloseable {
             .collect(Collectors.toSet());
     }
 
-    /** Translate a remote consumer group's offsets into corresponding local 
offsets. Topics are automatically
-     *  renamed according to the ReplicationPolicy.
-     *  @param consumerGroupId group ID of remote consumer group
-     *  @param remoteClusterAlias alias of remote cluster
-     *  @param timeout timeout
+    /**
+     * Translates a remote consumer group's offsets into corresponding local 
offsets. Topics are automatically
+     * renamed according to the ReplicationPolicy.
+     * @param consumerGroupId The group ID of remote consumer group
+     * @param remoteClusterAlias The alias of remote cluster
+     * @param timeout The maximum time to block when consuming from the 
checkpoints topic
      */
     public Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets(String 
consumerGroupId,
             String remoteClusterAlias, Duration timeout) {
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
index b21b7f72463..53a4f9f5f05 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
@@ -17,7 +17,9 @@
 package org.apache.kafka.connect.mirror;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.ForwardingAdmin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
@@ -31,24 +33,19 @@ import java.util.Map;
 
 import static 
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
 
-/** Configuration required for MirrorClient to talk to a given target cluster.
- *  <p>
- *  Generally, these properties come from an mm2.properties configuration file
- *  (@see MirrorMakerConfig.clientConfig):
- *  </p>
- *  <pre>
- *    MirrorMakerConfig mmConfig = new MirrorMakerConfig(props);
- *    MirrorClientConfig mmClientConfig = 
mmConfig.clientConfig("some-cluster");
- *  </pre>
- *  <p>
- *  In addition to the properties defined here, sub-configs are supported for 
Admin, Consumer, and Producer clients.
- *  For example:
- *  </p>
- *  <pre>
+/**
+ * Configuration required for {@link MirrorClient} to talk to a given target 
cluster.
+ * <p>
+ *   This needs to contain at least the connection details for the target 
cluster (<code>bootstrap.servers</code> and
+ *   any required TLS/SASL configuration), as well as {@link 
#REPLICATION_POLICY_CLASS} when not using the default
+ *   replication policy. It can also include {@link AdminClientConfig} and 
{@link ConsumerConfig} to customize the
+ *   internal clients this uses. For example:
+ *   <pre>
  *      bootstrap.servers = host1:9092
  *      consumer.client.id = mm2-client
  *      replication.policy.separator = __
- *  </pre>
+ *   </pre>
+ * </p>
  */
 public class MirrorClientConfig extends AbstractConfig {
     public static final String REPLICATION_POLICY_CLASS = 
"replication.policy.class";
@@ -110,8 +107,7 @@ public class MirrorClientConfig extends AbstractConfig {
     }
     
     private Map<String, Object> clientConfig(String prefix) {
-        Map<String, Object> props = new HashMap<>();
-        props.putAll(valuesWithPrefixOverride(prefix));
+        Map<String, Object> props = new 
HashMap<>(valuesWithPrefixOverride(prefix));
         props.keySet().retainAll(CLIENT_CONFIG_DEF.names());
         props.entrySet().removeIf(x -> x.getValue() == null);
         return props;
@@ -159,17 +155,17 @@ public class MirrorClientConfig extends AbstractConfig {
             ConfigDef.Importance.LOW,
             INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC)
         .define(
-                FORWARDING_ADMIN_CLASS,
-                ConfigDef.Type.CLASS,
-                FORWARDING_ADMIN_CLASS_DEFAULT,
-                ConfigDef.Importance.LOW,
-                FORWARDING_ADMIN_CLASS_DOC)
+            FORWARDING_ADMIN_CLASS,
+            ConfigDef.Type.CLASS,
+            FORWARDING_ADMIN_CLASS_DEFAULT,
+            ConfigDef.Importance.LOW,
+            FORWARDING_ADMIN_CLASS_DOC)
         .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
-                Type.STRING,
-                CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
-                in(Utils.enumOptions(SecurityProtocol.class)),
-                Importance.MEDIUM,
-                CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+            Type.STRING,
+            CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+            in(Utils.enumOptions(SecurityProtocol.class)),
+            Importance.MEDIUM,
+            CommonClientConfigs.SECURITY_PROTOCOL_DOC)
         .withClientSslSupport()
         .withClientSaslSupport();
 }
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
index f67a81e3559..bd1bed1a547 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
@@ -26,26 +28,29 @@ import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
 
-/** Convenience methods for multi-cluster environments. Wraps {@link 
MirrorClient}
- *  <p>
- *  Properties passed to these methods are used to construct internal Admin 
and Consumer clients.
- *  Sub-configs like "admin.xyz" are also supported. For example:
- *  </p>
- *  <pre>
- *      bootstrap.servers = host1:9092
- *      consumer.client.id = mm2-client
- *  </pre>
- *  <p>
- *  @see MirrorClientConfig for additional properties used by the internal 
MirrorClient.
- *  </p>
+/**
+ * Convenience tool for multi-cluster environments. Wraps {@link MirrorClient}
+ * <p>
+ * Properties passed to these methods are used to construct internal {@link 
Admin} and {@link Consumer} clients.
+ * Sub-configs like "admin.xyz" are also supported. For example:
+ * </p>
+ * <pre>
+ *     bootstrap.servers = host1:9092
+ *     consumer.client.id = mm2-client
+ * </pre>
+ * <p>
+ * @see MirrorClientConfig for additional properties used by the internal 
MirrorClient.
+ * </p>
  */
 public final class RemoteClusterUtils {
 
     // utility class
     private RemoteClusterUtils() {}
 
-    /** Find shortest number of hops from an upstream cluster.
-     *  Returns -1 if the cluster is unreachable */ 
+    /**
+     * Finds the shortest number of hops from an upstream cluster.
+     * Returns -1 if the cluster is unreachable.
+     */
     public static int replicationHops(Map<String, Object> properties, String 
upstreamClusterAlias)
             throws InterruptedException, TimeoutException {
         try (MirrorClient client = new MirrorClient(properties)) {
@@ -53,7 +58,9 @@ public final class RemoteClusterUtils {
         }
     }
 
-    /** Find all heartbeat topics */
+    /**
+     * Finds all heartbeats topics
+     */
     public static Set<String> heartbeatTopics(Map<String, Object> properties)
             throws InterruptedException, TimeoutException {
         try (MirrorClient client = new MirrorClient(properties)) {
@@ -61,7 +68,9 @@ public final class RemoteClusterUtils {
         }
     }
 
-    /** Find all checkpoint topics */
+    /**
+     * Finds all checkpoints topics
+     */
     public static Set<String> checkpointTopics(Map<String, Object> properties)
             throws InterruptedException, TimeoutException {
         try (MirrorClient client = new MirrorClient(properties)) {
@@ -69,7 +78,9 @@ public final class RemoteClusterUtils {
         }
     }
 
-    /** Find all upstream clusters */
+    /**
+     * Finds all upstream clusters
+     */
     public static Set<String> upstreamClusters(Map<String, Object> properties)
             throws InterruptedException, TimeoutException {
         try (MirrorClient client = new MirrorClient(properties)) {
@@ -77,12 +88,13 @@ public final class RemoteClusterUtils {
         }
     }
 
-    /** Translate a remote consumer group's offsets into corresponding local 
offsets. Topics are automatically
-     *  renamed according to the ReplicationPolicy.
-     *  @param properties {@link MirrorClientConfig} properties to instantiate 
a {@link MirrorClient}
-     *  @param consumerGroupId group ID of remote consumer group
-     *  @param remoteClusterAlias alias of remote cluster
-     *  @param timeout timeout
+    /**
+     * Translates a remote consumer group's offsets into corresponding local 
offsets. Topics are automatically
+     *  renamed according to the configured {@link ReplicationPolicy}.
+     *  @param properties Map of properties to instantiate a {@link 
MirrorClient}
+     *  @param remoteClusterAlias The alias of the remote cluster
+     *  @param consumerGroupId The group ID of remote consumer group
+     *  @param timeout The maximum time to block when consuming from the 
checkpoints topic
      */
     public static Map<TopicPartition, OffsetAndMetadata> 
translateOffsets(Map<String, Object> properties,
             String remoteClusterAlias, String consumerGroupId, Duration 
timeout)
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
index ea65aa9705e..fbd8725eb64 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
@@ -19,29 +19,35 @@ package org.apache.kafka.connect.mirror;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-/** Defines which topics are "remote topics". e.g. "us-west.topic1". */
+/**
+ * An interface used by the MirrorMaker connectors to manage topics names 
between source and target clusters.
+ */
 @InterfaceStability.Evolving
 public interface ReplicationPolicy {
 
-    /** How to rename remote topics; generally should be like us-west.topic1. 
*/
+    /**
+     * Returns the remote topic name for the given topic and source cluster 
alias.
+     */
     String formatRemoteTopic(String sourceClusterAlias, String topic);
 
-    /** Source cluster alias of given remote topic, e.g. "us-west" for 
"us-west.topic1".
-     *  Returns null if not a remote topic.
+    /**
+     * Returns the source cluster alias of given topic.
+     * Returns null if the given topic is not a remote topic.
      */
     String topicSource(String topic);
 
-    /** Name of topic on the source cluster, e.g. "topic1" for 
"us-west.topic1".
-     *  <p>
-     *  Topics may be replicated multiple hops, so the immediately upstream 
topic
-     *  may itself be a remote topic.
-     *  <p>
-     *  Returns null if not a remote topic.
+    /**
+     * Return the name of the given topic on the source cluster.
+     * <p>
+     * Topics may be replicated multiple hops, so the immediately upstream 
topic may itself be a remote topic.
+     * <p>
+     * Returns null if the given topic is not a remote topic.
      */
     String upstreamTopic(String topic); 
 
-    /** The name of the original source-topic, which may have been replicated 
multiple hops.
-     *  Returns the topic if it is not a remote topic.
+    /**
+     * Returns the name of the original topic, which may have been replicated 
multiple hops.
+     * Returns the topic if it is not a remote topic.
      */
     default String originalTopic(String topic) {
         String upstream = upstreamTopic(topic);
@@ -52,37 +58,52 @@ public interface ReplicationPolicy {
         }
     }
 
-    /** Returns heartbeats topic name.*/
+    /**
+     * Returns the name of heartbeats topic.
+     */
     default String heartbeatsTopic() {
         return "heartbeats";
     }
 
-    /** Returns the offset-syncs topic for given cluster alias. */
+    /**
+     * Returns the name of the offset-syncs topic for given cluster alias.
+     */
     default String offsetSyncsTopic(String clusterAlias) {
         return "mm2-offset-syncs." + clusterAlias + ".internal";
     }
 
-    /** Returns the name checkpoint topic for given cluster alias. */
+    /**
+     * Returns the name of the checkpoints topic for given cluster alias.
+     */
     default String checkpointsTopic(String clusterAlias) {
         return clusterAlias + ".checkpoints.internal";
     }
 
-    /** check if topic is a heartbeat topic, e.g heartbeats, 
us-west.heartbeats. */
+    /**
+     * Returns true if the topic is a heartbeats topic
+     */
     default boolean isHeartbeatsTopic(String topic) {
         return heartbeatsTopic().equals(originalTopic(topic));
     }
 
-    /** check if topic is a checkpoint topic. */
+    /**
+     * Returns true if the topic is a checkpoints topic.
+     */
     default boolean isCheckpointsTopic(String topic) {
         return  topic.endsWith(".checkpoints.internal");
     }
 
-    /** Check topic is one of MM2 internal topic, this is used to make sure 
the topic doesn't need to be replicated.*/
+    /**
+     * Returns true if the topic is one of MirrorMaker internal topics.
+     * This is used to make sure the topic doesn't need to be replicated.
+     */
     default boolean isMM2InternalTopic(String topic) {
         return  topic.endsWith(".internal");
     }
 
-    /** Internal topics are never replicated. */
+    /**
+     * Returns true if the topic is considered an internal topic.
+     */
     default boolean isInternalTopic(String topic) {
         boolean isKafkaInternalTopic = topic.startsWith("__") || 
topic.startsWith(".");
         boolean isDefaultConnectTopic =  topic.endsWith("-internal") ||  
topic.endsWith(".internal");
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java
index f9793aceed9..7bafd52244e 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.connect.mirror;
 
-/** Directional pair of clusters, where source is replicated to target. */
+/**
+ * Directional pair of clusters, where source is mirrored to target.
+ */
 public class SourceAndTarget {
     private final String source;
     private final String target;
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/package-info.java
similarity index 50%
copy from 
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java
copy to 
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/package-info.java
index f9793aceed9..48ed522ae34 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/package-info.java
@@ -14,38 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.connect.mirror;
-
-/** Directional pair of clusters, where source is replicated to target. */
-public class SourceAndTarget {
-    private final String source;
-    private final String target;
-
-    public SourceAndTarget(String source, String target) {
-        this.source = source;
-        this.target = target;
-    }
-
-    public String source() {
-        return source;
-    }
-
-    public String target() {
-        return target;
-    }
-
-    @Override
-    public String toString() {
-        return source + "->" + target;
-    }
-
-    @Override
-    public int hashCode() {
-        return toString().hashCode();
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        return other != null && toString().equals(other.toString());
-    }
-}
+/**
+ * Provides APIs for the MirrorMaker connectors and utilities to manage 
MirrorMaker resources.
+ */
+package org.apache.kafka.connect.mirror;
\ No newline at end of file
diff --git 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
index 392c58fb01f..1c97ced61ef 100644
--- 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
+++ 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
@@ -26,6 +26,9 @@ import org.slf4j.LoggerFactory;
 import java.util.Collection;
 import java.util.Map;
 
+/**
+ * Task implementation for {@link MockSinkConnector}.
+ */
 public class MockSinkTask extends SinkTask {
     private static final Logger log = 
LoggerFactory.getLogger(MockSinkTask.class);
 
diff --git 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
index c09fa6a5319..f69c58b99ab 100644
--- 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
+++ 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
@@ -27,6 +27,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Task implementation for {@link MockSourceConnector}.
+ */
 public class MockSourceTask extends SourceTask {
     private static final Logger log = 
LoggerFactory.getLogger(MockSourceTask.class);
 
diff --git 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
index 7895dbbef4f..c40e0932e53 100644
--- 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
+++ 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
@@ -31,6 +31,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Task implementation for {@link SchemaSourceConnector}.
+ */
 public class SchemaSourceTask extends SourceTask {
 
     private static final Logger log = 
LoggerFactory.getLogger(SchemaSourceTask.class);
diff --git 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
index fbe29e13ed4..b198dafc535 100644
--- 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
+++ 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
+ * A connector primarily intended for system tests.
  * @see VerifiableSinkTask
  */
 public class VerifiableSinkConnector extends SinkConnector {
diff --git 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
index 6262cc3b0bb..e23992dfe1b 100644
--- 
a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
+++ 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
+ * A connector primarily intended for system tests.
  * @see VerifiableSourceTask
  */
 public class VerifiableSourceConnector extends SourceConnector {
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/package-info.java
similarity index 50%
copy from 
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java
copy to 
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/package-info.java
index f9793aceed9..7a5ef74c51a 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java
+++ 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/package-info.java
@@ -14,38 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.connect.mirror;
-
-/** Directional pair of clusters, where source is replicated to target. */
-public class SourceAndTarget {
-    private final String source;
-    private final String target;
-
-    public SourceAndTarget(String source, String target) {
-        this.source = source;
-        this.target = target;
-    }
-
-    public String source() {
-        return source;
-    }
-
-    public String target() {
-        return target;
-    }
-
-    @Override
-    public String toString() {
-        return source + "->" + target;
-    }
-
-    @Override
-    public int hashCode() {
-        return toString().hashCode();
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        return other != null && toString().equals(other.toString());
-    }
-}
+/**
+ * Provides source and sink connector implementations used for testing
+ */
+package org.apache.kafka.connect.tools;
\ No newline at end of file


Reply via email to