frankgh commented on code in PR #251:
URL: https://github.com/apache/cassandra-sidecar/pull/251#discussion_r2326271432


##########
server/build.gradle:
##########
@@ -148,6 +148,10 @@ dependencies {
     
implementation("io.swagger.core.v3:swagger-models:${project.swaggerVersion}")
     
implementation("io.swagger.core.v3:swagger-jaxrs2:${project.swaggerVersion}")
 
+    implementation(group: "org.apache.cassandra", name: 
"cassandra-analytics-common", version: "${[project.analyticsVersion]}")
+    implementation(group: "org.apache.cassandra", name: 
"cassandra-analytics-cdc_spark3_2.12", version: "${[project.analyticsVersion]}")
+    implementation "com.esotericsoftware:kryo-shaded:${kryoVersion}"

Review Comment:
   Let's  omit the kryo version. My fear is that the version number might get 
out of sync from what `cassandra-analytics-cdc_spark3_2.12` is bringing 
transitively.  I am also uneasy about `cassandra-analytics-cdc_spark3_2.12` 
pulling guava. I'd love to get rid of guava as a dependency from the upstream, 
but it is widely used.
   ```suggestion
   ```



##########
server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.base.Preconditions;

Review Comment:
   we should use Sidecar's preconditions in general, but in this case it's not 
needed. We should use `Objects.requireNonNull`



##########
server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java:
##########
@@ -35,10 +35,13 @@ public class CdcConfigurationImpl implements 
CdcConfiguration
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CdcConfigurationImpl.class);
     public static final String IS_ENABLED_PROPERTY = "enabled";
     public static final String CONFIGURATION_REFRESH_TIME_PROPERTY = 
"config_refresh_time";
+    public static final String TABLE_SCHEMA_REFRESH_TIME_PROPERTY = 
"table_schema_refresh_time";

Review Comment:
   can we add this to sidecar.yaml and document it there as well?



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarCdcStats.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+/**

Review Comment:
   I find the AI generated javadocs to be less useful than a concise 
description of what the method does. In general I'm not liking these



##########
server/src/main/java/org/apache/cassandra/sidecar/utils/ByteBufUtils.java:
##########


Review Comment:
   This class needs unit tests.



##########
server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java:
##########
@@ -21,12 +21,67 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
 
 /**
- * Class with utility methods for CDC.
+ * Utility class providing CDC (Change Data Capture) specific operations and 
file handling.

Review Comment:
   these AI generated javadocs are too much



##########
server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.base.Preconditions;
+
+import jakarta.inject.Singleton;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.bridge.BaseCassandraBridgeFactory.getCassandraVersion;
+
+/**
+ * Factory class for creating Cassandra bridge instances based on 
version-specific jar files.
+ * <p>
+ * This factory maintains a cache of CassandraBridge instances mapped by 
Cassandra version labels
+ * and provides methods to retrieve bridge instances for specific Cassandra 
versions.
+ * Each bridge is loaded from version-specific JAR resources and instantiated 
using reflection.
+ */
+@Singleton
+public class CassandraBridgeFactory
+{
+    // maps Cassandra version-specific jar name (e.g. 'four-zero') to matching 
CassandraBridge
+    private final Map<String, CassandraBridge> cassandraBridges;
+    
+    public CassandraBridgeFactory()
+    {
+        cassandraBridges = new 
ConcurrentHashMap<>(CassandraVersion.values().length);
+    }
+
+    @NotNull
+    public CassandraBridge get(@NotNull String version)
+    {
+        return get(getCassandraVersion(version));
+    }
+
+    @NotNull
+    public CassandraBridge get(@NotNull CassandraVersionFeatures features)
+    {
+        return get(getCassandraVersion(features));
+    }
+
+    @NotNull
+    public CassandraBridge get(@NotNull CassandraVersion version)
+    {
+        String jarBaseName = version.jarBaseName();
+        Preconditions.checkNotNull(jarBaseName, "Cassandra version " + version 
+ " is not supported");
+        return cassandraBridges.computeIfAbsent(jarBaseName, this::create);
+    }
+
+    @NotNull
+    @SuppressWarnings("unchecked")
+    private CassandraBridge create(@NotNull String label)
+    {
+        try
+        {
+            ClassLoader loader = buildClassLoader(
+            cassandraResourceName(label),
+            bridgeResourceName(label),
+            typesResourceName(label));
+            Class<CassandraBridge> bridge = (Class<CassandraBridge>) 
loader.loadClass(CassandraBridge.IMPLEMENTATION_FQCN);
+            Constructor<CassandraBridge> constructor = bridge.getConstructor();
+            return constructor.newInstance();
+        }
+        catch (ClassNotFoundException | NoSuchMethodException | 
InstantiationException
+               | IllegalAccessException | InvocationTargetException exception)
+        {
+            throw new RuntimeException("Failed to create Cassandra bridge for 
label " + label, exception);
+        }
+    }
+
+    @NotNull
+    String cassandraResourceName(@NotNull String label)
+    {
+        return "/bridges/" + label + ".jar";
+    }
+
+    @NotNull
+    String bridgeResourceName(@NotNull String label)
+    {
+        return jarResourceName(label, "bridge");
+    }
+
+    @NotNull
+    String typesResourceName(@NotNull String label)
+    {
+        return jarResourceName(label, "types");
+    }
+
+    String jarResourceName(String... parts)
+    {
+        return "/bridges/" + String.join("-", parts) + ".jar";
+    }
+
+    public ClassLoader buildClassLoader(String... resourceNames)
+    {
+        URL[] urls = Arrays.stream(resourceNames)
+                           
.map(BaseCassandraBridgeFactory::copyClassResourceToFile)
+                           .map(jar -> {
+                               try
+                               {
+                                   return jar.toURI().toURL();
+                               }
+                               catch (MalformedURLException e)
+                               {
+                                   throw new RuntimeException(e);
+                               }
+                           }).toArray(URL[]::new);
+
+        return AccessController.doPrivileged(new 
PrivilegedAction<ClassLoader>()
+        {
+            public ClassLoader run()
+            {
+                return new PostDelegationClassLoader(urls, 
Thread.currentThread().getContextClassLoader());
+            }
+        });
+    }

Review Comment:
   NIT:
   ```suggestion
                   return 
AccessController.doPrivileged((PrivilegedAction<ClassLoader>) () ->
                                                                                
new PostDelegationClassLoader(urls, 
Thread.currentThread().getContextClassLoader()));
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema;
+import org.apache.cassandra.sidecar.db.schema.TableHistorySchema;
+import org.apache.cassandra.sidecar.utils.ByteBufUtils;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.TokenSplitUtil;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Database accessor for CDC (Change Data Capture) state management operations.
+ * <p>
+ * This class provides specialized database access functionality for managing 
CDC state persistence
+ * and retrieval in Cassandra Sidecar. It extends {@link DatabaseAccessor} to 
provide CDC-specific
+ * operations including:
+ * <ul>
+ *   <li>Storing CDC state data across token range splits for distributed 
processing</li>
+ *   <li>Loading and merging CDC state from overlapping token ranges</li>
+ *   <li>Managing table schema history for CDC-enabled tables</li>
+ *   <li>Providing partitioner and metadata access for CDC operations</li>
+ * </ul>
+ * <p>
+ * The accessor handles token range splitting to ensure CDC state is properly 
distributed
+ * across multiple database partitions for scalability. It uses {@link 
TokenSplitUtil}
+ * to determine overlapping splits for both storage and retrieval operations.
+ * <p>
+ * Key features:
+ * <ul>
+ *   <li><strong>Token-aware storage:</strong> Automatically distributes CDC 
state across
+ *       appropriate token range splits</li>
+ *   <li><strong>State merging:</strong> Combines overlapping CDC state 
objects into
+ *       canonical views during retrieval</li>
+ *   <li><strong>Schema management:</strong> Tracks table schema versions for 
CDC operations</li>
+ *   <li><strong>Async operations:</strong> Provides asynchronous database 
operations for
+ *       better performance</li>
+ * </ul>
+ * <p>
+ * This class is thread-safe and designed as a singleton for injection into 
CDC components
+ * that require database access functionality.
+ *
+ * @see DatabaseAccessor
+ * @see CdcStatesSchema
+ * @see TokenSplitUtil
+ */
+@SuppressWarnings("resource")
+@Singleton
+public class CdcDatabaseAccessor extends DatabaseAccessor<CdcStatesSchema>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CdcDatabaseAccessor.class);
+    private final TableHistorySchema tableHistorySchema;
+    private final Provider<TokenSplitUtil> tokenSplitUtilProvider;
+    private volatile TokenSplitUtil tokenSplitUtil = null;
+    private volatile InstanceMetadataFetcher instanceMetadataFetcher;

Review Comment:
   this should not be volatile, instead we should make it final
   ```suggestion
       private final InstanceMetadataFetcher instanceMetadataFetcher;
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema;
+import org.apache.cassandra.sidecar.db.schema.TableHistorySchema;
+import org.apache.cassandra.sidecar.utils.ByteBufUtils;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.TokenSplitUtil;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Database accessor for CDC (Change Data Capture) state management operations.

Review Comment:
   again this javadoc is not very useful and generally distracting



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring 
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for 
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time 
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ *   <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ *   <li>Automatic detection and tracking of CDC-enabled tables</li>
+ *   <li>Schema change event notification to registered listeners</li>
+ *   <li>Table metadata caching and synchronization with CDC bridges</li>
+ *   <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ *   <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema 
changes by comparing
+ *       full schema snapshots and updates CDC table metadata when changes are 
detected</li>
+ *   <li><strong>Schema Monitor (49s interval):</strong> Validates that 
CDC-enabled tables
+ *       are properly configured in the Cassandra Schema.instance 
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ *   <li><strong>CDC Table Discovery:</strong> Automatically identifies and 
tracks tables with
+ *       CDC enabled from the cluster schema</li>
+ *   <li><strong>Schema Change Detection:</strong> Compares schema snapshots 
to detect modifications
+ *       and trigger appropriate updates to CDC subsystems</li>
+ *   <li><strong>Bridge Integration:</strong> Synchronizes schema information 
with Cassandra and
+ *       CDC bridges for consistent metadata handling</li>
+ *   <li><strong>Event Notification:</strong> Provides a listener mechanism 
for components that
+ *       need to react to schema changes</li>
+ *   <li><strong>Validation and Monitoring:</strong> Continuously validates 
CDC table configurations
+ *       and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness 
with system load:
+ * <ul>
+ *   <li>60-second refresh interval for schema change detection</li>
+ *   <li>49-second monitor interval (chosen to avoid harmonics with the 60s 
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC 
consumers always have
+ * up-to-date schema information, enabling proper data serialization, 
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency 
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask

Review Comment:
   I think the name of the class should reflect the fact that this will be some 
background task the periodically refreshes the schema



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring 
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for 
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time 
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ *   <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ *   <li>Automatic detection and tracking of CDC-enabled tables</li>
+ *   <li>Schema change event notification to registered listeners</li>
+ *   <li>Table metadata caching and synchronization with CDC bridges</li>
+ *   <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ *   <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema 
changes by comparing
+ *       full schema snapshots and updates CDC table metadata when changes are 
detected</li>
+ *   <li><strong>Schema Monitor (49s interval):</strong> Validates that 
CDC-enabled tables
+ *       are properly configured in the Cassandra Schema.instance 
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ *   <li><strong>CDC Table Discovery:</strong> Automatically identifies and 
tracks tables with
+ *       CDC enabled from the cluster schema</li>
+ *   <li><strong>Schema Change Detection:</strong> Compares schema snapshots 
to detect modifications
+ *       and trigger appropriate updates to CDC subsystems</li>
+ *   <li><strong>Bridge Integration:</strong> Synchronizes schema information 
with Cassandra and
+ *       CDC bridges for consistent metadata handling</li>
+ *   <li><strong>Event Notification:</strong> Provides a listener mechanism 
for components that
+ *       need to react to schema changes</li>
+ *   <li><strong>Validation and Monitoring:</strong> Continuously validates 
CDC table configurations
+ *       and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness 
with system load:
+ * <ul>
+ *   <li>60-second refresh interval for schema change detection</li>
+ *   <li>49-second monitor interval (chosen to avoid harmonics with the 60s 
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC 
consumers always have
+ * up-to-date schema information, enabling proper data serialization, 
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency 
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+    // 49sec least-common multiple with 60sec is 49min so offers best monitor 
frequency without clashing with 60sec
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+    private final AtomicReference<String> currSchemaText = new 
AtomicReference<>("");
+    private final AtomicReference<Set<CqlTable>> cdcTables = new 
AtomicReference<>(Collections.emptySet());
+    private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new 
ConcurrentHashMap<>();
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new 
CopyOnWriteArrayList<>();
+    private final SidecarConfiguration sidecarConfiguration;
+    private final InstanceMetadataFetcher instanceFetcher;
+    private final SecondBoundConfiguration tableSchemaRefreshTime;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+
+    public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+                                  CdcDatabaseAccessor databaseAccessor,
+                                  SidecarConfiguration sidecarConfiguration,
+                                  CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+
+        this.instanceFetcher = instanceFetcher;
+        this.databaseAccessor = databaseAccessor;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.tableSchemaRefreshTime = 
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    public void addSchemaChangeListener(Runnable listener)
+    {
+        schemaChangeListeners.add(listener);
+    }
+
+    public void refresh()
+    {
+        NodeSettings nodeSettings = 
instanceFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());
+        CassandraBridge cassandraBridge = 
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+        CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+        try
+        {
+            LOGGER.info("Checking for schema changes...");
+            String fullSchemaText = databaseAccessor.fullSchema();
+            if (!fullSchemaText.equals(currSchemaText.get()))
+            {
+                LOGGER.info("Schema change detected, refreshing CDC tables");
+                currSchemaText.set(fullSchemaText);
+                Set<CqlTable> updatedCdcTables = 
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher, 
cassandraBridgeFactory);
+                LOGGER.info("Cdc enabled tables tables='{}'", 
+                            updatedCdcTables.stream()
+                                            .map(m -> String.format("%s.%s", 
m.keyspace(), m.table()))
+                                            .collect(Collectors.joining(",")));
+                cdcTables.set(updatedCdcTables);
+                cdcBridge.updateCdcSchema(updatedCdcTables, 
databaseAccessor.partitioner(),
+                                          ((keyspace, table) -> 
tableIdCache.get(TableIdentifier.of(keyspace, table))));
+                schemaChangeListeners.forEach(Runnable::run);
+            }
+        }
+        catch (IllegalStateException exception)
+        {
+            LOGGER.warn("There was a problem refreshing the schema. Database 
Accessor may not be ready", exception);
+            throw exception;
+        }
+        catch (Throwable t)
+        {
+            LOGGER.error("Unexpected error while refreshing the schema", t);
+            throw t;
+        }
+    }
+
+    public DurationSpec delay()
+    {
+        return tableSchemaRefreshTime;

Review Comment:
   We should keep it like this for when we support dynamic configuration 
reloading in the future.
   ```suggestion
           return 
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime()
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring 
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for 
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time 
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ *   <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ *   <li>Automatic detection and tracking of CDC-enabled tables</li>
+ *   <li>Schema change event notification to registered listeners</li>
+ *   <li>Table metadata caching and synchronization with CDC bridges</li>
+ *   <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ *   <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema 
changes by comparing
+ *       full schema snapshots and updates CDC table metadata when changes are 
detected</li>
+ *   <li><strong>Schema Monitor (49s interval):</strong> Validates that 
CDC-enabled tables
+ *       are properly configured in the Cassandra Schema.instance 
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ *   <li><strong>CDC Table Discovery:</strong> Automatically identifies and 
tracks tables with
+ *       CDC enabled from the cluster schema</li>
+ *   <li><strong>Schema Change Detection:</strong> Compares schema snapshots 
to detect modifications
+ *       and trigger appropriate updates to CDC subsystems</li>
+ *   <li><strong>Bridge Integration:</strong> Synchronizes schema information 
with Cassandra and
+ *       CDC bridges for consistent metadata handling</li>
+ *   <li><strong>Event Notification:</strong> Provides a listener mechanism 
for components that
+ *       need to react to schema changes</li>
+ *   <li><strong>Validation and Monitoring:</strong> Continuously validates 
CDC table configurations
+ *       and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness 
with system load:
+ * <ul>
+ *   <li>60-second refresh interval for schema change detection</li>
+ *   <li>49-second monitor interval (chosen to avoid harmonics with the 60s 
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC 
consumers always have
+ * up-to-date schema information, enabling proper data serialization, 
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency 
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+    // 49sec least-common multiple with 60sec is 49min so offers best monitor 
frequency without clashing with 60sec
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+    private final AtomicReference<String> currSchemaText = new 
AtomicReference<>("");
+    private final AtomicReference<Set<CqlTable>> cdcTables = new 
AtomicReference<>(Collections.emptySet());
+    private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new 
ConcurrentHashMap<>();
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new 
CopyOnWriteArrayList<>();
+    private final SidecarConfiguration sidecarConfiguration;
+    private final InstanceMetadataFetcher instanceFetcher;
+    private final SecondBoundConfiguration tableSchemaRefreshTime;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+
+    public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+                                  CdcDatabaseAccessor databaseAccessor,
+                                  SidecarConfiguration sidecarConfiguration,
+                                  CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+
+        this.instanceFetcher = instanceFetcher;
+        this.databaseAccessor = databaseAccessor;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.tableSchemaRefreshTime = 
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    public void addSchemaChangeListener(Runnable listener)
+    {
+        schemaChangeListeners.add(listener);
+    }
+
+    public void refresh()
+    {
+        NodeSettings nodeSettings = 
instanceFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());
+        CassandraBridge cassandraBridge = 
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+        CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+        try
+        {
+            LOGGER.info("Checking for schema changes...");

Review Comment:
   this at debug level?
   ```suggestion
               LOGGER.debug("Checking for schema changes...");
   ```



##########
server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.base.Preconditions;
+
+import jakarta.inject.Singleton;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.bridge.BaseCassandraBridgeFactory.getCassandraVersion;
+
+/**
+ * Factory class for creating Cassandra bridge instances based on 
version-specific jar files.
+ * <p>
+ * This factory maintains a cache of CassandraBridge instances mapped by 
Cassandra version labels
+ * and provides methods to retrieve bridge instances for specific Cassandra 
versions.
+ * Each bridge is loaded from version-specific JAR resources and instantiated 
using reflection.
+ */
+@Singleton
+public class CassandraBridgeFactory
+{
+    // maps Cassandra version-specific jar name (e.g. 'four-zero') to matching 
CassandraBridge
+    private final Map<String, CassandraBridge> cassandraBridges;
+    
+    public CassandraBridgeFactory()
+    {
+        cassandraBridges = new 
ConcurrentHashMap<>(CassandraVersion.values().length);
+    }
+
+    @NotNull
+    public CassandraBridge get(@NotNull String version)
+    {
+        return get(getCassandraVersion(version));
+    }
+
+    @NotNull
+    public CassandraBridge get(@NotNull CassandraVersionFeatures features)
+    {
+        return get(getCassandraVersion(features));
+    }
+
+    @NotNull
+    public CassandraBridge get(@NotNull CassandraVersion version)
+    {
+        String jarBaseName = version.jarBaseName();
+        Preconditions.checkNotNull(jarBaseName, "Cassandra version " + version 
+ " is not supported");

Review Comment:
   ```suggestion
           Objects.requireNonNull(jarBaseName, "Cassandra version " + version + 
" is not supported");
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema;
+import org.apache.cassandra.sidecar.db.schema.TableHistorySchema;
+import org.apache.cassandra.sidecar.utils.ByteBufUtils;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.TokenSplitUtil;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Database accessor for CDC (Change Data Capture) state management operations.
+ * <p>
+ * This class provides specialized database access functionality for managing 
CDC state persistence
+ * and retrieval in Cassandra Sidecar. It extends {@link DatabaseAccessor} to 
provide CDC-specific
+ * operations including:
+ * <ul>
+ *   <li>Storing CDC state data across token range splits for distributed 
processing</li>
+ *   <li>Loading and merging CDC state from overlapping token ranges</li>
+ *   <li>Managing table schema history for CDC-enabled tables</li>
+ *   <li>Providing partitioner and metadata access for CDC operations</li>
+ * </ul>
+ * <p>
+ * The accessor handles token range splitting to ensure CDC state is properly 
distributed
+ * across multiple database partitions for scalability. It uses {@link 
TokenSplitUtil}
+ * to determine overlapping splits for both storage and retrieval operations.
+ * <p>
+ * Key features:
+ * <ul>
+ *   <li><strong>Token-aware storage:</strong> Automatically distributes CDC 
state across
+ *       appropriate token range splits</li>
+ *   <li><strong>State merging:</strong> Combines overlapping CDC state 
objects into
+ *       canonical views during retrieval</li>
+ *   <li><strong>Schema management:</strong> Tracks table schema versions for 
CDC operations</li>
+ *   <li><strong>Async operations:</strong> Provides asynchronous database 
operations for
+ *       better performance</li>
+ * </ul>
+ * <p>
+ * This class is thread-safe and designed as a singleton for injection into 
CDC components
+ * that require database access functionality.
+ *
+ * @see DatabaseAccessor
+ * @see CdcStatesSchema
+ * @see TokenSplitUtil
+ */
+@SuppressWarnings("resource")
+@Singleton
+public class CdcDatabaseAccessor extends DatabaseAccessor<CdcStatesSchema>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CdcDatabaseAccessor.class);
+    private final TableHistorySchema tableHistorySchema;
+    private final Provider<TokenSplitUtil> tokenSplitUtilProvider;
+    private volatile TokenSplitUtil tokenSplitUtil = null;
+    private volatile InstanceMetadataFetcher instanceMetadataFetcher;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+
+    @Inject
+    public CdcDatabaseAccessor(InstanceMetadataFetcher instanceMetadataFetcher,
+                               CdcStatesSchema cdcStatesSchema,
+                               TableHistorySchema tableHistorySchema,
+                               CQLSessionProvider sessionProvider,
+                               Provider<TokenSplitUtil> tokenSplitUtilProvider,
+                               CassandraBridgeFactory cassandraBridgeFactory)
+    {
+        super(cdcStatesSchema, sessionProvider);
+        this.tableHistorySchema = tableHistorySchema;
+        this.tokenSplitUtilProvider = tokenSplitUtilProvider;
+        this.instanceMetadataFetcher = instanceMetadataFetcher;
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    @VisibleForTesting
+    public CdcDatabaseAccessor(InstanceMetadataFetcher instanceMetadataFetcher,

Review Comment:
   while this ctor is annotated with `@VisibleForTesting` I don't see it being 
used anywhere. Maybe you are missing tests for this class?



##########
server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema;
+import org.apache.cassandra.sidecar.db.schema.TableHistorySchema;
+import org.apache.cassandra.sidecar.utils.ByteBufUtils;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.TokenSplitUtil;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Database accessor for CDC (Change Data Capture) state management operations.
+ * <p>
+ * This class provides specialized database access functionality for managing 
CDC state persistence
+ * and retrieval in Cassandra Sidecar. It extends {@link DatabaseAccessor} to 
provide CDC-specific
+ * operations including:
+ * <ul>
+ *   <li>Storing CDC state data across token range splits for distributed 
processing</li>
+ *   <li>Loading and merging CDC state from overlapping token ranges</li>
+ *   <li>Managing table schema history for CDC-enabled tables</li>
+ *   <li>Providing partitioner and metadata access for CDC operations</li>
+ * </ul>
+ * <p>
+ * The accessor handles token range splitting to ensure CDC state is properly 
distributed
+ * across multiple database partitions for scalability. It uses {@link 
TokenSplitUtil}
+ * to determine overlapping splits for both storage and retrieval operations.
+ * <p>
+ * Key features:
+ * <ul>
+ *   <li><strong>Token-aware storage:</strong> Automatically distributes CDC 
state across
+ *       appropriate token range splits</li>
+ *   <li><strong>State merging:</strong> Combines overlapping CDC state 
objects into
+ *       canonical views during retrieval</li>
+ *   <li><strong>Schema management:</strong> Tracks table schema versions for 
CDC operations</li>
+ *   <li><strong>Async operations:</strong> Provides asynchronous database 
operations for
+ *       better performance</li>
+ * </ul>
+ * <p>
+ * This class is thread-safe and designed as a singleton for injection into 
CDC components
+ * that require database access functionality.
+ *
+ * @see DatabaseAccessor
+ * @see CdcStatesSchema
+ * @see TokenSplitUtil
+ */
+@SuppressWarnings("resource")
+@Singleton
+public class CdcDatabaseAccessor extends DatabaseAccessor<CdcStatesSchema>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CdcDatabaseAccessor.class);
+    private final TableHistorySchema tableHistorySchema;
+    private final Provider<TokenSplitUtil> tokenSplitUtilProvider;
+    private volatile TokenSplitUtil tokenSplitUtil = null;
+    private volatile InstanceMetadataFetcher instanceMetadataFetcher;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+
+    @Inject
+    public CdcDatabaseAccessor(InstanceMetadataFetcher instanceMetadataFetcher,
+                               CdcStatesSchema cdcStatesSchema,
+                               TableHistorySchema tableHistorySchema,
+                               CQLSessionProvider sessionProvider,
+                               Provider<TokenSplitUtil> tokenSplitUtilProvider,
+                               CassandraBridgeFactory cassandraBridgeFactory)
+    {
+        super(cdcStatesSchema, sessionProvider);
+        this.tableHistorySchema = tableHistorySchema;
+        this.tokenSplitUtilProvider = tokenSplitUtilProvider;
+        this.instanceMetadataFetcher = instanceMetadataFetcher;
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    @VisibleForTesting
+    public CdcDatabaseAccessor(InstanceMetadataFetcher instanceMetadataFetcher,
+                               CdcStatesSchema cdcStatesSchema,
+                               TableHistorySchema tableHistorySchema,
+                               CQLSessionProvider sessionProvider,
+                               TokenSplitUtil tokenSplitUtil,
+                               CassandraBridgeFactory cassandraBridgeFactory)
+    {
+        super(cdcStatesSchema, sessionProvider);
+        this.tableHistorySchema = tableHistorySchema;
+        this.tokenSplitUtilProvider = () -> tokenSplitUtil;
+        this.instanceMetadataFetcher = instanceMetadataFetcher;
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    protected TokenSplitUtil tokenSplitUtil()
+    {
+        // cql connection must be initialized before we can initialize 
TokenSplitUtil class
+        if (this.tokenSplitUtil == null)

Review Comment:
   do we need to worry about concurrent access to this method here?



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring 
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for 
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time 
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ *   <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ *   <li>Automatic detection and tracking of CDC-enabled tables</li>
+ *   <li>Schema change event notification to registered listeners</li>
+ *   <li>Table metadata caching and synchronization with CDC bridges</li>
+ *   <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ *   <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema 
changes by comparing
+ *       full schema snapshots and updates CDC table metadata when changes are 
detected</li>
+ *   <li><strong>Schema Monitor (49s interval):</strong> Validates that 
CDC-enabled tables
+ *       are properly configured in the Cassandra Schema.instance 
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ *   <li><strong>CDC Table Discovery:</strong> Automatically identifies and 
tracks tables with
+ *       CDC enabled from the cluster schema</li>
+ *   <li><strong>Schema Change Detection:</strong> Compares schema snapshots 
to detect modifications
+ *       and trigger appropriate updates to CDC subsystems</li>
+ *   <li><strong>Bridge Integration:</strong> Synchronizes schema information 
with Cassandra and
+ *       CDC bridges for consistent metadata handling</li>
+ *   <li><strong>Event Notification:</strong> Provides a listener mechanism 
for components that
+ *       need to react to schema changes</li>
+ *   <li><strong>Validation and Monitoring:</strong> Continuously validates 
CDC table configurations
+ *       and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness 
with system load:
+ * <ul>
+ *   <li>60-second refresh interval for schema change detection</li>
+ *   <li>49-second monitor interval (chosen to avoid harmonics with the 60s 
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC 
consumers always have
+ * up-to-date schema information, enabling proper data serialization, 
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency 
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+    // 49sec least-common multiple with 60sec is 49min so offers best monitor 
frequency without clashing with 60sec
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+    private final AtomicReference<String> currSchemaText = new 
AtomicReference<>("");
+    private final AtomicReference<Set<CqlTable>> cdcTables = new 
AtomicReference<>(Collections.emptySet());
+    private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new 
ConcurrentHashMap<>();
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new 
CopyOnWriteArrayList<>();
+    private final SidecarConfiguration sidecarConfiguration;
+    private final InstanceMetadataFetcher instanceFetcher;
+    private final SecondBoundConfiguration tableSchemaRefreshTime;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+
+    public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+                                  CdcDatabaseAccessor databaseAccessor,
+                                  SidecarConfiguration sidecarConfiguration,
+                                  CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+
+        this.instanceFetcher = instanceFetcher;
+        this.databaseAccessor = databaseAccessor;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.tableSchemaRefreshTime = 
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    public void addSchemaChangeListener(Runnable listener)
+    {
+        schemaChangeListeners.add(listener);
+    }
+
+    public void refresh()
+    {
+        NodeSettings nodeSettings = 
instanceFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());
+        CassandraBridge cassandraBridge = 
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+        CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+        try
+        {
+            LOGGER.info("Checking for schema changes...");
+            String fullSchemaText = databaseAccessor.fullSchema();
+            if (!fullSchemaText.equals(currSchemaText.get()))
+            {
+                LOGGER.info("Schema change detected, refreshing CDC tables");
+                currSchemaText.set(fullSchemaText);
+                Set<CqlTable> updatedCdcTables = 
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher, 
cassandraBridgeFactory);
+                LOGGER.info("Cdc enabled tables tables='{}'", 
+                            updatedCdcTables.stream()
+                                            .map(m -> String.format("%s.%s", 
m.keyspace(), m.table()))
+                                            .collect(Collectors.joining(",")));
+                cdcTables.set(updatedCdcTables);
+                cdcBridge.updateCdcSchema(updatedCdcTables, 
databaseAccessor.partitioner(),
+                                          ((keyspace, table) -> 
tableIdCache.get(TableIdentifier.of(keyspace, table))));
+                schemaChangeListeners.forEach(Runnable::run);
+            }
+        }
+        catch (IllegalStateException exception)
+        {
+            LOGGER.warn("There was a problem refreshing the schema. Database 
Accessor may not be ready", exception);
+            throw exception;
+        }
+        catch (Throwable t)
+        {
+            LOGGER.error("Unexpected error while refreshing the schema", t);
+            throw t;
+        }
+    }
+
+    public DurationSpec delay()
+    {
+        return tableSchemaRefreshTime;
+    }
+
+    @Override
+    public void execute(Promise<Void> promise)
+    {
+        try
+        {
+            refresh();
+            promise.tryComplete();
+        }
+        catch (Throwable t)
+        {
+            promise.fail(t);
+        }
+    }
+
+    @Override
+    public ScheduleDecision scheduleDecision()
+    {
+        boolean shouldSkip = 
!sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled()
+                             || 
!sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled();
+        return shouldSkip ? ScheduleDecision.SKIP : ScheduleDecision.EXECUTE;

Review Comment:
   NIT: I find this way more readable
   ```suggestion
                   if 
(sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled()
 &&
               
sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled())
           {
               return ScheduleDecision.EXECUTE;
           }
           return ScheduleDecision.SKIP;
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/utils/ByteBufUtils.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.function.Supplier;
+
+/**
+ * Utility class providing byte buffer manipulation and conversion operations 
for Cassandra Sidecar.
+ * <p>
+ * This class offers a comprehensive set of static utility methods for working 
with byte buffers,
+ * input streams, and byte arrays in the context of Cassandra data processing 
and CDC operations.
+ * The utilities are designed to handle common byte manipulation tasks 
including:
+ * <ul>
+ *   <li>ByteBuffer to byte array conversions with optimal performance</li>
+ *   <li>Hexadecimal string representation of binary data</li>
+ *   <li>Stream reading operations with full data consumption guarantees</li>
+ *   <li>Cassandra-specific composite key splitting and building</li>
+ *   <li>Length-prefixed byte buffer operations for data serialization</li>
+ *   <li>UTF-8 string encoding and decoding with error handling</li>
+ * </ul>
+ * <p>
+ * Key features include:
+ * <ul>
+ *   <li><strong>Memory efficiency:</strong> Optimized ByteBuffer handling 
that avoids
+ *       unnecessary copying when dealing with heap vs direct buffers</li>
+ *   <li><strong>Thread-safe string decoding:</strong> Thread-local UTF-8 
decoder
+ *       instances to avoid synchronization overhead</li>
+ *   <li><strong>Cassandra compatibility:</strong> Support for Cassandra 
composite key
+ *       formats including static marker handling</li>
+ *   <li><strong>Robust I/O:</strong> Stream reading methods that handle 
partial reads
+ *       and provide complete data consumption guarantees</li>
+ *   <li><strong>Debugging support:</strong> Hexadecimal representation 
methods for
+ *       binary data inspection and logging</li>
+ * </ul>
+ * <p>
+ * The class is particularly important for CDC operations where efficient byte 
manipulation
+ * is crucial for processing large volumes of change data. It provides the 
low-level
+ * building blocks for serialization, deserialization, and data format 
conversion
+ * operations throughout the sidecar system.
+ * <p>
+ * All methods in this class are static and thread-safe unless otherwise noted.
+ * The class maintains thread-local resources (such as charset decoders) to 
ensure
+ * optimal performance in multi-threaded environments.
+ *
+ * @see java.nio.ByteBuffer
+ * @see java.nio.charset.CharsetDecoder
+ */
+public class ByteBufUtils

Review Comment:
   NIT, this class has a lot of final modifiers that are not necessary and do 
not conform with the Cassandra style guide



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring 
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for 
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time 
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ *   <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ *   <li>Automatic detection and tracking of CDC-enabled tables</li>
+ *   <li>Schema change event notification to registered listeners</li>
+ *   <li>Table metadata caching and synchronization with CDC bridges</li>
+ *   <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ *   <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema 
changes by comparing
+ *       full schema snapshots and updates CDC table metadata when changes are 
detected</li>
+ *   <li><strong>Schema Monitor (49s interval):</strong> Validates that 
CDC-enabled tables
+ *       are properly configured in the Cassandra Schema.instance 
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ *   <li><strong>CDC Table Discovery:</strong> Automatically identifies and 
tracks tables with
+ *       CDC enabled from the cluster schema</li>
+ *   <li><strong>Schema Change Detection:</strong> Compares schema snapshots 
to detect modifications
+ *       and trigger appropriate updates to CDC subsystems</li>
+ *   <li><strong>Bridge Integration:</strong> Synchronizes schema information 
with Cassandra and
+ *       CDC bridges for consistent metadata handling</li>
+ *   <li><strong>Event Notification:</strong> Provides a listener mechanism 
for components that
+ *       need to react to schema changes</li>
+ *   <li><strong>Validation and Monitoring:</strong> Continuously validates 
CDC table configurations
+ *       and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness 
with system load:
+ * <ul>
+ *   <li>60-second refresh interval for schema change detection</li>
+ *   <li>49-second monitor interval (chosen to avoid harmonics with the 60s 
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC 
consumers always have
+ * up-to-date schema information, enabling proper data serialization, 
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency 
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+    // 49sec least-common multiple with 60sec is 49min so offers best monitor 
frequency without clashing with 60sec
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+    private final AtomicReference<String> currSchemaText = new 
AtomicReference<>("");
+    private final AtomicReference<Set<CqlTable>> cdcTables = new 
AtomicReference<>(Collections.emptySet());
+    private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new 
ConcurrentHashMap<>();
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new 
CopyOnWriteArrayList<>();
+    private final SidecarConfiguration sidecarConfiguration;
+    private final InstanceMetadataFetcher instanceFetcher;
+    private final SecondBoundConfiguration tableSchemaRefreshTime;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+
+    public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+                                  CdcDatabaseAccessor databaseAccessor,
+                                  SidecarConfiguration sidecarConfiguration,
+                                  CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+
+        this.instanceFetcher = instanceFetcher;
+        this.databaseAccessor = databaseAccessor;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.tableSchemaRefreshTime = 
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    public void addSchemaChangeListener(Runnable listener)
+    {
+        schemaChangeListeners.add(listener);
+    }
+
+    public void refresh()
+    {
+        NodeSettings nodeSettings = 
instanceFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());
+        CassandraBridge cassandraBridge = 
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+        CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+        try
+        {
+            LOGGER.info("Checking for schema changes...");
+            String fullSchemaText = databaseAccessor.fullSchema();
+            if (!fullSchemaText.equals(currSchemaText.get()))
+            {
+                LOGGER.info("Schema change detected, refreshing CDC tables");
+                currSchemaText.set(fullSchemaText);
+                Set<CqlTable> updatedCdcTables = 
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher, 
cassandraBridgeFactory);
+                LOGGER.info("Cdc enabled tables tables='{}'", 
+                            updatedCdcTables.stream()
+                                            .map(m -> String.format("%s.%s", 
m.keyspace(), m.table()))
+                                            .collect(Collectors.joining(",")));
+                cdcTables.set(updatedCdcTables);
+                cdcBridge.updateCdcSchema(updatedCdcTables, 
databaseAccessor.partitioner(),
+                                          ((keyspace, table) -> 
tableIdCache.get(TableIdentifier.of(keyspace, table))));
+                schemaChangeListeners.forEach(Runnable::run);
+            }
+        }
+        catch (IllegalStateException exception)
+        {
+            LOGGER.warn("There was a problem refreshing the schema. Database 
Accessor may not be ready", exception);
+            throw exception;
+        }
+        catch (Throwable t)
+        {
+            LOGGER.error("Unexpected error while refreshing the schema", t);
+            throw t;
+        }
+    }
+
+    public DurationSpec delay()

Review Comment:
   ```suggestion
       @Override
       public DurationSpec delay()
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring 
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for 
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time 
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ *   <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ *   <li>Automatic detection and tracking of CDC-enabled tables</li>
+ *   <li>Schema change event notification to registered listeners</li>
+ *   <li>Table metadata caching and synchronization with CDC bridges</li>
+ *   <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ *   <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema 
changes by comparing
+ *       full schema snapshots and updates CDC table metadata when changes are 
detected</li>
+ *   <li><strong>Schema Monitor (49s interval):</strong> Validates that 
CDC-enabled tables
+ *       are properly configured in the Cassandra Schema.instance 
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ *   <li><strong>CDC Table Discovery:</strong> Automatically identifies and 
tracks tables with
+ *       CDC enabled from the cluster schema</li>
+ *   <li><strong>Schema Change Detection:</strong> Compares schema snapshots 
to detect modifications
+ *       and trigger appropriate updates to CDC subsystems</li>
+ *   <li><strong>Bridge Integration:</strong> Synchronizes schema information 
with Cassandra and
+ *       CDC bridges for consistent metadata handling</li>
+ *   <li><strong>Event Notification:</strong> Provides a listener mechanism 
for components that
+ *       need to react to schema changes</li>
+ *   <li><strong>Validation and Monitoring:</strong> Continuously validates 
CDC table configurations
+ *       and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness 
with system load:
+ * <ul>
+ *   <li>60-second refresh interval for schema change detection</li>
+ *   <li>49-second monitor interval (chosen to avoid harmonics with the 60s 
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC 
consumers always have
+ * up-to-date schema information, enabling proper data serialization, 
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency 
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+    // 49sec least-common multiple with 60sec is 49min so offers best monitor 
frequency without clashing with 60sec
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+    private final AtomicReference<String> currSchemaText = new 
AtomicReference<>("");
+    private final AtomicReference<Set<CqlTable>> cdcTables = new 
AtomicReference<>(Collections.emptySet());
+    private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new 
ConcurrentHashMap<>();
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new 
CopyOnWriteArrayList<>();
+    private final SidecarConfiguration sidecarConfiguration;
+    private final InstanceMetadataFetcher instanceFetcher;
+    private final SecondBoundConfiguration tableSchemaRefreshTime;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+
+    public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+                                  CdcDatabaseAccessor databaseAccessor,
+                                  SidecarConfiguration sidecarConfiguration,
+                                  CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+
+        this.instanceFetcher = instanceFetcher;
+        this.databaseAccessor = databaseAccessor;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.tableSchemaRefreshTime = 
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    public void addSchemaChangeListener(Runnable listener)
+    {
+        schemaChangeListeners.add(listener);
+    }
+
+    public void refresh()
+    {
+        NodeSettings nodeSettings = 
instanceFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());
+        CassandraBridge cassandraBridge = 
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+        CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+        try
+        {
+            LOGGER.info("Checking for schema changes...");
+            String fullSchemaText = databaseAccessor.fullSchema();
+            if (!fullSchemaText.equals(currSchemaText.get()))
+            {
+                LOGGER.info("Schema change detected, refreshing CDC tables");
+                currSchemaText.set(fullSchemaText);
+                Set<CqlTable> updatedCdcTables = 
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher, 
cassandraBridgeFactory);
+                LOGGER.info("Cdc enabled tables tables='{}'", 
+                            updatedCdcTables.stream()
+                                            .map(m -> String.format("%s.%s", 
m.keyspace(), m.table()))
+                                            .collect(Collectors.joining(",")));
+                cdcTables.set(updatedCdcTables);
+                cdcBridge.updateCdcSchema(updatedCdcTables, 
databaseAccessor.partitioner(),

Review Comment:
   I don't think this `databaseAccessor.partitioner()` is required at all. We 
already have the partitioner information as a string in 
`nodeSettings.partitioner()`, with the caveat that we will need to check if the 
string has `.` and if it does we will need to do the logic we have inside 
`databaseAccessor.partitioner()`
   ```suggestion
                   cdcBridge.updateCdcSchema(updatedCdcTables, 
Partitioner.valueOf(nodeSettings.partitioner()),
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring 
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for 
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time 
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ *   <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ *   <li>Automatic detection and tracking of CDC-enabled tables</li>
+ *   <li>Schema change event notification to registered listeners</li>
+ *   <li>Table metadata caching and synchronization with CDC bridges</li>
+ *   <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ *   <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema 
changes by comparing
+ *       full schema snapshots and updates CDC table metadata when changes are 
detected</li>
+ *   <li><strong>Schema Monitor (49s interval):</strong> Validates that 
CDC-enabled tables
+ *       are properly configured in the Cassandra Schema.instance 
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ *   <li><strong>CDC Table Discovery:</strong> Automatically identifies and 
tracks tables with
+ *       CDC enabled from the cluster schema</li>
+ *   <li><strong>Schema Change Detection:</strong> Compares schema snapshots 
to detect modifications
+ *       and trigger appropriate updates to CDC subsystems</li>
+ *   <li><strong>Bridge Integration:</strong> Synchronizes schema information 
with Cassandra and
+ *       CDC bridges for consistent metadata handling</li>
+ *   <li><strong>Event Notification:</strong> Provides a listener mechanism 
for components that
+ *       need to react to schema changes</li>
+ *   <li><strong>Validation and Monitoring:</strong> Continuously validates 
CDC table configurations
+ *       and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness 
with system load:
+ * <ul>
+ *   <li>60-second refresh interval for schema change detection</li>
+ *   <li>49-second monitor interval (chosen to avoid harmonics with the 60s 
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC 
consumers always have
+ * up-to-date schema information, enabling proper data serialization, 
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency 
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+    // 49sec least-common multiple with 60sec is 49min so offers best monitor 
frequency without clashing with 60sec
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+    private final AtomicReference<String> currSchemaText = new 
AtomicReference<>("");
+    private final AtomicReference<Set<CqlTable>> cdcTables = new 
AtomicReference<>(Collections.emptySet());
+    private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new 
ConcurrentHashMap<>();
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new 
CopyOnWriteArrayList<>();
+    private final SidecarConfiguration sidecarConfiguration;
+    private final InstanceMetadataFetcher instanceFetcher;
+    private final SecondBoundConfiguration tableSchemaRefreshTime;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+
+    public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+                                  CdcDatabaseAccessor databaseAccessor,
+                                  SidecarConfiguration sidecarConfiguration,
+                                  CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+
+        this.instanceFetcher = instanceFetcher;
+        this.databaseAccessor = databaseAccessor;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.tableSchemaRefreshTime = 
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    public void addSchemaChangeListener(Runnable listener)
+    {
+        schemaChangeListeners.add(listener);
+    }
+
+    public void refresh()
+    {
+        NodeSettings nodeSettings = 
instanceFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());
+        CassandraBridge cassandraBridge = 
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+        CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+        try
+        {
+            LOGGER.info("Checking for schema changes...");
+            String fullSchemaText = databaseAccessor.fullSchema();
+            if (!fullSchemaText.equals(currSchemaText.get()))
+            {
+                LOGGER.info("Schema change detected, refreshing CDC tables");
+                currSchemaText.set(fullSchemaText);
+                Set<CqlTable> updatedCdcTables = 
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher, 
cassandraBridgeFactory);
+                LOGGER.info("Cdc enabled tables tables='{}'", 
+                            updatedCdcTables.stream()
+                                            .map(m -> String.format("%s.%s", 
m.keyspace(), m.table()))
+                                            .collect(Collectors.joining(",")));
+                cdcTables.set(updatedCdcTables);
+                cdcBridge.updateCdcSchema(updatedCdcTables, 
databaseAccessor.partitioner(),
+                                          ((keyspace, table) -> 
tableIdCache.get(TableIdentifier.of(keyspace, table))));
+                schemaChangeListeners.forEach(Runnable::run);
+            }
+        }
+        catch (IllegalStateException exception)
+        {
+            LOGGER.warn("There was a problem refreshing the schema. Database 
Accessor may not be ready", exception);
+            throw exception;
+        }
+        catch (Throwable t)
+        {
+            LOGGER.error("Unexpected error while refreshing the schema", t);
+            throw t;
+        }
+    }
+
+    public DurationSpec delay()
+    {
+        return tableSchemaRefreshTime;
+    }
+
+    @Override
+    public void execute(Promise<Void> promise)
+    {
+        try
+        {
+            refresh();
+            promise.tryComplete();
+        }
+        catch (Throwable t)
+        {
+            promise.fail(t);
+        }
+    }
+
+    @Override
+    public ScheduleDecision scheduleDecision()
+    {
+        boolean shouldSkip = 
!sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled()
+                             || 
!sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled();
+        return shouldSkip ? ScheduleDecision.SKIP : ScheduleDecision.EXECUTE;
+    }
+
+    @VisibleForTesting
+    static Set<CqlTable> buildCdcTables(CdcDatabaseAccessor 
cdcDatabaseAccessor,
+                                        ConcurrentHashMap<TableIdentifier, 
UUID> tableIdCache,
+                                        @NotNull InstanceMetadataFetcher 
instanceFetcher,
+                                        @NotNull CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+        return buildCdcTables(cdcDatabaseAccessor.fullSchema(),
+                              cdcDatabaseAccessor.partitioner(),
+                              tableIdCache,
+                              cdcDatabaseAccessor::getTableId,
+                              instanceFetcher,
+                              cassandraBridgeFactory);
+    }
+
+    private static Set<CqlTable> buildCdcTables(@NotNull String fullSchema,
+                                                @NotNull CdcDatabaseAccessor 
cdcDatabaseAccessor,
+                                                @NotNull 
ConcurrentHashMap<TableIdentifier, UUID> tableIdCache,
+                                                @NotNull 
InstanceMetadataFetcher instanceFetcher,
+                                                @NotNull 
CassandraBridgeFactory cassandraBridgeFactory)
+    {
+        return buildCdcTables(fullSchema,
+                              cdcDatabaseAccessor.partitioner(),
+                              tableIdCache,
+                              cdcDatabaseAccessor::getTableId,
+                              instanceFetcher,
+                              cassandraBridgeFactory);
+    }
+
+    private static Set<CqlTable> buildCdcTables(@NotNull final String 
fullSchema,
+                                                @NotNull final Partitioner 
partitioner,
+                                                @NotNull final 
ConcurrentHashMap<TableIdentifier, UUID> tableIdCache,
+                                                @NotNull final 
Function<TableIdentifier, UUID> tableIdLoaderFunction,
+                                                @NotNull final 
InstanceMetadataFetcher instanceFetcher,
+                                                @NotNull final 
CassandraBridgeFactory cassandraBridgeFactory)
+    {
+        Map<TableIdentifier, String> createStmts = 
CdcUtil.extractCdcTables(fullSchema);
+        Map<String, Set<String>> udtsPerKeyspace = createStmts.keySet()
+                                                              .stream()
+                                                              
.map(TableIdentifier::keyspace)
+                                                              .distinct() // 
remove duplicated keyspace strings
+                                                              
.collect(Collectors.toMap(Function.identity(),
+                                                                               
         keyspace -> CqlUtils.extractUdts(fullSchema, keyspace)));
+
+        Map<TableIdentifier, UUID> tableIds = createStmts.keySet()
+                                                         .stream()
+                                                         
.collect(Collectors.toMap(Function.identity(),
+                                                                               
    id -> tableIdCache.computeIfAbsent(id, tableIdLoaderFunction)));
+
+        NodeSettings nodeSettings = 
instanceFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());

Review Comment:
   We already have the nodeSettings in the callsite for this method, let's just 
use that



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring 
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for 
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time 
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ *   <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ *   <li>Automatic detection and tracking of CDC-enabled tables</li>
+ *   <li>Schema change event notification to registered listeners</li>
+ *   <li>Table metadata caching and synchronization with CDC bridges</li>
+ *   <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ *   <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema 
changes by comparing
+ *       full schema snapshots and updates CDC table metadata when changes are 
detected</li>
+ *   <li><strong>Schema Monitor (49s interval):</strong> Validates that 
CDC-enabled tables
+ *       are properly configured in the Cassandra Schema.instance 
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ *   <li><strong>CDC Table Discovery:</strong> Automatically identifies and 
tracks tables with
+ *       CDC enabled from the cluster schema</li>
+ *   <li><strong>Schema Change Detection:</strong> Compares schema snapshots 
to detect modifications
+ *       and trigger appropriate updates to CDC subsystems</li>
+ *   <li><strong>Bridge Integration:</strong> Synchronizes schema information 
with Cassandra and
+ *       CDC bridges for consistent metadata handling</li>
+ *   <li><strong>Event Notification:</strong> Provides a listener mechanism 
for components that
+ *       need to react to schema changes</li>
+ *   <li><strong>Validation and Monitoring:</strong> Continuously validates 
CDC table configurations
+ *       and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness 
with system load:
+ * <ul>
+ *   <li>60-second refresh interval for schema change detection</li>
+ *   <li>49-second monitor interval (chosen to avoid harmonics with the 60s 
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC 
consumers always have
+ * up-to-date schema information, enabling proper data serialization, 
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency 
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+    // 49sec least-common multiple with 60sec is 49min so offers best monitor 
frequency without clashing with 60sec
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+    private final AtomicReference<String> currSchemaText = new 
AtomicReference<>("");
+    private final AtomicReference<Set<CqlTable>> cdcTables = new 
AtomicReference<>(Collections.emptySet());
+    private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new 
ConcurrentHashMap<>();
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new 
CopyOnWriteArrayList<>();
+    private final SidecarConfiguration sidecarConfiguration;
+    private final InstanceMetadataFetcher instanceFetcher;
+    private final SecondBoundConfiguration tableSchemaRefreshTime;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+
+    public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+                                  CdcDatabaseAccessor databaseAccessor,
+                                  SidecarConfiguration sidecarConfiguration,
+                                  CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+
+        this.instanceFetcher = instanceFetcher;
+        this.databaseAccessor = databaseAccessor;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.tableSchemaRefreshTime = 
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    public void addSchemaChangeListener(Runnable listener)
+    {
+        schemaChangeListeners.add(listener);
+    }
+
+    public void refresh()
+    {
+        NodeSettings nodeSettings = 
instanceFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());
+        CassandraBridge cassandraBridge = 
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+        CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+        try
+        {
+            LOGGER.info("Checking for schema changes...");
+            String fullSchemaText = databaseAccessor.fullSchema();
+            if (!fullSchemaText.equals(currSchemaText.get()))
+            {
+                LOGGER.info("Schema change detected, refreshing CDC tables");
+                currSchemaText.set(fullSchemaText);
+                Set<CqlTable> updatedCdcTables = 
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher, 
cassandraBridgeFactory);
+                LOGGER.info("Cdc enabled tables tables='{}'", 
+                            updatedCdcTables.stream()
+                                            .map(m -> String.format("%s.%s", 
m.keyspace(), m.table()))
+                                            .collect(Collectors.joining(",")));
+                cdcTables.set(updatedCdcTables);
+                cdcBridge.updateCdcSchema(updatedCdcTables, 
databaseAccessor.partitioner(),

Review Comment:
   and of course Partitioner is in spark, so we wouldn't be able to change it 
here. So we'll need to do the check before calling valueOF



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring 
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for 
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time 
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ *   <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ *   <li>Automatic detection and tracking of CDC-enabled tables</li>
+ *   <li>Schema change event notification to registered listeners</li>
+ *   <li>Table metadata caching and synchronization with CDC bridges</li>
+ *   <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ *   <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema 
changes by comparing
+ *       full schema snapshots and updates CDC table metadata when changes are 
detected</li>
+ *   <li><strong>Schema Monitor (49s interval):</strong> Validates that 
CDC-enabled tables
+ *       are properly configured in the Cassandra Schema.instance 
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ *   <li><strong>CDC Table Discovery:</strong> Automatically identifies and 
tracks tables with
+ *       CDC enabled from the cluster schema</li>
+ *   <li><strong>Schema Change Detection:</strong> Compares schema snapshots 
to detect modifications
+ *       and trigger appropriate updates to CDC subsystems</li>
+ *   <li><strong>Bridge Integration:</strong> Synchronizes schema information 
with Cassandra and
+ *       CDC bridges for consistent metadata handling</li>
+ *   <li><strong>Event Notification:</strong> Provides a listener mechanism 
for components that
+ *       need to react to schema changes</li>
+ *   <li><strong>Validation and Monitoring:</strong> Continuously validates 
CDC table configurations
+ *       and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness 
with system load:
+ * <ul>
+ *   <li>60-second refresh interval for schema change detection</li>
+ *   <li>49-second monitor interval (chosen to avoid harmonics with the 60s 
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC 
consumers always have
+ * up-to-date schema information, enabling proper data serialization, 
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency 
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+    // 49sec least-common multiple with 60sec is 49min so offers best monitor 
frequency without clashing with 60sec
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+    private final AtomicReference<String> currSchemaText = new 
AtomicReference<>("");
+    private final AtomicReference<Set<CqlTable>> cdcTables = new 
AtomicReference<>(Collections.emptySet());
+    private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new 
ConcurrentHashMap<>();
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new 
CopyOnWriteArrayList<>();
+    private final SidecarConfiguration sidecarConfiguration;
+    private final InstanceMetadataFetcher instanceFetcher;
+    private final SecondBoundConfiguration tableSchemaRefreshTime;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+
+    public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+                                  CdcDatabaseAccessor databaseAccessor,
+                                  SidecarConfiguration sidecarConfiguration,
+                                  CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+
+        this.instanceFetcher = instanceFetcher;
+        this.databaseAccessor = databaseAccessor;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.tableSchemaRefreshTime = 
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    public void addSchemaChangeListener(Runnable listener)
+    {
+        schemaChangeListeners.add(listener);
+    }
+
+    public void refresh()
+    {
+        NodeSettings nodeSettings = 
instanceFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());
+        CassandraBridge cassandraBridge = 
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+        CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+        try
+        {
+            LOGGER.info("Checking for schema changes...");
+            String fullSchemaText = databaseAccessor.fullSchema();
+            if (!fullSchemaText.equals(currSchemaText.get()))
+            {
+                LOGGER.info("Schema change detected, refreshing CDC tables");
+                currSchemaText.set(fullSchemaText);
+                Set<CqlTable> updatedCdcTables = 
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher, 
cassandraBridgeFactory);
+                LOGGER.info("Cdc enabled tables tables='{}'", 
+                            updatedCdcTables.stream()
+                                            .map(m -> String.format("%s.%s", 
m.keyspace(), m.table()))
+                                            .collect(Collectors.joining(",")));
+                cdcTables.set(updatedCdcTables);
+                cdcBridge.updateCdcSchema(updatedCdcTables, 
databaseAccessor.partitioner(),
+                                          ((keyspace, table) -> 
tableIdCache.get(TableIdentifier.of(keyspace, table))));
+                schemaChangeListeners.forEach(Runnable::run);
+            }
+        }
+        catch (IllegalStateException exception)
+        {
+            LOGGER.warn("There was a problem refreshing the schema. Database 
Accessor may not be ready", exception);
+            throw exception;
+        }
+        catch (Throwable t)
+        {
+            LOGGER.error("Unexpected error while refreshing the schema", t);
+            throw t;
+        }
+    }
+
+    public DurationSpec delay()
+    {
+        return tableSchemaRefreshTime;
+    }
+
+    @Override
+    public void execute(Promise<Void> promise)
+    {
+        try
+        {
+            refresh();
+            promise.tryComplete();
+        }
+        catch (Throwable t)
+        {
+            promise.fail(t);
+        }
+    }
+
+    @Override
+    public ScheduleDecision scheduleDecision()
+    {
+        boolean shouldSkip = 
!sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled()
+                             || 
!sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled();
+        return shouldSkip ? ScheduleDecision.SKIP : ScheduleDecision.EXECUTE;
+    }
+
+    @VisibleForTesting
+    static Set<CqlTable> buildCdcTables(CdcDatabaseAccessor 
cdcDatabaseAccessor,
+                                        ConcurrentHashMap<TableIdentifier, 
UUID> tableIdCache,
+                                        @NotNull InstanceMetadataFetcher 
instanceFetcher,
+                                        @NotNull CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+        return buildCdcTables(cdcDatabaseAccessor.fullSchema(),
+                              cdcDatabaseAccessor.partitioner(),
+                              tableIdCache,
+                              cdcDatabaseAccessor::getTableId,
+                              instanceFetcher,
+                              cassandraBridgeFactory);
+    }
+
+    private static Set<CqlTable> buildCdcTables(@NotNull String fullSchema,
+                                                @NotNull CdcDatabaseAccessor 
cdcDatabaseAccessor,
+                                                @NotNull 
ConcurrentHashMap<TableIdentifier, UUID> tableIdCache,
+                                                @NotNull 
InstanceMetadataFetcher instanceFetcher,
+                                                @NotNull 
CassandraBridgeFactory cassandraBridgeFactory)
+    {
+        return buildCdcTables(fullSchema,
+                              cdcDatabaseAccessor.partitioner(),
+                              tableIdCache,
+                              cdcDatabaseAccessor::getTableId,
+                              instanceFetcher,
+                              cassandraBridgeFactory);
+    }
+
+    private static Set<CqlTable> buildCdcTables(@NotNull final String 
fullSchema,
+                                                @NotNull final Partitioner 
partitioner,
+                                                @NotNull final 
ConcurrentHashMap<TableIdentifier, UUID> tableIdCache,
+                                                @NotNull final 
Function<TableIdentifier, UUID> tableIdLoaderFunction,
+                                                @NotNull final 
InstanceMetadataFetcher instanceFetcher,
+                                                @NotNull final 
CassandraBridgeFactory cassandraBridgeFactory)
+    {
+        Map<TableIdentifier, String> createStmts = 
CdcUtil.extractCdcTables(fullSchema);
+        Map<String, Set<String>> udtsPerKeyspace = createStmts.keySet()
+                                                              .stream()
+                                                              
.map(TableIdentifier::keyspace)
+                                                              .distinct() // 
remove duplicated keyspace strings
+                                                              
.collect(Collectors.toMap(Function.identity(),
+                                                                               
         keyspace -> CqlUtils.extractUdts(fullSchema, keyspace)));
+
+        Map<TableIdentifier, UUID> tableIds = createStmts.keySet()
+                                                         .stream()
+                                                         
.collect(Collectors.toMap(Function.identity(),
+                                                                               
    id -> tableIdCache.computeIfAbsent(id, tableIdLoaderFunction)));
+
+        NodeSettings nodeSettings = 
instanceFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());

Review Comment:
   as a matter of fact, we have the bridge, we should just pass the bridge



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to