skaundinya15 commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r565772802



##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -89,11 +89,25 @@ public MirrorMakerConfig(Map<?, ?> props) {
     public List<SourceAndTarget> clusterPairs() {
         List<SourceAndTarget> pairs = new ArrayList<>();
         Set<String> clusters = clusters();
+        Map<String, String> originalStrings = originalsStrings();
+        boolean globalHeartbeatsEnabled = 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+        if 
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+            globalHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+        }
+
         for (String source : clusters) {
             for (String target : clusters) {
-                SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
                 if (!source.equals(target)) {
-                    pairs.add(sourceAndTarget);
+                    String clusterPairConfigPrefix = source + "->" + target + 
".";
+                    boolean clusterPairEnabled = 
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + 
"enabled", "false"));
+                    boolean clusterPairHeartbeatsEnabled = 
globalHeartbeatsEnabled;
+                    if (originalStrings.containsKey(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+                        clusterPairHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+                    }
+
+                    if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {

Review comment:
       ```suggestion
                       // By default, all source->target Herder combinations 
are created even if `x->y.enabled=false`
                       // Unless `emit.heartbeats.enabled=false` or 
`x->y.emit.heartbeats.enabled=false`
                       // Reason for this behavior: for a given replication 
flow A->B with heartbeats, 2 herders are required :
                       // B->A for the MirrorHeartbeatConnector (emits 
heartbeats into A for monitoring replication health)
                       // A->B for the MirrorSourceConnector (actual 
replication flow)
                       if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {
   ```
   Looks good to me, just had a small tweak for the `B->A` comment. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to