This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f25a8b5046 [hive] Sync unset table props with hms (#5616)
f25a8b5046 is described below
commit f25a8b5046f20b91e6e5fd59e71f73d65318e521
Author: Zouxxyy <[email protected]>
AuthorDate: Mon May 19 10:04:17 2025 +0800
[hive] Sync unset table props with hms (#5616)
---
.../java/org/apache/paimon/hive/HiveCatalog.java | 18 +++++++++++----
.../apache/paimon/spark/PaimonHiveTestBase.scala | 12 +++++++++-
.../spark/sql/DDLWithHiveCatalogTestBase.scala | 27 ++++++++++++++++------
3 files changed, 44 insertions(+), 13 deletions(-)
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 8362c2a94d..bbcd645f20 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -1174,17 +1174,24 @@ public class HiveCatalog extends AbstractCatalog {
if (!DEFAULT_MAIN_BRANCH.equals(identifier.getBranchNameOrDefault())) {
return;
}
+
+ Set<String> removedOptions =
+ changes.stream()
+ .filter(c -> c instanceof SchemaChange.RemoveOption)
+ .map(c -> ((SchemaChange.RemoveOption) c).key())
+ .collect(Collectors.toSet());
try {
- alterTableToHms(table, identifier, schema);
+ alterTableToHms(table, identifier, schema, removedOptions);
} catch (Exception te) {
schemaManager.deleteSchema(schema.id());
throw new RuntimeException(te);
}
}
- private void alterTableToHms(Table table, Identifier identifier,
TableSchema newSchema)
+ private void alterTableToHms(
+ Table table, Identifier identifier, TableSchema newSchema,
Set<String> removedOptions)
throws TException, InterruptedException {
- updateHmsTablePars(table, newSchema);
+ updateHmsTablePars(table, newSchema, removedOptions);
Path location = getTableLocation(identifier, table);
// file format is null, because only data table support alter table.
updateHmsTable(table, identifier, newSchema, null, location);
@@ -1269,7 +1276,7 @@ public class HiveCatalog extends AbstractCatalog {
identifier.getFullName());
if (!newTable.getSd().getCols().equals(table.getSd().getCols())
||
!newTable.getParameters().equals(table.getParameters())) {
- alterTableToHms(table, identifier, tableSchema);
+ alterTableToHms(table, identifier, tableSchema,
Collections.emptySet());
}
} catch (TableNotExistException e) {
// hive table does not exist.
@@ -1541,7 +1548,7 @@ public class HiveCatalog extends AbstractCatalog {
locationHelper.specifyTableLocation(table, location.toString());
}
- private void updateHmsTablePars(Table table, TableSchema schema) {
+ private void updateHmsTablePars(Table table, TableSchema schema,
Set<String> removedOptions) {
if (syncAllProperties()) {
table.getParameters().putAll(schema.options());
table.getParameters().putAll(convertToPropertiesTableKey(schema));
@@ -1549,6 +1556,7 @@ public class HiveCatalog extends AbstractCatalog {
table.getParameters()
.putAll(convertToPropertiesPrefixKey(schema.options(),
HIVE_PREFIX));
}
+ removedOptions.forEach(table.getParameters()::remove);
}
private Map<String, String> convertToPropertiesTableKey(TableSchema
tableSchema) {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
index 2d482a98da..ee423a0a59 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
@@ -18,10 +18,12 @@
package org.apache.paimon.spark
-import org.apache.paimon.hive.TestHiveMetastore
+import org.apache.paimon.catalog.{Catalog, DelegateCatalog}
+import org.apache.paimon.hive.{HiveCatalog, TestHiveMetastore}
import org.apache.paimon.table.FileStoreTable
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.metastore.IMetaStoreClient
import org.apache.spark.SparkConf
import org.apache.spark.sql.paimon.Utils
@@ -85,6 +87,14 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {
override def loadTable(tableName: String): FileStoreTable = {
loadTable(hiveDbName, tableName)
}
+
+ def getHmsClient(paimonCatalog: Catalog): IMetaStoreClient = {
+ DelegateCatalog.rootCatalog(paimonCatalog) match {
+ case hiveCatalog: HiveCatalog => hiveCatalog.getHmsClient
+ case other =>
+ throw new IllegalArgumentException(s"Unsupported catalog type:
${other.getClass.getName}")
+ }
+ }
}
object PaimonHiveTestBase {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
index 0c3db9a20d..87b33e59b9 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
@@ -18,9 +18,7 @@
package org.apache.paimon.spark.sql
-import org.apache.paimon.catalog.DelegateCatalog
import org.apache.paimon.fs.Path
-import org.apache.paimon.hive.HiveCatalog
import org.apache.paimon.spark.PaimonHiveTestBase
import org.apache.paimon.table.FileStoreTable
@@ -376,11 +374,7 @@ abstract class DDLWithHiveCatalogTestBase extends
PaimonHiveTestBase {
spark.sql(s"show partitions $tblName"),
Seq(Row("pt=1"), Row("pt=2"), Row("pt=3")))
// check partitions in HMS
- var catalog = paimonCatalog
- while (catalog.isInstanceOf[DelegateCatalog]) {
- catalog = catalog.asInstanceOf[DelegateCatalog].wrapped()
- }
- val hmsClient = catalog.asInstanceOf[HiveCatalog].getHmsClient
+ val hmsClient = getHmsClient(paimonCatalog)
assert(hmsClient.listPartitions(dbName, tblName, 100).size() ==
3)
// check partitions in filesystem
if (dataFilePathDir.isEmpty) {
@@ -669,6 +663,25 @@ abstract class DDLWithHiveCatalogTestBase extends
PaimonHiveTestBase {
}
}
+ test("Paimon DDL with hive catalog: sync unset table props to HMS") {
+ spark.sql(s"USE $paimonHiveCatalogName")
+ withDatabase("paimon_db") {
+ spark.sql(s"CREATE DATABASE IF NOT EXISTS paimon_db")
+ spark.sql(s"USE paimon_db")
+ withTable("t") {
+ spark.sql("CREATE TABLE t (id INT) USING paimon")
+
+ val hmsClient = getHmsClient(paimonCatalog)
+ spark.sql("ALTER TABLE t SET TBLPROPERTIES ('write-buffer-spillable' =
'true')")
+ assert(
+ hmsClient.getTable("paimon_db",
"t").getParameters.containsKey("write-buffer-spillable"))
+ spark.sql("ALTER TABLE t UNSET TBLPROPERTIES
('write-buffer-spillable')")
+ assert(
+ !hmsClient.getTable("paimon_db",
"t").getParameters.containsKey("write-buffer-spillable"))
+ }
+ }
+ }
+
def getDatabaseProp(dbName: String, propertyName: String): String = {
spark
.sql(s"DESC DATABASE EXTENDED $dbName")