This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 7a1a6837e0 [HUDI-5088]Fix bug:Failed to synchronize the hive metadata of the Flink table (#7056) 7a1a6837e0 is described below commit 7a1a6837e0c7be2cb401fbe6be8bbbb72064feae Author: chao chen <59957056+waywt...@users.noreply.github.com> AuthorDate: Mon Nov 7 10:13:57 2022 +0800 [HUDI-5088]Fix bug:Failed to synchronize the hive metadata of the Flink table (#7056) * sync `_hoodie_operation` meta field if changelog mode is enabled. --- .../main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java | 8 ++++++-- .../java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java | 3 ++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java index a057c02f2c..4383b42e9f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; @@ -178,9 +179,12 @@ public class HiveSchemaUtils { /** * Create Hive field schemas from Flink table schema including the hoodie metadata fields. */ - public static List<FieldSchema> toHiveFieldSchema(TableSchema schema) { + public static List<FieldSchema> toHiveFieldSchema(TableSchema schema, boolean withOperationField) { List<FieldSchema> columns = new ArrayList<>(); - for (String metaField : HoodieRecord.HOODIE_META_COLUMNS) { + Collection<String> metaFields = withOperationField + ? HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION // caution that the set may break sequence + : HoodieRecord.HOODIE_META_COLUMNS; + for (String metaField : metaFields) { columns.add(new FieldSchema(metaField, "string", null)); } columns.addAll(createHiveColumns(schema)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index d6e70f16ea..85f82d53d9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -553,7 +553,8 @@ public class HoodieHiveCatalog extends AbstractCatalog { // because since Hive 3.x, there is validation when altering table, // when the metadata fields are synced through the hive sync tool, // a compatability issue would be reported. - List<FieldSchema> allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema()); + boolean withOperationField = Boolean.parseBoolean(table.getOptions().getOrDefault(FlinkOptions.CHANGELOG_ENABLED.key(), "false")); + List<FieldSchema> allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema(), withOperationField); // Table columns and partition keys CatalogTable catalogTable = (CatalogTable) table;