This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 1c7c93b KAFKA-10710; MM2 - Create herders only if
source->target.enabled=true and heartbeats are disabled (#9589)
1c7c93b is described below
commit 1c7c93ba37308f4c3c3282b455b5263c0e455f09
Author: Julien Chanaud <[email protected]>
AuthorDate: Thu Jan 28 23:52:51 2021 +0100
KAFKA-10710; MM2 - Create herders only if source->target.enabled=true and
heartbeats are disabled (#9589)
By default Mirror Maker 2 creates herders for all the possible combinations
even if the "links" are not enabled.
This is because the beats are emitted from the "opposite" herder.
If there is a replication flow from A to B and heartbeats are required, 2
herders are needed :
- A->B for the MirrorSourceConnector
- B->A for the MirrorHeartbeatConnector
The MirrorHeartbeatConnector on B->A emits beats into topic heartbeats on
cluster A.
The MirrorSourceConnector on A->B then replicates whichever topic is
configured as well as heartbeats.
In cases with multiple clusters (10 and more), this leads to an incredible
amount of connections, file descriptors and configuration topics created in
every target clusters that are not necessary.
With this code change, we will leverage the top level property
"emit.heartbeats.enabled" which defaults to "true".
We skip creating the A->B herder whenever
A->B.emit.heartbeats.enabled=false (defaults to true) and A->B.enabled=false
(defaults to false).
Existing users will not see any change and if they depend on these
"opposites" herders for their monitoring, it will still work.
New users with more complex use case can change this property and fine tune
their heartbeat generation.
Reviewers: Ryanne Dolan <[email protected]>, Sanjana Kaundinya
<[email protected]>, Jason Gustafson <[email protected]>
---
.../kafka/connect/mirror/MirrorMakerConfig.java | 23 ++++++-
.../connect/mirror/MirrorMakerConfigTest.java | 75 ++++++++++++++++++++++
2 files changed, 96 insertions(+), 2 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 059ab78..b5c361c 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -87,11 +87,30 @@ public class MirrorMakerConfig extends AbstractConfig {
public List<SourceAndTarget> clusterPairs() {
List<SourceAndTarget> pairs = new ArrayList<>();
Set<String> clusters = clusters();
+ Map<String, String> originalStrings = originalsStrings();
+ boolean globalHeartbeatsEnabled =
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+ if
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+ globalHeartbeatsEnabled =
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+ }
+
for (String source : clusters) {
for (String target : clusters) {
- SourceAndTarget sourceAndTarget = new SourceAndTarget(source,
target);
if (!source.equals(target)) {
- pairs.add(sourceAndTarget);
+ String clusterPairConfigPrefix = source + "->" + target +
".";
+ boolean clusterPairEnabled =
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix +
"enabled", "false"));
+ boolean clusterPairHeartbeatsEnabled =
globalHeartbeatsEnabled;
+ if (originalStrings.containsKey(clusterPairConfigPrefix +
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+ clusterPairHeartbeatsEnabled =
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix +
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+ }
+
+ // By default, all source->target Herder combinations are
created even if `x->y.enabled=false`
+ // Unless `emit.heartbeats.enabled=false` or
`x->y.emit.heartbeats.enabled=false`
+ // Reason for this behavior: for a given replication flow
A->B with heartbeats, 2 herders are required :
+ // B->A for the MirrorHeartbeatConnector (emits heartbeats
into A for monitoring replication health)
+ // A->B for the MirrorSourceConnector (actual replication
flow)
+ if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {
+ pairs.add(new SourceAndTarget(source, target));
+ }
}
}
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index 1cba87f..c223acc 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.Collections;
import java.util.HashMap;
import java.util.Arrays;
+import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -256,6 +257,80 @@ public class MirrorMakerConfigTest {
"secret2", bProps.get("producer.ssl.key.password"));
}
+ @Test
+ public void testClusterPairsWithDefaultSettings() {
+ MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+ "clusters", "a, b, c"));
+ // implicit configuration associated
+ // a->b.enabled=false
+ // a->b.emit.heartbeat.enabled=true
+ // a->c.enabled=false
+ // a->c.emit.heartbeat.enabled=true
+ // b->a.enabled=false
+ // b->a.emit.heartbeat.enabled=true
+ // b->c.enabled=false
+ // b->c.emit.heartbeat.enabled=true
+ // c->a.enabled=false
+ // c->a.emit.heartbeat.enabled=true
+ // c->b.enabled=false
+ // c->b.emit.heartbeat.enabled=true
+ List<SourceAndTarget> clusterPairs = mirrorConfig.clusterPairs();
+ assertEquals("clusterPairs count should match all combinations count",
+ 6, clusterPairs.size());
+ }
+
+ @Test
+ public void testEmptyClusterPairsWithGloballyDisabledHeartbeats() {
+ MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+ "clusters", "a, b, c",
+ "emit.heartbeats.enabled", "false"));
+ assertEquals("clusterPairs count should be 0",
+ 0, mirrorConfig.clusterPairs().size());
+ }
+
+ @Test
+ public void testClusterPairsWithTwoDisabledHeartbeats() {
+ MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+ "clusters", "a, b, c",
+ "a->b.emit.heartbeats.enabled", "false",
+ "a->c.emit.heartbeats.enabled", "false"));
+ List<SourceAndTarget> clusterPairs = mirrorConfig.clusterPairs();
+ assertEquals("clusterPairs count should match all combinations count
except x->y.emit.heartbeats.enabled=false",
+ 4, clusterPairs.size());
+ }
+
+ @Test
+ public void testClusterPairsWithGloballyDisabledHeartbeats() {
+ MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+ "clusters", "a, b, c, d, e, f",
+ "emit.heartbeats.enabled", "false",
+ "a->b.enabled", "true",
+ "a->c.enabled", "true",
+ "a->d.enabled", "true",
+ "a->e.enabled", "false",
+ "a->f.enabled", "false"));
+ List<SourceAndTarget> clusterPairs = mirrorConfig.clusterPairs();
+ assertEquals("clusterPairs count should match (x->y.enabled=true or
x->y.emit.heartbeats.enabled=true) count",
+ 3, clusterPairs.size());
+
+ // Link b->a.enabled doesn't exist therefore it must not be in
clusterPairs
+ SourceAndTarget sourceAndTarget = new SourceAndTarget("b", "a");
+ assertFalse("disabled/unset link x->y should not be in clusterPairs",
clusterPairs.contains(sourceAndTarget));
+ }
+
+ @Test
+ public void testClusterPairsWithGloballyDisabledHeartbeatsCentralLocal() {
+ MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+ "clusters", "central, local_one, local_two, beats_emitter",
+ "emit.heartbeats.enabled", "false",
+ "central->local_one.enabled", "true",
+ "central->local_two.enabled", "true",
+ "beats_emitter->central.emit.heartbeats.enabled", "true"));
+
+ assertEquals("clusterPairs count should match (x->y.enabled=true or
x->y.emit.heartbeats.enabled=true) count",
+ 3, mirrorConfig.clusterPairs().size());
+ }
+
public static class FakeConfigProvider implements ConfigProvider {
Map<String, String> secrets = Collections.singletonMap("password",
"secret2");