tomtongue commented on code in PR #8931:
URL: https://github.com/apache/iceberg/pull/8931#discussion_r1375256629
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java:
##########
@@ -108,6 +109,23 @@ public MigrateTableSparkAction backupTableName(String
tableName) {
return this;
}
+ @Override
+ public MigrateTableSparkAction destCatalogName(String catalogName) {
+ CatalogManager catalogManager = spark().sessionState().catalogManager();
+
+ CatalogPlugin catalogPlugin;
+ if (catalogManager.isCatalogRegistered(catalogName)) {
+ catalogPlugin = catalogManager.catalog(catalogName);
+ } else {
+ LOG.warn(
+ "{} doesn't exist in SparkSession. " + "Fallback to current
SparkSession catalog.",
+ catalogName);
+ catalogPlugin = catalogManager.currentCatalog();
+ }
+ this.destCatalog = checkDestinationCatalog(catalogPlugin);
Review Comment:
Thanks for the review, @singhpk234.
Yes that was a problem because in the `migrate`, the source catalog can only
be `SparkSessionCatalog`. Let me elaborate this in addition to what I
investigated in
https://github.com/apache/iceberg/issues/7317#issuecomment-1782418606 (if I'm
wrong, please correct me).
When running `migrate`, [`checkSourceCatalog` in
`BaseTableCreationSparkAction`](https://github.com/apache/iceberg/blob/apache-iceberg-1.4.1/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java#L74)
is called along with [`MigrateTableSparkAction`
initialization](https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java#L65).
The `checkSourceCatalog` comes from the `MigrateTableSparkAction`
implementation, and the
[method](https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java#L198)
checks if the sourceCatalog is SparkSessionCatalog or not.
```java
@Override
protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
// currently the import code relies on being able to look up the table
in the session catalog
Preconditions.checkArgument(
catalog instanceof SparkSessionCatalog,
"Cannot migrate a table from a non-Iceberg Spark Session Catalog.
Found %s of class %s as the source catalog.",
catalog.name(),
catalog.getClass().getName());
return (TableCatalog) catalog;
}
```
So, the sourceCatalog is necessary to be specified the
`SparkSessionCatalog`. And if `SparkSessionCatalog` is set and
`GlueCatalogImpl` is set as its implementation, the `renameTable` operation in
the `migrate` is called via `SparkSessionCatalog`, `GlueCatalogImpl` is not
used in the `renameTable` operation, and then the `migrate` query fails.
For the custom catalog, specifically Glue Data Catalog, as you know, it
originally has the `renameTable` implemenation in Iceberg because the Glue Data
Catalog client doesn't support the table rename. I think if it were possible to
specify the destCatalog to use the destCatalog impl, it could resolve the
restriction of source catalog side impl like Glue Data Catalog.
For reference, I tested the following patterns, but all patterns failed:
### Pattern 1 - Set SparkCatalog (`glue_catalog`) to the sourceTable
SparkSession config:
```scala
val spark: SparkSession = SparkSession.builder
.config("spark.sql.catalog.glue_catalog",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.glue_catalog.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.catalog.glue_catalog.warehouse", warehouse)
.config("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
```
Migrate query:
```scala
spark.sql(
s"""
CALL glue_catalog.system.migrate (
table => 'glue_catalog.db.table'
)
""")
```
Error (partial stacktrace):
```
Exception in thread "main" java.lang.IllegalArgumentException: Cannot
migrate a table from a non-Iceberg Spark Session Catalog. Found glue_catalog of
class org.apache.iceberg.spark.SparkCatalog as the source catalog.
at
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:445)
at
org.apache.iceberg.spark.actions.MigrateTableSparkAction.checkSourceCatalog(MigrateTableSparkAction.java:200)
at
org.apache.iceberg.spark.actions.BaseTableCreationSparkAction.<init>(BaseTableCreationSparkAction.java:74)
at
org.apache.iceberg.spark.actions.MigrateTableSparkAction.<init>(MigrateTableSparkAction.java:65)
at
org.apache.iceberg.spark.actions.SparkActions.migrateTable(SparkActions.java:67)
at
org.apache.iceberg.spark.procedures.MigrateTableProcedure.call(MigrateTableProcedure.java:98)
at
org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
...
```
### Pattern 1' - Set SparkCatalog (`glue_catalog`) to the sourceTable and
specify SparkCatalog for Procedure call
The same result was obtained as "Pattern 1" because the catalog name for
procedure call doesn't affect the source/destination catalog.
SparkSession config: omitted. The same configuration was used as "Pattern 1".
Migrate query:
```scala
spark.sql(
s"""
CALL spark_catalog.system.migrate (
table => 'glue_catalog.db.table'
)
```
Error: omitted. The same error was obtained as "Pattern 1".
### Pattern 2 - Set `SparkSessionCatalog` with GlueCatalogImpl to the
sourceTable
SparkSession config:
```scala
val spark: SparkSession = SparkSession.builder
.config("spark.sql.catalog.glue_catalog",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.glue_catalog.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.catalog.glue_catalog.warehouse", warehouse)
.config("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
```
Migrate query:
```scala
spark.sql(
s"""
CALL glue_catalog.system.migrate (
table => 'db.table'
)
""")
/* Or,
CALL glue_catalog.system.migrate (
table => 'spark_catalog.db.table'
)
*/
```
Error (partial stacktrace):
```
Exception in thread "main" org.apache.spark.sql.AnalysisException:
java.lang.UnsupportedOperationException: Table rename is not supported
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:133)
at
org.apache.spark.sql.hive.HiveExternalCatalog.renameTable(HiveExternalCatalog.scala:566)
at
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.renameTable(ExternalCatalogWithListener.scala:110)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.renameTable(SessionCatalog.scala:803)
at
org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.renameTable(V2SessionCatalog.scala:216)
at
org.apache.iceberg.spark.SparkSessionCatalog.renameTable(SparkSessionCatalog.java:301)
at
org.apache.iceberg.spark.actions.MigrateTableSparkAction.renameAndBackupSourceTable(MigrateTableSparkAction.java:212)
at
org.apache.iceberg.spark.actions.MigrateTableSparkAction.doExecute(MigrateTableSparkAction.java:123)
at
org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:59)
at
org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:51)
at
org.apache.iceberg.spark.actions.BaseSparkAction.withJobGroupInfo(BaseSparkAction.java:132)
at
org.apache.iceberg.spark.actions.MigrateTableSparkAction.execute(MigrateTableSparkAction.java:115)
at
org.apache.iceberg.spark.procedures.MigrateTableProcedure.call(MigrateTableProcedure.java:108)
at
org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
...
Caused by: java.lang.UnsupportedOperationException: Table rename is not
supported
at
com.amazonaws.glue.catalog.metastore.GlueMetastoreClientDelegate.alterTable(GlueMetastoreClientDelegate.java:539)
at
com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.alter_table_with_environmentContext(AWSCatalogMetastoreClient.java:417)
at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:648)
at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:630)
...
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.sql.hive.client.Shim_v2_1.alterTable(HiveShim.scala:1624)
...
at
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$renameTable$1(HiveExternalCatalog.scala:588)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:104)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]