kbendick commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r944967219


##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object 
maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another 
catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the 
migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables 
required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are 
chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to 
be migrated
+   * @param maxConcurrentMigrates Size of the thread pool used for migrate 
tables (If set to 0, no
+   *     thread pool is used)
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrates) {
+    validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+    List<TableIdentifier> identifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      // fetch all the table identifiers from all the namespaces.
+      List<Namespace> namespaces =
+          (sourceCatalog instanceof SupportsNamespaces)
+              ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+              : ImmutableList.of(Namespace.empty());
+      identifiers =
+          namespaces.stream()
+              .flatMap(namespace -> 
sourceCatalog.listTables(namespace).stream())
+              .collect(Collectors.toList());
+    } else {
+      identifiers = tableIdentifiers;
+    }
+
+    ExecutorService executorService = null;
+    if (maxConcurrentMigrates > 0) {
+      executorService = ThreadPools.newWorkerPool("migrate-tables", 
maxConcurrentMigrates);
+    }

Review Comment:
   You'll probably want to add the option to allow the user to pass in an 
executor service, like is done in several other actions etc.
   
   If this is just a temporary utility -- or I should say isn't intended to be 
used from Flink ever (which was the motivator for allowing passing in executor 
services) -- then it might not be an issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to