frankgh commented on code in PR #251:
URL: https://github.com/apache/cassandra-sidecar/pull/251#discussion_r2326271432
##########
server/build.gradle:
##########
@@ -148,6 +148,10 @@ dependencies {
implementation("io.swagger.core.v3:swagger-models:${project.swaggerVersion}")
implementation("io.swagger.core.v3:swagger-jaxrs2:${project.swaggerVersion}")
+ implementation(group: "org.apache.cassandra", name:
"cassandra-analytics-common", version: "${[project.analyticsVersion]}")
+ implementation(group: "org.apache.cassandra", name:
"cassandra-analytics-cdc_spark3_2.12", version: "${[project.analyticsVersion]}")
+ implementation "com.esotericsoftware:kryo-shaded:${kryoVersion}"
Review Comment:
Let's omit the kryo version. My fear is that the version number might get
out of sync from what `cassandra-analytics-cdc_spark3_2.12` is bringing
transitively. I am also uneasy about `cassandra-analytics-cdc_spark3_2.12`
pulling guava. I'd love to get rid of guava as a dependency from the upstream,
but it is widely used.
```suggestion
```
##########
server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.base.Preconditions;
Review Comment:
we should use Sidecar's preconditions in general, but in this case it's not
needed. We should use `Objects.requireNonNull`
##########
server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java:
##########
@@ -35,10 +35,13 @@ public class CdcConfigurationImpl implements
CdcConfiguration
private static final Logger LOGGER =
LoggerFactory.getLogger(CdcConfigurationImpl.class);
public static final String IS_ENABLED_PROPERTY = "enabled";
public static final String CONFIGURATION_REFRESH_TIME_PROPERTY =
"config_refresh_time";
+ public static final String TABLE_SCHEMA_REFRESH_TIME_PROPERTY =
"table_schema_refresh_time";
Review Comment:
can we add this to sidecar.yaml and document it there as well?
##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarCdcStats.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+/**
Review Comment:
I find the AI generated javadocs to be less useful than a concise
description of what the method does. In general I'm not liking these
##########
server/src/main/java/org/apache/cassandra/sidecar/utils/ByteBufUtils.java:
##########
Review Comment:
This class needs unit tests.
##########
server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java:
##########
@@ -21,12 +21,67 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
/**
- * Class with utility methods for CDC.
+ * Utility class providing CDC (Change Data Capture) specific operations and
file handling.
Review Comment:
these AI generated javadocs are too much
##########
server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.base.Preconditions;
+
+import jakarta.inject.Singleton;
+import org.jetbrains.annotations.NotNull;
+
+import static
org.apache.cassandra.bridge.BaseCassandraBridgeFactory.getCassandraVersion;
+
+/**
+ * Factory class for creating Cassandra bridge instances based on
version-specific jar files.
+ * <p>
+ * This factory maintains a cache of CassandraBridge instances mapped by
Cassandra version labels
+ * and provides methods to retrieve bridge instances for specific Cassandra
versions.
+ * Each bridge is loaded from version-specific JAR resources and instantiated
using reflection.
+ */
+@Singleton
+public class CassandraBridgeFactory
+{
+ // maps Cassandra version-specific jar name (e.g. 'four-zero') to matching
CassandraBridge
+ private final Map<String, CassandraBridge> cassandraBridges;
+
+ public CassandraBridgeFactory()
+ {
+ cassandraBridges = new
ConcurrentHashMap<>(CassandraVersion.values().length);
+ }
+
+ @NotNull
+ public CassandraBridge get(@NotNull String version)
+ {
+ return get(getCassandraVersion(version));
+ }
+
+ @NotNull
+ public CassandraBridge get(@NotNull CassandraVersionFeatures features)
+ {
+ return get(getCassandraVersion(features));
+ }
+
+ @NotNull
+ public CassandraBridge get(@NotNull CassandraVersion version)
+ {
+ String jarBaseName = version.jarBaseName();
+ Preconditions.checkNotNull(jarBaseName, "Cassandra version " + version
+ " is not supported");
+ return cassandraBridges.computeIfAbsent(jarBaseName, this::create);
+ }
+
+ @NotNull
+ @SuppressWarnings("unchecked")
+ private CassandraBridge create(@NotNull String label)
+ {
+ try
+ {
+ ClassLoader loader = buildClassLoader(
+ cassandraResourceName(label),
+ bridgeResourceName(label),
+ typesResourceName(label));
+ Class<CassandraBridge> bridge = (Class<CassandraBridge>)
loader.loadClass(CassandraBridge.IMPLEMENTATION_FQCN);
+ Constructor<CassandraBridge> constructor = bridge.getConstructor();
+ return constructor.newInstance();
+ }
+ catch (ClassNotFoundException | NoSuchMethodException |
InstantiationException
+ | IllegalAccessException | InvocationTargetException exception)
+ {
+ throw new RuntimeException("Failed to create Cassandra bridge for
label " + label, exception);
+ }
+ }
+
+ @NotNull
+ String cassandraResourceName(@NotNull String label)
+ {
+ return "/bridges/" + label + ".jar";
+ }
+
+ @NotNull
+ String bridgeResourceName(@NotNull String label)
+ {
+ return jarResourceName(label, "bridge");
+ }
+
+ @NotNull
+ String typesResourceName(@NotNull String label)
+ {
+ return jarResourceName(label, "types");
+ }
+
+ String jarResourceName(String... parts)
+ {
+ return "/bridges/" + String.join("-", parts) + ".jar";
+ }
+
+ public ClassLoader buildClassLoader(String... resourceNames)
+ {
+ URL[] urls = Arrays.stream(resourceNames)
+
.map(BaseCassandraBridgeFactory::copyClassResourceToFile)
+ .map(jar -> {
+ try
+ {
+ return jar.toURI().toURL();
+ }
+ catch (MalformedURLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }).toArray(URL[]::new);
+
+ return AccessController.doPrivileged(new
PrivilegedAction<ClassLoader>()
+ {
+ public ClassLoader run()
+ {
+ return new PostDelegationClassLoader(urls,
Thread.currentThread().getContextClassLoader());
+ }
+ });
+ }
Review Comment:
NIT:
```suggestion
return
AccessController.doPrivileged((PrivilegedAction<ClassLoader>) () ->
new PostDelegationClassLoader(urls,
Thread.currentThread().getContextClassLoader()));
```
##########
server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema;
+import org.apache.cassandra.sidecar.db.schema.TableHistorySchema;
+import org.apache.cassandra.sidecar.utils.ByteBufUtils;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.TokenSplitUtil;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Database accessor for CDC (Change Data Capture) state management operations.
+ * <p>
+ * This class provides specialized database access functionality for managing
CDC state persistence
+ * and retrieval in Cassandra Sidecar. It extends {@link DatabaseAccessor} to
provide CDC-specific
+ * operations including:
+ * <ul>
+ * <li>Storing CDC state data across token range splits for distributed
processing</li>
+ * <li>Loading and merging CDC state from overlapping token ranges</li>
+ * <li>Managing table schema history for CDC-enabled tables</li>
+ * <li>Providing partitioner and metadata access for CDC operations</li>
+ * </ul>
+ * <p>
+ * The accessor handles token range splitting to ensure CDC state is properly
distributed
+ * across multiple database partitions for scalability. It uses {@link
TokenSplitUtil}
+ * to determine overlapping splits for both storage and retrieval operations.
+ * <p>
+ * Key features:
+ * <ul>
+ * <li><strong>Token-aware storage:</strong> Automatically distributes CDC
state across
+ * appropriate token range splits</li>
+ * <li><strong>State merging:</strong> Combines overlapping CDC state
objects into
+ * canonical views during retrieval</li>
+ * <li><strong>Schema management:</strong> Tracks table schema versions for
CDC operations</li>
+ * <li><strong>Async operations:</strong> Provides asynchronous database
operations for
+ * better performance</li>
+ * </ul>
+ * <p>
+ * This class is thread-safe and designed as a singleton for injection into
CDC components
+ * that require database access functionality.
+ *
+ * @see DatabaseAccessor
+ * @see CdcStatesSchema
+ * @see TokenSplitUtil
+ */
+@SuppressWarnings("resource")
+@Singleton
+public class CdcDatabaseAccessor extends DatabaseAccessor<CdcStatesSchema>
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CdcDatabaseAccessor.class);
+ private final TableHistorySchema tableHistorySchema;
+ private final Provider<TokenSplitUtil> tokenSplitUtilProvider;
+ private volatile TokenSplitUtil tokenSplitUtil = null;
+ private volatile InstanceMetadataFetcher instanceMetadataFetcher;
Review Comment:
this should not be volatile, instead we should make it final
```suggestion
private final InstanceMetadataFetcher instanceMetadataFetcher;
```
##########
server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema;
+import org.apache.cassandra.sidecar.db.schema.TableHistorySchema;
+import org.apache.cassandra.sidecar.utils.ByteBufUtils;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.TokenSplitUtil;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Database accessor for CDC (Change Data Capture) state management operations.
Review Comment:
again this javadoc is not very useful and generally distracting
##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ * <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ * <li>Automatic detection and tracking of CDC-enabled tables</li>
+ * <li>Schema change event notification to registered listeners</li>
+ * <li>Table metadata caching and synchronization with CDC bridges</li>
+ * <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ * <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema
changes by comparing
+ * full schema snapshots and updates CDC table metadata when changes are
detected</li>
+ * <li><strong>Schema Monitor (49s interval):</strong> Validates that
CDC-enabled tables
+ * are properly configured in the Cassandra Schema.instance
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ * <li><strong>CDC Table Discovery:</strong> Automatically identifies and
tracks tables with
+ * CDC enabled from the cluster schema</li>
+ * <li><strong>Schema Change Detection:</strong> Compares schema snapshots
to detect modifications
+ * and trigger appropriate updates to CDC subsystems</li>
+ * <li><strong>Bridge Integration:</strong> Synchronizes schema information
with Cassandra and
+ * CDC bridges for consistent metadata handling</li>
+ * <li><strong>Event Notification:</strong> Provides a listener mechanism
for components that
+ * need to react to schema changes</li>
+ * <li><strong>Validation and Monitoring:</strong> Continuously validates
CDC table configurations
+ * and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness
with system load:
+ * <ul>
+ * <li>60-second refresh interval for schema change detection</li>
+ * <li>49-second monitor interval (chosen to avoid harmonics with the 60s
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC
consumers always have
+ * up-to-date schema information, enabling proper data serialization,
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
Review Comment:
I think the name of the class should reflect the fact that this will be some
background task the periodically refreshes the schema
##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ * <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ * <li>Automatic detection and tracking of CDC-enabled tables</li>
+ * <li>Schema change event notification to registered listeners</li>
+ * <li>Table metadata caching and synchronization with CDC bridges</li>
+ * <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ * <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema
changes by comparing
+ * full schema snapshots and updates CDC table metadata when changes are
detected</li>
+ * <li><strong>Schema Monitor (49s interval):</strong> Validates that
CDC-enabled tables
+ * are properly configured in the Cassandra Schema.instance
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ * <li><strong>CDC Table Discovery:</strong> Automatically identifies and
tracks tables with
+ * CDC enabled from the cluster schema</li>
+ * <li><strong>Schema Change Detection:</strong> Compares schema snapshots
to detect modifications
+ * and trigger appropriate updates to CDC subsystems</li>
+ * <li><strong>Bridge Integration:</strong> Synchronizes schema information
with Cassandra and
+ * CDC bridges for consistent metadata handling</li>
+ * <li><strong>Event Notification:</strong> Provides a listener mechanism
for components that
+ * need to react to schema changes</li>
+ * <li><strong>Validation and Monitoring:</strong> Continuously validates
CDC table configurations
+ * and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness
with system load:
+ * <ul>
+ * <li>60-second refresh interval for schema change detection</li>
+ * <li>49-second monitor interval (chosen to avoid harmonics with the 60s
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC
consumers always have
+ * up-to-date schema information, enabling proper data serialization,
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+ // 49sec least-common multiple with 60sec is 49min so offers best monitor
frequency without clashing with 60sec
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+ private final AtomicReference<String> currSchemaText = new
AtomicReference<>("");
+ private final AtomicReference<Set<CqlTable>> cdcTables = new
AtomicReference<>(Collections.emptySet());
+ private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new
ConcurrentHashMap<>();
+ private final CdcDatabaseAccessor databaseAccessor;
+ private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new
CopyOnWriteArrayList<>();
+ private final SidecarConfiguration sidecarConfiguration;
+ private final InstanceMetadataFetcher instanceFetcher;
+ private final SecondBoundConfiguration tableSchemaRefreshTime;
+ private final CassandraBridgeFactory cassandraBridgeFactory;
+
+ public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+ CdcDatabaseAccessor databaseAccessor,
+ SidecarConfiguration sidecarConfiguration,
+ CassandraBridgeFactory
cassandraBridgeFactory)
+ {
+
+ this.instanceFetcher = instanceFetcher;
+ this.databaseAccessor = databaseAccessor;
+ this.sidecarConfiguration = sidecarConfiguration;
+ this.tableSchemaRefreshTime =
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+ this.cassandraBridgeFactory = cassandraBridgeFactory;
+ }
+
+ public void addSchemaChangeListener(Runnable listener)
+ {
+ schemaChangeListeners.add(listener);
+ }
+
+ public void refresh()
+ {
+ NodeSettings nodeSettings =
instanceFetcher.callOnFirstAvailableInstance(instance->
instance.delegate().nodeSettings());
+ CassandraBridge cassandraBridge =
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+ CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+ try
+ {
+ LOGGER.info("Checking for schema changes...");
+ String fullSchemaText = databaseAccessor.fullSchema();
+ if (!fullSchemaText.equals(currSchemaText.get()))
+ {
+ LOGGER.info("Schema change detected, refreshing CDC tables");
+ currSchemaText.set(fullSchemaText);
+ Set<CqlTable> updatedCdcTables =
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher,
cassandraBridgeFactory);
+ LOGGER.info("Cdc enabled tables tables='{}'",
+ updatedCdcTables.stream()
+ .map(m -> String.format("%s.%s",
m.keyspace(), m.table()))
+ .collect(Collectors.joining(",")));
+ cdcTables.set(updatedCdcTables);
+ cdcBridge.updateCdcSchema(updatedCdcTables,
databaseAccessor.partitioner(),
+ ((keyspace, table) ->
tableIdCache.get(TableIdentifier.of(keyspace, table))));
+ schemaChangeListeners.forEach(Runnable::run);
+ }
+ }
+ catch (IllegalStateException exception)
+ {
+ LOGGER.warn("There was a problem refreshing the schema. Database
Accessor may not be ready", exception);
+ throw exception;
+ }
+ catch (Throwable t)
+ {
+ LOGGER.error("Unexpected error while refreshing the schema", t);
+ throw t;
+ }
+ }
+
+ public DurationSpec delay()
+ {
+ return tableSchemaRefreshTime;
Review Comment:
We should keep it like this for when we support dynamic configuration
reloading in the future.
```suggestion
return
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime()
```
##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ * <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ * <li>Automatic detection and tracking of CDC-enabled tables</li>
+ * <li>Schema change event notification to registered listeners</li>
+ * <li>Table metadata caching and synchronization with CDC bridges</li>
+ * <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ * <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema
changes by comparing
+ * full schema snapshots and updates CDC table metadata when changes are
detected</li>
+ * <li><strong>Schema Monitor (49s interval):</strong> Validates that
CDC-enabled tables
+ * are properly configured in the Cassandra Schema.instance
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ * <li><strong>CDC Table Discovery:</strong> Automatically identifies and
tracks tables with
+ * CDC enabled from the cluster schema</li>
+ * <li><strong>Schema Change Detection:</strong> Compares schema snapshots
to detect modifications
+ * and trigger appropriate updates to CDC subsystems</li>
+ * <li><strong>Bridge Integration:</strong> Synchronizes schema information
with Cassandra and
+ * CDC bridges for consistent metadata handling</li>
+ * <li><strong>Event Notification:</strong> Provides a listener mechanism
for components that
+ * need to react to schema changes</li>
+ * <li><strong>Validation and Monitoring:</strong> Continuously validates
CDC table configurations
+ * and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness
with system load:
+ * <ul>
+ * <li>60-second refresh interval for schema change detection</li>
+ * <li>49-second monitor interval (chosen to avoid harmonics with the 60s
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC
consumers always have
+ * up-to-date schema information, enabling proper data serialization,
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+ // 49sec least-common multiple with 60sec is 49min so offers best monitor
frequency without clashing with 60sec
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+ private final AtomicReference<String> currSchemaText = new
AtomicReference<>("");
+ private final AtomicReference<Set<CqlTable>> cdcTables = new
AtomicReference<>(Collections.emptySet());
+ private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new
ConcurrentHashMap<>();
+ private final CdcDatabaseAccessor databaseAccessor;
+ private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new
CopyOnWriteArrayList<>();
+ private final SidecarConfiguration sidecarConfiguration;
+ private final InstanceMetadataFetcher instanceFetcher;
+ private final SecondBoundConfiguration tableSchemaRefreshTime;
+ private final CassandraBridgeFactory cassandraBridgeFactory;
+
+ public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+ CdcDatabaseAccessor databaseAccessor,
+ SidecarConfiguration sidecarConfiguration,
+ CassandraBridgeFactory
cassandraBridgeFactory)
+ {
+
+ this.instanceFetcher = instanceFetcher;
+ this.databaseAccessor = databaseAccessor;
+ this.sidecarConfiguration = sidecarConfiguration;
+ this.tableSchemaRefreshTime =
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+ this.cassandraBridgeFactory = cassandraBridgeFactory;
+ }
+
+ public void addSchemaChangeListener(Runnable listener)
+ {
+ schemaChangeListeners.add(listener);
+ }
+
+ public void refresh()
+ {
+ NodeSettings nodeSettings =
instanceFetcher.callOnFirstAvailableInstance(instance->
instance.delegate().nodeSettings());
+ CassandraBridge cassandraBridge =
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+ CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+ try
+ {
+ LOGGER.info("Checking for schema changes...");
Review Comment:
this at debug level?
```suggestion
LOGGER.debug("Checking for schema changes...");
```
##########
server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.base.Preconditions;
+
+import jakarta.inject.Singleton;
+import org.jetbrains.annotations.NotNull;
+
+import static
org.apache.cassandra.bridge.BaseCassandraBridgeFactory.getCassandraVersion;
+
+/**
+ * Factory class for creating Cassandra bridge instances based on
version-specific jar files.
+ * <p>
+ * This factory maintains a cache of CassandraBridge instances mapped by
Cassandra version labels
+ * and provides methods to retrieve bridge instances for specific Cassandra
versions.
+ * Each bridge is loaded from version-specific JAR resources and instantiated
using reflection.
+ */
+@Singleton
+public class CassandraBridgeFactory
+{
+ // maps Cassandra version-specific jar name (e.g. 'four-zero') to matching
CassandraBridge
+ private final Map<String, CassandraBridge> cassandraBridges;
+
+ public CassandraBridgeFactory()
+ {
+ cassandraBridges = new
ConcurrentHashMap<>(CassandraVersion.values().length);
+ }
+
+ @NotNull
+ public CassandraBridge get(@NotNull String version)
+ {
+ return get(getCassandraVersion(version));
+ }
+
+ @NotNull
+ public CassandraBridge get(@NotNull CassandraVersionFeatures features)
+ {
+ return get(getCassandraVersion(features));
+ }
+
+ @NotNull
+ public CassandraBridge get(@NotNull CassandraVersion version)
+ {
+ String jarBaseName = version.jarBaseName();
+ Preconditions.checkNotNull(jarBaseName, "Cassandra version " + version
+ " is not supported");
Review Comment:
```suggestion
Objects.requireNonNull(jarBaseName, "Cassandra version " + version +
" is not supported");
```
##########
server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema;
+import org.apache.cassandra.sidecar.db.schema.TableHistorySchema;
+import org.apache.cassandra.sidecar.utils.ByteBufUtils;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.TokenSplitUtil;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Database accessor for CDC (Change Data Capture) state management operations.
+ * <p>
+ * This class provides specialized database access functionality for managing
CDC state persistence
+ * and retrieval in Cassandra Sidecar. It extends {@link DatabaseAccessor} to
provide CDC-specific
+ * operations including:
+ * <ul>
+ * <li>Storing CDC state data across token range splits for distributed
processing</li>
+ * <li>Loading and merging CDC state from overlapping token ranges</li>
+ * <li>Managing table schema history for CDC-enabled tables</li>
+ * <li>Providing partitioner and metadata access for CDC operations</li>
+ * </ul>
+ * <p>
+ * The accessor handles token range splitting to ensure CDC state is properly
distributed
+ * across multiple database partitions for scalability. It uses {@link
TokenSplitUtil}
+ * to determine overlapping splits for both storage and retrieval operations.
+ * <p>
+ * Key features:
+ * <ul>
+ * <li><strong>Token-aware storage:</strong> Automatically distributes CDC
state across
+ * appropriate token range splits</li>
+ * <li><strong>State merging:</strong> Combines overlapping CDC state
objects into
+ * canonical views during retrieval</li>
+ * <li><strong>Schema management:</strong> Tracks table schema versions for
CDC operations</li>
+ * <li><strong>Async operations:</strong> Provides asynchronous database
operations for
+ * better performance</li>
+ * </ul>
+ * <p>
+ * This class is thread-safe and designed as a singleton for injection into
CDC components
+ * that require database access functionality.
+ *
+ * @see DatabaseAccessor
+ * @see CdcStatesSchema
+ * @see TokenSplitUtil
+ */
+@SuppressWarnings("resource")
+@Singleton
+public class CdcDatabaseAccessor extends DatabaseAccessor<CdcStatesSchema>
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CdcDatabaseAccessor.class);
+ private final TableHistorySchema tableHistorySchema;
+ private final Provider<TokenSplitUtil> tokenSplitUtilProvider;
+ private volatile TokenSplitUtil tokenSplitUtil = null;
+ private volatile InstanceMetadataFetcher instanceMetadataFetcher;
+ private final CassandraBridgeFactory cassandraBridgeFactory;
+
+ @Inject
+ public CdcDatabaseAccessor(InstanceMetadataFetcher instanceMetadataFetcher,
+ CdcStatesSchema cdcStatesSchema,
+ TableHistorySchema tableHistorySchema,
+ CQLSessionProvider sessionProvider,
+ Provider<TokenSplitUtil> tokenSplitUtilProvider,
+ CassandraBridgeFactory cassandraBridgeFactory)
+ {
+ super(cdcStatesSchema, sessionProvider);
+ this.tableHistorySchema = tableHistorySchema;
+ this.tokenSplitUtilProvider = tokenSplitUtilProvider;
+ this.instanceMetadataFetcher = instanceMetadataFetcher;
+ this.cassandraBridgeFactory = cassandraBridgeFactory;
+ }
+
+ @VisibleForTesting
+ public CdcDatabaseAccessor(InstanceMetadataFetcher instanceMetadataFetcher,
Review Comment:
while this ctor is annotated with `@VisibleForTesting` I don't see it being
used anywhere. Maybe you are missing tests for this class?
##########
server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema;
+import org.apache.cassandra.sidecar.db.schema.TableHistorySchema;
+import org.apache.cassandra.sidecar.utils.ByteBufUtils;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.TokenSplitUtil;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Database accessor for CDC (Change Data Capture) state management operations.
+ * <p>
+ * This class provides specialized database access functionality for managing
CDC state persistence
+ * and retrieval in Cassandra Sidecar. It extends {@link DatabaseAccessor} to
provide CDC-specific
+ * operations including:
+ * <ul>
+ * <li>Storing CDC state data across token range splits for distributed
processing</li>
+ * <li>Loading and merging CDC state from overlapping token ranges</li>
+ * <li>Managing table schema history for CDC-enabled tables</li>
+ * <li>Providing partitioner and metadata access for CDC operations</li>
+ * </ul>
+ * <p>
+ * The accessor handles token range splitting to ensure CDC state is properly
distributed
+ * across multiple database partitions for scalability. It uses {@link
TokenSplitUtil}
+ * to determine overlapping splits for both storage and retrieval operations.
+ * <p>
+ * Key features:
+ * <ul>
+ * <li><strong>Token-aware storage:</strong> Automatically distributes CDC
state across
+ * appropriate token range splits</li>
+ * <li><strong>State merging:</strong> Combines overlapping CDC state
objects into
+ * canonical views during retrieval</li>
+ * <li><strong>Schema management:</strong> Tracks table schema versions for
CDC operations</li>
+ * <li><strong>Async operations:</strong> Provides asynchronous database
operations for
+ * better performance</li>
+ * </ul>
+ * <p>
+ * This class is thread-safe and designed as a singleton for injection into
CDC components
+ * that require database access functionality.
+ *
+ * @see DatabaseAccessor
+ * @see CdcStatesSchema
+ * @see TokenSplitUtil
+ */
+@SuppressWarnings("resource")
+@Singleton
+public class CdcDatabaseAccessor extends DatabaseAccessor<CdcStatesSchema>
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CdcDatabaseAccessor.class);
+ private final TableHistorySchema tableHistorySchema;
+ private final Provider<TokenSplitUtil> tokenSplitUtilProvider;
+ private volatile TokenSplitUtil tokenSplitUtil = null;
+ private volatile InstanceMetadataFetcher instanceMetadataFetcher;
+ private final CassandraBridgeFactory cassandraBridgeFactory;
+
+ @Inject
+ public CdcDatabaseAccessor(InstanceMetadataFetcher instanceMetadataFetcher,
+ CdcStatesSchema cdcStatesSchema,
+ TableHistorySchema tableHistorySchema,
+ CQLSessionProvider sessionProvider,
+ Provider<TokenSplitUtil> tokenSplitUtilProvider,
+ CassandraBridgeFactory cassandraBridgeFactory)
+ {
+ super(cdcStatesSchema, sessionProvider);
+ this.tableHistorySchema = tableHistorySchema;
+ this.tokenSplitUtilProvider = tokenSplitUtilProvider;
+ this.instanceMetadataFetcher = instanceMetadataFetcher;
+ this.cassandraBridgeFactory = cassandraBridgeFactory;
+ }
+
+ @VisibleForTesting
+ public CdcDatabaseAccessor(InstanceMetadataFetcher instanceMetadataFetcher,
+ CdcStatesSchema cdcStatesSchema,
+ TableHistorySchema tableHistorySchema,
+ CQLSessionProvider sessionProvider,
+ TokenSplitUtil tokenSplitUtil,
+ CassandraBridgeFactory cassandraBridgeFactory)
+ {
+ super(cdcStatesSchema, sessionProvider);
+ this.tableHistorySchema = tableHistorySchema;
+ this.tokenSplitUtilProvider = () -> tokenSplitUtil;
+ this.instanceMetadataFetcher = instanceMetadataFetcher;
+ this.cassandraBridgeFactory = cassandraBridgeFactory;
+ }
+
+ protected TokenSplitUtil tokenSplitUtil()
+ {
+ // cql connection must be initialized before we can initialize
TokenSplitUtil class
+ if (this.tokenSplitUtil == null)
Review Comment:
do we need to worry about concurrent access to this method here?
##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ * <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ * <li>Automatic detection and tracking of CDC-enabled tables</li>
+ * <li>Schema change event notification to registered listeners</li>
+ * <li>Table metadata caching and synchronization with CDC bridges</li>
+ * <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ * <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema
changes by comparing
+ * full schema snapshots and updates CDC table metadata when changes are
detected</li>
+ * <li><strong>Schema Monitor (49s interval):</strong> Validates that
CDC-enabled tables
+ * are properly configured in the Cassandra Schema.instance
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ * <li><strong>CDC Table Discovery:</strong> Automatically identifies and
tracks tables with
+ * CDC enabled from the cluster schema</li>
+ * <li><strong>Schema Change Detection:</strong> Compares schema snapshots
to detect modifications
+ * and trigger appropriate updates to CDC subsystems</li>
+ * <li><strong>Bridge Integration:</strong> Synchronizes schema information
with Cassandra and
+ * CDC bridges for consistent metadata handling</li>
+ * <li><strong>Event Notification:</strong> Provides a listener mechanism
for components that
+ * need to react to schema changes</li>
+ * <li><strong>Validation and Monitoring:</strong> Continuously validates
CDC table configurations
+ * and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness
with system load:
+ * <ul>
+ * <li>60-second refresh interval for schema change detection</li>
+ * <li>49-second monitor interval (chosen to avoid harmonics with the 60s
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC
consumers always have
+ * up-to-date schema information, enabling proper data serialization,
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+ // 49sec least-common multiple with 60sec is 49min so offers best monitor
frequency without clashing with 60sec
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+ private final AtomicReference<String> currSchemaText = new
AtomicReference<>("");
+ private final AtomicReference<Set<CqlTable>> cdcTables = new
AtomicReference<>(Collections.emptySet());
+ private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new
ConcurrentHashMap<>();
+ private final CdcDatabaseAccessor databaseAccessor;
+ private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new
CopyOnWriteArrayList<>();
+ private final SidecarConfiguration sidecarConfiguration;
+ private final InstanceMetadataFetcher instanceFetcher;
+ private final SecondBoundConfiguration tableSchemaRefreshTime;
+ private final CassandraBridgeFactory cassandraBridgeFactory;
+
+ public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+ CdcDatabaseAccessor databaseAccessor,
+ SidecarConfiguration sidecarConfiguration,
+ CassandraBridgeFactory
cassandraBridgeFactory)
+ {
+
+ this.instanceFetcher = instanceFetcher;
+ this.databaseAccessor = databaseAccessor;
+ this.sidecarConfiguration = sidecarConfiguration;
+ this.tableSchemaRefreshTime =
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+ this.cassandraBridgeFactory = cassandraBridgeFactory;
+ }
+
+ public void addSchemaChangeListener(Runnable listener)
+ {
+ schemaChangeListeners.add(listener);
+ }
+
+ public void refresh()
+ {
+ NodeSettings nodeSettings =
instanceFetcher.callOnFirstAvailableInstance(instance->
instance.delegate().nodeSettings());
+ CassandraBridge cassandraBridge =
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+ CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+ try
+ {
+ LOGGER.info("Checking for schema changes...");
+ String fullSchemaText = databaseAccessor.fullSchema();
+ if (!fullSchemaText.equals(currSchemaText.get()))
+ {
+ LOGGER.info("Schema change detected, refreshing CDC tables");
+ currSchemaText.set(fullSchemaText);
+ Set<CqlTable> updatedCdcTables =
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher,
cassandraBridgeFactory);
+ LOGGER.info("Cdc enabled tables tables='{}'",
+ updatedCdcTables.stream()
+ .map(m -> String.format("%s.%s",
m.keyspace(), m.table()))
+ .collect(Collectors.joining(",")));
+ cdcTables.set(updatedCdcTables);
+ cdcBridge.updateCdcSchema(updatedCdcTables,
databaseAccessor.partitioner(),
+ ((keyspace, table) ->
tableIdCache.get(TableIdentifier.of(keyspace, table))));
+ schemaChangeListeners.forEach(Runnable::run);
+ }
+ }
+ catch (IllegalStateException exception)
+ {
+ LOGGER.warn("There was a problem refreshing the schema. Database
Accessor may not be ready", exception);
+ throw exception;
+ }
+ catch (Throwable t)
+ {
+ LOGGER.error("Unexpected error while refreshing the schema", t);
+ throw t;
+ }
+ }
+
+ public DurationSpec delay()
+ {
+ return tableSchemaRefreshTime;
+ }
+
+ @Override
+ public void execute(Promise<Void> promise)
+ {
+ try
+ {
+ refresh();
+ promise.tryComplete();
+ }
+ catch (Throwable t)
+ {
+ promise.fail(t);
+ }
+ }
+
+ @Override
+ public ScheduleDecision scheduleDecision()
+ {
+ boolean shouldSkip =
!sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled()
+ ||
!sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled();
+ return shouldSkip ? ScheduleDecision.SKIP : ScheduleDecision.EXECUTE;
Review Comment:
NIT: I find this way more readable
```suggestion
if
(sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled()
&&
sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled())
{
return ScheduleDecision.EXECUTE;
}
return ScheduleDecision.SKIP;
```
##########
server/src/main/java/org/apache/cassandra/sidecar/utils/ByteBufUtils.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.function.Supplier;
+
+/**
+ * Utility class providing byte buffer manipulation and conversion operations
for Cassandra Sidecar.
+ * <p>
+ * This class offers a comprehensive set of static utility methods for working
with byte buffers,
+ * input streams, and byte arrays in the context of Cassandra data processing
and CDC operations.
+ * The utilities are designed to handle common byte manipulation tasks
including:
+ * <ul>
+ * <li>ByteBuffer to byte array conversions with optimal performance</li>
+ * <li>Hexadecimal string representation of binary data</li>
+ * <li>Stream reading operations with full data consumption guarantees</li>
+ * <li>Cassandra-specific composite key splitting and building</li>
+ * <li>Length-prefixed byte buffer operations for data serialization</li>
+ * <li>UTF-8 string encoding and decoding with error handling</li>
+ * </ul>
+ * <p>
+ * Key features include:
+ * <ul>
+ * <li><strong>Memory efficiency:</strong> Optimized ByteBuffer handling
that avoids
+ * unnecessary copying when dealing with heap vs direct buffers</li>
+ * <li><strong>Thread-safe string decoding:</strong> Thread-local UTF-8
decoder
+ * instances to avoid synchronization overhead</li>
+ * <li><strong>Cassandra compatibility:</strong> Support for Cassandra
composite key
+ * formats including static marker handling</li>
+ * <li><strong>Robust I/O:</strong> Stream reading methods that handle
partial reads
+ * and provide complete data consumption guarantees</li>
+ * <li><strong>Debugging support:</strong> Hexadecimal representation
methods for
+ * binary data inspection and logging</li>
+ * </ul>
+ * <p>
+ * The class is particularly important for CDC operations where efficient byte
manipulation
+ * is crucial for processing large volumes of change data. It provides the
low-level
+ * building blocks for serialization, deserialization, and data format
conversion
+ * operations throughout the sidecar system.
+ * <p>
+ * All methods in this class are static and thread-safe unless otherwise noted.
+ * The class maintains thread-local resources (such as charset decoders) to
ensure
+ * optimal performance in multi-threaded environments.
+ *
+ * @see java.nio.ByteBuffer
+ * @see java.nio.charset.CharsetDecoder
+ */
+public class ByteBufUtils
Review Comment:
NIT, this class has a lot of final modifiers that are not necessary and do
not conform with the Cassandra style guide
##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ * <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ * <li>Automatic detection and tracking of CDC-enabled tables</li>
+ * <li>Schema change event notification to registered listeners</li>
+ * <li>Table metadata caching and synchronization with CDC bridges</li>
+ * <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ * <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema
changes by comparing
+ * full schema snapshots and updates CDC table metadata when changes are
detected</li>
+ * <li><strong>Schema Monitor (49s interval):</strong> Validates that
CDC-enabled tables
+ * are properly configured in the Cassandra Schema.instance
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ * <li><strong>CDC Table Discovery:</strong> Automatically identifies and
tracks tables with
+ * CDC enabled from the cluster schema</li>
+ * <li><strong>Schema Change Detection:</strong> Compares schema snapshots
to detect modifications
+ * and trigger appropriate updates to CDC subsystems</li>
+ * <li><strong>Bridge Integration:</strong> Synchronizes schema information
with Cassandra and
+ * CDC bridges for consistent metadata handling</li>
+ * <li><strong>Event Notification:</strong> Provides a listener mechanism
for components that
+ * need to react to schema changes</li>
+ * <li><strong>Validation and Monitoring:</strong> Continuously validates
CDC table configurations
+ * and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness
with system load:
+ * <ul>
+ * <li>60-second refresh interval for schema change detection</li>
+ * <li>49-second monitor interval (chosen to avoid harmonics with the 60s
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC
consumers always have
+ * up-to-date schema information, enabling proper data serialization,
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+ // 49sec least-common multiple with 60sec is 49min so offers best monitor
frequency without clashing with 60sec
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+ private final AtomicReference<String> currSchemaText = new
AtomicReference<>("");
+ private final AtomicReference<Set<CqlTable>> cdcTables = new
AtomicReference<>(Collections.emptySet());
+ private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new
ConcurrentHashMap<>();
+ private final CdcDatabaseAccessor databaseAccessor;
+ private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new
CopyOnWriteArrayList<>();
+ private final SidecarConfiguration sidecarConfiguration;
+ private final InstanceMetadataFetcher instanceFetcher;
+ private final SecondBoundConfiguration tableSchemaRefreshTime;
+ private final CassandraBridgeFactory cassandraBridgeFactory;
+
+ public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+ CdcDatabaseAccessor databaseAccessor,
+ SidecarConfiguration sidecarConfiguration,
+ CassandraBridgeFactory
cassandraBridgeFactory)
+ {
+
+ this.instanceFetcher = instanceFetcher;
+ this.databaseAccessor = databaseAccessor;
+ this.sidecarConfiguration = sidecarConfiguration;
+ this.tableSchemaRefreshTime =
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+ this.cassandraBridgeFactory = cassandraBridgeFactory;
+ }
+
+ public void addSchemaChangeListener(Runnable listener)
+ {
+ schemaChangeListeners.add(listener);
+ }
+
+ public void refresh()
+ {
+ NodeSettings nodeSettings =
instanceFetcher.callOnFirstAvailableInstance(instance->
instance.delegate().nodeSettings());
+ CassandraBridge cassandraBridge =
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+ CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+ try
+ {
+ LOGGER.info("Checking for schema changes...");
+ String fullSchemaText = databaseAccessor.fullSchema();
+ if (!fullSchemaText.equals(currSchemaText.get()))
+ {
+ LOGGER.info("Schema change detected, refreshing CDC tables");
+ currSchemaText.set(fullSchemaText);
+ Set<CqlTable> updatedCdcTables =
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher,
cassandraBridgeFactory);
+ LOGGER.info("Cdc enabled tables tables='{}'",
+ updatedCdcTables.stream()
+ .map(m -> String.format("%s.%s",
m.keyspace(), m.table()))
+ .collect(Collectors.joining(",")));
+ cdcTables.set(updatedCdcTables);
+ cdcBridge.updateCdcSchema(updatedCdcTables,
databaseAccessor.partitioner(),
+ ((keyspace, table) ->
tableIdCache.get(TableIdentifier.of(keyspace, table))));
+ schemaChangeListeners.forEach(Runnable::run);
+ }
+ }
+ catch (IllegalStateException exception)
+ {
+ LOGGER.warn("There was a problem refreshing the schema. Database
Accessor may not be ready", exception);
+ throw exception;
+ }
+ catch (Throwable t)
+ {
+ LOGGER.error("Unexpected error while refreshing the schema", t);
+ throw t;
+ }
+ }
+
+ public DurationSpec delay()
Review Comment:
```suggestion
@Override
public DurationSpec delay()
```
##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ * <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ * <li>Automatic detection and tracking of CDC-enabled tables</li>
+ * <li>Schema change event notification to registered listeners</li>
+ * <li>Table metadata caching and synchronization with CDC bridges</li>
+ * <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ * <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema
changes by comparing
+ * full schema snapshots and updates CDC table metadata when changes are
detected</li>
+ * <li><strong>Schema Monitor (49s interval):</strong> Validates that
CDC-enabled tables
+ * are properly configured in the Cassandra Schema.instance
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ * <li><strong>CDC Table Discovery:</strong> Automatically identifies and
tracks tables with
+ * CDC enabled from the cluster schema</li>
+ * <li><strong>Schema Change Detection:</strong> Compares schema snapshots
to detect modifications
+ * and trigger appropriate updates to CDC subsystems</li>
+ * <li><strong>Bridge Integration:</strong> Synchronizes schema information
with Cassandra and
+ * CDC bridges for consistent metadata handling</li>
+ * <li><strong>Event Notification:</strong> Provides a listener mechanism
for components that
+ * need to react to schema changes</li>
+ * <li><strong>Validation and Monitoring:</strong> Continuously validates
CDC table configurations
+ * and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness
with system load:
+ * <ul>
+ * <li>60-second refresh interval for schema change detection</li>
+ * <li>49-second monitor interval (chosen to avoid harmonics with the 60s
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC
consumers always have
+ * up-to-date schema information, enabling proper data serialization,
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+ // 49sec least-common multiple with 60sec is 49min so offers best monitor
frequency without clashing with 60sec
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+ private final AtomicReference<String> currSchemaText = new
AtomicReference<>("");
+ private final AtomicReference<Set<CqlTable>> cdcTables = new
AtomicReference<>(Collections.emptySet());
+ private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new
ConcurrentHashMap<>();
+ private final CdcDatabaseAccessor databaseAccessor;
+ private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new
CopyOnWriteArrayList<>();
+ private final SidecarConfiguration sidecarConfiguration;
+ private final InstanceMetadataFetcher instanceFetcher;
+ private final SecondBoundConfiguration tableSchemaRefreshTime;
+ private final CassandraBridgeFactory cassandraBridgeFactory;
+
+ public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+ CdcDatabaseAccessor databaseAccessor,
+ SidecarConfiguration sidecarConfiguration,
+ CassandraBridgeFactory
cassandraBridgeFactory)
+ {
+
+ this.instanceFetcher = instanceFetcher;
+ this.databaseAccessor = databaseAccessor;
+ this.sidecarConfiguration = sidecarConfiguration;
+ this.tableSchemaRefreshTime =
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+ this.cassandraBridgeFactory = cassandraBridgeFactory;
+ }
+
+ public void addSchemaChangeListener(Runnable listener)
+ {
+ schemaChangeListeners.add(listener);
+ }
+
+ public void refresh()
+ {
+ NodeSettings nodeSettings =
instanceFetcher.callOnFirstAvailableInstance(instance->
instance.delegate().nodeSettings());
+ CassandraBridge cassandraBridge =
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+ CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+ try
+ {
+ LOGGER.info("Checking for schema changes...");
+ String fullSchemaText = databaseAccessor.fullSchema();
+ if (!fullSchemaText.equals(currSchemaText.get()))
+ {
+ LOGGER.info("Schema change detected, refreshing CDC tables");
+ currSchemaText.set(fullSchemaText);
+ Set<CqlTable> updatedCdcTables =
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher,
cassandraBridgeFactory);
+ LOGGER.info("Cdc enabled tables tables='{}'",
+ updatedCdcTables.stream()
+ .map(m -> String.format("%s.%s",
m.keyspace(), m.table()))
+ .collect(Collectors.joining(",")));
+ cdcTables.set(updatedCdcTables);
+ cdcBridge.updateCdcSchema(updatedCdcTables,
databaseAccessor.partitioner(),
Review Comment:
I don't think this `databaseAccessor.partitioner()` is required at all. We
already have the partitioner information as a string in
`nodeSettings.partitioner()`, with the caveat that we will need to check if the
string has `.` and if it does we will need to do the logic we have inside
`databaseAccessor.partitioner()`
```suggestion
cdcBridge.updateCdcSchema(updatedCdcTables,
Partitioner.valueOf(nodeSettings.partitioner()),
```
##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ * <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ * <li>Automatic detection and tracking of CDC-enabled tables</li>
+ * <li>Schema change event notification to registered listeners</li>
+ * <li>Table metadata caching and synchronization with CDC bridges</li>
+ * <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ * <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema
changes by comparing
+ * full schema snapshots and updates CDC table metadata when changes are
detected</li>
+ * <li><strong>Schema Monitor (49s interval):</strong> Validates that
CDC-enabled tables
+ * are properly configured in the Cassandra Schema.instance
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ * <li><strong>CDC Table Discovery:</strong> Automatically identifies and
tracks tables with
+ * CDC enabled from the cluster schema</li>
+ * <li><strong>Schema Change Detection:</strong> Compares schema snapshots
to detect modifications
+ * and trigger appropriate updates to CDC subsystems</li>
+ * <li><strong>Bridge Integration:</strong> Synchronizes schema information
with Cassandra and
+ * CDC bridges for consistent metadata handling</li>
+ * <li><strong>Event Notification:</strong> Provides a listener mechanism
for components that
+ * need to react to schema changes</li>
+ * <li><strong>Validation and Monitoring:</strong> Continuously validates
CDC table configurations
+ * and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness
with system load:
+ * <ul>
+ * <li>60-second refresh interval for schema change detection</li>
+ * <li>49-second monitor interval (chosen to avoid harmonics with the 60s
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC
consumers always have
+ * up-to-date schema information, enabling proper data serialization,
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+ // 49sec least-common multiple with 60sec is 49min so offers best monitor
frequency without clashing with 60sec
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+ private final AtomicReference<String> currSchemaText = new
AtomicReference<>("");
+ private final AtomicReference<Set<CqlTable>> cdcTables = new
AtomicReference<>(Collections.emptySet());
+ private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new
ConcurrentHashMap<>();
+ private final CdcDatabaseAccessor databaseAccessor;
+ private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new
CopyOnWriteArrayList<>();
+ private final SidecarConfiguration sidecarConfiguration;
+ private final InstanceMetadataFetcher instanceFetcher;
+ private final SecondBoundConfiguration tableSchemaRefreshTime;
+ private final CassandraBridgeFactory cassandraBridgeFactory;
+
+ public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+ CdcDatabaseAccessor databaseAccessor,
+ SidecarConfiguration sidecarConfiguration,
+ CassandraBridgeFactory
cassandraBridgeFactory)
+ {
+
+ this.instanceFetcher = instanceFetcher;
+ this.databaseAccessor = databaseAccessor;
+ this.sidecarConfiguration = sidecarConfiguration;
+ this.tableSchemaRefreshTime =
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+ this.cassandraBridgeFactory = cassandraBridgeFactory;
+ }
+
+ public void addSchemaChangeListener(Runnable listener)
+ {
+ schemaChangeListeners.add(listener);
+ }
+
+ public void refresh()
+ {
+ NodeSettings nodeSettings =
instanceFetcher.callOnFirstAvailableInstance(instance->
instance.delegate().nodeSettings());
+ CassandraBridge cassandraBridge =
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+ CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+ try
+ {
+ LOGGER.info("Checking for schema changes...");
+ String fullSchemaText = databaseAccessor.fullSchema();
+ if (!fullSchemaText.equals(currSchemaText.get()))
+ {
+ LOGGER.info("Schema change detected, refreshing CDC tables");
+ currSchemaText.set(fullSchemaText);
+ Set<CqlTable> updatedCdcTables =
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher,
cassandraBridgeFactory);
+ LOGGER.info("Cdc enabled tables tables='{}'",
+ updatedCdcTables.stream()
+ .map(m -> String.format("%s.%s",
m.keyspace(), m.table()))
+ .collect(Collectors.joining(",")));
+ cdcTables.set(updatedCdcTables);
+ cdcBridge.updateCdcSchema(updatedCdcTables,
databaseAccessor.partitioner(),
+ ((keyspace, table) ->
tableIdCache.get(TableIdentifier.of(keyspace, table))));
+ schemaChangeListeners.forEach(Runnable::run);
+ }
+ }
+ catch (IllegalStateException exception)
+ {
+ LOGGER.warn("There was a problem refreshing the schema. Database
Accessor may not be ready", exception);
+ throw exception;
+ }
+ catch (Throwable t)
+ {
+ LOGGER.error("Unexpected error while refreshing the schema", t);
+ throw t;
+ }
+ }
+
+ public DurationSpec delay()
+ {
+ return tableSchemaRefreshTime;
+ }
+
+ @Override
+ public void execute(Promise<Void> promise)
+ {
+ try
+ {
+ refresh();
+ promise.tryComplete();
+ }
+ catch (Throwable t)
+ {
+ promise.fail(t);
+ }
+ }
+
+ @Override
+ public ScheduleDecision scheduleDecision()
+ {
+ boolean shouldSkip =
!sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled()
+ ||
!sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled();
+ return shouldSkip ? ScheduleDecision.SKIP : ScheduleDecision.EXECUTE;
+ }
+
+ @VisibleForTesting
+ static Set<CqlTable> buildCdcTables(CdcDatabaseAccessor
cdcDatabaseAccessor,
+ ConcurrentHashMap<TableIdentifier,
UUID> tableIdCache,
+ @NotNull InstanceMetadataFetcher
instanceFetcher,
+ @NotNull CassandraBridgeFactory
cassandraBridgeFactory)
+ {
+ return buildCdcTables(cdcDatabaseAccessor.fullSchema(),
+ cdcDatabaseAccessor.partitioner(),
+ tableIdCache,
+ cdcDatabaseAccessor::getTableId,
+ instanceFetcher,
+ cassandraBridgeFactory);
+ }
+
+ private static Set<CqlTable> buildCdcTables(@NotNull String fullSchema,
+ @NotNull CdcDatabaseAccessor
cdcDatabaseAccessor,
+ @NotNull
ConcurrentHashMap<TableIdentifier, UUID> tableIdCache,
+ @NotNull
InstanceMetadataFetcher instanceFetcher,
+ @NotNull
CassandraBridgeFactory cassandraBridgeFactory)
+ {
+ return buildCdcTables(fullSchema,
+ cdcDatabaseAccessor.partitioner(),
+ tableIdCache,
+ cdcDatabaseAccessor::getTableId,
+ instanceFetcher,
+ cassandraBridgeFactory);
+ }
+
+ private static Set<CqlTable> buildCdcTables(@NotNull final String
fullSchema,
+ @NotNull final Partitioner
partitioner,
+ @NotNull final
ConcurrentHashMap<TableIdentifier, UUID> tableIdCache,
+ @NotNull final
Function<TableIdentifier, UUID> tableIdLoaderFunction,
+ @NotNull final
InstanceMetadataFetcher instanceFetcher,
+ @NotNull final
CassandraBridgeFactory cassandraBridgeFactory)
+ {
+ Map<TableIdentifier, String> createStmts =
CdcUtil.extractCdcTables(fullSchema);
+ Map<String, Set<String>> udtsPerKeyspace = createStmts.keySet()
+ .stream()
+
.map(TableIdentifier::keyspace)
+ .distinct() //
remove duplicated keyspace strings
+
.collect(Collectors.toMap(Function.identity(),
+
keyspace -> CqlUtils.extractUdts(fullSchema, keyspace)));
+
+ Map<TableIdentifier, UUID> tableIds = createStmts.keySet()
+ .stream()
+
.collect(Collectors.toMap(Function.identity(),
+
id -> tableIdCache.computeIfAbsent(id, tableIdLoaderFunction)));
+
+ NodeSettings nodeSettings =
instanceFetcher.callOnFirstAvailableInstance(instance->
instance.delegate().nodeSettings());
Review Comment:
We already have the nodeSettings in the callsite for this method, let's just
use that
##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ * <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ * <li>Automatic detection and tracking of CDC-enabled tables</li>
+ * <li>Schema change event notification to registered listeners</li>
+ * <li>Table metadata caching and synchronization with CDC bridges</li>
+ * <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ * <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema
changes by comparing
+ * full schema snapshots and updates CDC table metadata when changes are
detected</li>
+ * <li><strong>Schema Monitor (49s interval):</strong> Validates that
CDC-enabled tables
+ * are properly configured in the Cassandra Schema.instance
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ * <li><strong>CDC Table Discovery:</strong> Automatically identifies and
tracks tables with
+ * CDC enabled from the cluster schema</li>
+ * <li><strong>Schema Change Detection:</strong> Compares schema snapshots
to detect modifications
+ * and trigger appropriate updates to CDC subsystems</li>
+ * <li><strong>Bridge Integration:</strong> Synchronizes schema information
with Cassandra and
+ * CDC bridges for consistent metadata handling</li>
+ * <li><strong>Event Notification:</strong> Provides a listener mechanism
for components that
+ * need to react to schema changes</li>
+ * <li><strong>Validation and Monitoring:</strong> Continuously validates
CDC table configurations
+ * and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness
with system load:
+ * <ul>
+ * <li>60-second refresh interval for schema change detection</li>
+ * <li>49-second monitor interval (chosen to avoid harmonics with the 60s
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC
consumers always have
+ * up-to-date schema information, enabling proper data serialization,
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+ // 49sec least-common multiple with 60sec is 49min so offers best monitor
frequency without clashing with 60sec
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+ private final AtomicReference<String> currSchemaText = new
AtomicReference<>("");
+ private final AtomicReference<Set<CqlTable>> cdcTables = new
AtomicReference<>(Collections.emptySet());
+ private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new
ConcurrentHashMap<>();
+ private final CdcDatabaseAccessor databaseAccessor;
+ private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new
CopyOnWriteArrayList<>();
+ private final SidecarConfiguration sidecarConfiguration;
+ private final InstanceMetadataFetcher instanceFetcher;
+ private final SecondBoundConfiguration tableSchemaRefreshTime;
+ private final CassandraBridgeFactory cassandraBridgeFactory;
+
+ public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+ CdcDatabaseAccessor databaseAccessor,
+ SidecarConfiguration sidecarConfiguration,
+ CassandraBridgeFactory
cassandraBridgeFactory)
+ {
+
+ this.instanceFetcher = instanceFetcher;
+ this.databaseAccessor = databaseAccessor;
+ this.sidecarConfiguration = sidecarConfiguration;
+ this.tableSchemaRefreshTime =
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+ this.cassandraBridgeFactory = cassandraBridgeFactory;
+ }
+
+ public void addSchemaChangeListener(Runnable listener)
+ {
+ schemaChangeListeners.add(listener);
+ }
+
+ public void refresh()
+ {
+ NodeSettings nodeSettings =
instanceFetcher.callOnFirstAvailableInstance(instance->
instance.delegate().nodeSettings());
+ CassandraBridge cassandraBridge =
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+ CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+ try
+ {
+ LOGGER.info("Checking for schema changes...");
+ String fullSchemaText = databaseAccessor.fullSchema();
+ if (!fullSchemaText.equals(currSchemaText.get()))
+ {
+ LOGGER.info("Schema change detected, refreshing CDC tables");
+ currSchemaText.set(fullSchemaText);
+ Set<CqlTable> updatedCdcTables =
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher,
cassandraBridgeFactory);
+ LOGGER.info("Cdc enabled tables tables='{}'",
+ updatedCdcTables.stream()
+ .map(m -> String.format("%s.%s",
m.keyspace(), m.table()))
+ .collect(Collectors.joining(",")));
+ cdcTables.set(updatedCdcTables);
+ cdcBridge.updateCdcSchema(updatedCdcTables,
databaseAccessor.partitioner(),
Review Comment:
and of course Partitioner is in spark, so we wouldn't be able to change it
here. So we'll need to do the check before calling valueOF
##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchema.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Central schema management component for Cassandra cluster schema monitoring
and CDC table tracking.
+ * <p>
+ * This class provides comprehensive schema management functionality for
Cassandra Sidecar, specifically
+ * focused on CDC (Change Data Capture) operations. It maintains real-time
awareness of schema changes
+ * in the Cassandra cluster and manages CDC-enabled table metadata, enabling:
+ * <ul>
+ * <li>Continuous monitoring of Cassandra cluster schema changes</li>
+ * <li>Automatic detection and tracking of CDC-enabled tables</li>
+ * <li>Schema change event notification to registered listeners</li>
+ * <li>Table metadata caching and synchronization with CDC bridges</li>
+ * <li>Periodic validation of CDC table configurations</li>
+ * </ul>
+ * <p>
+ * The class operates with two main periodic tasks:
+ * <ul>
+ * <li><strong>Schema Refresh (60s interval):</strong> Monitors for schema
changes by comparing
+ * full schema snapshots and updates CDC table metadata when changes are
detected</li>
+ * <li><strong>Schema Monitor (49s interval):</strong> Validates that
CDC-enabled tables
+ * are properly configured in the Cassandra Schema.instance
singleton</li>
+ * </ul>
+ * <p>
+ * Key functionalities include:
+ * <ul>
+ * <li><strong>CDC Table Discovery:</strong> Automatically identifies and
tracks tables with
+ * CDC enabled from the cluster schema</li>
+ * <li><strong>Schema Change Detection:</strong> Compares schema snapshots
to detect modifications
+ * and trigger appropriate updates to CDC subsystems</li>
+ * <li><strong>Bridge Integration:</strong> Synchronizes schema information
with Cassandra and
+ * CDC bridges for consistent metadata handling</li>
+ * <li><strong>Event Notification:</strong> Provides a listener mechanism
for components that
+ * need to react to schema changes</li>
+ * <li><strong>Validation and Monitoring:</strong> Continuously validates
CDC table configurations
+ * and reports inconsistencies through metrics</li>
+ * </ul>
+ * <p>
+ * The schema refresh intervals are carefully chosen to balance responsiveness
with system load:
+ * <ul>
+ * <li>60-second refresh interval for schema change detection</li>
+ * <li>49-second monitor interval (chosen to avoid harmonics with the 60s
refresh cycle)</li>
+ * </ul>
+ * <p>
+ * This component is essential for CDC operations as it ensures that CDC
consumers always have
+ * up-to-date schema information, enabling proper data serialization,
deserialization, and
+ * processing across schema evolution events.
+ * <p>
+ * This class is thread-safe and designed as a singleton for dependency
injection into CDC
+ * and other schema-dependent components.
+ *
+ * @see org.apache.cassandra.sidecar.db.CdcDatabaseAccessor
+ * @see org.apache.cassandra.bridge.CdcBridge
+ * @see org.apache.cassandra.spark.data.CqlTable
+ */
+@Singleton
+public class CassandraClusterSchema implements PeriodicTask
+{
+ // 49sec least-common multiple with 60sec is 49min so offers best monitor
frequency without clashing with 60sec
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraClusterSchema.class);
+
+ private final AtomicReference<String> currSchemaText = new
AtomicReference<>("");
+ private final AtomicReference<Set<CqlTable>> cdcTables = new
AtomicReference<>(Collections.emptySet());
+ private final ConcurrentHashMap<TableIdentifier, UUID> tableIdCache = new
ConcurrentHashMap<>();
+ private final CdcDatabaseAccessor databaseAccessor;
+ private final CopyOnWriteArrayList<Runnable> schemaChangeListeners = new
CopyOnWriteArrayList<>();
+ private final SidecarConfiguration sidecarConfiguration;
+ private final InstanceMetadataFetcher instanceFetcher;
+ private final SecondBoundConfiguration tableSchemaRefreshTime;
+ private final CassandraBridgeFactory cassandraBridgeFactory;
+
+ public CassandraClusterSchema(InstanceMetadataFetcher instanceFetcher,
+ CdcDatabaseAccessor databaseAccessor,
+ SidecarConfiguration sidecarConfiguration,
+ CassandraBridgeFactory
cassandraBridgeFactory)
+ {
+
+ this.instanceFetcher = instanceFetcher;
+ this.databaseAccessor = databaseAccessor;
+ this.sidecarConfiguration = sidecarConfiguration;
+ this.tableSchemaRefreshTime =
sidecarConfiguration.serviceConfiguration().cdcConfiguration().tableSchemaRefreshTime();
+ this.cassandraBridgeFactory = cassandraBridgeFactory;
+ }
+
+ public void addSchemaChangeListener(Runnable listener)
+ {
+ schemaChangeListeners.add(listener);
+ }
+
+ public void refresh()
+ {
+ NodeSettings nodeSettings =
instanceFetcher.callOnFirstAvailableInstance(instance->
instance.delegate().nodeSettings());
+ CassandraBridge cassandraBridge =
cassandraBridgeFactory.get(nodeSettings.releaseVersion());
+ CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge);
+
+ try
+ {
+ LOGGER.info("Checking for schema changes...");
+ String fullSchemaText = databaseAccessor.fullSchema();
+ if (!fullSchemaText.equals(currSchemaText.get()))
+ {
+ LOGGER.info("Schema change detected, refreshing CDC tables");
+ currSchemaText.set(fullSchemaText);
+ Set<CqlTable> updatedCdcTables =
buildCdcTables(fullSchemaText, databaseAccessor, tableIdCache, instanceFetcher,
cassandraBridgeFactory);
+ LOGGER.info("Cdc enabled tables tables='{}'",
+ updatedCdcTables.stream()
+ .map(m -> String.format("%s.%s",
m.keyspace(), m.table()))
+ .collect(Collectors.joining(",")));
+ cdcTables.set(updatedCdcTables);
+ cdcBridge.updateCdcSchema(updatedCdcTables,
databaseAccessor.partitioner(),
+ ((keyspace, table) ->
tableIdCache.get(TableIdentifier.of(keyspace, table))));
+ schemaChangeListeners.forEach(Runnable::run);
+ }
+ }
+ catch (IllegalStateException exception)
+ {
+ LOGGER.warn("There was a problem refreshing the schema. Database
Accessor may not be ready", exception);
+ throw exception;
+ }
+ catch (Throwable t)
+ {
+ LOGGER.error("Unexpected error while refreshing the schema", t);
+ throw t;
+ }
+ }
+
+ public DurationSpec delay()
+ {
+ return tableSchemaRefreshTime;
+ }
+
+ @Override
+ public void execute(Promise<Void> promise)
+ {
+ try
+ {
+ refresh();
+ promise.tryComplete();
+ }
+ catch (Throwable t)
+ {
+ promise.fail(t);
+ }
+ }
+
+ @Override
+ public ScheduleDecision scheduleDecision()
+ {
+ boolean shouldSkip =
!sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled()
+ ||
!sidecarConfiguration.serviceConfiguration().cdcConfiguration().isEnabled();
+ return shouldSkip ? ScheduleDecision.SKIP : ScheduleDecision.EXECUTE;
+ }
+
+ @VisibleForTesting
+ static Set<CqlTable> buildCdcTables(CdcDatabaseAccessor
cdcDatabaseAccessor,
+ ConcurrentHashMap<TableIdentifier,
UUID> tableIdCache,
+ @NotNull InstanceMetadataFetcher
instanceFetcher,
+ @NotNull CassandraBridgeFactory
cassandraBridgeFactory)
+ {
+ return buildCdcTables(cdcDatabaseAccessor.fullSchema(),
+ cdcDatabaseAccessor.partitioner(),
+ tableIdCache,
+ cdcDatabaseAccessor::getTableId,
+ instanceFetcher,
+ cassandraBridgeFactory);
+ }
+
+ private static Set<CqlTable> buildCdcTables(@NotNull String fullSchema,
+ @NotNull CdcDatabaseAccessor
cdcDatabaseAccessor,
+ @NotNull
ConcurrentHashMap<TableIdentifier, UUID> tableIdCache,
+ @NotNull
InstanceMetadataFetcher instanceFetcher,
+ @NotNull
CassandraBridgeFactory cassandraBridgeFactory)
+ {
+ return buildCdcTables(fullSchema,
+ cdcDatabaseAccessor.partitioner(),
+ tableIdCache,
+ cdcDatabaseAccessor::getTableId,
+ instanceFetcher,
+ cassandraBridgeFactory);
+ }
+
+ private static Set<CqlTable> buildCdcTables(@NotNull final String
fullSchema,
+ @NotNull final Partitioner
partitioner,
+ @NotNull final
ConcurrentHashMap<TableIdentifier, UUID> tableIdCache,
+ @NotNull final
Function<TableIdentifier, UUID> tableIdLoaderFunction,
+ @NotNull final
InstanceMetadataFetcher instanceFetcher,
+ @NotNull final
CassandraBridgeFactory cassandraBridgeFactory)
+ {
+ Map<TableIdentifier, String> createStmts =
CdcUtil.extractCdcTables(fullSchema);
+ Map<String, Set<String>> udtsPerKeyspace = createStmts.keySet()
+ .stream()
+
.map(TableIdentifier::keyspace)
+ .distinct() //
remove duplicated keyspace strings
+
.collect(Collectors.toMap(Function.identity(),
+
keyspace -> CqlUtils.extractUdts(fullSchema, keyspace)));
+
+ Map<TableIdentifier, UUID> tableIds = createStmts.keySet()
+ .stream()
+
.collect(Collectors.toMap(Function.identity(),
+
id -> tableIdCache.computeIfAbsent(id, tableIdLoaderFunction)));
+
+ NodeSettings nodeSettings =
instanceFetcher.callOnFirstAvailableInstance(instance->
instance.delegate().nodeSettings());
Review Comment:
as a matter of fact, we have the bridge, we should just pass the bridge
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]