frankgh commented on code in PR #251: URL: https://github.com/apache/cassandra-sidecar/pull/251#discussion_r2331570652
########## server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableHistorySchema.java: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.sidecar.db.schema; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.jetbrains.annotations.NotNull; + +/** + * Schema definition and management for tracking table schema evolution history. + * <p> + * This class extends {@link TableSchema} to provide specialized schema management for storing + * historical versions of table schemas in Cassandra Sidecar. The table schema history tracking + * is essential for CDC operations and data consistency, enabling: + * <ul> + * <li>Schema version tracking for CDC-enabled tables across time</li> + * <li>Historical schema retrieval for data processing and compatibility checks</li> + * <li>Schema evolution auditing and change management</li> + * <li>Version-aware data processing in CDC pipelines</li> + * </ul> + * <p> + * The table schema is designed with the following characteristics: + * <ul> + * <li><strong>Partitioning:</strong> Data is partitioned by keyspace ({@code ks}) and + * table name ({@code tb}) to organize schemas by table identity</li> + * <li><strong>Clustering:</strong> Ordered by schema version ({@code version}) to enable + * chronological access to schema changes</li> + * <li><strong>Versioning:</strong> Uses UUID-based versioning for unique schema identification</li> + * <li><strong>Timestamping:</strong> Automatic creation timestamp tracking with {@code created_at}</li> + * </ul> + * <p> + * The table structure includes: + * <pre>{@code + * CREATE TABLE table_schema_history ( + * ks text, -- Keyspace name + * tb text, -- Table name + * version uuid, -- Schema version identifier + * created_at timeuuid, -- Schema creation timestamp + * table_schema text, -- Complete table schema DDL + * PRIMARY KEY ((ks, tb), version) + * ) + * }</pre> + * <p> + * This schema supports CDC operations by maintaining a complete history of table schemas, + * allowing CDC consumers to process data with the correct schema version that was active + * when the data was written. This is crucial for maintaining data integrity across schema + * evolution in long-running CDC pipelines. + * <p> + * This class is thread-safe and designed as a singleton for dependency injection into + * components that require table schema history access. + * + * @see TableSchema + * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor + */ +@Singleton +public class TableHistorySchema extends TableSchema Review Comment: This schema should only be initialized on the lease holder ```suggestion public class TableHistorySchema extends TableSchema implements ExecuteOnClusterLeaseholderOnly ``` ########## server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java: ########## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.sidecar.tasks; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import com.google.inject.Singleton; +import io.vertx.core.Promise; +import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.bridge.CdcBridge; +import org.apache.cassandra.bridge.CdcBridgeFactory; +import org.apache.cassandra.sidecar.common.response.NodeSettings; +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; +import org.apache.cassandra.sidecar.utils.CdcUtil; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.utils.CqlUtils; +import org.apache.cassandra.spark.utils.TableIdentifier; +import org.jetbrains.annotations.NotNull; + +/** + * Central schema management component for Cassandra cluster schema monitoring and CDC table tracking. + * This class provides comprehensive schema management functionality for Cassandra Sidecar, specifically + * focused on CDC (Change Data Capture) operations. It maintains real-time awareness of schema changes + * in the Cassandra cluster and manages CDC-enabled table metadata. + */ +@Singleton +public class CassandraClusterSchemaMonitor implements PeriodicTask +{ + // 49sec least-common multiple with 60sec is 49min so offers best monitor frequency without clashing with 60sec + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClusterSchemaMonitor.class); + + private final AtomicReference<String> currSchemaText = new AtomicReference<>(""); + private final AtomicReference<Set<CqlTable>> cdcTables = new AtomicReference<>(Collections.emptySet()); + private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new ConcurrentHashMap<>(); + private final CdcDatabaseAccessor databaseAccessor; + private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new CopyOnWriteArrayList<>(); + private final SidecarConfiguration sidecarConfiguration; + private final InstanceMetadataFetcher instanceFetcher; + private final CassandraBridgeFactory cassandraBridgeFactory; + + public CassandraClusterSchemaMonitor(InstanceMetadataFetcher instanceFetcher, + CdcDatabaseAccessor databaseAccessor, + SidecarConfiguration sidecarConfiguration, + CassandraBridgeFactory cassandraBridgeFactory) + { + + this.instanceFetcher = instanceFetcher; + this.databaseAccessor = databaseAccessor; + this.sidecarConfiguration = sidecarConfiguration; + this.cassandraBridgeFactory = cassandraBridgeFactory; + } + + public void addSchemaChangeListener(Runnable listener) + { + schemaChangeListeners.add(listener); + } + + public void refresh() + { + NodeSettings nodeSettings = instanceFetcher.callOnFirstAvailableInstance(instance-> instance.delegate().nodeSettings()); + CassandraBridge cassandraBridge = cassandraBridgeFactory.get(nodeSettings.releaseVersion()); + CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge); + + try + { + LOGGER.debug("Checking for schema changes..."); + String fullSchemaText = databaseAccessor.fullSchema(); + if (!fullSchemaText.equals(currSchemaText.get())) + { + LOGGER.info("Schema change detected, refreshing CDC tables"); + currSchemaText.set(fullSchemaText); + Set<CqlTable> updatedCdcTables = buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher, cassandraBridge); + LOGGER.info("Cdc enabled tables tables='{}'", + updatedCdcTables.stream() + .map(m -> String.format("%s.%s", m.keyspace(), m.table())) + .collect(Collectors.joining(","))); + cdcTables.set(updatedCdcTables); + + cdcBridge.updateCdcSchema(updatedCdcTables, getPartitioner(nodeSettings), + ((keyspace, table) -> tableIdCache.get(TableIdentifier.of(keyspace, table)))); + schemaChangeListeners.forEach(Runnable::run); + } + } + catch (IllegalStateException exception) + { + LOGGER.warn("There was a problem refreshing the schema. Database Accessor may not be ready", exception); + throw exception; + } + catch (Throwable t) + { + LOGGER.error("Unexpected error while refreshing the schema", t); + throw t; + } + } + + private Partitioner getPartitioner(NodeSettings nodeSettings) + { + if (nodeSettings.partitioner().contains(".")) + { + String[] splitPartitionerName = nodeSettings.partitioner().split("\\."); + return Partitioner.valueOf(splitPartitionerName[splitPartitionerName.length - 1]); + } + return Partitioner.valueOf(nodeSettings.partitioner()); + } + + @Override + public DurationSpec delay() + { + return sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime(); + } + + @Override + public void execute(Promise<Void> promise) + { + try + { + refresh(); + promise.tryComplete(); + } + catch (Throwable t) + { + promise.fail(t); + } + } + + @Override + public ScheduleDecision scheduleDecision() + { + if (sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled() && + sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled()) + { + return ScheduleDecision.EXECUTE; + } + return ScheduleDecision.SKIP; + } + + @VisibleForTesting + static Set<CqlTable> buildCdcTables(CdcDatabaseAccessor cdcDatabaseAccessor, + ConcurrentHashMap<TableIdentifier, UUID> tableIdCache, + @NotNull InstanceMetadataFetcher instanceFetcher, + @NotNull final CassandraBridge cassandraBridge) + { + return buildCdcTables(cdcDatabaseAccessor.fullSchema(), + cdcDatabaseAccessor.partitioner(), + tableIdCache, + cdcDatabaseAccessor::getTableId, + instanceFetcher, + cassandraBridge); + } + + private static Set<CqlTable> buildCdcTables(@NotNull String fullSchema, + @NotNull CdcDatabaseAccessor cdcDatabaseAccessor, + @NotNull ConcurrentHashMap<TableIdentifier, UUID> tableIdCache, + @NotNull InstanceMetadataFetcher instanceFetcher, + @NotNull final CassandraBridge cassandraBridge) Review Comment: instanceFetcher is no longer needed ```suggestion private static Set<CqlTable> buildCdcTables(@NotNull String fullSchema, @NotNull CdcDatabaseAccessor cdcDatabaseAccessor, @NotNull ConcurrentHashMap<TableIdentifier, UUID> tableIdCache, @NotNull CassandraBridge cassandraBridge) ``` ########## server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java: ########## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.sidecar.tasks; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import com.google.inject.Singleton; +import io.vertx.core.Promise; +import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.bridge.CdcBridge; +import org.apache.cassandra.bridge.CdcBridgeFactory; +import org.apache.cassandra.sidecar.common.response.NodeSettings; +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; +import org.apache.cassandra.sidecar.utils.CdcUtil; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.utils.CqlUtils; +import org.apache.cassandra.spark.utils.TableIdentifier; +import org.jetbrains.annotations.NotNull; + +/** + * Central schema management component for Cassandra cluster schema monitoring and CDC table tracking. + * This class provides comprehensive schema management functionality for Cassandra Sidecar, specifically + * focused on CDC (Change Data Capture) operations. It maintains real-time awareness of schema changes + * in the Cassandra cluster and manages CDC-enabled table metadata. + */ +@Singleton +public class CassandraClusterSchemaMonitor implements PeriodicTask +{ + // 49sec least-common multiple with 60sec is 49min so offers best monitor frequency without clashing with 60sec + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClusterSchemaMonitor.class); + + private final AtomicReference<String> currSchemaText = new AtomicReference<>(""); + private final AtomicReference<Set<CqlTable>> cdcTables = new AtomicReference<>(Collections.emptySet()); + private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new ConcurrentHashMap<>(); + private final CdcDatabaseAccessor databaseAccessor; + private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new CopyOnWriteArrayList<>(); + private final SidecarConfiguration sidecarConfiguration; + private final InstanceMetadataFetcher instanceFetcher; + private final CassandraBridgeFactory cassandraBridgeFactory; + + public CassandraClusterSchemaMonitor(InstanceMetadataFetcher instanceFetcher, + CdcDatabaseAccessor databaseAccessor, + SidecarConfiguration sidecarConfiguration, + CassandraBridgeFactory cassandraBridgeFactory) + { + + this.instanceFetcher = instanceFetcher; + this.databaseAccessor = databaseAccessor; + this.sidecarConfiguration = sidecarConfiguration; + this.cassandraBridgeFactory = cassandraBridgeFactory; + } + + public void addSchemaChangeListener(Runnable listener) + { + schemaChangeListeners.add(listener); + } + + public void refresh() + { + NodeSettings nodeSettings = instanceFetcher.callOnFirstAvailableInstance(instance-> instance.delegate().nodeSettings()); + CassandraBridge cassandraBridge = cassandraBridgeFactory.get(nodeSettings.releaseVersion()); + CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge); + + try + { + LOGGER.debug("Checking for schema changes..."); + String fullSchemaText = databaseAccessor.fullSchema(); + if (!fullSchemaText.equals(currSchemaText.get())) + { + LOGGER.info("Schema change detected, refreshing CDC tables"); + currSchemaText.set(fullSchemaText); + Set<CqlTable> updatedCdcTables = buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher, cassandraBridge); + LOGGER.info("Cdc enabled tables tables='{}'", + updatedCdcTables.stream() + .map(m -> String.format("%s.%s", m.keyspace(), m.table())) + .collect(Collectors.joining(","))); + cdcTables.set(updatedCdcTables); + + cdcBridge.updateCdcSchema(updatedCdcTables, getPartitioner(nodeSettings), + ((keyspace, table) -> tableIdCache.get(TableIdentifier.of(keyspace, table)))); + schemaChangeListeners.forEach(Runnable::run); + } + } + catch (IllegalStateException exception) + { + LOGGER.warn("There was a problem refreshing the schema. Database Accessor may not be ready", exception); + throw exception; + } + catch (Throwable t) + { + LOGGER.error("Unexpected error while refreshing the schema", t); + throw t; + } + } + + private Partitioner getPartitioner(NodeSettings nodeSettings) + { + if (nodeSettings.partitioner().contains(".")) + { + String[] splitPartitionerName = nodeSettings.partitioner().split("\\."); + return Partitioner.valueOf(splitPartitionerName[splitPartitionerName.length - 1]); + } + return Partitioner.valueOf(nodeSettings.partitioner()); + } + + @Override + public DurationSpec delay() + { + return sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime(); + } + + @Override + public void execute(Promise<Void> promise) + { + try + { + refresh(); + promise.tryComplete(); + } + catch (Throwable t) + { + promise.fail(t); + } + } + + @Override + public ScheduleDecision scheduleDecision() + { + if (sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled() && + sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled()) + { + return ScheduleDecision.EXECUTE; + } + return ScheduleDecision.SKIP; + } + + @VisibleForTesting + static Set<CqlTable> buildCdcTables(CdcDatabaseAccessor cdcDatabaseAccessor, + ConcurrentHashMap<TableIdentifier, UUID> tableIdCache, + @NotNull InstanceMetadataFetcher instanceFetcher, + @NotNull final CassandraBridge cassandraBridge) Review Comment: instanceFetcher is no longer needed ```suggestion static Set<CqlTable> buildCdcTables(CdcDatabaseAccessor cdcDatabaseAccessor, ConcurrentHashMap<TableIdentifier, UUID> tableIdCache, @NotNull CassandraBridge cassandraBridge) ``` ########## server/src/main/java/org/apache/cassandra/sidecar/db/schema/CdcStatesSchema.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.sidecar.db.schema; + +import java.util.concurrent.TimeUnit; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.jetbrains.annotations.NotNull; + +/** + * Schema definition and management for the CDC (Change Data Capture) state persistence table. + * <p> + * This class extends {@link TableSchema} to provide specialized schema management for storing + * and retrieving CDC state data in Cassandra Sidecar. The CDC state table is used to persist + * the current processing state of CDC consumers across token ranges, enabling: + * <ul> + * <li>State persistence for fault tolerance and resumption after restarts</li> + * <li>Distributed processing coordination across multiple CDC consumers</li> + * <li>Token range-aware state storage for scalable CDC operations</li> + * <li>Time-based data retention through TTL configuration</li> + * </ul> + * <p> + * The table schema is designed with the following key characteristics: + * <ul> + * <li><strong>Partitioning:</strong> Data is partitioned by {@code job_id} and {@code split} + * to distribute CDC state across multiple partitions for scalability</li> + * <li><strong>Clustering:</strong> Ordered by token range boundaries ({@code start}, {@code end}) + * to enable efficient range queries</li> + * <li><strong>TTL:</strong> Configurable time-to-live (default 30 days) for automatic cleanup + * of old CDC state data</li> + * <li><strong>Compaction:</strong> Uses TimeWindowCompactionStrategy optimized for time-series + * data patterns</li> + * </ul> + * <p> + * The table structure includes: + * <pre>{@code + * CREATE TABLE cdc_state_v2 ( + * job_id text, -- CDC job identifier + * split smallint, -- Token range split identifier + * start varint, -- Token range start boundary + * end varint, -- Token range end boundary + * state blob, -- Serialized CDC state data + * PRIMARY KEY ((job_id, split), start, end) + * ) + * }</pre> + * <p> + * This class is thread-safe and designed as a singleton for dependency injection into + * components that require CDC state table access. + * + * @see TableSchema + * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor + */ +@Singleton +public class CdcStatesSchema extends TableSchema Review Comment: should only be initialized in the cluster lease holder ```suggestion public class CdcStatesSchema extends TableSchema implements ExecuteOnClusterLeaseholderOnly ``` ########## server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableHistorySchema.java: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.sidecar.db.schema; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.jetbrains.annotations.NotNull; + +/** + * Schema definition and management for tracking table schema evolution history. + * <p> + * This class extends {@link TableSchema} to provide specialized schema management for storing + * historical versions of table schemas in Cassandra Sidecar. The table schema history tracking + * is essential for CDC operations and data consistency, enabling: + * <ul> + * <li>Schema version tracking for CDC-enabled tables across time</li> + * <li>Historical schema retrieval for data processing and compatibility checks</li> + * <li>Schema evolution auditing and change management</li> + * <li>Version-aware data processing in CDC pipelines</li> + * </ul> + * <p> + * The table schema is designed with the following characteristics: + * <ul> + * <li><strong>Partitioning:</strong> Data is partitioned by keyspace ({@code ks}) and + * table name ({@code tb}) to organize schemas by table identity</li> + * <li><strong>Clustering:</strong> Ordered by schema version ({@code version}) to enable + * chronological access to schema changes</li> + * <li><strong>Versioning:</strong> Uses UUID-based versioning for unique schema identification</li> + * <li><strong>Timestamping:</strong> Automatic creation timestamp tracking with {@code created_at}</li> + * </ul> + * <p> + * The table structure includes: + * <pre>{@code + * CREATE TABLE table_schema_history ( + * ks text, -- Keyspace name + * tb text, -- Table name + * version uuid, -- Schema version identifier + * created_at timeuuid, -- Schema creation timestamp + * table_schema text, -- Complete table schema DDL + * PRIMARY KEY ((ks, tb), version) + * ) + * }</pre> + * <p> + * This schema supports CDC operations by maintaining a complete history of table schemas, + * allowing CDC consumers to process data with the correct schema version that was active + * when the data was written. This is crucial for maintaining data integrity across schema + * evolution in long-running CDC pipelines. + * <p> + * This class is thread-safe and designed as a singleton for dependency injection into + * components that require table schema history access. + * + * @see TableSchema + * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor + */ +@Singleton +public class TableHistorySchema extends TableSchema +{ + private static final String TABLE_SCHEMA_HISTORY = "table_schema_history"; + + private final SchemaKeyspaceConfiguration keyspaceConfig; + + // prepared statements + private PreparedStatement insertTableSchema; + private PreparedStatement selectVersionTableSchema; + + @Inject + public TableHistorySchema(ServiceConfiguration configuration) + { + this.keyspaceConfig = configuration.schemaKeyspaceConfiguration(); + } + + @Override + protected void prepareStatements(@NotNull Session session) + { + insertTableSchema = prepare(insertTableSchema, session, CqlLiterals.insertTableSchema(keyspaceConfig)); + selectVersionTableSchema = prepare(selectVersionTableSchema, session, CqlLiterals.selectVersionTableSchema(keyspaceConfig)); + } + + @Override + protected String keyspaceName() + { + return keyspaceConfig.keyspace(); + } + + @Override + protected String tableName() + { + return TABLE_SCHEMA_HISTORY; + } + + @Override + protected boolean exists(@NotNull Metadata metadata) + { + KeyspaceMetadata ksMetadata = metadata.getKeyspace(keyspaceConfig.keyspace()); + if (ksMetadata == null) + { + return false; + } + + return ksMetadata.getTable(TABLE_SCHEMA_HISTORY) != null; + } + + @Override + protected String createSchemaStatement() + { + return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" + + " ks text," + + " tb text," + Review Comment: NIT: to maintain consistency with Cassandra column naming ```suggestion " keyspace_name text," + " table_name text," + ``` ########## server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java: ########## @@ -85,6 +97,42 @@ PeriodicTask cdcRawDirectorySpaceCleanercPeriodicTask(CdcRawDirectorySpaceCleane return cleanerTask; } + @ProvidesIntoMap + @KeyClassMapKey(PeriodicTaskMapKeys.CassandraClusterSchemaTaskKey.class) + PeriodicTask cassandraClusterSchema(InstanceMetadataFetcher instanceMetadataFetcher, Review Comment: ```suggestion PeriodicTask cassandraClusterSchemaMonitor(InstanceMetadataFetcher instanceMetadataFetcher, ``` ########## server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java: ########## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.sidecar.tasks; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import com.google.inject.Singleton; +import io.vertx.core.Promise; +import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.bridge.CdcBridge; +import org.apache.cassandra.bridge.CdcBridgeFactory; +import org.apache.cassandra.sidecar.common.response.NodeSettings; +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; +import org.apache.cassandra.sidecar.utils.CdcUtil; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.utils.CqlUtils; +import org.apache.cassandra.spark.utils.TableIdentifier; +import org.jetbrains.annotations.NotNull; + +/** + * Central schema management component for Cassandra cluster schema monitoring and CDC table tracking. + * This class provides comprehensive schema management functionality for Cassandra Sidecar, specifically + * focused on CDC (Change Data Capture) operations. It maintains real-time awareness of schema changes + * in the Cassandra cluster and manages CDC-enabled table metadata. + */ +@Singleton +public class CassandraClusterSchemaMonitor implements PeriodicTask +{ + // 49sec least-common multiple with 60sec is 49min so offers best monitor frequency without clashing with 60sec + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClusterSchemaMonitor.class); + + private final AtomicReference<String> currSchemaText = new AtomicReference<>(""); + private final AtomicReference<Set<CqlTable>> cdcTables = new AtomicReference<>(Collections.emptySet()); + private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new ConcurrentHashMap<>(); + private final CdcDatabaseAccessor databaseAccessor; + private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new CopyOnWriteArrayList<>(); + private final SidecarConfiguration sidecarConfiguration; + private final InstanceMetadataFetcher instanceFetcher; + private final CassandraBridgeFactory cassandraBridgeFactory; + + public CassandraClusterSchemaMonitor(InstanceMetadataFetcher instanceFetcher, + CdcDatabaseAccessor databaseAccessor, + SidecarConfiguration sidecarConfiguration, + CassandraBridgeFactory cassandraBridgeFactory) + { + + this.instanceFetcher = instanceFetcher; + this.databaseAccessor = databaseAccessor; + this.sidecarConfiguration = sidecarConfiguration; + this.cassandraBridgeFactory = cassandraBridgeFactory; + } + + public void addSchemaChangeListener(Runnable listener) + { + schemaChangeListeners.add(listener); + } + + public void refresh() + { + NodeSettings nodeSettings = instanceFetcher.callOnFirstAvailableInstance(instance-> instance.delegate().nodeSettings()); + CassandraBridge cassandraBridge = cassandraBridgeFactory.get(nodeSettings.releaseVersion()); + CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge); + + try + { + LOGGER.debug("Checking for schema changes..."); + String fullSchemaText = databaseAccessor.fullSchema(); + if (!fullSchemaText.equals(currSchemaText.get())) + { + LOGGER.info("Schema change detected, refreshing CDC tables"); + currSchemaText.set(fullSchemaText); + Set<CqlTable> updatedCdcTables = buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher, cassandraBridge); + LOGGER.info("Cdc enabled tables tables='{}'", + updatedCdcTables.stream() + .map(m -> String.format("%s.%s", m.keyspace(), m.table())) + .collect(Collectors.joining(","))); + cdcTables.set(updatedCdcTables); + + cdcBridge.updateCdcSchema(updatedCdcTables, getPartitioner(nodeSettings), + ((keyspace, table) -> tableIdCache.get(TableIdentifier.of(keyspace, table)))); + schemaChangeListeners.forEach(Runnable::run); + } + } + catch (IllegalStateException exception) + { + LOGGER.warn("There was a problem refreshing the schema. Database Accessor may not be ready", exception); + throw exception; + } + catch (Throwable t) + { + LOGGER.error("Unexpected error while refreshing the schema", t); + throw t; + } + } + + private Partitioner getPartitioner(NodeSettings nodeSettings) + { + if (nodeSettings.partitioner().contains(".")) + { + String[] splitPartitionerName = nodeSettings.partitioner().split("\\."); + return Partitioner.valueOf(splitPartitionerName[splitPartitionerName.length - 1]); + } + return Partitioner.valueOf(nodeSettings.partitioner()); + } + + @Override + public DurationSpec delay() + { + return sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime(); + } + + @Override + public void execute(Promise<Void> promise) + { + try + { + refresh(); + promise.tryComplete(); + } + catch (Throwable t) + { + promise.fail(t); + } + } + + @Override + public ScheduleDecision scheduleDecision() + { + if (sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled() && + sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled()) + { + return ScheduleDecision.EXECUTE; + } + return ScheduleDecision.SKIP; + } + + @VisibleForTesting + static Set<CqlTable> buildCdcTables(CdcDatabaseAccessor cdcDatabaseAccessor, + ConcurrentHashMap<TableIdentifier, UUID> tableIdCache, + @NotNull InstanceMetadataFetcher instanceFetcher, + @NotNull final CassandraBridge cassandraBridge) + { + return buildCdcTables(cdcDatabaseAccessor.fullSchema(), + cdcDatabaseAccessor.partitioner(), + tableIdCache, + cdcDatabaseAccessor::getTableId, + instanceFetcher, + cassandraBridge); + } + + private static Set<CqlTable> buildCdcTables(@NotNull String fullSchema, + @NotNull CdcDatabaseAccessor cdcDatabaseAccessor, + @NotNull ConcurrentHashMap<TableIdentifier, UUID> tableIdCache, + @NotNull InstanceMetadataFetcher instanceFetcher, + @NotNull final CassandraBridge cassandraBridge) + { + return buildCdcTables(fullSchema, + cdcDatabaseAccessor.partitioner(), + tableIdCache, + cdcDatabaseAccessor::getTableId, + instanceFetcher, + cassandraBridge); + } + + private static Set<CqlTable> buildCdcTables(@NotNull final String fullSchema, + @NotNull final Partitioner partitioner, + @NotNull final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache, + @NotNull final Function<TableIdentifier, UUID> tableIdLoaderFunction, + @NotNull final InstanceMetadataFetcher instanceFetcher, + @NotNull final CassandraBridge cassandraBridge) Review Comment: we can omit final, and instanceFetcher is no longer needed ```suggestion private static Set<CqlTable> buildCdcTables(@NotNull String fullSchema, @NotNull Partitioner partitioner, @NotNull ConcurrentHashMap<TableIdentifier, UUID> tableIdCache, @NotNull Function<TableIdentifier, UUID> tableIdLoaderFunction, @NotNull CassandraBridge cassandraBridge) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

