This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 10a54d3ea4 [spark] Throw exception when to sort compact data evolution
table (#7119)
10a54d3ea4 is described below
commit 10a54d3ea48c09ea18276712100a239bddd9b997
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jan 26 17:49:00 2026 +0800
[spark] Throw exception when to sort compact data evolution table (#7119)
---
.../java/org/apache/paimon/schema/SchemaValidation.java | 3 +++
.../java/org/apache/paimon/flink/action/CompactAction.java | 1 -
.../org/apache/paimon/flink/action/SortCompactAction.java | 5 ++++-
.../apache/paimon/flink/action/CompactActionITCase.java | 2 +-
.../apache/paimon/spark/procedure/CompactProcedure.java | 3 +++
.../org/apache/paimon/spark/sql/RowTrackingTestBase.scala | 14 ++++++++++++++
6 files changed, 25 insertions(+), 3 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 3817cdef08..fd07bcef17 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -599,6 +599,9 @@ public class SchemaValidation {
checkArgument(
!options.deletionVectorsEnabled(),
"Data evolution config must disabled with
deletion-vectors.enabled");
+ checkArgument(
+ !options.clusteringIncrementalEnabled(),
+ "Data evolution config must disabled with
clustering.incremental");
}
Pair<RowType, RowType> normalAndBlobType =
BlobType.splitBlob(schema.logicalRowType());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index df7fd0227e..3be60281c0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -144,7 +144,6 @@ public class CompactAction extends TableActionBase {
if (fileStoreTable.coreOptions().bucket() ==
BucketMode.POSTPONE_BUCKET) {
buildForPostponeBucketCompaction(env, fileStoreTable, isStreaming);
} else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
-
if (fileStoreTable.coreOptions().dataEvolutionEnabled()) {
buildForDataEvolutionTableCompact(env, fileStoreTable,
isStreaming);
} else if
(fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index 69d07d238b..4df0bf1b37 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -56,7 +56,6 @@ public class SortCompactAction extends CompactAction {
Map<String, String> catalogConfig,
Map<String, String> tableConf) {
super(database, tableName, catalogConfig, tableConf);
-
table =
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
}
@@ -77,6 +76,10 @@ public class SortCompactAction extends CompactAction {
}
FileStoreTable fileStoreTable = (FileStoreTable) table;
+ if (fileStoreTable.coreOptions().dataEvolutionEnabled()) {
+ throw new UnsupportedOperationException("Data Evolution table
cannot be sorted!");
+ }
+
if (fileStoreTable.bucketMode() != BucketMode.BUCKET_UNAWARE
&& fileStoreTable.bucketMode() != BucketMode.HASH_DYNAMIC) {
throw new IllegalArgumentException("Sort Compact only supports
bucket=-1 yet.");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 25af8ca231..cb7cf87f60 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -849,7 +849,7 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
FileStoreTable table =
prepareTable(
- Arrays.asList("k"),
+ Collections.singletonList("k"),
Collections.emptyList(),
Collections.emptyList(),
tableOptions);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 7785735d04..316e65f3d1 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -608,6 +608,9 @@ public class CompactProcedure extends BaseProcedure {
List<String> sortColumns,
DataSourceV2Relation relation,
@Nullable PartitionPredicate partitionPredicate) {
+ if (table.coreOptions().dataEvolutionEnabled()) {
+ throw new UnsupportedOperationException("Data Evolution table
cannot be sorted!");
+ }
SnapshotReader snapshotReader = table.newSnapshotReader();
if (partitionPredicate != null) {
snapshotReader.withPartitionFilter(partitionPredicate);
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index e227604759..83dedc96dd 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -651,6 +651,20 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
}
}
+ test("Data Evolution: compact sort throw exception") {
+ withTable("s", "t") {
+ sql(
+ s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+ sql("INSERT INTO t VALUES (1, 1, 1)")
+ assert(
+ intercept[Exception](
+ sql("CALL sys.compact(table => 't', order_strategy => 'order',
order_by => 'id')")
+ .collect()).getMessage
+ .contains("Data Evolution table cannot be sorted!"))
+
+ }
+ }
+
test("Data Evolution: test global indexed column update action -- throw
error") {
withTable("T") {
spark.sql("""