frankgh commented on code in PR #251:
URL: https://github.com/apache/cassandra-sidecar/pull/251#discussion_r2335034731


##########
server/src/test/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessorTests.java:
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.google.inject.Provider;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema;
+import org.apache.cassandra.sidecar.db.schema.TableHistorySchema;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.TokenSplitUtil;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.apache.cassandra.sidecar.db.CdcDatabaseAccessor.await;
+import static org.apache.cassandra.sidecar.utils.TokenSplitUtil.overlaps;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link CdcDatabaseAccessor}
+ */
+public class CdcDatabaseAccessorTests
+{
+    @Test
+    public void testDataStoreBasic()
+    {
+        MockCdcStateV2 datastore = new MockCdcStateV2();
+        String jobId = UUID.randomUUID().toString();
+        ByteBuffer buf1 = ByteBuffer.wrap(new byte[]{ 'a', 'b', 'c' });
+        ByteBuffer buf2 = ByteBuffer.wrap(new byte[]{ 'e', 'f', 'g' });
+        assertTrue(datastore.isEmpty());
+        assertTrue(datastore.select(jobId, 0).isEmpty());
+        datastore.insert(jobId, 0, BigInteger.ZERO, BigInteger.TEN, buf1);
+        assertFalse(datastore.isEmpty());
+        assertByteBufferEquals(buf1, datastore.selectBuffers(jobId, 
0).stream().findFirst().orElseThrow());
+
+        assertFalse(datastore.select(jobId, 0).isEmpty());
+        assertFalse(datastore.select(jobId, 0).isEmpty());
+        assertTrue(datastore.select(jobId, 1).isEmpty());
+        assertTrue(datastore.select(jobId, 1).isEmpty());
+        datastore.insert(jobId, 0, BigInteger.ZERO, BigInteger.valueOf(100), 
buf2);
+        assertEquals(2, datastore.select(jobId, 0).size());
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = { 3, 4, 8, 16, 32, 64, 128, 256, 512, 1024 })
+    public void testDataStore(int numNodes)
+    {
+        Partitioner partitioner = Partitioner.Murmur3Partitioner;
+        MockCdcStateV2 datastore = new MockCdcStateV2();
+        String jobId = UUID.randomUUID().toString();
+        List<BigInteger> tokens = TokenSplitUtil.splitTokens(numNodes, 
partitioner);
+        TokenSplitUtil tokenSplitUtil = new TokenSplitUtil(numNodes);
+        assertTrue(datastore.isEmpty());
+
+        // write state and verify we can read back
+        ByteBuffer[] buffers = new ByteBuffer[tokens.size()];
+        for (int i = 0; i < tokens.size(); i++)
+        {
+            BigInteger lower = tokens.get(i);
+            BigInteger upper = i == tokens.size() - 1 ? partitioner.maxToken() 
: tokens.get(i + 1);
+            TokenRange range = TokenRange.openClosed(lower, upper);
+            ByteBuffer buf = randomBytes(i);
+            buffers[i] = buf;
+            int[] splits = tokenSplitUtil.findOverlappingSplitIds(partitioner, 
range);
+
+            Arrays.stream(splits).forEach(split -> 
assertTrue(datastore.select(jobId, split).isEmpty()));
+            Arrays.stream(splits).forEach(split -> datastore.insert(jobId, 
split, lower, upper, buf));
+        }
+        assertEquals(numNodes, datastore.store.size());
+
+        for (int i = 0; i < tokens.size(); i++)
+        {
+            BigInteger lower = tokens.get(i);
+            BigInteger upper = i == tokens.size() - 1 ? partitioner.maxToken() 
: tokens.get(i + 1);
+            TokenRange range = TokenRange.openClosed(lower, upper);
+            int[] splits = tokenSplitUtil.findOverlappingSplitIds(partitioner, 
range);
+            ByteBuffer expected = buffers[i];
+            Arrays.stream(splits).forEach(split -> 
assertByteBufferEquals(expected, datastore.selectBuffers(jobId, 
split).stream().findFirst().orElseThrow()));
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = { 4, 8, 32, 128, 1024 })
+    public void testShrink(int numNodes)
+    {
+        Partitioner partitioner = Partitioner.Murmur3Partitioner;
+        MockCdcStateV2 datastore = new MockCdcStateV2();
+        String jobId = UUID.randomUUID().toString();
+        CdcStatesSchema mockCdcStatesSchema = mock(CdcStatesSchema.class, 
RETURNS_DEEP_STUBS);
+        TableHistorySchema mockTableHistorySchema = 
mock(TableHistorySchema.class, RETURNS_DEEP_STUBS);
+        List<BigInteger> tokensBeforeShrink = 
TokenSplitUtil.splitTokens(numNodes, partitioner);
+        List<BigInteger> tokensAfterShrink = 
TokenSplitUtil.splitTokens(numNodes / 2, partitioner);
+        TokenSplitUtil tokenSplitUtil = new TokenSplitUtil(numNodes);
+
+        Provider<TokenSplitUtil> mockTokenSplitUtilProvider = 
mock(Provider.class);
+        when(mockTokenSplitUtilProvider.get()).thenReturn(tokenSplitUtil);
+        CassandraBridgeFactory mockCassandraBridgeFactory = 
mock(CassandraBridgeFactory.class);
+
+        CdcDatabaseAccessor db = new 
CdcDatabaseAccessor(getMockInstanceMetaDataFetcher(), mockCdcStatesSchema,
+                                                         
mockTableHistorySchema, getMockCQLSessionProvider(datastore, 
mockCdcStatesSchema),
+                                                         
mockTokenSplitUtilProvider, mockCassandraBridgeFactory);
+
+        ByteBuffer[] buffers = new ByteBuffer[tokensBeforeShrink.size()];
+        for (int i = 0; i < tokensBeforeShrink.size(); i++)
+        {
+            BigInteger lower = tokensBeforeShrink.get(i);
+            BigInteger upper = i == tokensBeforeShrink.size() - 1 ? 
partitioner.maxToken() : tokensBeforeShrink.get(i + 1);
+            TokenRange range = TokenRange.openClosed(lower, upper);
+            buffers[i] = randomBytes(i);
+            int[] splits = tokenSplitUtil.findOverlappingSplitIds(partitioner, 
range);
+
+            Arrays.stream(splits).forEach(split -> 
assertTrue(datastore.select(jobId, split).isEmpty()));
+            await(db.storeStateAsync(jobId, range, buffers[i], 
System.currentTimeMillis()).stream());
+            List<byte[]> arrays = db.loadStateForRange(jobId, 
range).collect(Collectors.toList());
+            assertEquals(1, arrays.size());
+            assertByteBufferEquals(buffers[i], arrays.get(0));
+        }
+        assertEquals(numNodes, datastore.store.size());
+
+        for (int i = 0; i < tokensAfterShrink.size(); i++)
+        {
+            BigInteger lower = tokensAfterShrink.get(i);
+            BigInteger upper = i == tokensAfterShrink.size() - 1 ? 
partitioner.maxToken() : tokensAfterShrink.get(i + 1);
+            TokenRange range = TokenRange.openClosed(lower, upper);
+            List<byte[]> arrays = db.loadStateForRange(jobId, 
range).collect(Collectors.toList());
+            assertEquals(2, arrays.size());
+            assertByteBufferEquals(buffers[i * 2], arrays.get(0));
+            assertByteBufferEquals(buffers[(i * 2) + 1], arrays.get(1));
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = { 4, 8, 32, 128, 1024 })
+    public void testExpand(int numNodes)
+    {
+        Partitioner partitioner = Partitioner.Murmur3Partitioner;
+        MockCdcStateV2 datastore = new MockCdcStateV2();
+        String jobId = UUID.randomUUID().toString();
+        CdcStatesSchema mockCdcStatesSchema = mock(CdcStatesSchema.class, 
RETURNS_DEEP_STUBS);
+        TableHistorySchema mockTableHistorySchema = 
mock(TableHistorySchema.class, RETURNS_DEEP_STUBS);
+        List<BigInteger> tokensBeforeExpansion = 
TokenSplitUtil.splitTokens(numNodes, partitioner);
+        List<BigInteger> tokensAfterExpansion = 
TokenSplitUtil.splitTokens(numNodes * 2, partitioner);
+        TokenSplitUtil tokenSplitUtil = new TokenSplitUtil(numNodes);
+
+        Provider<TokenSplitUtil> mockTokenSplitUtilProvider = 
mock(Provider.class);

Review Comment:
   NIT:
   ```suggestion
           Provider<TokenSplitUtil> tokenSplitUtilProvider = () -> 
tokenSplitUtil;
   ```



##########
server/src/test/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessorTests.java:
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.google.inject.Provider;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema;
+import org.apache.cassandra.sidecar.db.schema.TableHistorySchema;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.TokenSplitUtil;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;

Review Comment:
   let's migrate to `org.assertj.core.api.Assertions.assertThat` to be 
consistent with the repo



##########
server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java:
##########
@@ -175,4 +192,119 @@ public static boolean matchIndexExtension(String fileName)
     {
         return fileName.endsWith(IDX_FILE_EXTENSION);
     }
+
+    /**
+     * @param schemaStr full cluster schema text.
+     * @return map of keyspace/table identifier to table create statements.
+     */
+    public static Map<TableIdentifier, String> extractCdcTables(@NotNull final 
String schemaStr)

Review Comment:
   maybe add a unit test for this function? 



##########
server/src/test/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessorTests.java:
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.google.inject.Provider;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema;
+import org.apache.cassandra.sidecar.db.schema.TableHistorySchema;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.TokenSplitUtil;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.apache.cassandra.sidecar.db.CdcDatabaseAccessor.await;
+import static org.apache.cassandra.sidecar.utils.TokenSplitUtil.overlaps;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link CdcDatabaseAccessor}
+ */
+public class CdcDatabaseAccessorTests
+{
+    @Test
+    public void testDataStoreBasic()
+    {
+        MockCdcStateV2 datastore = new MockCdcStateV2();
+        String jobId = UUID.randomUUID().toString();
+        ByteBuffer buf1 = ByteBuffer.wrap(new byte[]{ 'a', 'b', 'c' });
+        ByteBuffer buf2 = ByteBuffer.wrap(new byte[]{ 'e', 'f', 'g' });
+        assertTrue(datastore.isEmpty());
+        assertTrue(datastore.select(jobId, 0).isEmpty());
+        datastore.insert(jobId, 0, BigInteger.ZERO, BigInteger.TEN, buf1);
+        assertFalse(datastore.isEmpty());
+        assertByteBufferEquals(buf1, datastore.selectBuffers(jobId, 
0).stream().findFirst().orElseThrow());
+
+        assertFalse(datastore.select(jobId, 0).isEmpty());
+        assertFalse(datastore.select(jobId, 0).isEmpty());
+        assertTrue(datastore.select(jobId, 1).isEmpty());
+        assertTrue(datastore.select(jobId, 1).isEmpty());
+        datastore.insert(jobId, 0, BigInteger.ZERO, BigInteger.valueOf(100), 
buf2);
+        assertEquals(2, datastore.select(jobId, 0).size());
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = { 3, 4, 8, 16, 32, 64, 128, 256, 512, 1024 })
+    public void testDataStore(int numNodes)
+    {
+        Partitioner partitioner = Partitioner.Murmur3Partitioner;
+        MockCdcStateV2 datastore = new MockCdcStateV2();
+        String jobId = UUID.randomUUID().toString();
+        List<BigInteger> tokens = TokenSplitUtil.splitTokens(numNodes, 
partitioner);
+        TokenSplitUtil tokenSplitUtil = new TokenSplitUtil(numNodes);
+        assertTrue(datastore.isEmpty());
+
+        // write state and verify we can read back
+        ByteBuffer[] buffers = new ByteBuffer[tokens.size()];
+        for (int i = 0; i < tokens.size(); i++)
+        {
+            BigInteger lower = tokens.get(i);
+            BigInteger upper = i == tokens.size() - 1 ? partitioner.maxToken() 
: tokens.get(i + 1);
+            TokenRange range = TokenRange.openClosed(lower, upper);
+            ByteBuffer buf = randomBytes(i);
+            buffers[i] = buf;
+            int[] splits = tokenSplitUtil.findOverlappingSplitIds(partitioner, 
range);
+
+            Arrays.stream(splits).forEach(split -> 
assertTrue(datastore.select(jobId, split).isEmpty()));
+            Arrays.stream(splits).forEach(split -> datastore.insert(jobId, 
split, lower, upper, buf));
+        }
+        assertEquals(numNodes, datastore.store.size());
+
+        for (int i = 0; i < tokens.size(); i++)
+        {
+            BigInteger lower = tokens.get(i);
+            BigInteger upper = i == tokens.size() - 1 ? partitioner.maxToken() 
: tokens.get(i + 1);
+            TokenRange range = TokenRange.openClosed(lower, upper);
+            int[] splits = tokenSplitUtil.findOverlappingSplitIds(partitioner, 
range);
+            ByteBuffer expected = buffers[i];
+            Arrays.stream(splits).forEach(split -> 
assertByteBufferEquals(expected, datastore.selectBuffers(jobId, 
split).stream().findFirst().orElseThrow()));
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = { 4, 8, 32, 128, 1024 })
+    public void testShrink(int numNodes)
+    {
+        Partitioner partitioner = Partitioner.Murmur3Partitioner;
+        MockCdcStateV2 datastore = new MockCdcStateV2();
+        String jobId = UUID.randomUUID().toString();
+        CdcStatesSchema mockCdcStatesSchema = mock(CdcStatesSchema.class, 
RETURNS_DEEP_STUBS);
+        TableHistorySchema mockTableHistorySchema = 
mock(TableHistorySchema.class, RETURNS_DEEP_STUBS);
+        List<BigInteger> tokensBeforeShrink = 
TokenSplitUtil.splitTokens(numNodes, partitioner);
+        List<BigInteger> tokensAfterShrink = 
TokenSplitUtil.splitTokens(numNodes / 2, partitioner);
+        TokenSplitUtil tokenSplitUtil = new TokenSplitUtil(numNodes);
+
+        Provider<TokenSplitUtil> mockTokenSplitUtilProvider = 
mock(Provider.class);
+        when(mockTokenSplitUtilProvider.get()).thenReturn(tokenSplitUtil);

Review Comment:
   NIT: 
   ```suggestion
           Provider<TokenSplitUtil> tokenSplitUtilProvider = () -> 
tokenSplitUtil;
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableHistorySchema.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db.schema;
+
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import 
org.apache.cassandra.sidecar.coordination.ExecuteOnClusterLeaseholderOnly;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Schema definition and management for tracking table schema evolution 
history.
+ * This class extends {@link TableSchema} to provide specialized schema 
management for storing
+ * historical versions of table schemas in Cassandra Sidecar.
+ */
+@Singleton
+public class TableHistorySchema extends TableSchema implements 
ExecuteOnClusterLeaseholderOnly
+{
+    private static final String TABLE_SCHEMA_HISTORY = "table_schema_history";
+
+    private final SchemaKeyspaceConfiguration keyspaceConfig;
+
+    // prepared statements
+    private PreparedStatement insertTableSchema;
+    private PreparedStatement selectVersionTableSchema;
+
+    @Inject
+    public TableHistorySchema(ServiceConfiguration configuration)
+    {
+        this.keyspaceConfig = configuration.schemaKeyspaceConfiguration();
+    }
+
+    @Override
+    protected void prepareStatements(@NotNull Session session)
+    {
+        insertTableSchema = prepare(insertTableSchema, session, 
CqlLiterals.insertTableSchema(keyspaceConfig));
+        selectVersionTableSchema = prepare(selectVersionTableSchema, session, 
CqlLiterals.selectVersionTableSchema(keyspaceConfig));
+    }
+
+    @Override
+    protected String keyspaceName()
+    {
+        return keyspaceConfig.keyspace();
+    }
+
+    @Override
+    protected String tableName()
+    {
+        return TABLE_SCHEMA_HISTORY;
+    }
+
+    @Override
+    protected boolean exists(@NotNull Metadata metadata)
+    {
+        KeyspaceMetadata ksMetadata = 
metadata.getKeyspace(keyspaceConfig.keyspace());
+        if (ksMetadata == null)
+        {
+            return false;
+        }
+
+        return ksMetadata.getTable(TABLE_SCHEMA_HISTORY) != null;
+    }
+
+    @Override
+    protected String createSchemaStatement()
+    {
+        return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" +
+                             "  keyspace_name text," +
+                             "  table_name text," +
+                             "  version uuid," +
+                             "  created_at timeuuid," +
+                             "  table_schema text," +
+                             "  PRIMARY KEY ((ks, tb), version)" +
+                             ")",
+                             keyspaceConfig.keyspace(), TABLE_SCHEMA_HISTORY);
+    }
+
+    public PreparedStatement insertTableSchema()
+    {
+        return insertTableSchema;
+    }
+
+    public PreparedStatement selectVersionTableSchema()
+    {
+        return selectVersionTableSchema;
+    }
+
+    private static class CqlLiterals
+    {
+        static String insertTableSchema(SchemaKeyspaceConfiguration config)
+        {
+            return withTable("INSERT INTO %s.%s (ks, tb, version, created_at, 
table_schema) " +

Review Comment:
   after the change above, we should use the new names for the keyspace, table 
columns
   ```suggestion
               return withTable("INSERT INTO %s.%s (keyspace_name, table_name, 
version, created_at, table_schema) " +
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/utils/TokenSplitUtil.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BoundType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Host;
+import com.google.inject.Singleton;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.sidecar.cdc.CdcConfig;
+import org.apache.cassandra.sidecar.db.ServiceConfig;
+import org.apache.cassandra.sidecar.db.TokenSplitConfigAccessor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+
+/**
+ * Util class that divides token range into fixed number of splits to help 
distribute load. 
+ * The number of splits should be proportional to the cluster size, so that 
small clusters 
+ * store state in a small number of partitions, and large clusters store state 
across a 
+ * larger number of partitions.
+ */
+@Singleton
+public class TokenSplitUtil
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenSplitUtil.class);
+
+    public static final Map<Partitioner, BigInteger> PARTITIONER_FULL_RANGE = 
Map.of(
+    Partitioner.Murmur3Partitioner, BigInteger.valueOf(2).pow(64),
+    Partitioner.RandomPartitioner, BigInteger.valueOf(2).pow(127)
+    );
+
+    public static final String NUM_SPLITS_KEY = "curr_num_splits";
+    // we can in the future add a current/prior split count so we can migrate 
from one to another after a cluster topology event
+//    public static final String PRIOR_NUM_SPLITS_KEY = "prior_num_splits";
+
+    public final int numSplits;
+    public final List<BigInteger> murmur3Tokens;
+    public final List<BigInteger> randomTokens;
+
+    @VisibleForTesting
+    public TokenSplitUtil(int numSplits)
+    {
+        this.numSplits = numSplits;
+        this.murmur3Tokens = splitTokens(numSplits, 
Partitioner.Murmur3Partitioner);
+        this.randomTokens = splitTokens(numSplits, 
Partitioner.RandomPartitioner);
+        LOGGER.info("Initialized token split util numSplits={}", numSplits);
+    }
+
+    public TokenSplitUtil(TokenSplitConfigAccessor tokenSplitConfigAccessor, 
CdcConfig cdcConfig, InstanceMetadataFetcher fetcher)
+    {
+        this(getOrInsert(tokenSplitConfigAccessor, cdcConfig, fetcher));
+    }
+
+    protected static int getOrInsert(TokenSplitConfigAccessor 
tokenSplitConfigAccessor, CdcConfig cdcConfig, InstanceMetadataFetcher fetcher)
+    {
+        Map<String, String> config = 
tokenSplitConfigAccessor.getConfig().getConfigs();
+        if (config != null && config.containsKey(NUM_SPLITS_KEY))
+        {
+            return Integer.parseInt(config.get(NUM_SPLITS_KEY));
+        }
+
+
+        int maxDcSize = fetcher.callOnFirstAvailableInstance(instanceMetadata 
-> instanceMetadata.delegate().metadata())

Review Comment:
   if sidecar cannot connect to CQL this will prevent the startup of the 
service. We should not fail here. Also we should have a more descriptive error 
message when we throw. But in general we should never throw when building the 
DAG for guice. 



##########
server/src/test/java/org/apache/cassandra/sidecar/utils/ByteBufUtilsTest.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteStreams;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Test for {@link ByteBufUtils}
+ */
+@SuppressWarnings("UnstableApiUsage")

Review Comment:
   this looks like it might not be needed?



##########
server/src/test/java/org/apache/cassandra/sidecar/utils/ByteBufUtilsTest.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteStreams;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Test for {@link ByteBufUtils}
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class ByteBufUtilsTest
+{
+    @Test
+    public void testSkipBytesFully() throws IOException
+    {
+        testSkipBytesFully("abc".getBytes(StandardCharsets.UTF_8));
+        testSkipBytesFully("abcdefghijklm".getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testReadRemainingBytes() throws IOException
+    {
+        testReadRemainingBytes("");
+        testReadRemainingBytes("abc");
+        testReadRemainingBytes("abcdefghijklm");
+    }
+
+    @Test
+    public void testGetArray()
+    {
+        testGetArray("");
+        testGetArray("abc");
+        testGetArray("abcdefghijklm");
+    }
+
+    @Test
+    public void testHexString()
+    {
+        assertEquals("00000000000001F4", ByteBufUtils.toHexString((ByteBuffer) 
ByteBuffer.allocate(8).putLong(500L).flip()));
+        assertEquals("616263", ByteBufUtils.toHexString(ByteBuffer.wrap(new 
byte[]{ 'a', 'b', 'c' })));
+        assertEquals("000000000588C164", ByteBufUtils.toHexString((ByteBuffer) 
ByteBuffer.allocate(8).putLong(92848484L).asReadOnlyBuffer().flip()));
+        assertEquals("null", ByteBufUtils.toHexString(null));
+
+        assertEquals("616263", ByteBufUtils.toHexString(new byte[]{ 'a', 'b', 
'c' }, 0, 3));
+        assertEquals("63", ByteBufUtils.toHexString(new byte[]{ 'a', 'b', 'c' 
}, 2, 1));
+    }
+
+    private static void testGetArray(final String str)
+    {
+        assertEquals(str, new 
String(ByteBufUtils.getArray(ByteBuffer.wrap(str.getBytes())), 
StandardCharsets.UTF_8));
+    }
+
+    private static void testReadRemainingBytes(final String str) throws 
IOException
+    {
+        assertEquals(str, new String(ByteBufUtils.readRemainingBytes(new 
ByteArrayInputStream(str.getBytes()), str.length()), StandardCharsets.UTF_8));
+    }
+
+    private static void testSkipBytesFully(final byte[] ar) throws IOException
+    {
+        final int len = ar.length;
+        final ByteArrayDataInput in = ByteStreams.newDataInput(ar, 0);

Review Comment:
   NIT:
   ```suggestion
          int len = ar.length;
           ByteArrayDataInput in = ByteStreams.newDataInput(ar, 0);
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableHistorySchema.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db.schema;
+
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import 
org.apache.cassandra.sidecar.coordination.ExecuteOnClusterLeaseholderOnly;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Schema definition and management for tracking table schema evolution 
history.
+ * This class extends {@link TableSchema} to provide specialized schema 
management for storing
+ * historical versions of table schemas in Cassandra Sidecar.
+ */
+@Singleton
+public class TableHistorySchema extends TableSchema implements 
ExecuteOnClusterLeaseholderOnly
+{
+    private static final String TABLE_SCHEMA_HISTORY = "table_schema_history";
+
+    private final SchemaKeyspaceConfiguration keyspaceConfig;
+
+    // prepared statements
+    private PreparedStatement insertTableSchema;
+    private PreparedStatement selectVersionTableSchema;
+
+    @Inject
+    public TableHistorySchema(ServiceConfiguration configuration)
+    {
+        this.keyspaceConfig = configuration.schemaKeyspaceConfiguration();
+    }
+
+    @Override
+    protected void prepareStatements(@NotNull Session session)
+    {
+        insertTableSchema = prepare(insertTableSchema, session, 
CqlLiterals.insertTableSchema(keyspaceConfig));
+        selectVersionTableSchema = prepare(selectVersionTableSchema, session, 
CqlLiterals.selectVersionTableSchema(keyspaceConfig));
+    }
+
+    @Override
+    protected String keyspaceName()
+    {
+        return keyspaceConfig.keyspace();
+    }
+
+    @Override
+    protected String tableName()
+    {
+        return TABLE_SCHEMA_HISTORY;
+    }
+
+    @Override
+    protected boolean exists(@NotNull Metadata metadata)
+    {
+        KeyspaceMetadata ksMetadata = 
metadata.getKeyspace(keyspaceConfig.keyspace());
+        if (ksMetadata == null)
+        {
+            return false;
+        }
+
+        return ksMetadata.getTable(TABLE_SCHEMA_HISTORY) != null;
+    }
+
+    @Override
+    protected String createSchemaStatement()
+    {
+        return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" +
+                             "  keyspace_name text," +
+                             "  table_name text," +
+                             "  version uuid," +
+                             "  created_at timeuuid," +
+                             "  table_schema text," +
+                             "  PRIMARY KEY ((ks, tb), version)" +
+                             ")",
+                             keyspaceConfig.keyspace(), TABLE_SCHEMA_HISTORY);
+    }
+
+    public PreparedStatement insertTableSchema()
+    {
+        return insertTableSchema;
+    }
+
+    public PreparedStatement selectVersionTableSchema()
+    {
+        return selectVersionTableSchema;
+    }
+
+    private static class CqlLiterals
+    {
+        static String insertTableSchema(SchemaKeyspaceConfiguration config)
+        {
+            return withTable("INSERT INTO %s.%s (ks, tb, version, created_at, 
table_schema) " +
+                             "VALUES (?, ?, ?, NOW(), ?)", config);
+        }
+
+        static String selectVersionTableSchema(SchemaKeyspaceConfiguration 
config)
+        {
+            return withTable("SELECT table_schema FROM %s.%s WHERE ks = ? AND 
tb = ? AND version = ?", config);

Review Comment:
   ```suggestion
               return withTable("SELECT table_schema FROM %s.%s WHERE 
keyspace_name = ? AND table_name = ? AND version = ?", config);
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.sidecar.cdc.SidecarCdcStats;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema;
+import org.apache.cassandra.sidecar.db.schema.TableHistorySchema;
+import org.apache.cassandra.sidecar.utils.ByteBufUtils;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.TokenSplitUtil;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Database accessor for CDC (Change Data Capture) state management operations.
+ * <p>
+ * This class provides specialized database access functionality for managing 
CDC state persistence
+ * and retrieval in Cassandra Sidecar. It extends {@link DatabaseAccessor} to 
provide CDC-specific
+ * operations including:
+ * <ul>
+ *   <li>Storing CDC state data across token range splits for distributed 
processing</li>
+ *   <li>Loading and merging CDC state from overlapping token ranges</li>
+ *   <li>Managing table schema history for CDC-enabled tables</li>
+ *   <li>Providing partitioner and metadata access for CDC operations</li>
+ * </ul>
+ * <p>
+ * The accessor handles token range splitting to ensure CDC state is properly 
distributed
+ * across multiple database partitions for scalability. It uses {@link 
TokenSplitUtil}
+ * to determine overlapping splits for both storage and retrieval operations.
+ * <p>
+ * Key features:
+ * <ul>
+ *   <li><strong>Token-aware storage:</strong> Automatically distributes CDC 
state across
+ *       appropriate token range splits</li>
+ *   <li><strong>State merging:</strong> Combines overlapping CDC state 
objects into
+ *       canonical views during retrieval</li>
+ *   <li><strong>Schema management:</strong> Tracks table schema versions for 
CDC operations</li>
+ *   <li><strong>Async operations:</strong> Provides asynchronous database 
operations for
+ *       better performance</li>
+ * </ul>
+ * <p>
+ * This class is thread-safe and designed as a singleton for injection into 
CDC components
+ * that require database access functionality.
+ *
+ * @see DatabaseAccessor
+ * @see CdcStatesSchema
+ * @see TokenSplitUtil
+ */
+@SuppressWarnings("resource")
+@Singleton
+public class CdcDatabaseAccessor extends DatabaseAccessor<CdcStatesSchema>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CdcDatabaseAccessor.class);
+    private final TableHistorySchema tableHistorySchema;
+    private final Provider<TokenSplitUtil> tokenSplitUtilProvider;
+    private volatile TokenSplitUtil tokenSplitUtil = null;
+    private volatile InstanceMetadataFetcher instanceMetadataFetcher;
+    private final CassandraBridgeFactory cassandraBridgeFactory;
+
+    @Inject
+    public CdcDatabaseAccessor(InstanceMetadataFetcher instanceMetadataFetcher,
+                               CdcStatesSchema cdcStatesSchema,
+                               TableHistorySchema tableHistorySchema,
+                               CQLSessionProvider sessionProvider,
+                               Provider<TokenSplitUtil> tokenSplitUtilProvider,
+                               CassandraBridgeFactory cassandraBridgeFactory)
+    {
+        super(cdcStatesSchema, sessionProvider);
+        this.tableHistorySchema = tableHistorySchema;
+        this.tokenSplitUtilProvider = tokenSplitUtilProvider;
+        this.instanceMetadataFetcher = instanceMetadataFetcher;
+        this.cassandraBridgeFactory = cassandraBridgeFactory;
+    }
+
+    @VisibleForTesting
+    public CdcDatabaseAccessor(InstanceMetadataFetcher instanceMetadataFetcher,

Review Comment:
   this is unused. Let's remove it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to