This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-diff.git


The following commit(s) were added to refs/heads/master by this push:
     new 2267933  Support auto discover user tables for comparison
2267933 is described below

commit 2267933c3ac009808271e1831cf56258c99cf2d3
Author: Yifan Cai <yifan_...@apple.com>
AuthorDate: Thu Jul 16 20:40:40 2020 -0700

    Support auto discover user tables for comparison
    
    Patch by Yifan Cai; reviewed by Sam Tunnicliffe for CASSANDRA-15953
    
    closes #10
---
 README.md                                          |  7 ++
 .../apache/cassandra/diff/api/DiffAPIServer.java   |  3 +-
 .../apache/cassandra/diff/JobConfiguration.java    | 35 ++++++++-
 .../cassandra/diff/YamlJobConfiguration.java       | 16 ++--
 .../cassandra/diff/YamlJobConfigurationTest.java   | 23 +++++-
 ...est_load_config_all_keyspaces_filtered_out.yaml | 57 +++++++++++++++
 .../test_load_config_no_keyspace_tables.yaml       | 45 ++++++++++++
 spark-job/localconfig-auto-discover.yaml           | 45 ++++++++++++
 .../java/org/apache/cassandra/diff/DiffJob.java    | 38 +++++++---
 .../java/org/apache/cassandra/diff/Schema.java     | 76 +++++++++++++++++++
 .../diff/AbstractMockJobConfiguration.java         | 79 ++++++++++++++++++++
 .../java/org/apache/cassandra/diff/SchemaTest.java | 85 ++++++++++++++++++++++
 12 files changed, 489 insertions(+), 20 deletions(-)

diff --git a/README.md b/README.md
index c17ef59..7793392 100644
--- a/README.md
+++ b/README.md
@@ -3,6 +3,11 @@
 ## Configuration
 See `spark-job/localconfig.yaml` for an example config.
 
+See `spark-job/localconfig-multi-keyspaces.yaml` for an example config that 
compares tables under multiple keyspaces.
+
+See `spark-job/localconfig-auto-discover.yaml` for an example config that auto 
discovers all user tables to compare. 
+The auto discover mode excludes all system keyspaces and any keyspaces defined 
at `disallowed_keyspaces` in the yaml file.
+
 ## Custom cluster providers
 To make it easy to run in any environment the cluster providers are pluggable 
- there are two interfaces to implement.
 First, the `ClusterProvider` interface is used to create a connection to the 
clusters, and it is configured using
@@ -50,6 +55,8 @@ $ docker exec cas-tgt cassandra-stress write n=1k -schema 
keyspace="keyspace2"
 $ spark-submit --verbose --files ./spark-job/localconfig.yaml --class 
org.apache.cassandra.diff.DiffJob 
spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar localconfig.yaml
 # If rows are created in "keyspace2", you can run pick up the 
localconfig-multi-keyspaces.yaml to compare data across multiple keyspaces! See 
the command below.
 # $ spark-submit --verbose --files 
./spark-job/localconfig-multi-keyspaces.yaml --class 
org.apache.cassandra.diff.DiffJob 
spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar 
localconfig-multi-keyspaces.yaml
+# To use the auto discover mode, you can run the job with 
localconfig-auto-discover.yaml, which has the keyspace_tables field removed. 
+# $ spark-submit --verbose --files ./spark-job/localconfig-auto-discover.yaml 
--class org.apache.cassandra.diff.DiffJob 
spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar 
localconfig-auto-discover.yaml
 # ... logs
 INFO  DiffJob:124 - FINISHED: {standard1=Matched Partitions - 1000, Mismatched 
Partitions - 0, Partition Errors - 0, Partitions Only In Source - 0, Partitions 
Only In Target - 0, Skipped Partitions - 0, Matched Rows - 1000, Matched Values 
- 6000, Mismatched Values - 0 }
 ## start api-server:
diff --git 
a/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java 
b/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java
index 6f14128..533fa22 100644
--- a/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java
@@ -19,6 +19,7 @@
 
 package org.apache.cassandra.diff.api;
 
+import java.io.FileInputStream;
 import java.io.IOException;
 
 import com.google.common.collect.Lists;
@@ -35,7 +36,7 @@ public class DiffAPIServer {
         String filename = args[0];
         JAXRSServerFactoryBean factoryBean = new JAXRSServerFactoryBean();
 
-        DiffJobsResource diffResource = new 
DiffJobsResource(YamlJobConfiguration.load(filename));
+        DiffJobsResource diffResource = new 
DiffJobsResource(YamlJobConfiguration.load(new FileInputStream(filename)));
         factoryBean.setResourceProviders(Lists.newArrayList(new 
SingletonResourceProvider(diffResource),
                                                             new 
SingletonResourceProvider(new HealthResource())));
         factoryBean.setAddress("http://localhost:8089/";);
diff --git 
a/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java 
b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
index 2d6cb51..cf12ea3 100644
--- a/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
+++ b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
@@ -20,13 +20,46 @@
 package org.apache.cassandra.diff;
 
 import java.io.Serializable;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 
 public interface JobConfiguration extends Serializable {
-    List<KeyspaceTablePair> keyspaceTables();
+
+    default boolean shouldAutoDiscoverTables() {
+        List<KeyspaceTablePair> list = keyspaceTables();
+        return null == list || list.isEmpty();
+    }
+
+    /**
+     * @return qualified tables defined in the configuration. Return null if 
not defined.
+     */
+    @Nullable List<KeyspaceTablePair> keyspaceTables();
+
+    /**
+     * @return a list of keyspace names that are disallowed for comparison. 
Return null if not defined.
+     */
+    @Nullable List<String> disallowedKeyspaces();
+
+    /**
+     * @return filtered qualified tables based on the keyspaceTables and 
disallowedKeypsaces defined.
+     *         Return null if keyspaceTables is not defined.
+     */
+    @Nullable default List<KeyspaceTablePair> filteredKeyspaceTables() {
+        List<String> disallowedKeyspaces = disallowedKeyspaces();
+        List<KeyspaceTablePair> list = keyspaceTables();
+        if (disallowedKeyspaces != null && !disallowedKeyspaces.isEmpty() && 
list != null && !list.isEmpty()) {
+            Set<String> filter = new HashSet<>(disallowedKeyspaces);
+            return list.stream().filter(t -> 
!filter.contains(t.keyspace)).collect(Collectors.toList());
+        } else {
+            return list;
+        }
+    }
 
     int splits();
 
diff --git 
a/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java 
b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
index 3666e48..c49da20 100644
--- a/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
+++ b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
@@ -19,8 +19,7 @@
 
 package org.apache.cassandra.diff;
 
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
+import java.io.InputStream;
 import java.math.BigInteger;
 import java.util.HashSet;
 import java.util.List;
@@ -35,6 +34,7 @@ import 
org.yaml.snakeyaml.constructor.CustomClassLoaderConstructor;
 public class YamlJobConfiguration implements JobConfiguration {
     public int splits = 10000;
     public List<KeyspaceTablePair> keyspace_tables;
+    public List<String> disallowed_keyspaces;
     public int buckets = 100;
     public int rate_limit = 10000;
     public String job_id = null;
@@ -48,20 +48,20 @@ public class YamlJobConfiguration implements 
JobConfiguration {
     public String specific_tokens = null;
     public String disallowed_tokens = null;
 
-    public static YamlJobConfiguration load(String file) {
+    public static YamlJobConfiguration load(InputStream inputStream) {
         Yaml yaml = new Yaml(new 
CustomClassLoaderConstructor(YamlJobConfiguration.class,
                                                               
Thread.currentThread().getContextClassLoader()));
-        try {
-            return yaml.loadAs(new FileInputStream(file), 
YamlJobConfiguration.class);
-        } catch (FileNotFoundException e) {
-            throw new RuntimeException(e);
-        }
+        return yaml.loadAs(inputStream, YamlJobConfiguration.class);
     }
 
     public List<KeyspaceTablePair> keyspaceTables() {
         return keyspace_tables;
     }
 
+    public List<String> disallowedKeyspaces() {
+        return disallowed_keyspaces;
+    }
+
     public int splits() {
         return splits;
     }
diff --git 
a/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java 
b/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java
index 813f38a..39c43ee 100644
--- 
a/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java
+++ 
b/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java
@@ -6,11 +6,32 @@ import org.junit.Test;
 public class YamlJobConfigurationTest {
     @Test
     public void testLoadYaml() {
-        JobConfiguration jobConfiguration = 
YamlJobConfiguration.load("src/test/resources/testconfig.yaml");
+        JobConfiguration jobConfiguration = load("testconfig.yaml");
         Assert.assertEquals(3, jobConfiguration.keyspaceTables().size());
         jobConfiguration.keyspaceTables().forEach(kt -> {
             Assert.assertTrue("Keyspace segment is not loaded correctly", 
kt.keyspace.contains("ks"));
             Assert.assertTrue("Table segment is not loaded correctly", 
kt.table.contains("tb"));
         });
     }
+
+    @Test
+    public void testLoadYamlWithKeyspaceTablesAbsent() {
+        JobConfiguration jobConfiguration = 
load("test_load_config_no_keyspace_tables.yaml");
+        Assert.assertNull(jobConfiguration.keyspaceTables());
+        Assert.assertNull(jobConfiguration.disallowedKeyspaces());
+        Assert.assertNull(jobConfiguration.filteredKeyspaceTables());
+        Assert.assertTrue(jobConfiguration.shouldAutoDiscoverTables());
+    }
+
+    @Test
+    public void testLoadYamlFilterOutDisallowedKeyspaces() {
+        JobConfiguration jobConfiguration = 
load("test_load_config_all_keyspaces_filtered_out.yaml");
+        Assert.assertNotNull(jobConfiguration.filteredKeyspaceTables());
+        Assert.assertTrue("All tables should be filtered out", 
jobConfiguration.filteredKeyspaceTables().isEmpty());
+        Assert.assertFalse("It should not be in the discover mode", 
jobConfiguration.shouldAutoDiscoverTables());
+    }
+
+    private JobConfiguration load(String filename) {
+        return 
YamlJobConfiguration.load(getClass().getClassLoader().getResourceAsStream(filename));
+    }
 }
diff --git 
a/common/src/test/resources/test_load_config_all_keyspaces_filtered_out.yaml 
b/common/src/test/resources/test_load_config_all_keyspaces_filtered_out.yaml
new file mode 100644
index 0000000..1dc5556
--- /dev/null
+++ b/common/src/test/resources/test_load_config_all_keyspaces_filtered_out.yaml
@@ -0,0 +1,57 @@
+# List of keyspace.tables to diff
+keyspace_tables:
+  - ks1.tb1
+  - ks1.tb2
+  - ks2.tb3
+
+# It makes no sense to filter out keyspaces defined in keyspace_tables
+# Only have it for the testing purpose.
+disallowed_keyspaces:
+  - ks1
+  - ks2
+
+# This is how many parts we split the full token range in.
+# Each of these splits is then compared between the clusters
+splits: 10000
+
+# Number of buckets - splits / buckets should be under 100k to avoid wide 
partitions when storing the metadata
+buckets: 100
+
+# global rate limit - this is how many q/s you think the target clusters can 
handle
+rate_limit: 10000
+
+# optional job id - if restarting a job, set the correct job_id here to avoid 
re-diffing old splits
+# job_id: 4e2c6c6b-bed7-4c4e-bd4c-28bef89c3cef
+
+# Fetch size to use for the query fetching the tokens in the cluster
+token_scan_fetch_size: 1000
+# Fetch size to use for the queries fetching the rows of each partition
+partition_read_fetch_size: 1000
+
+read_timeout_millis: 10000
+reverse_read_probability: 0.5
+consistency_level: ALL
+metadata_options:
+  keyspace: cassandradiff
+  replication: "{'class':'SimpleStrategy', 'replication_factor':'1'}"
+  ttl: 31536000
+  should_init: true
+cluster_config:
+  source:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test_1"
+    contact_points: "127.0.0.1"
+    port: "9042"
+    dc: "datacenter1"
+  target:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test_2"
+    contact_points: "127.0.0.1"
+    port: "9043"
+    dc: "datacenter1"
+  metadata:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test"
+    contact_points: "127.0.0.1"
+    port: "9042"
+    dc: "datacenter1"
diff --git a/common/src/test/resources/test_load_config_no_keyspace_tables.yaml 
b/common/src/test/resources/test_load_config_no_keyspace_tables.yaml
new file mode 100644
index 0000000..00ac31f
--- /dev/null
+++ b/common/src/test/resources/test_load_config_no_keyspace_tables.yaml
@@ -0,0 +1,45 @@
+# This is how many parts we split the full token range in.
+# Each of these splits is then compared between the clusters
+splits: 10000
+
+# Number of buckets - splits / buckets should be under 100k to avoid wide 
partitions when storing the metadata
+buckets: 100
+
+# global rate limit - this is how many q/s you think the target clusters can 
handle
+rate_limit: 10000
+
+# optional job id - if restarting a job, set the correct job_id here to avoid 
re-diffing old splits
+# job_id: 4e2c6c6b-bed7-4c4e-bd4c-28bef89c3cef
+
+# Fetch size to use for the query fetching the tokens in the cluster
+token_scan_fetch_size: 1000
+# Fetch size to use for the queries fetching the rows of each partition
+partition_read_fetch_size: 1000
+
+read_timeout_millis: 10000
+reverse_read_probability: 0.5
+consistency_level: ALL
+metadata_options:
+  keyspace: cassandradiff
+  replication: "{'class':'SimpleStrategy', 'replication_factor':'1'}"
+  ttl: 31536000
+  should_init: true
+cluster_config:
+  source:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test_1"
+    contact_points: "127.0.0.1"
+    port: "9042"
+    dc: "datacenter1"
+  target:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test_2"
+    contact_points: "127.0.0.1"
+    port: "9043"
+    dc: "datacenter1"
+  metadata:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test"
+    contact_points: "127.0.0.1"
+    port: "9042"
+    dc: "datacenter1"
diff --git a/spark-job/localconfig-auto-discover.yaml 
b/spark-job/localconfig-auto-discover.yaml
new file mode 100644
index 0000000..00ac31f
--- /dev/null
+++ b/spark-job/localconfig-auto-discover.yaml
@@ -0,0 +1,45 @@
+# This is how many parts we split the full token range in.
+# Each of these splits is then compared between the clusters
+splits: 10000
+
+# Number of buckets - splits / buckets should be under 100k to avoid wide 
partitions when storing the metadata
+buckets: 100
+
+# global rate limit - this is how many q/s you think the target clusters can 
handle
+rate_limit: 10000
+
+# optional job id - if restarting a job, set the correct job_id here to avoid 
re-diffing old splits
+# job_id: 4e2c6c6b-bed7-4c4e-bd4c-28bef89c3cef
+
+# Fetch size to use for the query fetching the tokens in the cluster
+token_scan_fetch_size: 1000
+# Fetch size to use for the queries fetching the rows of each partition
+partition_read_fetch_size: 1000
+
+read_timeout_millis: 10000
+reverse_read_probability: 0.5
+consistency_level: ALL
+metadata_options:
+  keyspace: cassandradiff
+  replication: "{'class':'SimpleStrategy', 'replication_factor':'1'}"
+  ttl: 31536000
+  should_init: true
+cluster_config:
+  source:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test_1"
+    contact_points: "127.0.0.1"
+    port: "9042"
+    dc: "datacenter1"
+  target:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test_2"
+    contact_points: "127.0.0.1"
+    port: "9043"
+    dc: "datacenter1"
+  metadata:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test"
+    contact_points: "127.0.0.1"
+    port: "9042"
+    dc: "datacenter1"
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java 
b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
index 4a1faa1..ff697d2 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
@@ -19,6 +19,8 @@
 
 package org.apache.cassandra.diff;
 
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -32,6 +34,7 @@ import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,14 +57,14 @@ import org.apache.spark.sql.SparkSession;
 public class DiffJob {
     private static final Logger logger = 
LoggerFactory.getLogger(DiffJob.class);
 
-    public static void main(String ... args) {
+    public static void main(String ... args) throws FileNotFoundException {
         if (args.length == 0) {
             System.exit(-1);
         }
         SparkSession spark = 
SparkSession.builder().appName("cassandra-diff").getOrCreate();
         JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
         String configFile = SparkFiles.get(args[0]);
-        YamlJobConfiguration configuration = 
YamlJobConfiguration.load(configFile);
+        YamlJobConfiguration configuration = YamlJobConfiguration.load(new 
FileInputStream(configFile));
         DiffJob diffJob = new DiffJob();
         diffJob.run(configuration, sc);
         spark.stop();
@@ -87,15 +90,32 @@ public class DiffJob {
         ClusterProvider targetProvider = 
ClusterProvider.getProvider(configuration.clusterConfig("target"), "target");
         String sourcePartitioner;
         String targetPartitioner;
+        List<KeyspaceTablePair> tablesToCompare = 
configuration.filteredKeyspaceTables();
         try (Cluster sourceCluster = sourceProvider.getCluster();
              Cluster targetCluster = targetProvider.getCluster()) {
             sourcePartitioner = sourceCluster.getMetadata().getPartitioner();
             targetPartitioner = targetCluster.getMetadata().getPartitioner();
+
+            if (!sourcePartitioner.equals(targetPartitioner)) {
+                throw new IllegalStateException(String.format("Cluster 
partitioners do not match; Source: %s, Target: %s,",
+                                                              
sourcePartitioner, targetPartitioner));
+            }
+
+            if (configuration.shouldAutoDiscoverTables()) {
+                Schema sourceSchema = new Schema(sourceCluster.getMetadata(), 
configuration);
+                Schema targetSchema = new Schema(targetCluster.getMetadata(), 
configuration);
+                Schema commonSchema = sourceSchema.intersect(targetSchema);
+                if (commonSchema.size() != sourceSchema.size()) {
+                    Pair<Set<KeyspaceTablePair>, Set<KeyspaceTablePair>> 
difference = Schema.difference(sourceSchema, targetSchema);
+                    logger.warn("Found tables that only exist in either source 
or target cluster. Ignoring those tables for comparision. " +
+                                "Distinct tables in source cluster: {}. " +
+                                "Distinct tables in target cluster: {}",
+                                difference.getLeft(), difference.getRight());
+                }
+                tablesToCompare = commonSchema.toQualifiedTableList();
+            }
         }
-        if (!sourcePartitioner.equals(targetPartitioner)) {
-            throw new IllegalStateException(String.format("Cluster 
partitioners do not match; Source: %s, Target: %s,",
-                                                          sourcePartitioner, 
targetPartitioner));
-        }
+
         TokenHelper tokenHelper = 
TokenHelper.forPartitioner(sourcePartitioner);
 
         logger.info("Configuring job metadata store");
@@ -111,7 +131,7 @@ public class DiffJob {
             // Job params, which once a job is created cannot be modified in 
subsequent re-runs
             logger.info("Creating or retrieving job parameters");
             job = new JobMetadataDb.JobLifeCycle(metadataSession, 
metadataOptions.keyspace);
-            Params params = getJobParams(job, configuration);
+            Params params = getJobParams(job, configuration, tablesToCompare);
             logger.info("Job Params: {}", params);
             if (null == params)
                 throw new RuntimeException("Unable to initialize job params");
@@ -178,12 +198,12 @@ public class DiffJob {
         }
     }
 
-    private static Params getJobParams(JobMetadataDb.JobLifeCycle job, 
JobConfiguration conf) {
+    private static Params getJobParams(JobMetadataDb.JobLifeCycle job, 
JobConfiguration conf, List<KeyspaceTablePair> keyspaceTables) {
         if (conf.jobId().isPresent()) {
             return job.getJobParams(conf.jobId().get());
         } else {
             return new Params(UUID.randomUUID(),
-                              conf.keyspaceTables(),
+                              keyspaceTables,
                               conf.buckets(),
                               conf.splits());
         }
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/Schema.java 
b/spark-job/src/main/java/org/apache/cassandra/diff/Schema.java
new file mode 100644
index 0000000..ad9c593
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/Schema.java
@@ -0,0 +1,76 @@
+package org.apache.cassandra.diff;
+
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.TableMetadata;
+
+public class Schema {
+    // Filter out all system keyspaces.
+    private static final Set<String> DEFAULT_FILTER = ImmutableSet.of(
+        "system", "system_schema", "system_traces", "system_auth", 
"system_distributed", "system_virtual_schema", "system_views"
+    );
+
+    private final Set<KeyspaceTablePair> qualifiedTables;
+
+    public Schema(Metadata metadata, JobConfiguration configuration) {
+        Set<String> keyspaceFilter = getKeyspaceFilter(configuration);
+        qualifiedTables = new HashSet<>();
+        for (KeyspaceMetadata keyspaceMetadata : metadata.getKeyspaces()) {
+            if (keyspaceFilter.contains(keyspaceMetadata.getName()))
+                continue;
+
+            for (TableMetadata tableMetadata : keyspaceMetadata.getTables()) {
+                qualifiedTables.add(KeyspaceTablePair.from(tableMetadata));
+            }
+        }
+    }
+
+    public Schema(Set<KeyspaceTablePair> qualifiedTables) {
+        this.qualifiedTables = qualifiedTables;
+    }
+
+    public Schema intersect(Schema other) {
+        if (this != other) {
+            Set<KeyspaceTablePair> intersection = 
Sets.intersection(this.qualifiedTables, other.qualifiedTables);
+            return new Schema(intersection);
+        }
+        return this;
+    }
+
+    public List<KeyspaceTablePair> toQualifiedTableList() {
+        return new ArrayList<>(qualifiedTables);
+    }
+
+    public int size() {
+        return qualifiedTables.size();
+    }
+
+    @VisibleForTesting
+    public static Set<String> getKeyspaceFilter(JobConfiguration 
configuration) {
+        List<String> disallowedKeyspaces = configuration.disallowedKeyspaces();
+        if (null == disallowedKeyspaces) {
+            return DEFAULT_FILTER;
+        } else {
+            return Sets.union(DEFAULT_FILTER, 
ImmutableSet.copyOf(disallowedKeyspaces));
+        }
+    }
+
+    // Returns the distinct tables from the according shcema.
+    // The left contains the distinct tables from schema `first`
+    // The right contains the distinct tables from schema `second`
+    public static Pair<Set<KeyspaceTablePair>, Set<KeyspaceTablePair>> 
difference(Schema first, Schema second) {
+        return Pair.of(Sets.difference(first.qualifiedTables, 
second.qualifiedTables), Sets.difference(second.qualifiedTables, 
first.qualifiedTables));
+    }
+}
diff --git 
a/spark-job/src/test/java/org/apache/cassandra/diff/AbstractMockJobConfiguration.java
 
b/spark-job/src/test/java/org/apache/cassandra/diff/AbstractMockJobConfiguration.java
new file mode 100644
index 0000000..5f9d22a
--- /dev/null
+++ 
b/spark-job/src/test/java/org/apache/cassandra/diff/AbstractMockJobConfiguration.java
@@ -0,0 +1,79 @@
+package org.apache.cassandra.diff;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+// Overrides methods on demand
+public abstract class AbstractMockJobConfiguration implements JobConfiguration 
{
+    private final UnsupportedOperationException uoe = new 
UnsupportedOperationException("Not implemented");
+
+    @Nullable
+    @Override
+    public List<KeyspaceTablePair> keyspaceTables() {
+        throw uoe;
+    }
+
+    @Nullable
+    @Override
+    public List<String> disallowedKeyspaces() {
+        throw uoe;
+    }
+
+    @Override
+    public int splits() {
+        throw uoe;
+    }
+
+    @Override
+    public int buckets() {
+        throw uoe;
+    }
+
+    @Override
+    public int rateLimit() {
+        throw uoe;
+    }
+
+    @Override
+    public Optional<UUID> jobId() {
+        throw uoe;
+    }
+
+    @Override
+    public int tokenScanFetchSize() {
+        throw uoe;
+    }
+
+    @Override
+    public int partitionReadFetchSize() {
+        throw uoe;
+    }
+
+    @Override
+    public int readTimeoutMillis() {
+        throw uoe;
+    }
+
+    @Override
+    public double reverseReadProbability() {
+        throw uoe;
+    }
+
+    @Override
+    public String consistencyLevel() {
+        throw uoe;
+    }
+
+    @Override
+    public MetadataKeyspaceOptions metadataOptions() {
+        throw uoe;
+    }
+
+    @Override
+    public Map<String, String> clusterConfig(String identifier) {
+        throw uoe;
+    }
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/SchemaTest.java 
b/spark-job/src/test/java/org/apache/cassandra/diff/SchemaTest.java
new file mode 100644
index 0000000..17dc67c
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/SchemaTest.java
@@ -0,0 +1,85 @@
+package org.apache.cassandra.diff;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SchemaTest {
+    private static final KeyspaceTablePair kt1 = new KeyspaceTablePair("ks", 
"tbl1");
+    private static final KeyspaceTablePair kt2 = new KeyspaceTablePair("ks", 
"tbl2");
+    private static final KeyspaceTablePair kt3 = new KeyspaceTablePair("ks", 
"tbl3");
+
+    private class MockConfig extends AbstractMockJobConfiguration {
+        private List<String> disallowedKeyspaces;
+
+        MockConfig(List<String> disallowedKeyspaces) {
+            this.disallowedKeyspaces = disallowedKeyspaces;
+        }
+
+        @Nullable
+        @Override
+        public List<String> disallowedKeyspaces() {
+            return disallowedKeyspaces;
+        }
+    }
+
+    @Test
+    public void testIntersectSameInstance() {
+        Schema schemaA = new Schema(new HashSet<>());
+        Schema intersection = schemaA.intersect(schemaA);
+        Assert.assertSame(schemaA, intersection);
+    }
+
+    @Test
+    public void testIntersectIsCommutative() {
+        Schema schemaA = new Schema(ImmutableSet.of(kt1, kt2));
+        Schema schemaB = new Schema(ImmutableSet.of(kt1, kt3));
+        Schema intersection1 = schemaA.intersect(schemaB);
+        Schema intersection2 = schemaB.intersect(schemaA);
+        Assert.assertEquals(1, intersection1.size());
+        Assert.assertEquals(intersection1.toQualifiedTableList(), 
intersection2.toQualifiedTableList());
+    }
+
+    @Test
+    public void testToQualifiedTableList() {
+        Schema schema = new Schema(ImmutableSet.of(kt1, kt2, kt3));
+        Assert.assertEquals(3, schema.size());
+        Assert.assertEquals(ImmutableSet.of(kt1, kt2, kt3), 
ImmutableSet.copyOf(schema.toQualifiedTableList()));
+    }
+
+    @Test
+    public void testGetKeyspaceFilterWithDefault() {
+        MockConfig config = new MockConfig(null);
+        Set<String> filter = Schema.getKeyspaceFilter(config);
+        Assert.assertFalse("Default fileter should not be empty", 
filter.isEmpty());
+    }
+
+    @Test
+    public void testGetKeyspaceFilterWithAdditions() {
+        List<String> disallowed = Arrays.asList("ks1, ks2");
+        MockConfig configWithDefault = new MockConfig(null);
+        MockConfig configWithAddition = new MockConfig(disallowed);
+        Set<String> defaultFilter = 
Schema.getKeyspaceFilter(configWithDefault);
+        Set<String> filter = Schema.getKeyspaceFilter(configWithAddition);
+        Assert.assertFalse("Filter should not be not empty", filter.isEmpty());
+        Assert.assertEquals("Filter with additions should be larger than the 
default", disallowed.size(), filter.size() - defaultFilter.size());
+        disallowed.forEach(ks -> Assert.assertTrue("Filter should contain the 
additional disallowed keyspace.", filter.contains(ks)));
+    }
+
+    @Test
+    public void testSchemaDifference() {
+        Schema first = new Schema(ImmutableSet.of(kt1, kt2));
+        Schema second = new Schema(ImmutableSet.of(kt2, kt3));
+        Pair<Set<KeyspaceTablePair>, Set<KeyspaceTablePair>> difference = 
Schema.difference(first, second);
+        Assert.assertTrue("Should contain the distinct table in first schema", 
difference.getLeft().contains(kt1));
+        Assert.assertTrue("Should contain the distinct table in second 
schema", difference.getRight().contains(kt3));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to