This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 0772144e510 MINOR: Add javadoc for Connect public packages/classes (#16404) 0772144e510 is described below commit 0772144e510b490fbfbd7fa96e926bdba95c8b00 Author: Mickael Maison <mimai...@users.noreply.github.com> AuthorDate: Fri Jun 21 10:23:35 2024 +0200 MINOR: Add javadoc for Connect public packages/classes (#16404) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../apache/kafka/connect/mirror/Checkpoint.java | 4 +- .../connect/mirror/DefaultReplicationPolicy.java | 7 ++- .../org/apache/kafka/connect/mirror/Heartbeat.java | 4 +- .../connect/mirror/IdentityReplicationPolicy.java | 47 +++++++++-------- .../apache/kafka/connect/mirror/MirrorClient.java | 61 +++++++++++++--------- .../kafka/connect/mirror/MirrorClientConfig.java | 50 ++++++++---------- .../kafka/connect/mirror/RemoteClusterUtils.java | 58 ++++++++++++-------- .../kafka/connect/mirror/ReplicationPolicy.java | 59 ++++++++++++++------- .../kafka/connect/mirror/SourceAndTarget.java | 4 +- .../{SourceAndTarget.java => package-info.java} | 39 ++------------ .../apache/kafka/connect/tools/MockSinkTask.java | 3 ++ .../apache/kafka/connect/tools/MockSourceTask.java | 3 ++ .../kafka/connect/tools/SchemaSourceTask.java | 3 ++ .../connect/tools/VerifiableSinkConnector.java | 1 + .../connect/tools/VerifiableSourceConnector.java | 1 + .../apache/kafka/connect/tools/package-info.java} | 39 ++------------ 16 files changed, 192 insertions(+), 191 deletions(-) diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java index 603f09df84c..3e0a2ee6177 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java @@ -29,7 +29,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; -/** Checkpoint records emitted from MirrorCheckpointConnector. Encodes remote consumer group state. */ +/** + * Checkpoint records emitted by MirrorCheckpointConnector. + */ public class Checkpoint { public static final String TOPIC_KEY = "topic"; public static final String PARTITION_KEY = "partition"; diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java index fa2c5a75b24..7733ccf3fd5 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java @@ -24,7 +24,12 @@ import org.slf4j.LoggerFactory; import java.util.Map; import java.util.regex.Pattern; -/** Defines remote topics like "us-west.topic1". The separator is customizable and defaults to a period. */ +/** + * Default implementation of {@link ReplicationPolicy} which prepends the source cluster alias to + * remote topic names. + * For example, if the source cluster alias is "us-west", topics created in the target cluster will be named + * us-west.<TOPIC>. The separator is customizable by setting {@link #SEPARATOR_CONFIG} and defaults to a period. + */ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable { private static final Logger log = LoggerFactory.getLogger(DefaultReplicationPolicy.class); diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java index ab88e60439a..d63dfa70ff8 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java @@ -26,7 +26,9 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; -/** Heartbeat message sent from MirrorHeartbeatTask to target cluster. Heartbeats are always replicated. */ +/** + * Heartbeat records emitted by MirrorHeartbeatConnector. + */ public class Heartbeat { public static final String SOURCE_CLUSTER_ALIAS_KEY = "sourceClusterAlias"; public static final String TARGET_CLUSTER_ALIAS_KEY = "targetClusterAlias"; diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java index 16a3dfa11ff..1206becd5ee 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java @@ -21,13 +21,13 @@ import org.slf4j.LoggerFactory; import java.util.Map; -/** IdentityReplicationPolicy does not rename remote topics. This is useful for migrating - * from legacy MM1, or for any use-case involving one-way replication. - * <p> - * N.B. MirrorMaker is not able to prevent cycles when using this class, so take care that - * your replication topology is acyclic. If migrating from MirrorMaker v1, this will likely - * already be the case. - */ +/** + * Alternative implementation of {@link ReplicationPolicy} that does not rename remote topics. + * This is useful for migrating from legacy MirrorMaker, or for any use-case involving one-way replication. + * <p> + * N.B. MirrorMaker is not able to prevent cycles when using this replication policy, so take care that + * your replication topology is acyclic. If migrating from legacy MirrorMaker, this will likely already be the case. + */ public class IdentityReplicationPolicy extends DefaultReplicationPolicy { private static final Logger log = LoggerFactory.getLogger(IdentityReplicationPolicy.class); @@ -44,11 +44,12 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy { } } - /** Unlike DefaultReplicationPolicy, IdentityReplicationPolicy does not include the source - * cluster alias in the remote topic name. Instead, topic names are unchanged. - * <p> - * In the special case of heartbeats, we defer to DefaultReplicationPolicy. - */ + /** + * Unlike {@link DefaultReplicationPolicy}, IdentityReplicationPolicy does not include the source + * cluster alias in the remote topic name. Instead, topic names are unchanged. + * <p> + * In the special case of heartbeats, we defer to {@link DefaultReplicationPolicy#formatRemoteTopic(String, String)}. + */ @Override public String formatRemoteTopic(String sourceClusterAlias, String topic) { if (looksLikeHeartbeat(topic)) { @@ -58,12 +59,13 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy { } } - /** Unlike DefaultReplicationPolicy, IdentityReplicationPolicy cannot know the source of - * a remote topic based on its name alone. If `source.cluster.alias` is provided, - * `topicSource` will return that. - * <p> - * In the special case of heartbeats, we defer to DefaultReplicationPolicy. - */ + /** + * Unlike {@link DefaultReplicationPolicy}, IdentityReplicationPolicy cannot know the source of + * a remote topic based on its name alone. If <code>source.cluster.alias</code> is provided, + * this method will return that. + * <p> + * In the special case of heartbeats, we defer to {@link DefaultReplicationPolicy#topicSource(String)}. + */ @Override public String topicSource(String topic) { if (looksLikeHeartbeat(topic)) { @@ -73,10 +75,11 @@ public class IdentityReplicationPolicy extends DefaultReplicationPolicy { } } - /** Since any topic may be a "remote topic", this just returns `topic`. - * <p> - * In the special case of heartbeats, we defer to DefaultReplicationPolicy. - */ + /** + * Since any topic may be a remote topic, this just returns `topic`. + * <p> + * In the special case of heartbeats, we defer to {@link DefaultReplicationPolicy#upstreamTopic(String)}. + */ @Override public String upstreamTopic(String topic) { if (looksLikeHeartbeat(topic)) { diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java index a071521aa88..0b74b64ebbb 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java @@ -42,16 +42,9 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -/** Interprets MM2's internal topics (checkpoints, heartbeats) on a given cluster. - * <p> - * Given a top-level "mm2.properties" configuration file, MirrorClients can be constructed - * for individual clusters as follows: - * </p> - * <pre> - * MirrorMakerConfig mmConfig = new MirrorMakerConfig(props); - * MirrorClientConfig mmClientConfig = mmConfig.clientConfig("some-cluster"); - * MirrorClient mmClient = new Mirrorclient(mmClientConfig); - * </pre> +/** + * Client to interact with MirrorMaker internal topics (checkpoints, heartbeats) on a given cluster. + * Whenever possible use the methods from {@link RemoteClusterUtils} instead of directly using MirrorClient. */ public class MirrorClient implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(MirrorClient.class); @@ -78,20 +71,25 @@ public class MirrorClient implements AutoCloseable { this.consumerConfig = consumerConfig; } - /** Close internal clients. */ + /** + * Closes internal clients. + */ public void close() { adminClient.close(); } - /** Get the ReplicationPolicy instance used to interpret remote topics. This instance is constructed based on - * relevant configuration properties, including {@code replication.policy.class}. */ + /** + * Gets the {@link ReplicationPolicy} instance used to interpret remote topics. This instance is constructed based on + * relevant configuration properties, including {@code replication.policy.class}. + */ public ReplicationPolicy replicationPolicy() { return replicationPolicy; } - /** Compute shortest number of hops from an upstream source cluster. - * For example, given replication flow A->B->C, there are two hops from A to C. - * Returns -1 if upstream cluster is unreachable. + /** + * Computes the shortest number of hops from an upstream source cluster. + * For example, given replication flow A->B->C, there are two hops from A to C. + * Returns -1 if the upstream cluster is unreachable. */ public int replicationHops(String upstreamClusterAlias) throws InterruptedException { return heartbeatTopics().stream() @@ -102,21 +100,27 @@ public class MirrorClient implements AutoCloseable { .orElse(-1); } - /** Find all heartbeat topics on this cluster. Heartbeat topics are replicated from other clusters. */ + /** + * Finds all heartbeats topics on this cluster. Heartbeats topics are replicated from other clusters. + */ public Set<String> heartbeatTopics() throws InterruptedException { return listTopics().stream() .filter(this::isHeartbeatTopic) .collect(Collectors.toSet()); } - /** Find all checkpoint topics on this cluster. */ + /** + * Finds all checkpoints topics on this cluster. + */ public Set<String> checkpointTopics() throws InterruptedException { return listTopics().stream() .filter(this::isCheckpointTopic) .collect(Collectors.toSet()); } - /** Find upstream clusters, which may be multiple hops away, based on incoming heartbeats. */ + /** + * Finds upstream clusters, which may be multiple hops away, based on incoming heartbeats. + */ public Set<String> upstreamClusters() throws InterruptedException { return listTopics().stream() .filter(this::isHeartbeatTopic) @@ -124,14 +128,18 @@ public class MirrorClient implements AutoCloseable { .collect(Collectors.toSet()); } - /** Find all remote topics on this cluster. This does not include internal topics (heartbeats, checkpoints). */ + /** + * Finds all remote topics on this cluster. This does not include internal topics (heartbeats, checkpoints). + */ public Set<String> remoteTopics() throws InterruptedException { return listTopics().stream() .filter(this::isRemoteTopic) .collect(Collectors.toSet()); } - /** Find all remote topics that have been replicated directly from the given source cluster. */ + /** + * Finds all remote topics that have been replicated directly from the given source cluster. + */ public Set<String> remoteTopics(String source) throws InterruptedException { return listTopics().stream() .filter(this::isRemoteTopic) @@ -139,11 +147,12 @@ public class MirrorClient implements AutoCloseable { .collect(Collectors.toSet()); } - /** Translate a remote consumer group's offsets into corresponding local offsets. Topics are automatically - * renamed according to the ReplicationPolicy. - * @param consumerGroupId group ID of remote consumer group - * @param remoteClusterAlias alias of remote cluster - * @param timeout timeout + /** + * Translates a remote consumer group's offsets into corresponding local offsets. Topics are automatically + * renamed according to the ReplicationPolicy. + * @param consumerGroupId The group ID of remote consumer group + * @param remoteClusterAlias The alias of remote cluster + * @param timeout The maximum time to block when consuming from the checkpoints topic */ public Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets(String consumerGroupId, String remoteClusterAlias, Duration timeout) { diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java index b21b7f72463..53a4f9f5f05 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java @@ -17,7 +17,9 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ForwardingAdmin; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -31,24 +33,19 @@ import java.util.Map; import static org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in; -/** Configuration required for MirrorClient to talk to a given target cluster. - * <p> - * Generally, these properties come from an mm2.properties configuration file - * (@see MirrorMakerConfig.clientConfig): - * </p> - * <pre> - * MirrorMakerConfig mmConfig = new MirrorMakerConfig(props); - * MirrorClientConfig mmClientConfig = mmConfig.clientConfig("some-cluster"); - * </pre> - * <p> - * In addition to the properties defined here, sub-configs are supported for Admin, Consumer, and Producer clients. - * For example: - * </p> - * <pre> +/** + * Configuration required for {@link MirrorClient} to talk to a given target cluster. + * <p> + * This needs to contain at least the connection details for the target cluster (<code>bootstrap.servers</code> and + * any required TLS/SASL configuration), as well as {@link #REPLICATION_POLICY_CLASS} when not using the default + * replication policy. It can also include {@link AdminClientConfig} and {@link ConsumerConfig} to customize the + * internal clients this uses. For example: + * <pre> * bootstrap.servers = host1:9092 * consumer.client.id = mm2-client * replication.policy.separator = __ - * </pre> + * </pre> + * </p> */ public class MirrorClientConfig extends AbstractConfig { public static final String REPLICATION_POLICY_CLASS = "replication.policy.class"; @@ -110,8 +107,7 @@ public class MirrorClientConfig extends AbstractConfig { } private Map<String, Object> clientConfig(String prefix) { - Map<String, Object> props = new HashMap<>(); - props.putAll(valuesWithPrefixOverride(prefix)); + Map<String, Object> props = new HashMap<>(valuesWithPrefixOverride(prefix)); props.keySet().retainAll(CLIENT_CONFIG_DEF.names()); props.entrySet().removeIf(x -> x.getValue() == null); return props; @@ -159,17 +155,17 @@ public class MirrorClientConfig extends AbstractConfig { ConfigDef.Importance.LOW, INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC) .define( - FORWARDING_ADMIN_CLASS, - ConfigDef.Type.CLASS, - FORWARDING_ADMIN_CLASS_DEFAULT, - ConfigDef.Importance.LOW, - FORWARDING_ADMIN_CLASS_DOC) + FORWARDING_ADMIN_CLASS, + ConfigDef.Type.CLASS, + FORWARDING_ADMIN_CLASS_DEFAULT, + ConfigDef.Importance.LOW, + FORWARDING_ADMIN_CLASS_DOC) .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, - Type.STRING, - CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, - in(Utils.enumOptions(SecurityProtocol.class)), - Importance.MEDIUM, - CommonClientConfigs.SECURITY_PROTOCOL_DOC) + Type.STRING, + CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, + in(Utils.enumOptions(SecurityProtocol.class)), + Importance.MEDIUM, + CommonClientConfigs.SECURITY_PROTOCOL_DOC) .withClientSslSupport() .withClientSaslSupport(); } diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java index f67a81e3559..bd1bed1a547 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java @@ -17,6 +17,8 @@ package org.apache.kafka.connect.mirror; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -26,26 +28,29 @@ import java.util.Set; import java.util.concurrent.TimeoutException; -/** Convenience methods for multi-cluster environments. Wraps {@link MirrorClient} - * <p> - * Properties passed to these methods are used to construct internal Admin and Consumer clients. - * Sub-configs like "admin.xyz" are also supported. For example: - * </p> - * <pre> - * bootstrap.servers = host1:9092 - * consumer.client.id = mm2-client - * </pre> - * <p> - * @see MirrorClientConfig for additional properties used by the internal MirrorClient. - * </p> +/** + * Convenience tool for multi-cluster environments. Wraps {@link MirrorClient} + * <p> + * Properties passed to these methods are used to construct internal {@link Admin} and {@link Consumer} clients. + * Sub-configs like "admin.xyz" are also supported. For example: + * </p> + * <pre> + * bootstrap.servers = host1:9092 + * consumer.client.id = mm2-client + * </pre> + * <p> + * @see MirrorClientConfig for additional properties used by the internal MirrorClient. + * </p> */ public final class RemoteClusterUtils { // utility class private RemoteClusterUtils() {} - /** Find shortest number of hops from an upstream cluster. - * Returns -1 if the cluster is unreachable */ + /** + * Finds the shortest number of hops from an upstream cluster. + * Returns -1 if the cluster is unreachable. + */ public static int replicationHops(Map<String, Object> properties, String upstreamClusterAlias) throws InterruptedException, TimeoutException { try (MirrorClient client = new MirrorClient(properties)) { @@ -53,7 +58,9 @@ public final class RemoteClusterUtils { } } - /** Find all heartbeat topics */ + /** + * Finds all heartbeats topics + */ public static Set<String> heartbeatTopics(Map<String, Object> properties) throws InterruptedException, TimeoutException { try (MirrorClient client = new MirrorClient(properties)) { @@ -61,7 +68,9 @@ public final class RemoteClusterUtils { } } - /** Find all checkpoint topics */ + /** + * Finds all checkpoints topics + */ public static Set<String> checkpointTopics(Map<String, Object> properties) throws InterruptedException, TimeoutException { try (MirrorClient client = new MirrorClient(properties)) { @@ -69,7 +78,9 @@ public final class RemoteClusterUtils { } } - /** Find all upstream clusters */ + /** + * Finds all upstream clusters + */ public static Set<String> upstreamClusters(Map<String, Object> properties) throws InterruptedException, TimeoutException { try (MirrorClient client = new MirrorClient(properties)) { @@ -77,12 +88,13 @@ public final class RemoteClusterUtils { } } - /** Translate a remote consumer group's offsets into corresponding local offsets. Topics are automatically - * renamed according to the ReplicationPolicy. - * @param properties {@link MirrorClientConfig} properties to instantiate a {@link MirrorClient} - * @param consumerGroupId group ID of remote consumer group - * @param remoteClusterAlias alias of remote cluster - * @param timeout timeout + /** + * Translates a remote consumer group's offsets into corresponding local offsets. Topics are automatically + * renamed according to the configured {@link ReplicationPolicy}. + * @param properties Map of properties to instantiate a {@link MirrorClient} + * @param remoteClusterAlias The alias of the remote cluster + * @param consumerGroupId The group ID of remote consumer group + * @param timeout The maximum time to block when consuming from the checkpoints topic */ public static Map<TopicPartition, OffsetAndMetadata> translateOffsets(Map<String, Object> properties, String remoteClusterAlias, String consumerGroupId, Duration timeout) diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java index ea65aa9705e..fbd8725eb64 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java @@ -19,29 +19,35 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.common.annotation.InterfaceStability; -/** Defines which topics are "remote topics". e.g. "us-west.topic1". */ +/** + * An interface used by the MirrorMaker connectors to manage topics names between source and target clusters. + */ @InterfaceStability.Evolving public interface ReplicationPolicy { - /** How to rename remote topics; generally should be like us-west.topic1. */ + /** + * Returns the remote topic name for the given topic and source cluster alias. + */ String formatRemoteTopic(String sourceClusterAlias, String topic); - /** Source cluster alias of given remote topic, e.g. "us-west" for "us-west.topic1". - * Returns null if not a remote topic. + /** + * Returns the source cluster alias of given topic. + * Returns null if the given topic is not a remote topic. */ String topicSource(String topic); - /** Name of topic on the source cluster, e.g. "topic1" for "us-west.topic1". - * <p> - * Topics may be replicated multiple hops, so the immediately upstream topic - * may itself be a remote topic. - * <p> - * Returns null if not a remote topic. + /** + * Return the name of the given topic on the source cluster. + * <p> + * Topics may be replicated multiple hops, so the immediately upstream topic may itself be a remote topic. + * <p> + * Returns null if the given topic is not a remote topic. */ String upstreamTopic(String topic); - /** The name of the original source-topic, which may have been replicated multiple hops. - * Returns the topic if it is not a remote topic. + /** + * Returns the name of the original topic, which may have been replicated multiple hops. + * Returns the topic if it is not a remote topic. */ default String originalTopic(String topic) { String upstream = upstreamTopic(topic); @@ -52,37 +58,52 @@ public interface ReplicationPolicy { } } - /** Returns heartbeats topic name.*/ + /** + * Returns the name of heartbeats topic. + */ default String heartbeatsTopic() { return "heartbeats"; } - /** Returns the offset-syncs topic for given cluster alias. */ + /** + * Returns the name of the offset-syncs topic for given cluster alias. + */ default String offsetSyncsTopic(String clusterAlias) { return "mm2-offset-syncs." + clusterAlias + ".internal"; } - /** Returns the name checkpoint topic for given cluster alias. */ + /** + * Returns the name of the checkpoints topic for given cluster alias. + */ default String checkpointsTopic(String clusterAlias) { return clusterAlias + ".checkpoints.internal"; } - /** check if topic is a heartbeat topic, e.g heartbeats, us-west.heartbeats. */ + /** + * Returns true if the topic is a heartbeats topic + */ default boolean isHeartbeatsTopic(String topic) { return heartbeatsTopic().equals(originalTopic(topic)); } - /** check if topic is a checkpoint topic. */ + /** + * Returns true if the topic is a checkpoints topic. + */ default boolean isCheckpointsTopic(String topic) { return topic.endsWith(".checkpoints.internal"); } - /** Check topic is one of MM2 internal topic, this is used to make sure the topic doesn't need to be replicated.*/ + /** + * Returns true if the topic is one of MirrorMaker internal topics. + * This is used to make sure the topic doesn't need to be replicated. + */ default boolean isMM2InternalTopic(String topic) { return topic.endsWith(".internal"); } - /** Internal topics are never replicated. */ + /** + * Returns true if the topic is considered an internal topic. + */ default boolean isInternalTopic(String topic) { boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith("."); boolean isDefaultConnectTopic = topic.endsWith("-internal") || topic.endsWith(".internal"); diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java index f9793aceed9..7bafd52244e 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.connect.mirror; -/** Directional pair of clusters, where source is replicated to target. */ +/** + * Directional pair of clusters, where source is mirrored to target. + */ public class SourceAndTarget { private final String source; private final String target; diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/package-info.java similarity index 50% copy from connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java copy to connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/package-info.java index f9793aceed9..48ed522ae34 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/package-info.java @@ -14,38 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.connect.mirror; - -/** Directional pair of clusters, where source is replicated to target. */ -public class SourceAndTarget { - private final String source; - private final String target; - - public SourceAndTarget(String source, String target) { - this.source = source; - this.target = target; - } - - public String source() { - return source; - } - - public String target() { - return target; - } - - @Override - public String toString() { - return source + "->" + target; - } - - @Override - public int hashCode() { - return toString().hashCode(); - } - - @Override - public boolean equals(Object other) { - return other != null && toString().equals(other.toString()); - } -} +/** + * Provides APIs for the MirrorMaker connectors and utilities to manage MirrorMaker resources. + */ +package org.apache.kafka.connect.mirror; \ No newline at end of file diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java index 392c58fb01f..1c97ced61ef 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java @@ -26,6 +26,9 @@ import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Map; +/** + * Task implementation for {@link MockSinkConnector}. + */ public class MockSinkTask extends SinkTask { private static final Logger log = LoggerFactory.getLogger(MockSinkTask.class); diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java index c09fa6a5319..f69c58b99ab 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java @@ -27,6 +27,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +/** + * Task implementation for {@link MockSourceConnector}. + */ public class MockSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(MockSourceTask.class); diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java index 7895dbbef4f..c40e0932e53 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java @@ -31,6 +31,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +/** + * Task implementation for {@link SchemaSourceConnector}. + */ public class SchemaSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(SchemaSourceTask.class); diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java index fbe29e13ed4..b198dafc535 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; /** + * A connector primarily intended for system tests. * @see VerifiableSinkTask */ public class VerifiableSinkConnector extends SinkConnector { diff --git a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java index 6262cc3b0bb..e23992dfe1b 100644 --- a/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; /** + * A connector primarily intended for system tests. * @see VerifiableSourceTask */ public class VerifiableSourceConnector extends SourceConnector { diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/package-info.java similarity index 50% copy from connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java copy to connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/package-info.java index f9793aceed9..7a5ef74c51a 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java +++ b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/package-info.java @@ -14,38 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.connect.mirror; - -/** Directional pair of clusters, where source is replicated to target. */ -public class SourceAndTarget { - private final String source; - private final String target; - - public SourceAndTarget(String source, String target) { - this.source = source; - this.target = target; - } - - public String source() { - return source; - } - - public String target() { - return target; - } - - @Override - public String toString() { - return source + "->" + target; - } - - @Override - public int hashCode() { - return toString().hashCode(); - } - - @Override - public boolean equals(Object other) { - return other != null && toString().equals(other.toString()); - } -} +/** + * Provides source and sink connector implementations used for testing + */ +package org.apache.kafka.connect.tools; \ No newline at end of file