maedhroz commented on code in PR #4652:
URL: https://github.com/apache/cassandra/pull/4652#discussion_r2927112373


##########
src/java/org/apache/cassandra/locator/SatelliteDatacenterReplicationStrategy.java:
##########
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.locator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
+import 
org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo;
+import 
org.apache.cassandra.service.replication.migration.MutationTrackingMigrationState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.ReplicaGroups;
+import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
+
+import static java.lang.String.format;
+
+/**
+ * Replication strategy for  CEP-58: Satellite Datacenters
+ * allows pairing full datacenters with smaller satellite DCs * that use 
witness replicas for mutation tracking and
+ * enable transactionally consistent failover without requiring 3 full 
datacenters.
+ *
+ * Configuration syntax uses dot notation:
+ *
+ * CREATE KEYSPACE ks WITH replication = {
+ *   'class': 'SatelliteDatacenterReplicationStrategy',
+ *   'DC1': '3',                    // Full datacenter with RF=3
+ *   'DC1.satellite.ST1': '3/3',    // Satellite ST1 for DC1 with 3 witness 
replicas
+ *   'DC2': '3',                    // Second full DC
+ *   'DC2.satellite.ST2': '3/3',    // Satellite ST2 for DC2
+ *   'primary': 'DC1'               // Primary datacenter designation
+ *   'DC1.disabled': 'true',        // Replication to DC1 disabled (disabled 
DC can't be primary, use for down DCs)
+ * } AND replication_type = tracked;
+ *
+ * Requirements:
+ * - Keyspace must use tracked replication (replication_type = tracked)
+ * - Cluster must have transient replication enabled 
(transient_replication_enabled: true)
+ * - Satellites must use witness replica format 'N/N' where all replicas are 
transient
+ *
+ * Uses NetworkTopologyStrategy replica selection algorithm for compatibility 
with existing NTS keyspaces
+ */
+public class SatelliteDatacenterReplicationStrategy extends 
AbstractReplicationStrategy
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(SatelliteDatacenterReplicationStrategy.class);
+
+    private static final String PRIMARY_DC_KEY = "primary";
+    private static final String SATELLITE_KEY_PATTERN = ".satellite.";
+    private static final String DISABLED_KEY_SUFFIX = ".disabled";
+
+    private final Map<String, ReplicationFactor> fullDCs;
+
+    private final Map<String, SatelliteInfo> satellites;
+
+    private final Map<String, ReplicationFactor> allDCs;
+
+    private final String primaryDC;
+
+    private final Set<String> disabledDCs;
+
+    private final ReplicationFactor aggregateRf;
+
+    private static class SatelliteInfo
+    {
+        public final String parentDC;
+
+        public final String name;
+
+        public final ReplicationFactor rf;
+
+        SatelliteInfo(String parentDC, String name, ReplicationFactor rf)
+        {
+            this.parentDC = parentDC;
+            this.name = name;
+            this.rf = rf;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            SatelliteInfo that = (SatelliteInfo) o;
+            return parentDC.equals(that.parentDC) &&
+                   name.equals(that.name) &&
+                   rf.equals(that.rf);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            int result = parentDC.hashCode();
+            result = 31 * result + name.hashCode();
+            result = 31 * result + rf.hashCode();
+            return result;
+        }
+    }
+
+    private static void cfgEx(String format, Object... args)
+    {
+        throw new ConfigurationException(format(format, args));
+    }
+
+    private static class StrategyOptions
+    {
+        Map<String, ReplicationFactor> fullDatacenters = new HashMap<>();
+        Map<String, SatelliteInfo> satellites = new HashMap<>();
+        Map<String, ReplicationFactor> allDatacenters = new HashMap<>();
+        Set<String> disabledDCs = new HashSet<>();
+        String primary = null;
+
+        StrategyOptions(Map<String, String> configOptions)
+        {
+
+            if (configOptions != null)
+            {
+                for (Map.Entry<String, String> entry : 
configOptions.entrySet())
+                {
+                    String key = entry.getKey();
+                    String value = entry.getValue();
+
+                    if (setPrimary(key, value))
+                        continue;
+
+                    if (setSatellite(key, value))
+                        continue;
+
+                    if (setDisabled(key, value))
+                        continue;
+
+                    if (key.contains("."))
+                        cfgEx("Datacenter names cannot contain dots: '%s'. Use 
'<DC>.satellite.<SAT>' for satellites", key);
+
+                    ReplicationFactor rf = ReplicationFactor.fromString(value);
+                    fullDatacenters.put(key, rf);
+                }
+            }
+
+            allDatacenters.putAll(fullDatacenters);
+            satellites.forEach((dc, info) -> allDatacenters.put(dc, info.rf));
+        }
+
+        boolean setPrimary(String key, String value)
+        {
+            if (!key.equalsIgnoreCase(PRIMARY_DC_KEY))
+                return false;
+
+            if (primary != null)
+                cfgEx("'primary' option specified multiple times");
+
+            primary = value;
+            return true;
+        }
+
+        boolean setSatellite(String key, String value)
+        {
+            if (!key.contains(SATELLITE_KEY_PATTERN))
+                return false;
+
+            String[] parts = key.split("\\" + SATELLITE_KEY_PATTERN);
+            if (parts.length != 2)
+                cfgEx("Invalid satellite configuration key '%s'. Expected 
format: '<DC>.satellite.<SAT>'", key);
+
+            String parentDc = parts[0];
+            String satelliteName = parts[1];
+
+            if (parentDc.contains(".") || satelliteName.contains("."))
+                cfgEx("Datacenter names cannot contain dots: '%s'", key);
+
+            // Parse RF - satellites must use witness format (e.g., '3/3')
+            if (!value.contains("/"))
+                cfgEx("Satellite replicas must be specified as witness 
replicas using format 'N/N' (e.g., '3/3'). Got '%s'", value);
+
+            String[] rfParts = value.split("/");
+            if (rfParts.length != 2)
+                cfgEx("Invalid replication factor format for satellite '%s': 
'%s'", satelliteName, value);
+
+            try
+            {
+                int total = Integer.parseInt(rfParts[0]);
+                int transientCount = Integer.parseInt(rfParts[1]);
+
+                if (transientCount != total)
+                    cfgEx("Satellite replicas must all be witnesses. Expected 
'%s/%s' but got '%s'", total, total, value);

Review Comment:
   ```suggestion
                       cfgEx("Satellite replicas must all be witnesses. 
Expected '%d/%d' but got '%s'", total, total, value);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to