frankgh commented on code in PR #251: URL: https://github.com/apache/cassandra-sidecar/pull/251#discussion_r2335034731
########## server/src/test/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessorTests.java: ########## @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.db; + +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.google.inject.Provider; +import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.bridge.TokenRange; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema; +import org.apache.cassandra.sidecar.db.schema.TableHistorySchema; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.TokenSplitUtil; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.jetbrains.annotations.NotNull; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.apache.cassandra.sidecar.db.CdcDatabaseAccessor.await; +import static org.apache.cassandra.sidecar.utils.TokenSplitUtil.overlaps; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link CdcDatabaseAccessor} + */ +public class CdcDatabaseAccessorTests +{ + @Test + public void testDataStoreBasic() + { + MockCdcStateV2 datastore = new MockCdcStateV2(); + String jobId = UUID.randomUUID().toString(); + ByteBuffer buf1 = ByteBuffer.wrap(new byte[]{ 'a', 'b', 'c' }); + ByteBuffer buf2 = ByteBuffer.wrap(new byte[]{ 'e', 'f', 'g' }); + assertTrue(datastore.isEmpty()); + assertTrue(datastore.select(jobId, 0).isEmpty()); + datastore.insert(jobId, 0, BigInteger.ZERO, BigInteger.TEN, buf1); + assertFalse(datastore.isEmpty()); + assertByteBufferEquals(buf1, datastore.selectBuffers(jobId, 0).stream().findFirst().orElseThrow()); + + assertFalse(datastore.select(jobId, 0).isEmpty()); + assertFalse(datastore.select(jobId, 0).isEmpty()); + assertTrue(datastore.select(jobId, 1).isEmpty()); + assertTrue(datastore.select(jobId, 1).isEmpty()); + datastore.insert(jobId, 0, BigInteger.ZERO, BigInteger.valueOf(100), buf2); + assertEquals(2, datastore.select(jobId, 0).size()); + } + + @ParameterizedTest + @ValueSource(ints = { 3, 4, 8, 16, 32, 64, 128, 256, 512, 1024 }) + public void testDataStore(int numNodes) + { + Partitioner partitioner = Partitioner.Murmur3Partitioner; + MockCdcStateV2 datastore = new MockCdcStateV2(); + String jobId = UUID.randomUUID().toString(); + List<BigInteger> tokens = TokenSplitUtil.splitTokens(numNodes, partitioner); + TokenSplitUtil tokenSplitUtil = new TokenSplitUtil(numNodes); + assertTrue(datastore.isEmpty()); + + // write state and verify we can read back + ByteBuffer[] buffers = new ByteBuffer[tokens.size()]; + for (int i = 0; i < tokens.size(); i++) + { + BigInteger lower = tokens.get(i); + BigInteger upper = i == tokens.size() - 1 ? partitioner.maxToken() : tokens.get(i + 1); + TokenRange range = TokenRange.openClosed(lower, upper); + ByteBuffer buf = randomBytes(i); + buffers[i] = buf; + int[] splits = tokenSplitUtil.findOverlappingSplitIds(partitioner, range); + + Arrays.stream(splits).forEach(split -> assertTrue(datastore.select(jobId, split).isEmpty())); + Arrays.stream(splits).forEach(split -> datastore.insert(jobId, split, lower, upper, buf)); + } + assertEquals(numNodes, datastore.store.size()); + + for (int i = 0; i < tokens.size(); i++) + { + BigInteger lower = tokens.get(i); + BigInteger upper = i == tokens.size() - 1 ? partitioner.maxToken() : tokens.get(i + 1); + TokenRange range = TokenRange.openClosed(lower, upper); + int[] splits = tokenSplitUtil.findOverlappingSplitIds(partitioner, range); + ByteBuffer expected = buffers[i]; + Arrays.stream(splits).forEach(split -> assertByteBufferEquals(expected, datastore.selectBuffers(jobId, split).stream().findFirst().orElseThrow())); + } + } + + @ParameterizedTest + @ValueSource(ints = { 4, 8, 32, 128, 1024 }) + public void testShrink(int numNodes) + { + Partitioner partitioner = Partitioner.Murmur3Partitioner; + MockCdcStateV2 datastore = new MockCdcStateV2(); + String jobId = UUID.randomUUID().toString(); + CdcStatesSchema mockCdcStatesSchema = mock(CdcStatesSchema.class, RETURNS_DEEP_STUBS); + TableHistorySchema mockTableHistorySchema = mock(TableHistorySchema.class, RETURNS_DEEP_STUBS); + List<BigInteger> tokensBeforeShrink = TokenSplitUtil.splitTokens(numNodes, partitioner); + List<BigInteger> tokensAfterShrink = TokenSplitUtil.splitTokens(numNodes / 2, partitioner); + TokenSplitUtil tokenSplitUtil = new TokenSplitUtil(numNodes); + + Provider<TokenSplitUtil> mockTokenSplitUtilProvider = mock(Provider.class); + when(mockTokenSplitUtilProvider.get()).thenReturn(tokenSplitUtil); + CassandraBridgeFactory mockCassandraBridgeFactory = mock(CassandraBridgeFactory.class); + + CdcDatabaseAccessor db = new CdcDatabaseAccessor(getMockInstanceMetaDataFetcher(), mockCdcStatesSchema, + mockTableHistorySchema, getMockCQLSessionProvider(datastore, mockCdcStatesSchema), + mockTokenSplitUtilProvider, mockCassandraBridgeFactory); + + ByteBuffer[] buffers = new ByteBuffer[tokensBeforeShrink.size()]; + for (int i = 0; i < tokensBeforeShrink.size(); i++) + { + BigInteger lower = tokensBeforeShrink.get(i); + BigInteger upper = i == tokensBeforeShrink.size() - 1 ? partitioner.maxToken() : tokensBeforeShrink.get(i + 1); + TokenRange range = TokenRange.openClosed(lower, upper); + buffers[i] = randomBytes(i); + int[] splits = tokenSplitUtil.findOverlappingSplitIds(partitioner, range); + + Arrays.stream(splits).forEach(split -> assertTrue(datastore.select(jobId, split).isEmpty())); + await(db.storeStateAsync(jobId, range, buffers[i], System.currentTimeMillis()).stream()); + List<byte[]> arrays = db.loadStateForRange(jobId, range).collect(Collectors.toList()); + assertEquals(1, arrays.size()); + assertByteBufferEquals(buffers[i], arrays.get(0)); + } + assertEquals(numNodes, datastore.store.size()); + + for (int i = 0; i < tokensAfterShrink.size(); i++) + { + BigInteger lower = tokensAfterShrink.get(i); + BigInteger upper = i == tokensAfterShrink.size() - 1 ? partitioner.maxToken() : tokensAfterShrink.get(i + 1); + TokenRange range = TokenRange.openClosed(lower, upper); + List<byte[]> arrays = db.loadStateForRange(jobId, range).collect(Collectors.toList()); + assertEquals(2, arrays.size()); + assertByteBufferEquals(buffers[i * 2], arrays.get(0)); + assertByteBufferEquals(buffers[(i * 2) + 1], arrays.get(1)); + } + } + + @ParameterizedTest + @ValueSource(ints = { 4, 8, 32, 128, 1024 }) + public void testExpand(int numNodes) + { + Partitioner partitioner = Partitioner.Murmur3Partitioner; + MockCdcStateV2 datastore = new MockCdcStateV2(); + String jobId = UUID.randomUUID().toString(); + CdcStatesSchema mockCdcStatesSchema = mock(CdcStatesSchema.class, RETURNS_DEEP_STUBS); + TableHistorySchema mockTableHistorySchema = mock(TableHistorySchema.class, RETURNS_DEEP_STUBS); + List<BigInteger> tokensBeforeExpansion = TokenSplitUtil.splitTokens(numNodes, partitioner); + List<BigInteger> tokensAfterExpansion = TokenSplitUtil.splitTokens(numNodes * 2, partitioner); + TokenSplitUtil tokenSplitUtil = new TokenSplitUtil(numNodes); + + Provider<TokenSplitUtil> mockTokenSplitUtilProvider = mock(Provider.class); Review Comment: NIT: ```suggestion Provider<TokenSplitUtil> tokenSplitUtilProvider = () -> tokenSplitUtil; ``` ########## server/src/test/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessorTests.java: ########## @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.db; + +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.google.inject.Provider; +import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.bridge.TokenRange; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema; +import org.apache.cassandra.sidecar.db.schema.TableHistorySchema; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.TokenSplitUtil; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.jetbrains.annotations.NotNull; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; Review Comment: let's migrate to `org.assertj.core.api.Assertions.assertThat` to be consistent with the repo ########## server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java: ########## @@ -175,4 +192,119 @@ public static boolean matchIndexExtension(String fileName) { return fileName.endsWith(IDX_FILE_EXTENSION); } + + /** + * @param schemaStr full cluster schema text. + * @return map of keyspace/table identifier to table create statements. + */ + public static Map<TableIdentifier, String> extractCdcTables(@NotNull final String schemaStr) Review Comment: maybe add a unit test for this function? ########## server/src/test/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessorTests.java: ########## @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.db; + +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.google.inject.Provider; +import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.bridge.TokenRange; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema; +import org.apache.cassandra.sidecar.db.schema.TableHistorySchema; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.TokenSplitUtil; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.jetbrains.annotations.NotNull; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.apache.cassandra.sidecar.db.CdcDatabaseAccessor.await; +import static org.apache.cassandra.sidecar.utils.TokenSplitUtil.overlaps; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link CdcDatabaseAccessor} + */ +public class CdcDatabaseAccessorTests +{ + @Test + public void testDataStoreBasic() + { + MockCdcStateV2 datastore = new MockCdcStateV2(); + String jobId = UUID.randomUUID().toString(); + ByteBuffer buf1 = ByteBuffer.wrap(new byte[]{ 'a', 'b', 'c' }); + ByteBuffer buf2 = ByteBuffer.wrap(new byte[]{ 'e', 'f', 'g' }); + assertTrue(datastore.isEmpty()); + assertTrue(datastore.select(jobId, 0).isEmpty()); + datastore.insert(jobId, 0, BigInteger.ZERO, BigInteger.TEN, buf1); + assertFalse(datastore.isEmpty()); + assertByteBufferEquals(buf1, datastore.selectBuffers(jobId, 0).stream().findFirst().orElseThrow()); + + assertFalse(datastore.select(jobId, 0).isEmpty()); + assertFalse(datastore.select(jobId, 0).isEmpty()); + assertTrue(datastore.select(jobId, 1).isEmpty()); + assertTrue(datastore.select(jobId, 1).isEmpty()); + datastore.insert(jobId, 0, BigInteger.ZERO, BigInteger.valueOf(100), buf2); + assertEquals(2, datastore.select(jobId, 0).size()); + } + + @ParameterizedTest + @ValueSource(ints = { 3, 4, 8, 16, 32, 64, 128, 256, 512, 1024 }) + public void testDataStore(int numNodes) + { + Partitioner partitioner = Partitioner.Murmur3Partitioner; + MockCdcStateV2 datastore = new MockCdcStateV2(); + String jobId = UUID.randomUUID().toString(); + List<BigInteger> tokens = TokenSplitUtil.splitTokens(numNodes, partitioner); + TokenSplitUtil tokenSplitUtil = new TokenSplitUtil(numNodes); + assertTrue(datastore.isEmpty()); + + // write state and verify we can read back + ByteBuffer[] buffers = new ByteBuffer[tokens.size()]; + for (int i = 0; i < tokens.size(); i++) + { + BigInteger lower = tokens.get(i); + BigInteger upper = i == tokens.size() - 1 ? partitioner.maxToken() : tokens.get(i + 1); + TokenRange range = TokenRange.openClosed(lower, upper); + ByteBuffer buf = randomBytes(i); + buffers[i] = buf; + int[] splits = tokenSplitUtil.findOverlappingSplitIds(partitioner, range); + + Arrays.stream(splits).forEach(split -> assertTrue(datastore.select(jobId, split).isEmpty())); + Arrays.stream(splits).forEach(split -> datastore.insert(jobId, split, lower, upper, buf)); + } + assertEquals(numNodes, datastore.store.size()); + + for (int i = 0; i < tokens.size(); i++) + { + BigInteger lower = tokens.get(i); + BigInteger upper = i == tokens.size() - 1 ? partitioner.maxToken() : tokens.get(i + 1); + TokenRange range = TokenRange.openClosed(lower, upper); + int[] splits = tokenSplitUtil.findOverlappingSplitIds(partitioner, range); + ByteBuffer expected = buffers[i]; + Arrays.stream(splits).forEach(split -> assertByteBufferEquals(expected, datastore.selectBuffers(jobId, split).stream().findFirst().orElseThrow())); + } + } + + @ParameterizedTest + @ValueSource(ints = { 4, 8, 32, 128, 1024 }) + public void testShrink(int numNodes) + { + Partitioner partitioner = Partitioner.Murmur3Partitioner; + MockCdcStateV2 datastore = new MockCdcStateV2(); + String jobId = UUID.randomUUID().toString(); + CdcStatesSchema mockCdcStatesSchema = mock(CdcStatesSchema.class, RETURNS_DEEP_STUBS); + TableHistorySchema mockTableHistorySchema = mock(TableHistorySchema.class, RETURNS_DEEP_STUBS); + List<BigInteger> tokensBeforeShrink = TokenSplitUtil.splitTokens(numNodes, partitioner); + List<BigInteger> tokensAfterShrink = TokenSplitUtil.splitTokens(numNodes / 2, partitioner); + TokenSplitUtil tokenSplitUtil = new TokenSplitUtil(numNodes); + + Provider<TokenSplitUtil> mockTokenSplitUtilProvider = mock(Provider.class); + when(mockTokenSplitUtilProvider.get()).thenReturn(tokenSplitUtil); Review Comment: NIT: ```suggestion Provider<TokenSplitUtil> tokenSplitUtilProvider = () -> tokenSplitUtil; ``` ########## server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableHistorySchema.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.sidecar.db.schema; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.coordination.ExecuteOnClusterLeaseholderOnly; +import org.jetbrains.annotations.NotNull; + +/** + * Schema definition and management for tracking table schema evolution history. + * This class extends {@link TableSchema} to provide specialized schema management for storing + * historical versions of table schemas in Cassandra Sidecar. + */ +@Singleton +public class TableHistorySchema extends TableSchema implements ExecuteOnClusterLeaseholderOnly +{ + private static final String TABLE_SCHEMA_HISTORY = "table_schema_history"; + + private final SchemaKeyspaceConfiguration keyspaceConfig; + + // prepared statements + private PreparedStatement insertTableSchema; + private PreparedStatement selectVersionTableSchema; + + @Inject + public TableHistorySchema(ServiceConfiguration configuration) + { + this.keyspaceConfig = configuration.schemaKeyspaceConfiguration(); + } + + @Override + protected void prepareStatements(@NotNull Session session) + { + insertTableSchema = prepare(insertTableSchema, session, CqlLiterals.insertTableSchema(keyspaceConfig)); + selectVersionTableSchema = prepare(selectVersionTableSchema, session, CqlLiterals.selectVersionTableSchema(keyspaceConfig)); + } + + @Override + protected String keyspaceName() + { + return keyspaceConfig.keyspace(); + } + + @Override + protected String tableName() + { + return TABLE_SCHEMA_HISTORY; + } + + @Override + protected boolean exists(@NotNull Metadata metadata) + { + KeyspaceMetadata ksMetadata = metadata.getKeyspace(keyspaceConfig.keyspace()); + if (ksMetadata == null) + { + return false; + } + + return ksMetadata.getTable(TABLE_SCHEMA_HISTORY) != null; + } + + @Override + protected String createSchemaStatement() + { + return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" + + " keyspace_name text," + + " table_name text," + + " version uuid," + + " created_at timeuuid," + + " table_schema text," + + " PRIMARY KEY ((ks, tb), version)" + + ")", + keyspaceConfig.keyspace(), TABLE_SCHEMA_HISTORY); + } + + public PreparedStatement insertTableSchema() + { + return insertTableSchema; + } + + public PreparedStatement selectVersionTableSchema() + { + return selectVersionTableSchema; + } + + private static class CqlLiterals + { + static String insertTableSchema(SchemaKeyspaceConfiguration config) + { + return withTable("INSERT INTO %s.%s (ks, tb, version, created_at, table_schema) " + Review Comment: after the change above, we should use the new names for the keyspace, table columns ```suggestion return withTable("INSERT INTO %s.%s (keyspace_name, table_name, version, created_at, table_schema) " + ``` ########## server/src/main/java/org/apache/cassandra/sidecar/utils/TokenSplitUtil.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.sidecar.utils; + +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.BoundType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.Host; +import com.google.inject.Singleton; +import org.apache.cassandra.bridge.TokenRange; +import org.apache.cassandra.sidecar.cdc.CdcConfig; +import org.apache.cassandra.sidecar.db.ServiceConfig; +import org.apache.cassandra.sidecar.db.TokenSplitConfigAccessor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; + +/** + * Util class that divides token range into fixed number of splits to help distribute load. + * The number of splits should be proportional to the cluster size, so that small clusters + * store state in a small number of partitions, and large clusters store state across a + * larger number of partitions. + */ +@Singleton +public class TokenSplitUtil +{ + private static final Logger LOGGER = LoggerFactory.getLogger(TokenSplitUtil.class); + + public static final Map<Partitioner, BigInteger> PARTITIONER_FULL_RANGE = Map.of( + Partitioner.Murmur3Partitioner, BigInteger.valueOf(2).pow(64), + Partitioner.RandomPartitioner, BigInteger.valueOf(2).pow(127) + ); + + public static final String NUM_SPLITS_KEY = "curr_num_splits"; + // we can in the future add a current/prior split count so we can migrate from one to another after a cluster topology event +// public static final String PRIOR_NUM_SPLITS_KEY = "prior_num_splits"; + + public final int numSplits; + public final List<BigInteger> murmur3Tokens; + public final List<BigInteger> randomTokens; + + @VisibleForTesting + public TokenSplitUtil(int numSplits) + { + this.numSplits = numSplits; + this.murmur3Tokens = splitTokens(numSplits, Partitioner.Murmur3Partitioner); + this.randomTokens = splitTokens(numSplits, Partitioner.RandomPartitioner); + LOGGER.info("Initialized token split util numSplits={}", numSplits); + } + + public TokenSplitUtil(TokenSplitConfigAccessor tokenSplitConfigAccessor, CdcConfig cdcConfig, InstanceMetadataFetcher fetcher) + { + this(getOrInsert(tokenSplitConfigAccessor, cdcConfig, fetcher)); + } + + protected static int getOrInsert(TokenSplitConfigAccessor tokenSplitConfigAccessor, CdcConfig cdcConfig, InstanceMetadataFetcher fetcher) + { + Map<String, String> config = tokenSplitConfigAccessor.getConfig().getConfigs(); + if (config != null && config.containsKey(NUM_SPLITS_KEY)) + { + return Integer.parseInt(config.get(NUM_SPLITS_KEY)); + } + + + int maxDcSize = fetcher.callOnFirstAvailableInstance(instanceMetadata -> instanceMetadata.delegate().metadata()) Review Comment: if sidecar cannot connect to CQL this will prevent the startup of the service. We should not fail here. Also we should have a more descriptive error message when we throw. But in general we should never throw when building the DAG for guice. ########## server/src/test/java/org/apache/cassandra/sidecar/utils/ByteBufUtilsTest.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.utils; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteStreams; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Test for {@link ByteBufUtils} + */ +@SuppressWarnings("UnstableApiUsage") Review Comment: this looks like it might not be needed? ########## server/src/test/java/org/apache/cassandra/sidecar/utils/ByteBufUtilsTest.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.utils; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteStreams; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Test for {@link ByteBufUtils} + */ +@SuppressWarnings("UnstableApiUsage") +public class ByteBufUtilsTest +{ + @Test + public void testSkipBytesFully() throws IOException + { + testSkipBytesFully("abc".getBytes(StandardCharsets.UTF_8)); + testSkipBytesFully("abcdefghijklm".getBytes(StandardCharsets.UTF_8)); + } + + @Test + public void testReadRemainingBytes() throws IOException + { + testReadRemainingBytes(""); + testReadRemainingBytes("abc"); + testReadRemainingBytes("abcdefghijklm"); + } + + @Test + public void testGetArray() + { + testGetArray(""); + testGetArray("abc"); + testGetArray("abcdefghijklm"); + } + + @Test + public void testHexString() + { + assertEquals("00000000000001F4", ByteBufUtils.toHexString((ByteBuffer) ByteBuffer.allocate(8).putLong(500L).flip())); + assertEquals("616263", ByteBufUtils.toHexString(ByteBuffer.wrap(new byte[]{ 'a', 'b', 'c' }))); + assertEquals("000000000588C164", ByteBufUtils.toHexString((ByteBuffer) ByteBuffer.allocate(8).putLong(92848484L).asReadOnlyBuffer().flip())); + assertEquals("null", ByteBufUtils.toHexString(null)); + + assertEquals("616263", ByteBufUtils.toHexString(new byte[]{ 'a', 'b', 'c' }, 0, 3)); + assertEquals("63", ByteBufUtils.toHexString(new byte[]{ 'a', 'b', 'c' }, 2, 1)); + } + + private static void testGetArray(final String str) + { + assertEquals(str, new String(ByteBufUtils.getArray(ByteBuffer.wrap(str.getBytes())), StandardCharsets.UTF_8)); + } + + private static void testReadRemainingBytes(final String str) throws IOException + { + assertEquals(str, new String(ByteBufUtils.readRemainingBytes(new ByteArrayInputStream(str.getBytes()), str.length()), StandardCharsets.UTF_8)); + } + + private static void testSkipBytesFully(final byte[] ar) throws IOException + { + final int len = ar.length; + final ByteArrayDataInput in = ByteStreams.newDataInput(ar, 0); Review Comment: NIT: ```suggestion int len = ar.length; ByteArrayDataInput in = ByteStreams.newDataInput(ar, 0); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/db/schema/TableHistorySchema.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.sidecar.db.schema; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.coordination.ExecuteOnClusterLeaseholderOnly; +import org.jetbrains.annotations.NotNull; + +/** + * Schema definition and management for tracking table schema evolution history. + * This class extends {@link TableSchema} to provide specialized schema management for storing + * historical versions of table schemas in Cassandra Sidecar. + */ +@Singleton +public class TableHistorySchema extends TableSchema implements ExecuteOnClusterLeaseholderOnly +{ + private static final String TABLE_SCHEMA_HISTORY = "table_schema_history"; + + private final SchemaKeyspaceConfiguration keyspaceConfig; + + // prepared statements + private PreparedStatement insertTableSchema; + private PreparedStatement selectVersionTableSchema; + + @Inject + public TableHistorySchema(ServiceConfiguration configuration) + { + this.keyspaceConfig = configuration.schemaKeyspaceConfiguration(); + } + + @Override + protected void prepareStatements(@NotNull Session session) + { + insertTableSchema = prepare(insertTableSchema, session, CqlLiterals.insertTableSchema(keyspaceConfig)); + selectVersionTableSchema = prepare(selectVersionTableSchema, session, CqlLiterals.selectVersionTableSchema(keyspaceConfig)); + } + + @Override + protected String keyspaceName() + { + return keyspaceConfig.keyspace(); + } + + @Override + protected String tableName() + { + return TABLE_SCHEMA_HISTORY; + } + + @Override + protected boolean exists(@NotNull Metadata metadata) + { + KeyspaceMetadata ksMetadata = metadata.getKeyspace(keyspaceConfig.keyspace()); + if (ksMetadata == null) + { + return false; + } + + return ksMetadata.getTable(TABLE_SCHEMA_HISTORY) != null; + } + + @Override + protected String createSchemaStatement() + { + return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" + + " keyspace_name text," + + " table_name text," + + " version uuid," + + " created_at timeuuid," + + " table_schema text," + + " PRIMARY KEY ((ks, tb), version)" + + ")", + keyspaceConfig.keyspace(), TABLE_SCHEMA_HISTORY); + } + + public PreparedStatement insertTableSchema() + { + return insertTableSchema; + } + + public PreparedStatement selectVersionTableSchema() + { + return selectVersionTableSchema; + } + + private static class CqlLiterals + { + static String insertTableSchema(SchemaKeyspaceConfiguration config) + { + return withTable("INSERT INTO %s.%s (ks, tb, version, created_at, table_schema) " + + "VALUES (?, ?, ?, NOW(), ?)", config); + } + + static String selectVersionTableSchema(SchemaKeyspaceConfiguration config) + { + return withTable("SELECT table_schema FROM %s.%s WHERE ks = ? AND tb = ? AND version = ?", config); Review Comment: ```suggestion return withTable("SELECT table_schema FROM %s.%s WHERE keyspace_name = ? AND table_name = ? AND version = ?", config); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java: ########## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.sidecar.db; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Row; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.ProvisionException; +import com.google.inject.Singleton; +import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.bridge.TokenRange; +import org.apache.cassandra.cdc.CdcKryoRegister; +import org.apache.cassandra.cdc.state.CdcState; +import org.apache.cassandra.sidecar.cdc.SidecarCdcStats; +import org.apache.cassandra.sidecar.common.response.NodeSettings; +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.db.schema.CdcStatesSchema; +import org.apache.cassandra.sidecar.db.schema.TableHistorySchema; +import org.apache.cassandra.sidecar.utils.ByteBufUtils; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.TokenSplitUtil; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.utils.TableIdentifier; +import org.apache.cassandra.spark.utils.ThrowableUtils; +import org.jetbrains.annotations.NotNull; + +/** + * Database accessor for CDC (Change Data Capture) state management operations. + * <p> + * This class provides specialized database access functionality for managing CDC state persistence + * and retrieval in Cassandra Sidecar. It extends {@link DatabaseAccessor} to provide CDC-specific + * operations including: + * <ul> + * <li>Storing CDC state data across token range splits for distributed processing</li> + * <li>Loading and merging CDC state from overlapping token ranges</li> + * <li>Managing table schema history for CDC-enabled tables</li> + * <li>Providing partitioner and metadata access for CDC operations</li> + * </ul> + * <p> + * The accessor handles token range splitting to ensure CDC state is properly distributed + * across multiple database partitions for scalability. It uses {@link TokenSplitUtil} + * to determine overlapping splits for both storage and retrieval operations. + * <p> + * Key features: + * <ul> + * <li><strong>Token-aware storage:</strong> Automatically distributes CDC state across + * appropriate token range splits</li> + * <li><strong>State merging:</strong> Combines overlapping CDC state objects into + * canonical views during retrieval</li> + * <li><strong>Schema management:</strong> Tracks table schema versions for CDC operations</li> + * <li><strong>Async operations:</strong> Provides asynchronous database operations for + * better performance</li> + * </ul> + * <p> + * This class is thread-safe and designed as a singleton for injection into CDC components + * that require database access functionality. + * + * @see DatabaseAccessor + * @see CdcStatesSchema + * @see TokenSplitUtil + */ +@SuppressWarnings("resource") +@Singleton +public class CdcDatabaseAccessor extends DatabaseAccessor<CdcStatesSchema> +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CdcDatabaseAccessor.class); + private final TableHistorySchema tableHistorySchema; + private final Provider<TokenSplitUtil> tokenSplitUtilProvider; + private volatile TokenSplitUtil tokenSplitUtil = null; + private volatile InstanceMetadataFetcher instanceMetadataFetcher; + private final CassandraBridgeFactory cassandraBridgeFactory; + + @Inject + public CdcDatabaseAccessor(InstanceMetadataFetcher instanceMetadataFetcher, + CdcStatesSchema cdcStatesSchema, + TableHistorySchema tableHistorySchema, + CQLSessionProvider sessionProvider, + Provider<TokenSplitUtil> tokenSplitUtilProvider, + CassandraBridgeFactory cassandraBridgeFactory) + { + super(cdcStatesSchema, sessionProvider); + this.tableHistorySchema = tableHistorySchema; + this.tokenSplitUtilProvider = tokenSplitUtilProvider; + this.instanceMetadataFetcher = instanceMetadataFetcher; + this.cassandraBridgeFactory = cassandraBridgeFactory; + } + + @VisibleForTesting + public CdcDatabaseAccessor(InstanceMetadataFetcher instanceMetadataFetcher, Review Comment: this is unused. Let's remove it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

