This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/cassandra-diff.git
The following commit(s) were added to refs/heads/master by this push: new 2267933 Support auto discover user tables for comparison 2267933 is described below commit 2267933c3ac009808271e1831cf56258c99cf2d3 Author: Yifan Cai <yifan_...@apple.com> AuthorDate: Thu Jul 16 20:40:40 2020 -0700 Support auto discover user tables for comparison Patch by Yifan Cai; reviewed by Sam Tunnicliffe for CASSANDRA-15953 closes #10 --- README.md | 7 ++ .../apache/cassandra/diff/api/DiffAPIServer.java | 3 +- .../apache/cassandra/diff/JobConfiguration.java | 35 ++++++++- .../cassandra/diff/YamlJobConfiguration.java | 16 ++-- .../cassandra/diff/YamlJobConfigurationTest.java | 23 +++++- ...est_load_config_all_keyspaces_filtered_out.yaml | 57 +++++++++++++++ .../test_load_config_no_keyspace_tables.yaml | 45 ++++++++++++ spark-job/localconfig-auto-discover.yaml | 45 ++++++++++++ .../java/org/apache/cassandra/diff/DiffJob.java | 38 +++++++--- .../java/org/apache/cassandra/diff/Schema.java | 76 +++++++++++++++++++ .../diff/AbstractMockJobConfiguration.java | 79 ++++++++++++++++++++ .../java/org/apache/cassandra/diff/SchemaTest.java | 85 ++++++++++++++++++++++ 12 files changed, 489 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index c17ef59..7793392 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,11 @@ ## Configuration See `spark-job/localconfig.yaml` for an example config. +See `spark-job/localconfig-multi-keyspaces.yaml` for an example config that compares tables under multiple keyspaces. + +See `spark-job/localconfig-auto-discover.yaml` for an example config that auto discovers all user tables to compare. +The auto discover mode excludes all system keyspaces and any keyspaces defined at `disallowed_keyspaces` in the yaml file. + ## Custom cluster providers To make it easy to run in any environment the cluster providers are pluggable - there are two interfaces to implement. First, the `ClusterProvider` interface is used to create a connection to the clusters, and it is configured using @@ -50,6 +55,8 @@ $ docker exec cas-tgt cassandra-stress write n=1k -schema keyspace="keyspace2" $ spark-submit --verbose --files ./spark-job/localconfig.yaml --class org.apache.cassandra.diff.DiffJob spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar localconfig.yaml # If rows are created in "keyspace2", you can run pick up the localconfig-multi-keyspaces.yaml to compare data across multiple keyspaces! See the command below. # $ spark-submit --verbose --files ./spark-job/localconfig-multi-keyspaces.yaml --class org.apache.cassandra.diff.DiffJob spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar localconfig-multi-keyspaces.yaml +# To use the auto discover mode, you can run the job with localconfig-auto-discover.yaml, which has the keyspace_tables field removed. +# $ spark-submit --verbose --files ./spark-job/localconfig-auto-discover.yaml --class org.apache.cassandra.diff.DiffJob spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar localconfig-auto-discover.yaml # ... logs INFO DiffJob:124 - FINISHED: {standard1=Matched Partitions - 1000, Mismatched Partitions - 0, Partition Errors - 0, Partitions Only In Source - 0, Partitions Only In Target - 0, Skipped Partitions - 0, Matched Rows - 1000, Matched Values - 6000, Mismatched Values - 0 } ## start api-server: diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java b/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java index 6f14128..533fa22 100644 --- a/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java +++ b/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java @@ -19,6 +19,7 @@ package org.apache.cassandra.diff.api; +import java.io.FileInputStream; import java.io.IOException; import com.google.common.collect.Lists; @@ -35,7 +36,7 @@ public class DiffAPIServer { String filename = args[0]; JAXRSServerFactoryBean factoryBean = new JAXRSServerFactoryBean(); - DiffJobsResource diffResource = new DiffJobsResource(YamlJobConfiguration.load(filename)); + DiffJobsResource diffResource = new DiffJobsResource(YamlJobConfiguration.load(new FileInputStream(filename))); factoryBean.setResourceProviders(Lists.newArrayList(new SingletonResourceProvider(diffResource), new SingletonResourceProvider(new HealthResource()))); factoryBean.setAddress("http://localhost:8089/"); diff --git a/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java index 2d6cb51..cf12ea3 100644 --- a/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java +++ b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java @@ -20,13 +20,46 @@ package org.apache.cassandra.diff; import java.io.Serializable; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; +import javax.annotation.Nullable; public interface JobConfiguration extends Serializable { - List<KeyspaceTablePair> keyspaceTables(); + + default boolean shouldAutoDiscoverTables() { + List<KeyspaceTablePair> list = keyspaceTables(); + return null == list || list.isEmpty(); + } + + /** + * @return qualified tables defined in the configuration. Return null if not defined. + */ + @Nullable List<KeyspaceTablePair> keyspaceTables(); + + /** + * @return a list of keyspace names that are disallowed for comparison. Return null if not defined. + */ + @Nullable List<String> disallowedKeyspaces(); + + /** + * @return filtered qualified tables based on the keyspaceTables and disallowedKeypsaces defined. + * Return null if keyspaceTables is not defined. + */ + @Nullable default List<KeyspaceTablePair> filteredKeyspaceTables() { + List<String> disallowedKeyspaces = disallowedKeyspaces(); + List<KeyspaceTablePair> list = keyspaceTables(); + if (disallowedKeyspaces != null && !disallowedKeyspaces.isEmpty() && list != null && !list.isEmpty()) { + Set<String> filter = new HashSet<>(disallowedKeyspaces); + return list.stream().filter(t -> !filter.contains(t.keyspace)).collect(Collectors.toList()); + } else { + return list; + } + } int splits(); diff --git a/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java index 3666e48..c49da20 100644 --- a/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java +++ b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java @@ -19,8 +19,7 @@ package org.apache.cassandra.diff; -import java.io.FileInputStream; -import java.io.FileNotFoundException; +import java.io.InputStream; import java.math.BigInteger; import java.util.HashSet; import java.util.List; @@ -35,6 +34,7 @@ import org.yaml.snakeyaml.constructor.CustomClassLoaderConstructor; public class YamlJobConfiguration implements JobConfiguration { public int splits = 10000; public List<KeyspaceTablePair> keyspace_tables; + public List<String> disallowed_keyspaces; public int buckets = 100; public int rate_limit = 10000; public String job_id = null; @@ -48,20 +48,20 @@ public class YamlJobConfiguration implements JobConfiguration { public String specific_tokens = null; public String disallowed_tokens = null; - public static YamlJobConfiguration load(String file) { + public static YamlJobConfiguration load(InputStream inputStream) { Yaml yaml = new Yaml(new CustomClassLoaderConstructor(YamlJobConfiguration.class, Thread.currentThread().getContextClassLoader())); - try { - return yaml.loadAs(new FileInputStream(file), YamlJobConfiguration.class); - } catch (FileNotFoundException e) { - throw new RuntimeException(e); - } + return yaml.loadAs(inputStream, YamlJobConfiguration.class); } public List<KeyspaceTablePair> keyspaceTables() { return keyspace_tables; } + public List<String> disallowedKeyspaces() { + return disallowed_keyspaces; + } + public int splits() { return splits; } diff --git a/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java b/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java index 813f38a..39c43ee 100644 --- a/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java +++ b/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java @@ -6,11 +6,32 @@ import org.junit.Test; public class YamlJobConfigurationTest { @Test public void testLoadYaml() { - JobConfiguration jobConfiguration = YamlJobConfiguration.load("src/test/resources/testconfig.yaml"); + JobConfiguration jobConfiguration = load("testconfig.yaml"); Assert.assertEquals(3, jobConfiguration.keyspaceTables().size()); jobConfiguration.keyspaceTables().forEach(kt -> { Assert.assertTrue("Keyspace segment is not loaded correctly", kt.keyspace.contains("ks")); Assert.assertTrue("Table segment is not loaded correctly", kt.table.contains("tb")); }); } + + @Test + public void testLoadYamlWithKeyspaceTablesAbsent() { + JobConfiguration jobConfiguration = load("test_load_config_no_keyspace_tables.yaml"); + Assert.assertNull(jobConfiguration.keyspaceTables()); + Assert.assertNull(jobConfiguration.disallowedKeyspaces()); + Assert.assertNull(jobConfiguration.filteredKeyspaceTables()); + Assert.assertTrue(jobConfiguration.shouldAutoDiscoverTables()); + } + + @Test + public void testLoadYamlFilterOutDisallowedKeyspaces() { + JobConfiguration jobConfiguration = load("test_load_config_all_keyspaces_filtered_out.yaml"); + Assert.assertNotNull(jobConfiguration.filteredKeyspaceTables()); + Assert.assertTrue("All tables should be filtered out", jobConfiguration.filteredKeyspaceTables().isEmpty()); + Assert.assertFalse("It should not be in the discover mode", jobConfiguration.shouldAutoDiscoverTables()); + } + + private JobConfiguration load(String filename) { + return YamlJobConfiguration.load(getClass().getClassLoader().getResourceAsStream(filename)); + } } diff --git a/common/src/test/resources/test_load_config_all_keyspaces_filtered_out.yaml b/common/src/test/resources/test_load_config_all_keyspaces_filtered_out.yaml new file mode 100644 index 0000000..1dc5556 --- /dev/null +++ b/common/src/test/resources/test_load_config_all_keyspaces_filtered_out.yaml @@ -0,0 +1,57 @@ +# List of keyspace.tables to diff +keyspace_tables: + - ks1.tb1 + - ks1.tb2 + - ks2.tb3 + +# It makes no sense to filter out keyspaces defined in keyspace_tables +# Only have it for the testing purpose. +disallowed_keyspaces: + - ks1 + - ks2 + +# This is how many parts we split the full token range in. +# Each of these splits is then compared between the clusters +splits: 10000 + +# Number of buckets - splits / buckets should be under 100k to avoid wide partitions when storing the metadata +buckets: 100 + +# global rate limit - this is how many q/s you think the target clusters can handle +rate_limit: 10000 + +# optional job id - if restarting a job, set the correct job_id here to avoid re-diffing old splits +# job_id: 4e2c6c6b-bed7-4c4e-bd4c-28bef89c3cef + +# Fetch size to use for the query fetching the tokens in the cluster +token_scan_fetch_size: 1000 +# Fetch size to use for the queries fetching the rows of each partition +partition_read_fetch_size: 1000 + +read_timeout_millis: 10000 +reverse_read_probability: 0.5 +consistency_level: ALL +metadata_options: + keyspace: cassandradiff + replication: "{'class':'SimpleStrategy', 'replication_factor':'1'}" + ttl: 31536000 + should_init: true +cluster_config: + source: + impl: "org.apache.cassandra.diff.ContactPointsClusterProvider" + name: "local_test_1" + contact_points: "127.0.0.1" + port: "9042" + dc: "datacenter1" + target: + impl: "org.apache.cassandra.diff.ContactPointsClusterProvider" + name: "local_test_2" + contact_points: "127.0.0.1" + port: "9043" + dc: "datacenter1" + metadata: + impl: "org.apache.cassandra.diff.ContactPointsClusterProvider" + name: "local_test" + contact_points: "127.0.0.1" + port: "9042" + dc: "datacenter1" diff --git a/common/src/test/resources/test_load_config_no_keyspace_tables.yaml b/common/src/test/resources/test_load_config_no_keyspace_tables.yaml new file mode 100644 index 0000000..00ac31f --- /dev/null +++ b/common/src/test/resources/test_load_config_no_keyspace_tables.yaml @@ -0,0 +1,45 @@ +# This is how many parts we split the full token range in. +# Each of these splits is then compared between the clusters +splits: 10000 + +# Number of buckets - splits / buckets should be under 100k to avoid wide partitions when storing the metadata +buckets: 100 + +# global rate limit - this is how many q/s you think the target clusters can handle +rate_limit: 10000 + +# optional job id - if restarting a job, set the correct job_id here to avoid re-diffing old splits +# job_id: 4e2c6c6b-bed7-4c4e-bd4c-28bef89c3cef + +# Fetch size to use for the query fetching the tokens in the cluster +token_scan_fetch_size: 1000 +# Fetch size to use for the queries fetching the rows of each partition +partition_read_fetch_size: 1000 + +read_timeout_millis: 10000 +reverse_read_probability: 0.5 +consistency_level: ALL +metadata_options: + keyspace: cassandradiff + replication: "{'class':'SimpleStrategy', 'replication_factor':'1'}" + ttl: 31536000 + should_init: true +cluster_config: + source: + impl: "org.apache.cassandra.diff.ContactPointsClusterProvider" + name: "local_test_1" + contact_points: "127.0.0.1" + port: "9042" + dc: "datacenter1" + target: + impl: "org.apache.cassandra.diff.ContactPointsClusterProvider" + name: "local_test_2" + contact_points: "127.0.0.1" + port: "9043" + dc: "datacenter1" + metadata: + impl: "org.apache.cassandra.diff.ContactPointsClusterProvider" + name: "local_test" + contact_points: "127.0.0.1" + port: "9042" + dc: "datacenter1" diff --git a/spark-job/localconfig-auto-discover.yaml b/spark-job/localconfig-auto-discover.yaml new file mode 100644 index 0000000..00ac31f --- /dev/null +++ b/spark-job/localconfig-auto-discover.yaml @@ -0,0 +1,45 @@ +# This is how many parts we split the full token range in. +# Each of these splits is then compared between the clusters +splits: 10000 + +# Number of buckets - splits / buckets should be under 100k to avoid wide partitions when storing the metadata +buckets: 100 + +# global rate limit - this is how many q/s you think the target clusters can handle +rate_limit: 10000 + +# optional job id - if restarting a job, set the correct job_id here to avoid re-diffing old splits +# job_id: 4e2c6c6b-bed7-4c4e-bd4c-28bef89c3cef + +# Fetch size to use for the query fetching the tokens in the cluster +token_scan_fetch_size: 1000 +# Fetch size to use for the queries fetching the rows of each partition +partition_read_fetch_size: 1000 + +read_timeout_millis: 10000 +reverse_read_probability: 0.5 +consistency_level: ALL +metadata_options: + keyspace: cassandradiff + replication: "{'class':'SimpleStrategy', 'replication_factor':'1'}" + ttl: 31536000 + should_init: true +cluster_config: + source: + impl: "org.apache.cassandra.diff.ContactPointsClusterProvider" + name: "local_test_1" + contact_points: "127.0.0.1" + port: "9042" + dc: "datacenter1" + target: + impl: "org.apache.cassandra.diff.ContactPointsClusterProvider" + name: "local_test_2" + contact_points: "127.0.0.1" + port: "9043" + dc: "datacenter1" + metadata: + impl: "org.apache.cassandra.diff.ContactPointsClusterProvider" + name: "local_test" + contact_points: "127.0.0.1" + port: "9042" + dc: "datacenter1" diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java index 4a1faa1..ff697d2 100644 --- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java +++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java @@ -19,6 +19,8 @@ package org.apache.cassandra.diff; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; @@ -32,6 +34,7 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,14 +57,14 @@ import org.apache.spark.sql.SparkSession; public class DiffJob { private static final Logger logger = LoggerFactory.getLogger(DiffJob.class); - public static void main(String ... args) { + public static void main(String ... args) throws FileNotFoundException { if (args.length == 0) { System.exit(-1); } SparkSession spark = SparkSession.builder().appName("cassandra-diff").getOrCreate(); JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); String configFile = SparkFiles.get(args[0]); - YamlJobConfiguration configuration = YamlJobConfiguration.load(configFile); + YamlJobConfiguration configuration = YamlJobConfiguration.load(new FileInputStream(configFile)); DiffJob diffJob = new DiffJob(); diffJob.run(configuration, sc); spark.stop(); @@ -87,15 +90,32 @@ public class DiffJob { ClusterProvider targetProvider = ClusterProvider.getProvider(configuration.clusterConfig("target"), "target"); String sourcePartitioner; String targetPartitioner; + List<KeyspaceTablePair> tablesToCompare = configuration.filteredKeyspaceTables(); try (Cluster sourceCluster = sourceProvider.getCluster(); Cluster targetCluster = targetProvider.getCluster()) { sourcePartitioner = sourceCluster.getMetadata().getPartitioner(); targetPartitioner = targetCluster.getMetadata().getPartitioner(); + + if (!sourcePartitioner.equals(targetPartitioner)) { + throw new IllegalStateException(String.format("Cluster partitioners do not match; Source: %s, Target: %s,", + sourcePartitioner, targetPartitioner)); + } + + if (configuration.shouldAutoDiscoverTables()) { + Schema sourceSchema = new Schema(sourceCluster.getMetadata(), configuration); + Schema targetSchema = new Schema(targetCluster.getMetadata(), configuration); + Schema commonSchema = sourceSchema.intersect(targetSchema); + if (commonSchema.size() != sourceSchema.size()) { + Pair<Set<KeyspaceTablePair>, Set<KeyspaceTablePair>> difference = Schema.difference(sourceSchema, targetSchema); + logger.warn("Found tables that only exist in either source or target cluster. Ignoring those tables for comparision. " + + "Distinct tables in source cluster: {}. " + + "Distinct tables in target cluster: {}", + difference.getLeft(), difference.getRight()); + } + tablesToCompare = commonSchema.toQualifiedTableList(); + } } - if (!sourcePartitioner.equals(targetPartitioner)) { - throw new IllegalStateException(String.format("Cluster partitioners do not match; Source: %s, Target: %s,", - sourcePartitioner, targetPartitioner)); - } + TokenHelper tokenHelper = TokenHelper.forPartitioner(sourcePartitioner); logger.info("Configuring job metadata store"); @@ -111,7 +131,7 @@ public class DiffJob { // Job params, which once a job is created cannot be modified in subsequent re-runs logger.info("Creating or retrieving job parameters"); job = new JobMetadataDb.JobLifeCycle(metadataSession, metadataOptions.keyspace); - Params params = getJobParams(job, configuration); + Params params = getJobParams(job, configuration, tablesToCompare); logger.info("Job Params: {}", params); if (null == params) throw new RuntimeException("Unable to initialize job params"); @@ -178,12 +198,12 @@ public class DiffJob { } } - private static Params getJobParams(JobMetadataDb.JobLifeCycle job, JobConfiguration conf) { + private static Params getJobParams(JobMetadataDb.JobLifeCycle job, JobConfiguration conf, List<KeyspaceTablePair> keyspaceTables) { if (conf.jobId().isPresent()) { return job.getJobParams(conf.jobId().get()); } else { return new Params(UUID.randomUUID(), - conf.keyspaceTables(), + keyspaceTables, conf.buckets(), conf.splits()); } diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/Schema.java b/spark-job/src/main/java/org/apache/cassandra/diff/Schema.java new file mode 100644 index 0000000..ad9c593 --- /dev/null +++ b/spark-job/src/main/java/org/apache/cassandra/diff/Schema.java @@ -0,0 +1,76 @@ +package org.apache.cassandra.diff; + + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +import org.apache.commons.lang3.tuple.Pair; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.TableMetadata; + +public class Schema { + // Filter out all system keyspaces. + private static final Set<String> DEFAULT_FILTER = ImmutableSet.of( + "system", "system_schema", "system_traces", "system_auth", "system_distributed", "system_virtual_schema", "system_views" + ); + + private final Set<KeyspaceTablePair> qualifiedTables; + + public Schema(Metadata metadata, JobConfiguration configuration) { + Set<String> keyspaceFilter = getKeyspaceFilter(configuration); + qualifiedTables = new HashSet<>(); + for (KeyspaceMetadata keyspaceMetadata : metadata.getKeyspaces()) { + if (keyspaceFilter.contains(keyspaceMetadata.getName())) + continue; + + for (TableMetadata tableMetadata : keyspaceMetadata.getTables()) { + qualifiedTables.add(KeyspaceTablePair.from(tableMetadata)); + } + } + } + + public Schema(Set<KeyspaceTablePair> qualifiedTables) { + this.qualifiedTables = qualifiedTables; + } + + public Schema intersect(Schema other) { + if (this != other) { + Set<KeyspaceTablePair> intersection = Sets.intersection(this.qualifiedTables, other.qualifiedTables); + return new Schema(intersection); + } + return this; + } + + public List<KeyspaceTablePair> toQualifiedTableList() { + return new ArrayList<>(qualifiedTables); + } + + public int size() { + return qualifiedTables.size(); + } + + @VisibleForTesting + public static Set<String> getKeyspaceFilter(JobConfiguration configuration) { + List<String> disallowedKeyspaces = configuration.disallowedKeyspaces(); + if (null == disallowedKeyspaces) { + return DEFAULT_FILTER; + } else { + return Sets.union(DEFAULT_FILTER, ImmutableSet.copyOf(disallowedKeyspaces)); + } + } + + // Returns the distinct tables from the according shcema. + // The left contains the distinct tables from schema `first` + // The right contains the distinct tables from schema `second` + public static Pair<Set<KeyspaceTablePair>, Set<KeyspaceTablePair>> difference(Schema first, Schema second) { + return Pair.of(Sets.difference(first.qualifiedTables, second.qualifiedTables), Sets.difference(second.qualifiedTables, first.qualifiedTables)); + } +} diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/AbstractMockJobConfiguration.java b/spark-job/src/test/java/org/apache/cassandra/diff/AbstractMockJobConfiguration.java new file mode 100644 index 0000000..5f9d22a --- /dev/null +++ b/spark-job/src/test/java/org/apache/cassandra/diff/AbstractMockJobConfiguration.java @@ -0,0 +1,79 @@ +package org.apache.cassandra.diff; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import javax.annotation.Nullable; + +// Overrides methods on demand +public abstract class AbstractMockJobConfiguration implements JobConfiguration { + private final UnsupportedOperationException uoe = new UnsupportedOperationException("Not implemented"); + + @Nullable + @Override + public List<KeyspaceTablePair> keyspaceTables() { + throw uoe; + } + + @Nullable + @Override + public List<String> disallowedKeyspaces() { + throw uoe; + } + + @Override + public int splits() { + throw uoe; + } + + @Override + public int buckets() { + throw uoe; + } + + @Override + public int rateLimit() { + throw uoe; + } + + @Override + public Optional<UUID> jobId() { + throw uoe; + } + + @Override + public int tokenScanFetchSize() { + throw uoe; + } + + @Override + public int partitionReadFetchSize() { + throw uoe; + } + + @Override + public int readTimeoutMillis() { + throw uoe; + } + + @Override + public double reverseReadProbability() { + throw uoe; + } + + @Override + public String consistencyLevel() { + throw uoe; + } + + @Override + public MetadataKeyspaceOptions metadataOptions() { + throw uoe; + } + + @Override + public Map<String, String> clusterConfig(String identifier) { + throw uoe; + } +} diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/SchemaTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/SchemaTest.java new file mode 100644 index 0000000..17dc67c --- /dev/null +++ b/spark-job/src/test/java/org/apache/cassandra/diff/SchemaTest.java @@ -0,0 +1,85 @@ +package org.apache.cassandra.diff; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import javax.annotation.Nullable; + +import com.google.common.collect.ImmutableSet; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.Assert; +import org.junit.Test; + +public class SchemaTest { + private static final KeyspaceTablePair kt1 = new KeyspaceTablePair("ks", "tbl1"); + private static final KeyspaceTablePair kt2 = new KeyspaceTablePair("ks", "tbl2"); + private static final KeyspaceTablePair kt3 = new KeyspaceTablePair("ks", "tbl3"); + + private class MockConfig extends AbstractMockJobConfiguration { + private List<String> disallowedKeyspaces; + + MockConfig(List<String> disallowedKeyspaces) { + this.disallowedKeyspaces = disallowedKeyspaces; + } + + @Nullable + @Override + public List<String> disallowedKeyspaces() { + return disallowedKeyspaces; + } + } + + @Test + public void testIntersectSameInstance() { + Schema schemaA = new Schema(new HashSet<>()); + Schema intersection = schemaA.intersect(schemaA); + Assert.assertSame(schemaA, intersection); + } + + @Test + public void testIntersectIsCommutative() { + Schema schemaA = new Schema(ImmutableSet.of(kt1, kt2)); + Schema schemaB = new Schema(ImmutableSet.of(kt1, kt3)); + Schema intersection1 = schemaA.intersect(schemaB); + Schema intersection2 = schemaB.intersect(schemaA); + Assert.assertEquals(1, intersection1.size()); + Assert.assertEquals(intersection1.toQualifiedTableList(), intersection2.toQualifiedTableList()); + } + + @Test + public void testToQualifiedTableList() { + Schema schema = new Schema(ImmutableSet.of(kt1, kt2, kt3)); + Assert.assertEquals(3, schema.size()); + Assert.assertEquals(ImmutableSet.of(kt1, kt2, kt3), ImmutableSet.copyOf(schema.toQualifiedTableList())); + } + + @Test + public void testGetKeyspaceFilterWithDefault() { + MockConfig config = new MockConfig(null); + Set<String> filter = Schema.getKeyspaceFilter(config); + Assert.assertFalse("Default fileter should not be empty", filter.isEmpty()); + } + + @Test + public void testGetKeyspaceFilterWithAdditions() { + List<String> disallowed = Arrays.asList("ks1, ks2"); + MockConfig configWithDefault = new MockConfig(null); + MockConfig configWithAddition = new MockConfig(disallowed); + Set<String> defaultFilter = Schema.getKeyspaceFilter(configWithDefault); + Set<String> filter = Schema.getKeyspaceFilter(configWithAddition); + Assert.assertFalse("Filter should not be not empty", filter.isEmpty()); + Assert.assertEquals("Filter with additions should be larger than the default", disallowed.size(), filter.size() - defaultFilter.size()); + disallowed.forEach(ks -> Assert.assertTrue("Filter should contain the additional disallowed keyspace.", filter.contains(ks))); + } + + @Test + public void testSchemaDifference() { + Schema first = new Schema(ImmutableSet.of(kt1, kt2)); + Schema second = new Schema(ImmutableSet.of(kt2, kt3)); + Pair<Set<KeyspaceTablePair>, Set<KeyspaceTablePair>> difference = Schema.difference(first, second); + Assert.assertTrue("Should contain the distinct table in first schema", difference.getLeft().contains(kt1)); + Assert.assertTrue("Should contain the distinct table in second schema", difference.getRight().contains(kt3)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org