This is an automated email from the ASF dual-hosted git repository.

emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris-tools.git


The following commit(s) were added to refs/heads/main by this push:
     new bba9a30  Add filter for catalog name to Polaris synchronizer/migrator 
(#10)
bba9a30 is described below

commit bba9a30d60e28ac71e9429baf83afac8de5a337d
Author: Mansehaj Singh <[email protected]>
AuthorDate: Fri Apr 25 10:42:47 2025 -0700

    Add filter for catalog name to Polaris synchronizer/migrator (#10)
    
    * Add catalog name filter planner
    
    * Added builder for SynchronizationPlan
    
    * Change formatting
    
    * Fixed builder
    
    * Changed builder contract
---
 .../polaris/planning/CatalogNameFilterPlanner.java | 82 ++++++++++++++++++++++
 .../polaris/planning/SynchronizationPlanner.java   | 61 +++++++++++++++-
 .../sync/polaris/CatalogNameFilterPlannerTest.java | 75 ++++++++++++++++++++
 .../tools/sync/polaris/SyncPolarisCommand.java     | 18 +++--
 4 files changed, 231 insertions(+), 5 deletions(-)

diff --git 
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/CatalogNameFilterPlanner.java
 
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/CatalogNameFilterPlanner.java
new file mode 100644
index 0000000..b50c823
--- /dev/null
+++ 
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/CatalogNameFilterPlanner.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.tools.sync.polaris.planning;
+
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.tools.sync.polaris.planning.plan.SynchronizationPlan;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Planner that filters out catalogs from the source and target on the basis 
of a RegEx before
+ * they are passed to an encapsulated planner.
+ */
+public class CatalogNameFilterPlanner extends DelegatedPlanner implements 
SynchronizationPlanner {
+
+    private final String catalogNameFilterPattern;
+
+    public CatalogNameFilterPlanner(String regex, SynchronizationPlanner 
delegate) {
+        super(delegate);
+        this.catalogNameFilterPattern = regex;
+    }
+
+    @Override
+    public SynchronizationPlan<Catalog> planCatalogSync(List<Catalog> 
catalogsOnSource, List<Catalog> catalogsOnTarget) {
+        List<Catalog> filteredSourceCatalogs = new ArrayList<>();
+
+        // store the names of the catalogs we skip so that we don't also mark 
target catalogs with the same name
+        // twice
+        Map<String, Catalog> skippedSourceCatalogsByName = new HashMap<>();
+
+        for (Catalog catalog : catalogsOnSource) {
+            if (catalog.getName().matches(catalogNameFilterPattern)) {
+                filteredSourceCatalogs.add(catalog);
+            } else {
+                skippedSourceCatalogsByName.put(catalog.getName(), catalog);
+            }
+        }
+
+        List<Catalog> filteredTargetCatalogs = new ArrayList<>();
+
+        List<Catalog> skippedTargetCatalogs = new ArrayList<>();
+
+        for (Catalog catalog : catalogsOnTarget) {
+            if (catalog.getName().matches(catalogNameFilterPattern)) {
+                filteredTargetCatalogs.add(catalog);
+            } else if 
(!skippedSourceCatalogsByName.containsKey(catalog.getName())) {
+                // if we already skipped a catalog with the same name on the 
source, we don't want to mark it as
+                // skipped again, but we do want to mark catalogs that aren't 
on the source but were instead filtered
+                // out solely from the target
+                skippedTargetCatalogs.add(catalog);
+            }
+        }
+
+        SynchronizationPlan<Catalog> delegatedPlan =
+                delegate.planCatalogSync(filteredSourceCatalogs, 
filteredTargetCatalogs);
+
+        
skippedSourceCatalogsByName.values().forEach(delegatedPlan::skipEntityAndSkipChildren);
+        
skippedTargetCatalogs.forEach(delegatedPlan::skipEntityAndSkipChildren);
+
+        return delegatedPlan;
+    }
+}
diff --git 
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/SynchronizationPlanner.java
 
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/SynchronizationPlanner.java
index 93cddab..842cdcf 100644
--- 
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/SynchronizationPlanner.java
+++ 
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/SynchronizationPlanner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.polaris.tools.sync.polaris.planning;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import org.apache.iceberg.catalog.Namespace;
@@ -35,6 +36,64 @@ import 
org.apache.polaris.tools.sync.polaris.planning.plan.SynchronizationPlan;
  */
 public interface SynchronizationPlanner {
 
+  class SynchronizationPlannerBuilder {
+
+    @FunctionalInterface
+    public interface PlannerWrapper {
+
+      /**
+       * Wrap a provided {@link SynchronizationPlanner} by another {@link 
SynchronizationPlanner}.
+       * @param planner the planner to wrap
+       * @return a wrapped planner
+       */
+      SynchronizationPlanner wrap(SynchronizationPlanner planner);
+    }
+
+    private final SynchronizationPlanner innermost;
+
+    private final List<PlannerWrapper> plannerWrappers = new ArrayList<>();
+
+    private SynchronizationPlannerBuilder(SourceParitySynchronizationPlanner 
innermost) {
+      this.innermost = innermost;
+    }
+
+    /**
+     * Wrap the current chain of planners.
+     * @param outer the planner to wrap by
+     */
+    public SynchronizationPlannerBuilder wrapBy(PlannerWrapper outer) {
+      plannerWrappers.add(outer);
+      return this;
+    }
+
+    /**
+     * Wrap the current chain of planners if the condition is true.
+     * @param condition if true, will wrap the current chain of planners by 
the provided outer planner
+     * @param outer the planner to wrap by
+     */
+    public SynchronizationPlannerBuilder conditionallyWrapBy(boolean 
condition, PlannerWrapper outer) {
+      if (condition) {
+        plannerWrappers.add(outer);
+      }
+      return this;
+    }
+
+    /**
+     * Build the chained set of planners.
+     */
+    public SynchronizationPlanner build() {
+      SynchronizationPlanner current = innermost;
+      for (PlannerWrapper plannerWrapper : plannerWrappers) {
+        current = plannerWrapper.wrap(current);
+      }
+      return current;
+    }
+  }
+
+  static SynchronizationPlannerBuilder 
builder(SourceParitySynchronizationPlanner innermost) {
+    return new SynchronizationPlannerBuilder(innermost);
+  }
+
   SynchronizationPlan<Principal> planPrincipalSync(
           List<Principal> principalsOnSource, List<Principal> 
principalsOnTarget);
 
@@ -77,4 +136,4 @@ public interface SynchronizationPlanner {
       Namespace namespace,
       Set<TableIdentifier> tablesOnSource,
       Set<TableIdentifier> tablesOnTarget);
-}
+}
\ No newline at end of file
diff --git 
a/polaris-synchronizer/api/src/test/java/org/apache/polaris/tools/sync/polaris/CatalogNameFilterPlannerTest.java
 
b/polaris-synchronizer/api/src/test/java/org/apache/polaris/tools/sync/polaris/CatalogNameFilterPlannerTest.java
new file mode 100644
index 0000000..a2f2f78
--- /dev/null
+++ 
b/polaris-synchronizer/api/src/test/java/org/apache/polaris/tools/sync/polaris/CatalogNameFilterPlannerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.tools.sync.polaris;
+
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.tools.sync.polaris.planning.CatalogNameFilterPlanner;
+import org.apache.polaris.tools.sync.polaris.planning.NoOpSyncPlanner;
+import org.apache.polaris.tools.sync.polaris.planning.SynchronizationPlanner;
+import org.apache.polaris.tools.sync.polaris.planning.plan.SynchronizationPlan;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public class CatalogNameFilterPlannerTest {
+
+    private static final Catalog CATALOG_1 = new Catalog().name("catalog-1");
+
+    private static final Catalog CATALOG_2 = new Catalog().name("catalog-2");
+
+    @Test
+    public void testFiltersOutCatalog() {
+        SynchronizationPlanner planner = new CatalogNameFilterPlanner(
+                "^catalog-1$", new NoOpSyncPlanner());
+
+        SynchronizationPlan<Catalog> plan
+                = planner.planCatalogSync(List.of(CATALOG_1, CATALOG_2), 
List.of());
+
+        
Assertions.assertTrue(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_2));
+        
Assertions.assertFalse(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_1));
+    }
+
+    @Test
+    public void 
onlyMarksSourceCatalogForFilteringWhenCatalogIsOnSourceAndTarget() {
+        SynchronizationPlanner planner = new CatalogNameFilterPlanner(
+                "^something that doesn't match the catalog name$", new 
NoOpSyncPlanner());
+
+        SynchronizationPlan<Catalog> plan
+                = planner.planCatalogSync(List.of(CATALOG_1), 
List.of(CATALOG_1));
+
+        
Assertions.assertTrue(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_1));
+
+        // ensure only marks the source catalog, doesn't also mark target 
catalog as well
+        Assertions.assertEquals(1, 
plan.entitiesToSkipAndSkipChildren().size());
+    }
+
+    @Test
+    public void marksTargetCatalogWhenSourceCatalogDoesNotExist() {
+        SynchronizationPlanner planner = new CatalogNameFilterPlanner(
+                "^something that doesn't match either catalog name$", new 
NoOpSyncPlanner());
+
+        SynchronizationPlan<Catalog> plan
+                = planner.planCatalogSync(List.of(CATALOG_1), 
List.of(CATALOG_2));
+
+        
Assertions.assertTrue(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_1));
+        
Assertions.assertTrue(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_2));
+    }
+}
diff --git 
a/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java
 
b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java
index b3f4cc9..31388eb 100644
--- 
a/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java
+++ 
b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import org.apache.polaris.tools.sync.polaris.catalog.ETagManager;
 import 
org.apache.polaris.tools.sync.polaris.planning.AccessControlAwarePlanner;
+import org.apache.polaris.tools.sync.polaris.planning.CatalogNameFilterPlanner;
 import org.apache.polaris.tools.sync.polaris.planning.ModificationAwarePlanner;
 import 
org.apache.polaris.tools.sync.polaris.planning.SourceParitySynchronizationPlanner;
 import org.apache.polaris.tools.sync.polaris.planning.SynchronizationPlanner;
@@ -90,11 +91,20 @@ public class SyncPolarisCommand implements 
Callable<Integer> {
   )
   private boolean haltOnFailure;
 
+  @CommandLine.Option(
+          names = {"--catalog-name-regex"},
+          description = "If specified, only catalogs with names that match the 
provided RegEx will be staged for " +
+                  "synchronization. This applies to catalogs on both the 
source and target."
+  )
+  private String catalogNameRegex;
+
   @Override
   public Integer call() throws Exception {
-    SynchronizationPlanner sourceParityPlanner = new 
SourceParitySynchronizationPlanner();
-    SynchronizationPlanner modificationAwareSourceParityPlanner = new 
ModificationAwarePlanner(sourceParityPlanner);
-    SynchronizationPlanner accessControlAwarePlanner = new 
AccessControlAwarePlanner(modificationAwareSourceParityPlanner);
+    SynchronizationPlanner planner = SynchronizationPlanner.builder(new 
SourceParitySynchronizationPlanner())
+            .wrapBy(ModificationAwarePlanner::new)
+            .conditionallyWrapBy(catalogNameRegex != null, p -> new 
CatalogNameFilterPlanner(catalogNameRegex, p))
+            .wrapBy(AccessControlAwarePlanner::new)
+            .build();
 
     // auto generate omnipotent principals with write access on the target, 
read only access on source
     sourceProperties.put(PolarisApiService.ICEBERG_WRITE_ACCESS_PROPERTY, 
Boolean.toString(false));
@@ -111,7 +121,7 @@ public class SyncPolarisCommand implements 
Callable<Integer> {
               new PolarisSynchronizer(
                       consoleLog,
                       haltOnFailure,
-                      accessControlAwarePlanner,
+                      planner,
                       source,
                       target,
                       etagManager);

Reply via email to