This is an automated email from the ASF dual-hosted git repository.
emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris-tools.git
The following commit(s) were added to refs/heads/main by this push:
new bba9a30 Add filter for catalog name to Polaris synchronizer/migrator
(#10)
bba9a30 is described below
commit bba9a30d60e28ac71e9429baf83afac8de5a337d
Author: Mansehaj Singh <[email protected]>
AuthorDate: Fri Apr 25 10:42:47 2025 -0700
Add filter for catalog name to Polaris synchronizer/migrator (#10)
* Add catalog name filter planner
* Added builder for SynchronizationPlan
* Change formatting
* Fixed builder
* Changed builder contract
---
.../polaris/planning/CatalogNameFilterPlanner.java | 82 ++++++++++++++++++++++
.../polaris/planning/SynchronizationPlanner.java | 61 +++++++++++++++-
.../sync/polaris/CatalogNameFilterPlannerTest.java | 75 ++++++++++++++++++++
.../tools/sync/polaris/SyncPolarisCommand.java | 18 +++--
4 files changed, 231 insertions(+), 5 deletions(-)
diff --git
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/CatalogNameFilterPlanner.java
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/CatalogNameFilterPlanner.java
new file mode 100644
index 0000000..b50c823
--- /dev/null
+++
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/CatalogNameFilterPlanner.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.tools.sync.polaris.planning;
+
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.tools.sync.polaris.planning.plan.SynchronizationPlan;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Planner that filters out catalogs from the source and target on the basis
of a RegEx before
+ * they are passed to an encapsulated planner.
+ */
+public class CatalogNameFilterPlanner extends DelegatedPlanner implements
SynchronizationPlanner {
+
+ private final String catalogNameFilterPattern;
+
+ public CatalogNameFilterPlanner(String regex, SynchronizationPlanner
delegate) {
+ super(delegate);
+ this.catalogNameFilterPattern = regex;
+ }
+
+ @Override
+ public SynchronizationPlan<Catalog> planCatalogSync(List<Catalog>
catalogsOnSource, List<Catalog> catalogsOnTarget) {
+ List<Catalog> filteredSourceCatalogs = new ArrayList<>();
+
+ // store the names of the catalogs we skip so that we don't also mark
target catalogs with the same name
+ // twice
+ Map<String, Catalog> skippedSourceCatalogsByName = new HashMap<>();
+
+ for (Catalog catalog : catalogsOnSource) {
+ if (catalog.getName().matches(catalogNameFilterPattern)) {
+ filteredSourceCatalogs.add(catalog);
+ } else {
+ skippedSourceCatalogsByName.put(catalog.getName(), catalog);
+ }
+ }
+
+ List<Catalog> filteredTargetCatalogs = new ArrayList<>();
+
+ List<Catalog> skippedTargetCatalogs = new ArrayList<>();
+
+ for (Catalog catalog : catalogsOnTarget) {
+ if (catalog.getName().matches(catalogNameFilterPattern)) {
+ filteredTargetCatalogs.add(catalog);
+ } else if
(!skippedSourceCatalogsByName.containsKey(catalog.getName())) {
+ // if we already skipped a catalog with the same name on the
source, we don't want to mark it as
+ // skipped again, but we do want to mark catalogs that aren't
on the source but were instead filtered
+ // out solely from the target
+ skippedTargetCatalogs.add(catalog);
+ }
+ }
+
+ SynchronizationPlan<Catalog> delegatedPlan =
+ delegate.planCatalogSync(filteredSourceCatalogs,
filteredTargetCatalogs);
+
+
skippedSourceCatalogsByName.values().forEach(delegatedPlan::skipEntityAndSkipChildren);
+
skippedTargetCatalogs.forEach(delegatedPlan::skipEntityAndSkipChildren);
+
+ return delegatedPlan;
+ }
+}
diff --git
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/SynchronizationPlanner.java
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/SynchronizationPlanner.java
index 93cddab..842cdcf 100644
---
a/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/SynchronizationPlanner.java
+++
b/polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/SynchronizationPlanner.java
@@ -18,6 +18,7 @@
*/
package org.apache.polaris.tools.sync.polaris.planning;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.iceberg.catalog.Namespace;
@@ -35,6 +36,64 @@ import
org.apache.polaris.tools.sync.polaris.planning.plan.SynchronizationPlan;
*/
public interface SynchronizationPlanner {
+ class SynchronizationPlannerBuilder {
+
+ @FunctionalInterface
+ public interface PlannerWrapper {
+
+ /**
+ * Wrap a provided {@link SynchronizationPlanner} by another {@link
SynchronizationPlanner}.
+ * @param planner the planner to wrap
+ * @return a wrapped planner
+ */
+ SynchronizationPlanner wrap(SynchronizationPlanner planner);
+ }
+
+ private final SynchronizationPlanner innermost;
+
+ private final List<PlannerWrapper> plannerWrappers = new ArrayList<>();
+
+ private SynchronizationPlannerBuilder(SourceParitySynchronizationPlanner
innermost) {
+ this.innermost = innermost;
+ }
+
+ /**
+ * Wrap the current chain of planners.
+ * @param outer the planner to wrap by
+ */
+ public SynchronizationPlannerBuilder wrapBy(PlannerWrapper outer) {
+ plannerWrappers.add(outer);
+ return this;
+ }
+
+ /**
+ * Wrap the current chain of planners if the condition is true.
+ * @param condition if true, will wrap the current chain of planners by
the provided outer planner
+ * @param outer the planner to wrap by
+ */
+ public SynchronizationPlannerBuilder conditionallyWrapBy(boolean
condition, PlannerWrapper outer) {
+ if (condition) {
+ plannerWrappers.add(outer);
+ }
+ return this;
+ }
+
+ /**
+ * Build the chained set of planners.
+ */
+ public SynchronizationPlanner build() {
+ SynchronizationPlanner current = innermost;
+ for (PlannerWrapper plannerWrapper : plannerWrappers) {
+ current = plannerWrapper.wrap(current);
+ }
+ return current;
+ }
+ }
+
+ static SynchronizationPlannerBuilder
builder(SourceParitySynchronizationPlanner innermost) {
+ return new SynchronizationPlannerBuilder(innermost);
+ }
+
SynchronizationPlan<Principal> planPrincipalSync(
List<Principal> principalsOnSource, List<Principal>
principalsOnTarget);
@@ -77,4 +136,4 @@ public interface SynchronizationPlanner {
Namespace namespace,
Set<TableIdentifier> tablesOnSource,
Set<TableIdentifier> tablesOnTarget);
-}
+}
\ No newline at end of file
diff --git
a/polaris-synchronizer/api/src/test/java/org/apache/polaris/tools/sync/polaris/CatalogNameFilterPlannerTest.java
b/polaris-synchronizer/api/src/test/java/org/apache/polaris/tools/sync/polaris/CatalogNameFilterPlannerTest.java
new file mode 100644
index 0000000..a2f2f78
--- /dev/null
+++
b/polaris-synchronizer/api/src/test/java/org/apache/polaris/tools/sync/polaris/CatalogNameFilterPlannerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.tools.sync.polaris;
+
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.tools.sync.polaris.planning.CatalogNameFilterPlanner;
+import org.apache.polaris.tools.sync.polaris.planning.NoOpSyncPlanner;
+import org.apache.polaris.tools.sync.polaris.planning.SynchronizationPlanner;
+import org.apache.polaris.tools.sync.polaris.planning.plan.SynchronizationPlan;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public class CatalogNameFilterPlannerTest {
+
+ private static final Catalog CATALOG_1 = new Catalog().name("catalog-1");
+
+ private static final Catalog CATALOG_2 = new Catalog().name("catalog-2");
+
+ @Test
+ public void testFiltersOutCatalog() {
+ SynchronizationPlanner planner = new CatalogNameFilterPlanner(
+ "^catalog-1$", new NoOpSyncPlanner());
+
+ SynchronizationPlan<Catalog> plan
+ = planner.planCatalogSync(List.of(CATALOG_1, CATALOG_2),
List.of());
+
+
Assertions.assertTrue(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_2));
+
Assertions.assertFalse(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_1));
+ }
+
+ @Test
+ public void
onlyMarksSourceCatalogForFilteringWhenCatalogIsOnSourceAndTarget() {
+ SynchronizationPlanner planner = new CatalogNameFilterPlanner(
+ "^something that doesn't match the catalog name$", new
NoOpSyncPlanner());
+
+ SynchronizationPlan<Catalog> plan
+ = planner.planCatalogSync(List.of(CATALOG_1),
List.of(CATALOG_1));
+
+
Assertions.assertTrue(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_1));
+
+ // ensure only marks the source catalog, doesn't also mark target
catalog as well
+ Assertions.assertEquals(1,
plan.entitiesToSkipAndSkipChildren().size());
+ }
+
+ @Test
+ public void marksTargetCatalogWhenSourceCatalogDoesNotExist() {
+ SynchronizationPlanner planner = new CatalogNameFilterPlanner(
+ "^something that doesn't match either catalog name$", new
NoOpSyncPlanner());
+
+ SynchronizationPlan<Catalog> plan
+ = planner.planCatalogSync(List.of(CATALOG_1),
List.of(CATALOG_2));
+
+
Assertions.assertTrue(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_1));
+
Assertions.assertTrue(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_2));
+ }
+}
diff --git
a/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java
b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java
index b3f4cc9..31388eb 100644
---
a/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java
+++
b/polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.polaris.tools.sync.polaris.catalog.ETagManager;
import
org.apache.polaris.tools.sync.polaris.planning.AccessControlAwarePlanner;
+import org.apache.polaris.tools.sync.polaris.planning.CatalogNameFilterPlanner;
import org.apache.polaris.tools.sync.polaris.planning.ModificationAwarePlanner;
import
org.apache.polaris.tools.sync.polaris.planning.SourceParitySynchronizationPlanner;
import org.apache.polaris.tools.sync.polaris.planning.SynchronizationPlanner;
@@ -90,11 +91,20 @@ public class SyncPolarisCommand implements
Callable<Integer> {
)
private boolean haltOnFailure;
+ @CommandLine.Option(
+ names = {"--catalog-name-regex"},
+ description = "If specified, only catalogs with names that match the
provided RegEx will be staged for " +
+ "synchronization. This applies to catalogs on both the
source and target."
+ )
+ private String catalogNameRegex;
+
@Override
public Integer call() throws Exception {
- SynchronizationPlanner sourceParityPlanner = new
SourceParitySynchronizationPlanner();
- SynchronizationPlanner modificationAwareSourceParityPlanner = new
ModificationAwarePlanner(sourceParityPlanner);
- SynchronizationPlanner accessControlAwarePlanner = new
AccessControlAwarePlanner(modificationAwareSourceParityPlanner);
+ SynchronizationPlanner planner = SynchronizationPlanner.builder(new
SourceParitySynchronizationPlanner())
+ .wrapBy(ModificationAwarePlanner::new)
+ .conditionallyWrapBy(catalogNameRegex != null, p -> new
CatalogNameFilterPlanner(catalogNameRegex, p))
+ .wrapBy(AccessControlAwarePlanner::new)
+ .build();
// auto generate omnipotent principals with write access on the target,
read only access on source
sourceProperties.put(PolarisApiService.ICEBERG_WRITE_ACCESS_PROPERTY,
Boolean.toString(false));
@@ -111,7 +121,7 @@ public class SyncPolarisCommand implements
Callable<Integer> {
new PolarisSynchronizer(
consoleLog,
haltOnFailure,
- accessControlAwarePlanner,
+ planner,
source,
target,
etagManager);