frankgh commented on code in PR #251:
URL: https://github.com/apache/cassandra-sidecar/pull/251#discussion_r2331570652


##########
server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableHistorySchema.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db.schema;
+
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Schema definition and management for tracking table schema evolution 
history.
+ * <p>
+ * This class extends {@link TableSchema} to provide specialized schema 
management for storing
+ * historical versions of table schemas in Cassandra Sidecar. The table schema 
history tracking
+ * is essential for CDC operations and data consistency, enabling:
+ * <ul>
+ *   <li>Schema version tracking for CDC-enabled tables across time</li>
+ *   <li>Historical schema retrieval for data processing and compatibility 
checks</li>
+ *   <li>Schema evolution auditing and change management</li>
+ *   <li>Version-aware data processing in CDC pipelines</li>
+ * </ul>
+ * <p>
+ * The table schema is designed with the following characteristics:
+ * <ul>
+ *   <li><strong>Partitioning:</strong> Data is partitioned by keyspace 
({@code ks}) and 
+ *       table name ({@code tb}) to organize schemas by table identity</li>
+ *   <li><strong>Clustering:</strong> Ordered by schema version ({@code 
version}) to enable
+ *       chronological access to schema changes</li>
+ *   <li><strong>Versioning:</strong> Uses UUID-based versioning for unique 
schema identification</li>
+ *   <li><strong>Timestamping:</strong> Automatic creation timestamp tracking 
with {@code created_at}</li>
+ * </ul>
+ * <p>
+ * The table structure includes:
+ * <pre>{@code
+ * CREATE TABLE table_schema_history (
+ *   ks text,                    -- Keyspace name
+ *   tb text,                    -- Table name
+ *   version uuid,               -- Schema version identifier
+ *   created_at timeuuid,        -- Schema creation timestamp
+ *   table_schema text,          -- Complete table schema DDL
+ *   PRIMARY KEY ((ks, tb), version)
+ * )
+ * }</pre>
+ * <p>
+ * This schema supports CDC operations by maintaining a complete history of 
table schemas,
+ * allowing CDC consumers to process data with the correct schema version that 
was active
+ * when the data was written. This is crucial for maintaining data integrity 
across schema
+ * evolution in long-running CDC pipelines.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency 
injection into
+ * components that require table schema history access.
+ *
+ * @see TableSchema
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ */
+@Singleton
+public class TableHistorySchema extends TableSchema

Review Comment:
   This schema should only be initialized on the lease holder
   ```suggestion
   public class TableHistorySchema extends TableSchema implements 
ExecuteOnClusterLeaseholderOnly
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring 
and CDC table tracking.
+ * This class provides comprehensive schema management functionality for 
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time 
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata.
+ */
+@Singleton
+public class CassandraClusterSchemaMonitor implements PeriodicTask
+{
+    // 49sec least-common multiple with 60sec is 49min so offers best monitor 
frequency without clashing with 60sec
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterSchemaMonitor.class);
+
+    private final AtomicReference<String> currSchemaText = new 
AtomicReference<>("");
+    private final AtomicReference<Set<CqlTable>> cdcTables = new 
AtomicReference<>(Collections.emptySet());
+    private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new 
ConcurrentHashMap<>();
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new 
CopyOnWriteArrayList<>();
+    private final SidecarConfiguration sidecarConfiguration;
+    private final InstanceMetadataFetcher instanceFetcher;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+
+    public CassandraClusterSchemaMonitor(InstanceMetadataFetcher 
instanceFetcher,
+                                         CdcDatabaseAccessor databaseAccessor,
+                                         SidecarConfiguration 
sidecarConfiguration,
+                                         CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+
+        this.instanceFetcher = instanceFetcher;
+        this.databaseAccessor = databaseAccessor;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    public void addSchemaChangeListener(Runnable listener)
+    {
+        schemaChangeListeners.add(listener);
+    }
+
+    public void refresh()
+    {
+        NodeSettings nodeSettings = 
instanceFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());
+        CassandraBridge cassandraBridge = 
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+        CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+        try
+        {
+            LOGGER.debug("Checking for schema changes...");
+            String fullSchemaText = databaseAccessor.fullSchema();
+            if (!fullSchemaText.equals(currSchemaText.get()))
+            {
+                LOGGER.info("Schema change detected, refreshing CDC tables");
+                currSchemaText.set(fullSchemaText);
+                Set<CqlTable> updatedCdcTables = 
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher, 
cassandraBridge);
+                LOGGER.info("Cdc enabled tables tables='{}'", 
+                            updatedCdcTables.stream()
+                                            .map(m -> String.format("%s.%s", 
m.keyspace(), m.table()))
+                                            .collect(Collectors.joining(",")));
+                cdcTables.set(updatedCdcTables);
+
+                cdcBridge.updateCdcSchema(updatedCdcTables, 
getPartitioner(nodeSettings),
+                                          ((keyspace, table) -> 
tableIdCache.get(TableIdentifier.of(keyspace, table))));
+                schemaChangeListeners.forEach(Runnable::run);
+            }
+        }
+        catch (IllegalStateException exception)
+        {
+            LOGGER.warn("There was a problem refreshing the schema. Database 
Accessor may not be ready", exception);
+            throw exception;
+        }
+        catch (Throwable t)
+        {
+            LOGGER.error("Unexpected error while refreshing the schema", t);
+            throw t;
+        }
+    }
+
+    private Partitioner getPartitioner(NodeSettings nodeSettings)
+    {
+        if (nodeSettings.partitioner().contains("."))
+        {
+            String[] splitPartitionerName = 
nodeSettings.partitioner().split("\\.");
+            return 
Partitioner.valueOf(splitPartitionerName[splitPartitionerName.length - 1]);
+        }
+        return Partitioner.valueOf(nodeSettings.partitioner());
+    }
+
+    @Override
+    public DurationSpec delay()
+    {
+        return 
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+    }
+
+    @Override
+    public void execute(Promise<Void> promise)
+    {
+        try
+        {
+            refresh();
+            promise.tryComplete();
+        }
+        catch (Throwable t)
+        {
+            promise.fail(t);
+        }
+    }
+
+    @Override
+    public ScheduleDecision scheduleDecision()
+    {
+        if 
(sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled()
 &&
+            
sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled())
+        {
+            return ScheduleDecision.EXECUTE;
+        }
+        return ScheduleDecision.SKIP;
+    }
+
+    @VisibleForTesting
+    static Set<CqlTable> buildCdcTables(CdcDatabaseAccessor 
cdcDatabaseAccessor,
+                                        ConcurrentHashMap<TableIdentifier, 
UUID> tableIdCache,
+                                        @NotNull InstanceMetadataFetcher 
instanceFetcher,
+                                        @NotNull final CassandraBridge 
cassandraBridge)
+    {
+        return buildCdcTables(cdcDatabaseAccessor.fullSchema(),
+                              cdcDatabaseAccessor.partitioner(),
+                              tableIdCache,
+                              cdcDatabaseAccessor::getTableId,
+                              instanceFetcher,
+                              cassandraBridge);
+    }
+
+    private static Set<CqlTable> buildCdcTables(@NotNull String fullSchema,
+                                                @NotNull CdcDatabaseAccessor 
cdcDatabaseAccessor,
+                                                @NotNull 
ConcurrentHashMap<TableIdentifier, UUID> tableIdCache,
+                                                @NotNull 
InstanceMetadataFetcher instanceFetcher,
+                                                @NotNull final CassandraBridge 
cassandraBridge)

Review Comment:
   instanceFetcher is no longer needed
   ```suggestion
       private static Set<CqlTable> buildCdcTables(@NotNull String fullSchema,
                                                   @NotNull CdcDatabaseAccessor 
cdcDatabaseAccessor,
                                                   @NotNull 
ConcurrentHashMap<TableIdentifier, UUID> tableIdCache,
                                                   @NotNull CassandraBridge 
cassandraBridge)
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring 
and CDC table tracking.
+ * This class provides comprehensive schema management functionality for 
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time 
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata.
+ */
+@Singleton
+public class CassandraClusterSchemaMonitor implements PeriodicTask
+{
+    // 49sec least-common multiple with 60sec is 49min so offers best monitor 
frequency without clashing with 60sec
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterSchemaMonitor.class);
+
+    private final AtomicReference<String> currSchemaText = new 
AtomicReference<>("");
+    private final AtomicReference<Set<CqlTable>> cdcTables = new 
AtomicReference<>(Collections.emptySet());
+    private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new 
ConcurrentHashMap<>();
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new 
CopyOnWriteArrayList<>();
+    private final SidecarConfiguration sidecarConfiguration;
+    private final InstanceMetadataFetcher instanceFetcher;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+
+    public CassandraClusterSchemaMonitor(InstanceMetadataFetcher 
instanceFetcher,
+                                         CdcDatabaseAccessor databaseAccessor,
+                                         SidecarConfiguration 
sidecarConfiguration,
+                                         CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+
+        this.instanceFetcher = instanceFetcher;
+        this.databaseAccessor = databaseAccessor;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    public void addSchemaChangeListener(Runnable listener)
+    {
+        schemaChangeListeners.add(listener);
+    }
+
+    public void refresh()
+    {
+        NodeSettings nodeSettings = 
instanceFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());
+        CassandraBridge cassandraBridge = 
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+        CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+        try
+        {
+            LOGGER.debug("Checking for schema changes...");
+            String fullSchemaText = databaseAccessor.fullSchema();
+            if (!fullSchemaText.equals(currSchemaText.get()))
+            {
+                LOGGER.info("Schema change detected, refreshing CDC tables");
+                currSchemaText.set(fullSchemaText);
+                Set<CqlTable> updatedCdcTables = 
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher, 
cassandraBridge);
+                LOGGER.info("Cdc enabled tables tables='{}'", 
+                            updatedCdcTables.stream()
+                                            .map(m -> String.format("%s.%s", 
m.keyspace(), m.table()))
+                                            .collect(Collectors.joining(",")));
+                cdcTables.set(updatedCdcTables);
+
+                cdcBridge.updateCdcSchema(updatedCdcTables, 
getPartitioner(nodeSettings),
+                                          ((keyspace, table) -> 
tableIdCache.get(TableIdentifier.of(keyspace, table))));
+                schemaChangeListeners.forEach(Runnable::run);
+            }
+        }
+        catch (IllegalStateException exception)
+        {
+            LOGGER.warn("There was a problem refreshing the schema. Database 
Accessor may not be ready", exception);
+            throw exception;
+        }
+        catch (Throwable t)
+        {
+            LOGGER.error("Unexpected error while refreshing the schema", t);
+            throw t;
+        }
+    }
+
+    private Partitioner getPartitioner(NodeSettings nodeSettings)
+    {
+        if (nodeSettings.partitioner().contains("."))
+        {
+            String[] splitPartitionerName = 
nodeSettings.partitioner().split("\\.");
+            return 
Partitioner.valueOf(splitPartitionerName[splitPartitionerName.length - 1]);
+        }
+        return Partitioner.valueOf(nodeSettings.partitioner());
+    }
+
+    @Override
+    public DurationSpec delay()
+    {
+        return 
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+    }
+
+    @Override
+    public void execute(Promise<Void> promise)
+    {
+        try
+        {
+            refresh();
+            promise.tryComplete();
+        }
+        catch (Throwable t)
+        {
+            promise.fail(t);
+        }
+    }
+
+    @Override
+    public ScheduleDecision scheduleDecision()
+    {
+        if 
(sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled()
 &&
+            
sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled())
+        {
+            return ScheduleDecision.EXECUTE;
+        }
+        return ScheduleDecision.SKIP;
+    }
+
+    @VisibleForTesting
+    static Set<CqlTable> buildCdcTables(CdcDatabaseAccessor 
cdcDatabaseAccessor,
+                                        ConcurrentHashMap<TableIdentifier, 
UUID> tableIdCache,
+                                        @NotNull InstanceMetadataFetcher 
instanceFetcher,
+                                        @NotNull final CassandraBridge 
cassandraBridge)

Review Comment:
   instanceFetcher is no longer needed
   ```suggestion
       static Set<CqlTable> buildCdcTables(CdcDatabaseAccessor 
cdcDatabaseAccessor,
                                           ConcurrentHashMap<TableIdentifier, 
UUID> tableIdCache,
                                           @NotNull CassandraBridge 
cassandraBridge)
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/db/schema/CdcStatesSchema.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db.schema;
+
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Schema definition and management for the CDC (Change Data Capture) state 
persistence table.
+ * <p>
+ * This class extends {@link TableSchema} to provide specialized schema 
management for storing
+ * and retrieving CDC state data in Cassandra Sidecar. The CDC state table is 
used to persist
+ * the current processing state of CDC consumers across token ranges, enabling:
+ * <ul>
+ *   <li>State persistence for fault tolerance and resumption after 
restarts</li>
+ *   <li>Distributed processing coordination across multiple CDC consumers</li>
+ *   <li>Token range-aware state storage for scalable CDC operations</li>
+ *   <li>Time-based data retention through TTL configuration</li>
+ * </ul>
+ * <p>
+ * The table schema is designed with the following key characteristics:
+ * <ul>
+ *   <li><strong>Partitioning:</strong> Data is partitioned by {@code job_id} 
and {@code split}
+ *       to distribute CDC state across multiple partitions for 
scalability</li>
+ *   <li><strong>Clustering:</strong> Ordered by token range boundaries 
({@code start}, {@code end})
+ *       to enable efficient range queries</li>
+ *   <li><strong>TTL:</strong> Configurable time-to-live (default 30 days) for 
automatic cleanup
+ *       of old CDC state data</li>
+ *   <li><strong>Compaction:</strong> Uses TimeWindowCompactionStrategy 
optimized for time-series
+ *       data patterns</li>
+ * </ul>
+ * <p>
+ * The table structure includes:
+ * <pre>{@code
+ * CREATE TABLE cdc_state_v2 (
+ *   job_id text,           -- CDC job identifier
+ *   split smallint,        -- Token range split identifier
+ *   start varint,          -- Token range start boundary
+ *   end varint,            -- Token range end boundary
+ *   state blob,            -- Serialized CDC state data
+ *   PRIMARY KEY ((job_id, split), start, end)
+ * )
+ * }</pre>
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency 
injection into
+ * components that require CDC state table access.
+ *
+ * @see TableSchema
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ */
+@Singleton
+public class CdcStatesSchema extends TableSchema

Review Comment:
   should only be initialized in the cluster lease holder
   ```suggestion
   public class CdcStatesSchema extends TableSchema implements 
ExecuteOnClusterLeaseholderOnly
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableHistorySchema.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db.schema;
+
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Schema definition and management for tracking table schema evolution 
history.
+ * <p>
+ * This class extends {@link TableSchema} to provide specialized schema 
management for storing
+ * historical versions of table schemas in Cassandra Sidecar. The table schema 
history tracking
+ * is essential for CDC operations and data consistency, enabling:
+ * <ul>
+ *   <li>Schema version tracking for CDC-enabled tables across time</li>
+ *   <li>Historical schema retrieval for data processing and compatibility 
checks</li>
+ *   <li>Schema evolution auditing and change management</li>
+ *   <li>Version-aware data processing in CDC pipelines</li>
+ * </ul>
+ * <p>
+ * The table schema is designed with the following characteristics:
+ * <ul>
+ *   <li><strong>Partitioning:</strong> Data is partitioned by keyspace 
({@code ks}) and 
+ *       table name ({@code tb}) to organize schemas by table identity</li>
+ *   <li><strong>Clustering:</strong> Ordered by schema version ({@code 
version}) to enable
+ *       chronological access to schema changes</li>
+ *   <li><strong>Versioning:</strong> Uses UUID-based versioning for unique 
schema identification</li>
+ *   <li><strong>Timestamping:</strong> Automatic creation timestamp tracking 
with {@code created_at}</li>
+ * </ul>
+ * <p>
+ * The table structure includes:
+ * <pre>{@code
+ * CREATE TABLE table_schema_history (
+ *   ks text,                    -- Keyspace name
+ *   tb text,                    -- Table name
+ *   version uuid,               -- Schema version identifier
+ *   created_at timeuuid,        -- Schema creation timestamp
+ *   table_schema text,          -- Complete table schema DDL
+ *   PRIMARY KEY ((ks, tb), version)
+ * )
+ * }</pre>
+ * <p>
+ * This schema supports CDC operations by maintaining a complete history of 
table schemas,
+ * allowing CDC consumers to process data with the correct schema version that 
was active
+ * when the data was written. This is crucial for maintaining data integrity 
across schema
+ * evolution in long-running CDC pipelines.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency 
injection into
+ * components that require table schema history access.
+ *
+ * @see TableSchema
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ */
+@Singleton
+public class TableHistorySchema extends TableSchema
+{
+    private static final String TABLE_SCHEMA_HISTORY = "table_schema_history";
+
+    private final SchemaKeyspaceConfiguration keyspaceConfig;
+
+    // prepared statements
+    private PreparedStatement insertTableSchema;
+    private PreparedStatement selectVersionTableSchema;
+
+    @Inject
+    public TableHistorySchema(ServiceConfiguration configuration)
+    {
+        this.keyspaceConfig = configuration.schemaKeyspaceConfiguration();
+    }
+
+    @Override
+    protected void prepareStatements(@NotNull Session session)
+    {
+        insertTableSchema = prepare(insertTableSchema, session, 
CqlLiterals.insertTableSchema(keyspaceConfig));
+        selectVersionTableSchema = prepare(selectVersionTableSchema, session, 
CqlLiterals.selectVersionTableSchema(keyspaceConfig));
+    }
+
+    @Override
+    protected String keyspaceName()
+    {
+        return keyspaceConfig.keyspace();
+    }
+
+    @Override
+    protected String tableName()
+    {
+        return TABLE_SCHEMA_HISTORY;
+    }
+
+    @Override
+    protected boolean exists(@NotNull Metadata metadata)
+    {
+        KeyspaceMetadata ksMetadata = 
metadata.getKeyspace(keyspaceConfig.keyspace());
+        if (ksMetadata == null)
+        {
+            return false;
+        }
+
+        return ksMetadata.getTable(TABLE_SCHEMA_HISTORY) != null;
+    }
+
+    @Override
+    protected String createSchemaStatement()
+    {
+        return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" +
+                             "  ks text," +
+                             "  tb text," +

Review Comment:
   NIT: to maintain consistency with Cassandra column naming
   ```suggestion
                                "  keyspace_name text," +
                                "  table_name text," +
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java:
##########
@@ -85,6 +97,42 @@ PeriodicTask 
cdcRawDirectorySpaceCleanercPeriodicTask(CdcRawDirectorySpaceCleane
         return cleanerTask;
     }
 
+    @ProvidesIntoMap
+    @KeyClassMapKey(PeriodicTaskMapKeys.CassandraClusterSchemaTaskKey.class)
+    PeriodicTask cassandraClusterSchema(InstanceMetadataFetcher 
instanceMetadataFetcher,

Review Comment:
   ```suggestion
       PeriodicTask cassandraClusterSchemaMonitor(InstanceMetadataFetcher 
instanceMetadataFetcher,
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring 
and CDC table tracking.
+ * This class provides comprehensive schema management functionality for 
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time 
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata.
+ */
+@Singleton
+public class CassandraClusterSchemaMonitor implements PeriodicTask
+{
+    // 49sec least-common multiple with 60sec is 49min so offers best monitor 
frequency without clashing with 60sec
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterSchemaMonitor.class);
+
+    private final AtomicReference<String> currSchemaText = new 
AtomicReference<>("");
+    private final AtomicReference<Set<CqlTable>> cdcTables = new 
AtomicReference<>(Collections.emptySet());
+    private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new 
ConcurrentHashMap<>();
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new 
CopyOnWriteArrayList<>();
+    private final SidecarConfiguration sidecarConfiguration;
+    private final InstanceMetadataFetcher instanceFetcher;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+
+    public CassandraClusterSchemaMonitor(InstanceMetadataFetcher 
instanceFetcher,
+                                         CdcDatabaseAccessor databaseAccessor,
+                                         SidecarConfiguration 
sidecarConfiguration,
+                                         CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+
+        this.instanceFetcher = instanceFetcher;
+        this.databaseAccessor = databaseAccessor;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    public void addSchemaChangeListener(Runnable listener)
+    {
+        schemaChangeListeners.add(listener);
+    }
+
+    public void refresh()
+    {
+        NodeSettings nodeSettings = 
instanceFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());
+        CassandraBridge cassandraBridge = 
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+        CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+        try
+        {
+            LOGGER.debug("Checking for schema changes...");
+            String fullSchemaText = databaseAccessor.fullSchema();
+            if (!fullSchemaText.equals(currSchemaText.get()))
+            {
+                LOGGER.info("Schema change detected, refreshing CDC tables");
+                currSchemaText.set(fullSchemaText);
+                Set<CqlTable> updatedCdcTables = 
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher, 
cassandraBridge);
+                LOGGER.info("Cdc enabled tables tables='{}'", 
+                            updatedCdcTables.stream()
+                                            .map(m -> String.format("%s.%s", 
m.keyspace(), m.table()))
+                                            .collect(Collectors.joining(",")));
+                cdcTables.set(updatedCdcTables);
+
+                cdcBridge.updateCdcSchema(updatedCdcTables, 
getPartitioner(nodeSettings),
+                                          ((keyspace, table) -> 
tableIdCache.get(TableIdentifier.of(keyspace, table))));
+                schemaChangeListeners.forEach(Runnable::run);
+            }
+        }
+        catch (IllegalStateException exception)
+        {
+            LOGGER.warn("There was a problem refreshing the schema. Database 
Accessor may not be ready", exception);
+            throw exception;
+        }
+        catch (Throwable t)
+        {
+            LOGGER.error("Unexpected error while refreshing the schema", t);
+            throw t;
+        }
+    }
+
+    private Partitioner getPartitioner(NodeSettings nodeSettings)
+    {
+        if (nodeSettings.partitioner().contains("."))
+        {
+            String[] splitPartitionerName = 
nodeSettings.partitioner().split("\\.");
+            return 
Partitioner.valueOf(splitPartitionerName[splitPartitionerName.length - 1]);
+        }
+        return Partitioner.valueOf(nodeSettings.partitioner());
+    }
+
+    @Override
+    public DurationSpec delay()
+    {
+        return 
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+    }
+
+    @Override
+    public void execute(Promise<Void> promise)
+    {
+        try
+        {
+            refresh();
+            promise.tryComplete();
+        }
+        catch (Throwable t)
+        {
+            promise.fail(t);
+        }
+    }
+
+    @Override
+    public ScheduleDecision scheduleDecision()
+    {
+        if 
(sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled()
 &&
+            
sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled())
+        {
+            return ScheduleDecision.EXECUTE;
+        }
+        return ScheduleDecision.SKIP;
+    }
+
+    @VisibleForTesting
+    static Set<CqlTable> buildCdcTables(CdcDatabaseAccessor 
cdcDatabaseAccessor,
+                                        ConcurrentHashMap<TableIdentifier, 
UUID> tableIdCache,
+                                        @NotNull InstanceMetadataFetcher 
instanceFetcher,
+                                        @NotNull final CassandraBridge 
cassandraBridge)
+    {
+        return buildCdcTables(cdcDatabaseAccessor.fullSchema(),
+                              cdcDatabaseAccessor.partitioner(),
+                              tableIdCache,
+                              cdcDatabaseAccessor::getTableId,
+                              instanceFetcher,
+                              cassandraBridge);
+    }
+
+    private static Set<CqlTable> buildCdcTables(@NotNull String fullSchema,
+                                                @NotNull CdcDatabaseAccessor 
cdcDatabaseAccessor,
+                                                @NotNull 
ConcurrentHashMap<TableIdentifier, UUID> tableIdCache,
+                                                @NotNull 
InstanceMetadataFetcher instanceFetcher,
+                                                @NotNull final CassandraBridge 
cassandraBridge)
+    {
+        return buildCdcTables(fullSchema,
+                              cdcDatabaseAccessor.partitioner(),
+                              tableIdCache,
+                              cdcDatabaseAccessor::getTableId,
+                              instanceFetcher,
+                              cassandraBridge);
+    }
+
+    private static Set<CqlTable> buildCdcTables(@NotNull final String 
fullSchema,
+                                                @NotNull final Partitioner 
partitioner,
+                                                @NotNull final 
ConcurrentHashMap<TableIdentifier, UUID> tableIdCache,
+                                                @NotNull final 
Function<TableIdentifier, UUID> tableIdLoaderFunction,
+                                                @NotNull final 
InstanceMetadataFetcher instanceFetcher,
+                                                @NotNull final CassandraBridge 
cassandraBridge)

Review Comment:
   we can omit final, and instanceFetcher is no longer needed
   ```suggestion
       private static Set<CqlTable> buildCdcTables(@NotNull String fullSchema,
                                                   @NotNull Partitioner 
partitioner,
                                                   @NotNull 
ConcurrentHashMap<TableIdentifier, UUID> tableIdCache,
                                                   @NotNull 
Function<TableIdentifier, UUID> tableIdLoaderFunction,
                                                   @NotNull CassandraBridge 
cassandraBridge)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to