This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a1a6837e0 [HUDI-5088]Fix bug:Failed to synchronize the hive metadata 
of the Flink table (#7056)
7a1a6837e0 is described below

commit 7a1a6837e0c7be2cb401fbe6be8bbbb72064feae
Author: chao chen <59957056+waywt...@users.noreply.github.com>
AuthorDate: Mon Nov 7 10:13:57 2022 +0800

    [HUDI-5088]Fix bug:Failed to synchronize the hive metadata of the Flink 
table (#7056)
    
    * sync `_hoodie_operation` meta field if changelog mode is enabled.
---
 .../main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java  | 8 ++++++--
 .../java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java     | 3 ++-
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
index a057c02f2c..4383b42e9f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -178,9 +179,12 @@ public class HiveSchemaUtils {
   /**
    * Create Hive field schemas from Flink table schema including the hoodie 
metadata fields.
    */
-  public static List<FieldSchema> toHiveFieldSchema(TableSchema schema) {
+  public static List<FieldSchema> toHiveFieldSchema(TableSchema schema, 
boolean withOperationField) {
     List<FieldSchema> columns = new ArrayList<>();
-    for (String metaField : HoodieRecord.HOODIE_META_COLUMNS) {
+    Collection<String> metaFields = withOperationField
+        ? HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION // caution that the 
set may break sequence
+        : HoodieRecord.HOODIE_META_COLUMNS;
+    for (String metaField : metaFields) {
       columns.add(new FieldSchema(metaField, "string", null));
     }
     columns.addAll(createHiveColumns(schema));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index d6e70f16ea..85f82d53d9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -553,7 +553,8 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     // because since Hive 3.x, there is validation when altering table,
     // when the metadata fields are synced through the hive sync tool,
     // a compatability issue would be reported.
-    List<FieldSchema> allColumns = 
HiveSchemaUtils.toHiveFieldSchema(table.getSchema());
+    boolean withOperationField = 
Boolean.parseBoolean(table.getOptions().getOrDefault(FlinkOptions.CHANGELOG_ENABLED.key(),
 "false"));
+    List<FieldSchema> allColumns = 
HiveSchemaUtils.toHiveFieldSchema(table.getSchema(), withOperationField);
 
     // Table columns and partition keys
     CatalogTable catalogTable = (CatalogTable) table;

Reply via email to