yifan-c commented on code in PR #294:
URL: https://github.com/apache/cassandra-sidecar/pull/294#discussion_r2674287663


##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.EventBus;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.cdc.kafka.KafkaOptions;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+import org.apache.cassandra.cdc.schemastore.SchemaStorePublisherFactory;
+import org.apache.cassandra.cdc.schemastore.TableSchemaPublisher;
+import org.apache.cassandra.sidecar.db.TableHistoryDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.tasks.CassandraClusterSchemaMonitor;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_START;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED;
+
+/**
+ * Schemas cache to be used by CDC event serialization. It contains a map of 
table schemas
+ * using TableIdentifier as key.
+ */
+@Singleton
+public class CachingSchemaStore implements SchemaStore
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachingSchemaStore.class);
+    private final Map<TableIdentifier, SchemaCacheEntry> avroSchemasCache = 
new ConcurrentHashMap<>();
+    private final CassandraClusterSchemaMonitor cassandraClusterSchemaMonitor;
+    private final SidecarSchema sidecarSchema;
+    private final TableHistoryDatabaseAccessor tableHistoryDatabaseAccessor;
+    private final Vertx vertx;
+    private final CdcConfigImpl cdcConfig;
+    @Nullable private volatile TableSchemaPublisher publisher;
+    private final CqlToAvroSchemaConverter cqlToAvroSchemaConverter;
+    private final SidecarCdcStats sidecarCdcStats;
+
+    private static final String METADATA_NAME_KEY = "name";
+    private static final String METADATA_NAMESPACE_KEY = "namespace";
+
+    @Inject
+    CachingSchemaStore(Vertx vertx,
+                       CassandraClusterSchemaMonitor 
cassandraClusterSchemaMonitor,
+                       TableHistoryDatabaseAccessor 
tableHistoryDatabaseAccessor,
+                       CdcConfigImpl cdcConfig,
+                       SidecarCdcStats sidecarCdcStats,
+                       SidecarSchema sidecarSchema,
+                       CqlToAvroSchemaConverter cqlToAvroSchemaConverter)
+    {
+        super();
+        this.cassandraClusterSchemaMonitor = cassandraClusterSchemaMonitor;
+        this.tableHistoryDatabaseAccessor = tableHistoryDatabaseAccessor;
+        this.sidecarSchema = sidecarSchema;
+        this.cqlToAvroSchemaConverter = cqlToAvroSchemaConverter;
+        
this.avroSchemasCache.putAll(createSchemaCache(cassandraClusterSchemaMonitor.getCdcTables()));
+        AvroSchemas.registerLogicalTypes();
+        
cassandraClusterSchemaMonitor.addSchemaChangeListener(this::onSchemaChanged);
+        this.vertx = vertx;
+        this.cdcConfig = cdcConfig;
+        this.sidecarCdcStats = sidecarCdcStats;
+
+        configureSidecarServerEventListeners();
+    }
+
+    private void loadPublisher()
+    {
+        KafkaOptions kafkaOptions = () -> cdcConfig.kafkaConfigs();
+        this.publisher = 
SchemaStorePublisherFactory.DEFAULT.buildPublisher(kafkaOptions);
+    }
+
+    private void configureSidecarServerEventListeners()
+    {
+        EventBus eventBus = vertx.eventBus();
+
+        eventBus.localConsumer(ON_SERVER_START.address(), startMessage -> {
+            eventBus.localConsumer(ON_SIDECAR_SCHEMA_INITIALIZED.address(), 
message -> {
+                LOGGER.debug("Sidecar Schema initialized message={}", message);
+                Set<CqlTable> refreshedCdcTables = 
cassandraClusterSchemaMonitor.getCdcTables();
+                for (CqlTable cqlTable : refreshedCdcTables)
+                {
+                    TableIdentifier tableIdentifier = 
TableIdentifier.of(cqlTable.keyspace(), cqlTable.table());
+                    avroSchemasCache.compute(tableIdentifier, (k, v) ->
+                    {
+                        
tableHistoryDatabaseAccessor.insertTableSchemaHistory(cqlTable.keyspace(), 
cqlTable.table(), cqlTable.createStatement());
+                        return v;
+                    });
+                }
+                loadPublisher();
+                publishSchemas();
+            });
+        });
+    }
+
+    private void publishSchemas()
+    {
+        Set<CqlTable> refreshedCdcTables = 
cassandraClusterSchemaMonitor.getCdcTables();
+        for (CqlTable cqlTable : refreshedCdcTables)
+        {
+            TableIdentifier tableIdentifier = 
TableIdentifier.of(cqlTable.keyspace(), cqlTable.table());
+            avroSchemasCache.compute(tableIdentifier, (k, v) ->
+            {
+                if (null != publisher)
+                {
+                    Schema schema = cqlToAvroSchemaConverter.convert(cqlTable);
+                    TableSchemaPublisher.SchemaPublishMetadata metadata = new 
TableSchemaPublisher.SchemaPublishMetadata();
+                    metadata.put(METADATA_NAME_KEY, cqlTable.table());
+                    metadata.put(METADATA_NAMESPACE_KEY, cqlTable.keyspace());
+                    publisher.publishSchema(schema.toString(false), metadata);
+                    sidecarCdcStats.capturePublishedSchema();
+                }
+                return new SchemaCacheEntry(cqlTable, 
cqlToAvroSchemaConverter);
+            });
+        }
+    }
+
+    @VisibleForTesting
+    void onSchemaChanged()
+    {
+        Set<CqlTable> refreshedCdcTables = 
cassandraClusterSchemaMonitor.getCdcTables();
+        for (CqlTable cqlTable : refreshedCdcTables)
+        {
+            TableIdentifier tableIdentifier = 
TableIdentifier.of(cqlTable.keyspace(), cqlTable.table());
+            avroSchemasCache.compute(tableIdentifier, (k, v) ->
+            {
+                if (v == null || 
!v.tableSchema().equals(cqlTable.createStatement()))
+                {
+                    if (sidecarSchema.isInitialized())
+                    {
+                        
tableHistoryDatabaseAccessor.insertTableSchemaHistory(cqlTable.keyspace(), 
cqlTable.table(), cqlTable.createStatement());
+                    }
+                    LOGGER.info("Re-generating Avro Schema after schema change 
keyspace={} table={}", tableIdentifier.keyspace(), tableIdentifier.table());
+                    return new SchemaCacheEntry(cqlTable, 
cqlToAvroSchemaConverter);
+                }
+                return v;
+            });
+            loadPublisher();
+            publishSchemas();

Review Comment:
   Those 2 methods are called in _every_ iteration. Looks like they are 
included in the for-loop by mistake. 



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.api.TokenRangeSupplier;
+import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
+import org.apache.cassandra.cdc.sidecar.SidecarCdc;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcStats;
+import org.apache.cassandra.cdc.sidecar.SidecarStatePersister;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.coordination.RangeManager;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.utils.AsyncExecutor;
+import org.jetbrains.annotations.NotNull;
+
+
+/**
+ * Manages the lifecycle and coordination of CDC (Change Data Capture) 
consumers for processing
+ * Cassandra change events across distributed token ranges.
+ *
+ * <p>This class is responsible for:
+ * <ul>
+ *   <li>Building and configuring {@link SidecarCdc} consumers based on owned 
token ranges</li>
+ *   <li>Deduplicating consumers by instance ID and token range to prevent 
duplicate processing</li>
+ *   <li>Managing consumer lifecycle (start/stop operations)</li>
+ *   <li>Integrating with various providers for cluster configuration, schema, 
and instance metadata</li>
+ *   <li>Coordinating with the range manager to determine token ownership</li>
+ * </ul>
+ *
+ * <p>The CDC consumers created by this manager process change events from 
Cassandra commit logs
+ * and forward them to configured event consumers (such as Kafka producers) 
while maintaining
+ * state persistence and proper token range distribution across the cluster.
+ *
+ * @see SidecarCdc
+ * @see EventConsumer
+ * @see RangeManager
+ * @see CdcConfig
+ */
+public class CdcManager
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CdcManager.class);
+    private final CdcConfig conf;
+    private final RangeManager rangeManager;
+    private final InstanceMetadataFetcher instanceFetcher;
+    private final EventConsumer eventConsumer;
+    private final SchemaSupplier schemaSupplier;
+    private final ClusterConfigProvider clusterConfigProvider;
+    private final CdcSidecarInstancesProvider sidecarInstancesProvider;
+    private final SecretsProvider secretsProvider;
+    private final SidecarCdcClient.ClientConfig clientConfig;
+    private final ICdcStats cdcStats;
+    private List<SidecarCdc> consumers = new ArrayList<>();
+    private final TaskExecutorPool taskExecutorPool;
+    private final CdcDatabaseAccessor cdcDatabaseAccessor;
+
+
+    public CdcManager(EventConsumer eventConsumer,
+                      SchemaSupplier schemaSupplier,
+                      CdcConfig conf,
+                      RangeManager rangeManager,
+                      InstanceMetadataFetcher instanceFetcher,
+                      ClusterConfigProvider clusterConfigProvider,
+                      CdcSidecarInstancesProvider sidecarInstancesProvider,
+                      SecretsProvider secretsProvider,
+                      SidecarCdcClient.ClientConfig clientConfig,
+                      ICdcStats cdcStats,
+                      TaskExecutorPool taskExecutorPool,
+                      CdcDatabaseAccessor cdcDatabaseAccessor)
+    {
+        this.eventConsumer = eventConsumer;
+        this.schemaSupplier = schemaSupplier;
+        this.conf = conf;
+        this.rangeManager = rangeManager;
+        this.instanceFetcher = instanceFetcher;
+        this.clusterConfigProvider = clusterConfigProvider;
+        this.sidecarInstancesProvider = sidecarInstancesProvider;
+        this.secretsProvider = secretsProvider;
+        this.clientConfig = clientConfig;
+        this.cdcStats = cdcStats;
+        this.taskExecutorPool = taskExecutorPool;
+        this.cdcDatabaseAccessor = cdcDatabaseAccessor;
+    }
+
+    List<SidecarCdc> buildCdcConsumers()
+    {
+        Map<String, Set<TokenRange>> ownedRanges = 
rangeManager.ownedTokenRanges();
+        if (ownedRanges == null || ownedRanges.isEmpty())
+        {
+            throw new IllegalStateException("No owned token ranges right now, 
cql session may still be initializing.");
+        }
+
+        // NEW: Deduplicate by (instanceId, tokenRange) to prevent duplicate 
consumers
+        Map<String, SidecarCdc> uniqueConsumers = new 
HashMap<>(ownedRanges.values().stream().mapToInt(Set::size).sum());
+
+        ownedRanges.entrySet().stream()
+                   .flatMap(entry ->
+                            entry.getValue().stream().map(range -> {
+                                Integer instanceId = 
getInstanceId(entry.getKey());
+
+                                // Create unique key: 
"instanceId:rangeStart:rangeEnd"
+                                String uniqueKey = String.format("%d:%s:%s",
+                                                                 instanceId,
+                                                                 
range.startAsBigInt(),
+                                                                 
range.endAsBigInt());
+
+                                // Only create consumer if not already created 
for this (instance, range)
+                                return 
uniqueConsumers.computeIfAbsent(uniqueKey, k -> {
+                                    try
+                                    {
+                                        return 
loadOrBuildCdcConsumer(instanceId,
+                                                                      
clusterConfigProvider,
+                                                                      
eventConsumer,
+                                                                      
schemaSupplier,
+                                                                      () -> 
org.apache.cassandra.bridge.TokenRange.openClosed(range.startAsBigInt(), 
range.endAsBigInt()),
+                                                                      
sidecarInstancesProvider,
+                                                                      
secretsProvider,
+                                                                      
clientConfig,
+                                                                      conf,
+                                                                      cdcStats,
+                                                                      
taskExecutorPool);
+                                    }
+                                    catch (IOException e)
+                                    {
+                                        throw new RuntimeException(e);
+                                    }
+                                });
+                            }))
+                   .collect(Collectors.toList());;

Review Comment:
   double `;`



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.EventBus;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.cdc.kafka.KafkaOptions;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+import org.apache.cassandra.cdc.schemastore.SchemaStorePublisherFactory;
+import org.apache.cassandra.cdc.schemastore.TableSchemaPublisher;
+import org.apache.cassandra.sidecar.db.TableHistoryDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.tasks.CassandraClusterSchemaMonitor;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_START;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED;
+
+/**
+ * Schemas cache to be used by CDC event serialization. It contains a map of 
table schemas
+ * using TableIdentifier as key.
+ */
+@Singleton
+public class CachingSchemaStore implements SchemaStore
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachingSchemaStore.class);
+    private final Map<TableIdentifier, SchemaCacheEntry> avroSchemasCache = 
new ConcurrentHashMap<>();
+    private final CassandraClusterSchemaMonitor cassandraClusterSchemaMonitor;
+    private final SidecarSchema sidecarSchema;
+    private final TableHistoryDatabaseAccessor tableHistoryDatabaseAccessor;
+    private final Vertx vertx;
+    private final CdcConfigImpl cdcConfig;
+    @Nullable private volatile TableSchemaPublisher publisher;
+    private final CqlToAvroSchemaConverter cqlToAvroSchemaConverter;
+    private final SidecarCdcStats sidecarCdcStats;
+
+    private static final String METADATA_NAME_KEY = "name";
+    private static final String METADATA_NAMESPACE_KEY = "namespace";
+
+    @Inject
+    CachingSchemaStore(Vertx vertx,
+                       CassandraClusterSchemaMonitor 
cassandraClusterSchemaMonitor,
+                       TableHistoryDatabaseAccessor 
tableHistoryDatabaseAccessor,
+                       CdcConfigImpl cdcConfig,
+                       SidecarCdcStats sidecarCdcStats,
+                       SidecarSchema sidecarSchema,
+                       CqlToAvroSchemaConverter cqlToAvroSchemaConverter)
+    {
+        super();

Review Comment:
   `super()` is redundant, because it is just implementing the interface 
`SchemaStore`



##########
server/src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java:
##########
@@ -113,6 +113,10 @@ public enum SidecarServerEvents
      * has lost the cluster-wide lease.
      */
     ON_SIDECAR_GLOBAL_LEASE_LOST,
+
+    ON_CDC_CONFIGURATION_CHANGED,
+    ON_CDC_CACHE_WARMED_UP,
+    ON_CDC_CONFIG_MAPPINGS_CHANGED,

Review Comment:
   Add javadoc



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.EventBus;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.cdc.kafka.KafkaOptions;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+import org.apache.cassandra.cdc.schemastore.SchemaStorePublisherFactory;
+import org.apache.cassandra.cdc.schemastore.TableSchemaPublisher;
+import org.apache.cassandra.sidecar.db.TableHistoryDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.tasks.CassandraClusterSchemaMonitor;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_START;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED;
+
+/**
+ * Schemas cache to be used by CDC event serialization. It contains a map of 
table schemas
+ * using TableIdentifier as key.
+ */
+@Singleton
+public class CachingSchemaStore implements SchemaStore
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachingSchemaStore.class);
+    private final Map<TableIdentifier, SchemaCacheEntry> avroSchemasCache = 
new ConcurrentHashMap<>();
+    private final CassandraClusterSchemaMonitor cassandraClusterSchemaMonitor;
+    private final SidecarSchema sidecarSchema;
+    private final TableHistoryDatabaseAccessor tableHistoryDatabaseAccessor;
+    private final Vertx vertx;
+    private final CdcConfigImpl cdcConfig;
+    @Nullable private volatile TableSchemaPublisher publisher;
+    private final CqlToAvroSchemaConverter cqlToAvroSchemaConverter;
+    private final SidecarCdcStats sidecarCdcStats;
+
+    private static final String METADATA_NAME_KEY = "name";
+    private static final String METADATA_NAMESPACE_KEY = "namespace";
+
+    @Inject
+    CachingSchemaStore(Vertx vertx,
+                       CassandraClusterSchemaMonitor 
cassandraClusterSchemaMonitor,
+                       TableHistoryDatabaseAccessor 
tableHistoryDatabaseAccessor,
+                       CdcConfigImpl cdcConfig,
+                       SidecarCdcStats sidecarCdcStats,
+                       SidecarSchema sidecarSchema,
+                       CqlToAvroSchemaConverter cqlToAvroSchemaConverter)
+    {
+        super();
+        this.cassandraClusterSchemaMonitor = cassandraClusterSchemaMonitor;
+        this.tableHistoryDatabaseAccessor = tableHistoryDatabaseAccessor;
+        this.sidecarSchema = sidecarSchema;
+        this.cqlToAvroSchemaConverter = cqlToAvroSchemaConverter;
+        
this.avroSchemasCache.putAll(createSchemaCache(cassandraClusterSchemaMonitor.getCdcTables()));
+        AvroSchemas.registerLogicalTypes();
+        
cassandraClusterSchemaMonitor.addSchemaChangeListener(this::onSchemaChanged);
+        this.vertx = vertx;
+        this.cdcConfig = cdcConfig;
+        this.sidecarCdcStats = sidecarCdcStats;
+
+        configureSidecarServerEventListeners();
+    }
+
+    private void loadPublisher()
+    {
+        KafkaOptions kafkaOptions = () -> cdcConfig.kafkaConfigs();
+        this.publisher = 
SchemaStorePublisherFactory.DEFAULT.buildPublisher(kafkaOptions);

Review Comment:
   `publisher` is a `Closeable` object. We should close the previous object 
when assigning a new one. 



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.EventBus;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.cdc.kafka.KafkaOptions;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+import org.apache.cassandra.cdc.schemastore.SchemaStorePublisherFactory;
+import org.apache.cassandra.cdc.schemastore.TableSchemaPublisher;
+import org.apache.cassandra.sidecar.db.TableHistoryDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.tasks.CassandraClusterSchemaMonitor;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_START;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED;
+
+/**
+ * Schemas cache to be used by CDC event serialization. It contains a map of 
table schemas
+ * using TableIdentifier as key.
+ */
+@Singleton
+public class CachingSchemaStore implements SchemaStore
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachingSchemaStore.class);
+    private final Map<TableIdentifier, SchemaCacheEntry> avroSchemasCache = 
new ConcurrentHashMap<>();
+    private final CassandraClusterSchemaMonitor cassandraClusterSchemaMonitor;
+    private final SidecarSchema sidecarSchema;
+    private final TableHistoryDatabaseAccessor tableHistoryDatabaseAccessor;
+    private final Vertx vertx;
+    private final CdcConfigImpl cdcConfig;
+    @Nullable private volatile TableSchemaPublisher publisher;
+    private final CqlToAvroSchemaConverter cqlToAvroSchemaConverter;
+    private final SidecarCdcStats sidecarCdcStats;
+
+    private static final String METADATA_NAME_KEY = "name";
+    private static final String METADATA_NAMESPACE_KEY = "namespace";
+
+    @Inject
+    CachingSchemaStore(Vertx vertx,
+                       CassandraClusterSchemaMonitor 
cassandraClusterSchemaMonitor,
+                       TableHistoryDatabaseAccessor 
tableHistoryDatabaseAccessor,
+                       CdcConfigImpl cdcConfig,
+                       SidecarCdcStats sidecarCdcStats,
+                       SidecarSchema sidecarSchema,
+                       CqlToAvroSchemaConverter cqlToAvroSchemaConverter)
+    {
+        super();
+        this.cassandraClusterSchemaMonitor = cassandraClusterSchemaMonitor;
+        this.tableHistoryDatabaseAccessor = tableHistoryDatabaseAccessor;
+        this.sidecarSchema = sidecarSchema;
+        this.cqlToAvroSchemaConverter = cqlToAvroSchemaConverter;
+        
this.avroSchemasCache.putAll(createSchemaCache(cassandraClusterSchemaMonitor.getCdcTables()));
+        AvroSchemas.registerLogicalTypes();
+        
cassandraClusterSchemaMonitor.addSchemaChangeListener(this::onSchemaChanged);
+        this.vertx = vertx;
+        this.cdcConfig = cdcConfig;
+        this.sidecarCdcStats = sidecarCdcStats;
+
+        configureSidecarServerEventListeners();

Review Comment:
   When CDC is disabled, we should skip those all together. It does not make 
sense to register listeners, etc. 
   
   Ideally, we should not mount the `CdcModule` entirely, if the feature is not 
enabled. It is outside of the scope of this PR. 



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import org.apache.cassandra.cdc.CdcLogMode;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.kafka.KafkaPublisher;
+import org.apache.cassandra.cdc.kafka.TopicSupplier;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
+import org.apache.cassandra.cdc.sidecar.SidecarCdc;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.secrets.SslConfig;
+import org.apache.cassandra.secrets.SslConfigSecretsProvider;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SslConfiguration;
+import org.apache.cassandra.sidecar.coordination.RangeManager;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.Serializer;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CONFIGURATION_CHANGED;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
+
+/**
+ * Class that handles CDC life cycle
+ */
+@Singleton
+public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CdcPublisher.class);
+    private static final long INITIALIZATION_LOOP_DELAY_MILLIS = 1000;
+
+    // SSL Configuration Keys
+    private static final String SSL_ENABLED_KEY = "enabled";
+    private static final String SSL_PREFER_OPENSSL_KEY = "preferOpenSSL";
+    private static final String SSL_CLIENT_AUTH_KEY = "clientAuth";
+    private static final String SSL_CIPHER_SUITES_KEY = "cipherSuites";
+    private static final String SSL_SECURE_TRANSPORT_PROTOCOLS_KEY = 
"secureTransportProtocols";
+    private static final String SSL_HANDSHAKE_TIMEOUT_KEY = "handshakeTimeout";
+    private static final String SSL_KEYSTORE_PATH_KEY = "keystorePath";
+    private static final String SSL_KEYSTORE_PASSWORD_KEY = "keystorePassword";
+    private static final String SSL_KEYSTORE_TYPE_KEY = "keystoreType";
+    private static final String SSL_TRUSTSTORE_PATH_KEY = "truststorePath";
+    private static final String SSL_TRUSTSTORE_PASSWORD_KEY = 
"truststorePassword";
+    private static final String SSL_TRUSTSTORE_TYPE_KEY = "truststoreType";
+    private final TaskExecutorPool executorPools;
+    private final CdcConfig conf;
+    private volatile boolean isRunning = false;
+    private volatile boolean isInitialized = false;
+    private volatile boolean cdcCacheWarmedUp = false;
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final VirtualTablesDatabaseAccessor virtualTables;
+    private final SidecarCdcStats sidecarCdcStats;
+    private final SchemaSupplier schemaSupplier;
+    private final CdcSidecarInstancesProvider sidecarInstancesProvider;
+    private final InstanceMetadataFetcher instanceMetadataFetcher;
+    private final ClusterConfigProvider clusterConfigProvider;
+    private final SidecarCdcClient.ClientConfig clientConfig;
+    private final ICdcStats cdcStats;
+    private final SidecarConfiguration sidecarConfiguration;
+    private CdcManager cdcManager;
+    private final Serializer<CdcEvent> avroSerializer;
+    private final Provider<RangeManager> rangeManagerProvider;
+
+    @Inject
+    public CdcPublisher(Vertx vertx,
+                        SidecarConfiguration sidecarConfiguration,
+                        ExecutorPools executorPools,
+                        ClusterConfigProvider clusterConfigProvider,
+                        SchemaSupplier schemaSupplier,
+                        CdcSidecarInstancesProvider sidecarInstancesProvider,
+                        SidecarCdcClient.ClientConfig clientConfig,
+                        InstanceMetadataFetcher instanceMetadataFetcher,
+                        CdcConfig conf,
+                        CdcDatabaseAccessor databaseAccessor,
+                        ICdcStats cdcStats,
+                        VirtualTablesDatabaseAccessor virtualTables,
+                        SidecarCdcStats sidecarCdcStats,
+                        Serializer<CdcEvent> avroSerializer,
+                        Provider<RangeManager> rangeManagerProvider)
+    {
+        this.sidecarCdcStats = sidecarCdcStats;
+        this.executorPools = executorPools.internal();
+        this.conf = conf;
+        this.databaseAccessor = databaseAccessor;
+        this.virtualTables = virtualTables;
+
+        this.schemaSupplier = schemaSupplier;
+        this.sidecarInstancesProvider = sidecarInstancesProvider;
+        this.instanceMetadataFetcher = instanceMetadataFetcher;
+        this.clusterConfigProvider = clusterConfigProvider;
+        this.clientConfig = clientConfig;
+        this.cdcStats = cdcStats;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.avroSerializer = avroSerializer;
+        this.rangeManagerProvider = rangeManagerProvider;
+        
vertx.eventBus().localConsumer(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address(),
 this);
+        
vertx.eventBus().localConsumer(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address(),
 this);
+        
vertx.eventBus().localConsumer(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address(),
 this);
+        vertx.eventBus().localConsumer(ON_SERVER_STOP.address(), this);
+        vertx.eventBus().localConsumer(ON_CDC_CACHE_WARMED_UP.address(), this);
+        vertx.eventBus().localConsumer(ON_CDC_CONFIGURATION_CHANGED.address(), 
new ConfigChangedHandler());

Review Comment:
   Skip those when CDC is disabled. 



##########
server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java:
##########
@@ -269,4 +315,127 @@ public SidecarCdcStats sidecarCdcStats()
         {
         };
     }
+
+    @Provides
+    @Singleton
+    public Serializer<CdcEvent> getSerializer(CachingSchemaStore schemaStore,
+                                              InstanceMetadataFetcher 
instanceMetadataFetcher,
+                                              CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+        return new CdcAvroSerializer(schemaStore, instanceMetadataFetcher, 
cassandraBridgeFactory);
+    }
+
+    @Provides
+    @Singleton
+    public SchemaSupplier schemaSupplier(InstanceMetadataFetcher 
instanceMetadataFetcher,
+                                         CassandraBridgeFactory 
cassandraBridgeFactory,
+                                         CdcDatabaseAccessor 
cdcDatabaseAccessor)
+    {
+        return new CdcSchemaSupplier(instanceMetadataFetcher, 
cassandraBridgeFactory, cdcDatabaseAccessor);
+    }
+
+    @Provides
+    @Singleton
+    public ClusterConfigProvider clusterConfigProvider(InstanceMetadataFetcher 
instanceMetadataFetcher)
+    {
+        return new SidecarClusterConfigProvider(instanceMetadataFetcher);
+    }
+
+    @Provides
+    @Singleton
+    public ICdcStats cdcStats()
+    {
+        return new CdcStats()
+        {
+        };
+    }
+
+    @Provides
+    @Singleton
+    public TokenRingProvider tokenRingProvider(InstancesMetadata 
instancesMetadata, InstanceMetadataFetcher instanceMetadataFetcher, 
ServiceConfiguration configuration)
+    {
+        return new CassandraClientTokenRingProvider(instancesMetadata, 
instanceMetadataFetcher, configuration.dnsResolver());
+    }
+
+    @Provides
+    @Singleton
+    public SidecarCdcClient.ClientConfig clientConfig(SidecarConfiguration 
sidecarConfiguration)
+    {
+        SidecarClientConfiguration sidecarClientConfiguration = 
sidecarConfiguration.sidecarClientConfiguration();
+        return 
SidecarCdcClient.ClientConfig.create(sidecarConfiguration.serviceConfiguration().port(),
+                                                    
sidecarClientConfiguration.maxRetries(),
+                                                    
sidecarClientConfiguration.retryDelay().toIntMillis());
+    }
+
+    @Provides
+    @Singleton
+    public TableSchema virtualTablesDatabaseAccessor(ServiceConfiguration 
configuration)
+    {
+        return new CdcStatesSchema(configuration);
+    }
+
+    @ProvidesIntoMap
+    @KeyClassMapKey(PeriodicTaskMapKeys.CdcPublisherTaskKey.class)
+    PeriodicTask cdcPublisherTask(Vertx vertx,
+                                  SidecarConfiguration sidecarConfiguration,
+                                  ExecutorPools executorPools,
+                                  ClusterConfigProvider clusterConfigProvider,
+                                  SchemaSupplier schemaSupplier,
+                                  CdcSidecarInstancesProvider 
sidecarInstancesProvider,
+                                  SidecarCdcClient.ClientConfig clientConfig,
+                                  InstanceMetadataFetcher 
instanceMetadataFetcher,
+                                  CdcConfig conf,
+                                  CdcDatabaseAccessor databaseAccessor,
+                                  TokenRingProvider tokenRingProvider,
+                                  ICdcStats cdcStats,
+                                  VirtualTablesDatabaseAccessor virtualTables,
+                                  SidecarCdcStats sidecarCdcStats,
+                                  Serializer<CdcEvent> avroSerializer)
+    {
+        return new CdcPublisher(vertx,
+                                sidecarConfiguration,
+                                executorPools,
+                                clusterConfigProvider,
+                                schemaSupplier,
+                                sidecarInstancesProvider,
+                                clientConfig,
+                                instanceMetadataFetcher,
+                                conf,
+                                databaseAccessor,
+                                cdcStats,
+                                virtualTables,
+                                sidecarCdcStats,
+                                avroSerializer,
+                                () -> new ContentionFreeRangeManager(vertx, 
tokenRingProvider));
+    }
+
+    @Singleton
+    @ProvidesIntoMap
+    @KeyClassMapKey(PeriodicTaskMapKeys.CdcConfigRefresherNotifierKey.class)
+    PeriodicTask cdcConfigRefresherNotifier(Vertx vertx,
+                                            SidecarConfiguration 
sidecarConfiguration,
+                                    KafkaConfigAccessor kafkaConfigAccessor,
+                                    CdcConfigAccessor cdcConfigAccessor)

Review Comment:
   indentation



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import org.apache.cassandra.cdc.CdcLogMode;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.kafka.KafkaPublisher;
+import org.apache.cassandra.cdc.kafka.TopicSupplier;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
+import org.apache.cassandra.cdc.sidecar.SidecarCdc;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.secrets.SslConfig;
+import org.apache.cassandra.secrets.SslConfigSecretsProvider;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SslConfiguration;
+import org.apache.cassandra.sidecar.coordination.RangeManager;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.Serializer;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CONFIGURATION_CHANGED;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
+
+/**
+ * Class that handles CDC life cycle
+ */
+@Singleton
+public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CdcPublisher.class);
+    private static final long INITIALIZATION_LOOP_DELAY_MILLIS = 1000;
+
+    // SSL Configuration Keys
+    private static final String SSL_ENABLED_KEY = "enabled";
+    private static final String SSL_PREFER_OPENSSL_KEY = "preferOpenSSL";
+    private static final String SSL_CLIENT_AUTH_KEY = "clientAuth";
+    private static final String SSL_CIPHER_SUITES_KEY = "cipherSuites";
+    private static final String SSL_SECURE_TRANSPORT_PROTOCOLS_KEY = 
"secureTransportProtocols";
+    private static final String SSL_HANDSHAKE_TIMEOUT_KEY = "handshakeTimeout";
+    private static final String SSL_KEYSTORE_PATH_KEY = "keystorePath";
+    private static final String SSL_KEYSTORE_PASSWORD_KEY = "keystorePassword";
+    private static final String SSL_KEYSTORE_TYPE_KEY = "keystoreType";
+    private static final String SSL_TRUSTSTORE_PATH_KEY = "truststorePath";
+    private static final String SSL_TRUSTSTORE_PASSWORD_KEY = 
"truststorePassword";
+    private static final String SSL_TRUSTSTORE_TYPE_KEY = "truststoreType";
+    private final TaskExecutorPool executorPools;
+    private final CdcConfig conf;
+    private volatile boolean isRunning = false;
+    private volatile boolean isInitialized = false;
+    private volatile boolean cdcCacheWarmedUp = false;
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final VirtualTablesDatabaseAccessor virtualTables;
+    private final SidecarCdcStats sidecarCdcStats;
+    private final SchemaSupplier schemaSupplier;
+    private final CdcSidecarInstancesProvider sidecarInstancesProvider;
+    private final InstanceMetadataFetcher instanceMetadataFetcher;
+    private final ClusterConfigProvider clusterConfigProvider;
+    private final SidecarCdcClient.ClientConfig clientConfig;
+    private final ICdcStats cdcStats;
+    private final SidecarConfiguration sidecarConfiguration;
+    private CdcManager cdcManager;
+    private final Serializer<CdcEvent> avroSerializer;
+    private final Provider<RangeManager> rangeManagerProvider;
+
+    @Inject
+    public CdcPublisher(Vertx vertx,
+                        SidecarConfiguration sidecarConfiguration,
+                        ExecutorPools executorPools,
+                        ClusterConfigProvider clusterConfigProvider,
+                        SchemaSupplier schemaSupplier,
+                        CdcSidecarInstancesProvider sidecarInstancesProvider,
+                        SidecarCdcClient.ClientConfig clientConfig,
+                        InstanceMetadataFetcher instanceMetadataFetcher,
+                        CdcConfig conf,
+                        CdcDatabaseAccessor databaseAccessor,
+                        ICdcStats cdcStats,
+                        VirtualTablesDatabaseAccessor virtualTables,
+                        SidecarCdcStats sidecarCdcStats,
+                        Serializer<CdcEvent> avroSerializer,
+                        Provider<RangeManager> rangeManagerProvider)
+    {
+        this.sidecarCdcStats = sidecarCdcStats;
+        this.executorPools = executorPools.internal();
+        this.conf = conf;
+        this.databaseAccessor = databaseAccessor;
+        this.virtualTables = virtualTables;
+
+        this.schemaSupplier = schemaSupplier;
+        this.sidecarInstancesProvider = sidecarInstancesProvider;
+        this.instanceMetadataFetcher = instanceMetadataFetcher;
+        this.clusterConfigProvider = clusterConfigProvider;
+        this.clientConfig = clientConfig;
+        this.cdcStats = cdcStats;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.avroSerializer = avroSerializer;
+        this.rangeManagerProvider = rangeManagerProvider;
+        
vertx.eventBus().localConsumer(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address(),
 this);
+        
vertx.eventBus().localConsumer(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address(),
 this);
+        
vertx.eventBus().localConsumer(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address(),
 this);
+        vertx.eventBus().localConsumer(ON_SERVER_STOP.address(), this);
+        vertx.eventBus().localConsumer(ON_CDC_CACHE_WARMED_UP.address(), this);
+        vertx.eventBus().localConsumer(ON_CDC_CONFIGURATION_CHANGED.address(), 
new ConfigChangedHandler());
+    }
+
+    public SecretsProvider secretsProvider()
+    {
+        SslConfiguration sslConfiguration = 
sidecarConfiguration.sidecarClientConfiguration().sslConfiguration();
+
+        if (sslConfiguration == null || !sslConfiguration.enabled())
+        {
+            return null;
+        }
+
+        Map<String, String> sslConfigMap = new HashMap<>();
+
+        sslConfigMap.put(SSL_ENABLED_KEY, sslConfiguration.enabled() + "");
+        sslConfigMap.put(SSL_PREFER_OPENSSL_KEY, 
sslConfiguration.preferOpenSSL() + "");
+        sslConfigMap.put(SSL_CLIENT_AUTH_KEY, sslConfiguration.clientAuth());
+        sslConfigMap.put(SSL_CIPHER_SUITES_KEY, String.join(",", 
sslConfiguration.cipherSuites()));
+        sslConfigMap.put(SSL_SECURE_TRANSPORT_PROTOCOLS_KEY, String.join(",", 
sslConfiguration.secureTransportProtocols()));
+        sslConfigMap.put(SSL_HANDSHAKE_TIMEOUT_KEY, 
sslConfiguration.handshakeTimeout().toString());
+
+        if (sslConfiguration.isKeystoreConfigured())
+        {
+            KeyStoreConfiguration keystore = sslConfiguration.keystore();
+            sslConfigMap.put(SSL_KEYSTORE_PATH_KEY, keystore.path());
+            sslConfigMap.put(SSL_KEYSTORE_PASSWORD_KEY, keystore.password());
+            sslConfigMap.put(SSL_KEYSTORE_TYPE_KEY, keystore.type());
+        }
+
+        if (sslConfiguration.isTrustStoreConfigured())
+        {
+            KeyStoreConfiguration truststore = sslConfiguration.truststore();
+            sslConfigMap.put(SSL_TRUSTSTORE_PATH_KEY, truststore.path());
+            sslConfigMap.put(SSL_TRUSTSTORE_PASSWORD_KEY, 
truststore.password());
+            sslConfigMap.put(SSL_TRUSTSTORE_TYPE_KEY, truststore.type());
+        }
+
+        SslConfig sslConfig = SslConfig.create(sslConfigMap);
+        return new SslConfigSecretsProvider(sslConfig);
+    }
+
+    public EventConsumer eventConsumer(CdcConfig conf,
+                                       Serializer<CdcEvent> avroSerializer)
+    {
+        KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(conf.kafkaConfigs());
+        KafkaPublisher kafkaPublisher = new 
KafkaPublisher(TopicSupplier.staticTopicSupplier(conf.kafkaTopic()),

Review Comment:
   Both `producer` and `kafkaPublisher` are `Closeable`. I do not see them get 
closed anywhere. We are create a new eventConsumer on every publisher restart, 
meaning leaks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to