ajantha-bhat commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r945087226
##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object
maybeConfigurable, Object conf) {
setConf.invoke(conf);
}
+
+ /**
+ * Migrates tables from one catalog(source catalog) to another
catalog(target catalog).
+ *
+ * <p>Supports bulk migrations with a multi-thread execution. Once the
migration is success, table
+ * would be dropped from the source catalog.
+ *
+ * @param tableIdentifiers a list of {@link TableIdentifier} for the tables
required to be
+ * migrated. If not specified, all the tables would be migrated.
+ * @param sourceCatalog Source {@link Catalog} from which the tables are
chosen
+ * @param targetCatalog Target {@link Catalog} to which the tables need to
be migrated
+ * @param maxConcurrentMigrates Size of the thread pool used for migrate
tables (If set to 0, no
+ * thread pool is used)
+ * @return Collection of table identifiers for successfully migrated tables
+ */
+ public static Collection<TableIdentifier> migrateTables(
+ List<TableIdentifier> tableIdentifiers,
+ Catalog sourceCatalog,
+ Catalog targetCatalog,
+ int maxConcurrentMigrates) {
+ validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+ List<TableIdentifier> identifiers;
+ if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+ // fetch all the table identifiers from all the namespaces.
+ List<Namespace> namespaces =
+ (sourceCatalog instanceof SupportsNamespaces)
+ ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+ : ImmutableList.of(Namespace.empty());
+ identifiers =
+ namespaces.stream()
+ .flatMap(namespace ->
sourceCatalog.listTables(namespace).stream())
+ .collect(Collectors.toList());
+ } else {
+ identifiers = tableIdentifiers;
+ }
+
+ ExecutorService executorService = null;
+ if (maxConcurrentMigrates > 0) {
+ executorService = ThreadPools.newWorkerPool("migrate-tables",
maxConcurrentMigrates);
+ }
Review Comment:
Just like other `CatalogUtil` API, this migrate API was intended to work by
not depending on any engines.
If we need engine-specific implementation, we can have a spark action or
flink action later on.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]