This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 0400f5dead Spark: Explicitly disallow migrating bucketed tables
(#15429)
0400f5dead is described below
commit 0400f5dead809a386637cefa35bde24d6ef2fd72
Author: Rui Li <[email protected]>
AuthorDate: Wed Mar 18 10:45:46 2026 +0800
Spark: Explicitly disallow migrating bucketed tables (#15429)
---
.../spark/extensions/TestMigrateTableProcedure.java | 17 +++++++++++++++++
.../spark/actions/BaseTableCreationSparkAction.java | 4 ++++
.../spark/extensions/TestMigrateTableProcedure.java | 17 +++++++++++++++++
.../spark/actions/BaseTableCreationSparkAction.java | 4 ++++
.../spark/extensions/TestMigrateTableProcedure.java | 17 +++++++++++++++++
.../spark/actions/BaseTableCreationSparkAction.java | 4 ++++
.../spark/extensions/TestMigrateTableProcedure.java | 17 +++++++++++++++++
.../spark/actions/BaseTableCreationSparkAction.java | 4 ++++
8 files changed, 84 insertions(+)
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index a9e309ddea..d26e4f35bc 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -288,4 +288,21 @@ public class TestMigrateTableProcedure extends
ExtensionsTestBase {
ImmutableList.of(row("a", 1L), row("b", 2L)),
sql("SELECT * FROM %s ORDER BY id", tableName));
}
+
+ @TestTemplate
+ public void testMigrateBucketedTable() throws IOException {
+ assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");
+ String location = Files.createTempDirectory(temp,
"junit").toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet "
+ + "CLUSTERED BY (id) INTO 4 BUCKETS LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ assertThatThrownBy(() -> sql("CALL %s.system.migrate('%s')", catalogName,
tableName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot create an Iceberg table from a bucketed source table: "
+ + "4 buckets, bucket columns: [id]");
+ }
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
index 0ca73bef4d..37e1ec4ce7 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
@@ -134,6 +134,10 @@ abstract class BaseTableCreationSparkAction<ThisT> extends
BaseSparkAction<ThisT
Preconditions.checkArgument(
!sourceCatalogTable.storage().locationUri().isEmpty(),
"Cannot create an Iceberg table from a source without an explicit
location");
+ Preconditions.checkArgument(
+ sourceCatalogTable.bucketSpec().isEmpty(),
+ "Cannot create an Iceberg table from a bucketed source table: %s",
+ (Object) sourceCatalogTable.bucketSpec().getOrElse(() -> null));
}
protected StagingTableCatalog checkDestinationCatalog(CatalogPlugin catalog)
{
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index c189311a20..8c22aaeaba 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -286,4 +286,21 @@ public class TestMigrateTableProcedure extends
ExtensionsTestBase {
ImmutableList.of(row("a", 1L), row("b", 2L)),
sql("SELECT * FROM %s ORDER BY id", tableName));
}
+
+ @TestTemplate
+ public void testMigrateBucketedTable() throws IOException {
+ assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");
+ String location = Files.createTempDirectory(temp,
"junit").toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet "
+ + "CLUSTERED BY (id) INTO 4 BUCKETS LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ assertThatThrownBy(() -> sql("CALL %s.system.migrate('%s')", catalogName,
tableName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot create an Iceberg table from a bucketed source table: "
+ + "4 buckets, bucket columns: [id]");
+ }
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
index 0ca73bef4d..37e1ec4ce7 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
@@ -134,6 +134,10 @@ abstract class BaseTableCreationSparkAction<ThisT> extends
BaseSparkAction<ThisT
Preconditions.checkArgument(
!sourceCatalogTable.storage().locationUri().isEmpty(),
"Cannot create an Iceberg table from a source without an explicit
location");
+ Preconditions.checkArgument(
+ sourceCatalogTable.bucketSpec().isEmpty(),
+ "Cannot create an Iceberg table from a bucketed source table: %s",
+ (Object) sourceCatalogTable.bucketSpec().getOrElse(() -> null));
}
protected StagingTableCatalog checkDestinationCatalog(CatalogPlugin catalog)
{
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 9246671c00..ec2b9564da 100644
---
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -288,4 +288,21 @@ public class TestMigrateTableProcedure extends
ExtensionsTestBase {
ImmutableList.of(row("a", 1L), row("b", 2L)),
sql("SELECT * FROM %s ORDER BY id", tableName));
}
+
+ @TestTemplate
+ public void testMigrateBucketedTable() throws IOException {
+ assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");
+ String location = Files.createTempDirectory(temp,
"junit").toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet "
+ + "CLUSTERED BY (id) INTO 4 BUCKETS LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ assertThatThrownBy(() -> sql("CALL %s.system.migrate('%s')", catalogName,
tableName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot create an Iceberg table from a bucketed source table: "
+ + "4 buckets, bucket columns: [id]");
+ }
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
index 0ca73bef4d..37e1ec4ce7 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
@@ -134,6 +134,10 @@ abstract class BaseTableCreationSparkAction<ThisT> extends
BaseSparkAction<ThisT
Preconditions.checkArgument(
!sourceCatalogTable.storage().locationUri().isEmpty(),
"Cannot create an Iceberg table from a source without an explicit
location");
+ Preconditions.checkArgument(
+ sourceCatalogTable.bucketSpec().isEmpty(),
+ "Cannot create an Iceberg table from a bucketed source table: %s",
+ (Object) sourceCatalogTable.bucketSpec().getOrElse(() -> null));
}
protected StagingTableCatalog checkDestinationCatalog(CatalogPlugin catalog)
{
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 9246671c00..60645e1c35 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -288,4 +288,21 @@ public class TestMigrateTableProcedure extends
ExtensionsTestBase {
ImmutableList.of(row("a", 1L), row("b", 2L)),
sql("SELECT * FROM %s ORDER BY id", tableName));
}
+
+ @TestTemplate
+ public void testMigrateBucketedTable() throws IOException {
+ assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");
+ String location = Files.createTempDirectory(temp,
"junit").toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet "
+ + "CLUSTERED BY (id) INTO 4 BUCKETS LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ assertThatThrownBy(() -> scalarSql("CALL %s.system.migrate('%s')",
catalogName, tableName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot create an Iceberg table from a bucketed source table: "
+ + "4 buckets, bucket columns: [id]");
+ }
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
index 0ca73bef4d..37e1ec4ce7 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
@@ -134,6 +134,10 @@ abstract class BaseTableCreationSparkAction<ThisT> extends
BaseSparkAction<ThisT
Preconditions.checkArgument(
!sourceCatalogTable.storage().locationUri().isEmpty(),
"Cannot create an Iceberg table from a source without an explicit
location");
+ Preconditions.checkArgument(
+ sourceCatalogTable.bucketSpec().isEmpty(),
+ "Cannot create an Iceberg table from a bucketed source table: %s",
+ (Object) sourceCatalogTable.bucketSpec().getOrElse(() -> null));
}
protected StagingTableCatalog checkDestinationCatalog(CatalogPlugin catalog)
{