xushiyan commented on code in PR #8775:
URL: https://github.com/apache/hudi/pull/8775#discussion_r1201544493


##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -280,83 +282,87 @@ protected void syncHoodieTable(String tableName, boolean 
useRealtimeInputFormat,
       partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, 
droppedPartitions);
     }
 
-    boolean meetSyncConditions = schemaChanged || partitionsChanged;
+    boolean meetSyncConditions = schemaChanged || propertiesChanged || 
partitionsChanged;
     if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) {
       syncClient.updateLastCommitTimeSynced(tableName);
     }
     LOG.info("Sync complete for " + tableName);
   }
 
-  /**
-   * Get the latest schema from the last commit and check if its in sync with 
the hive table schema. If not, evolves the
-   * table schema.
-   *
-   * @param tableExists does table exist
-   * @param schema      extracted schema
-   */
-  private boolean syncSchema(String tableName, boolean tableExists, boolean 
useRealTimeInputFormat,
-      boolean readAsOptimized, MessageType schema) {
-    // Append spark table properties & serde properties
+  private Map<String, String> getTableProperties(MessageType schema) {
     Map<String, String> tableProperties = 
ConfigUtils.toMap(config.getString(HIVE_TABLE_PROPERTIES));
-    Map<String, String> serdeProperties = 
ConfigUtils.toMap(config.getString(HIVE_TABLE_SERDE_PROPERTIES));
     if (config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) {
       Map<String, String> sparkTableProperties = 
SparkDataSourceTableUtils.getSparkTableProperties(config.getSplitStrings(META_SYNC_PARTITION_FIELDS),
           config.getStringOrDefault(META_SYNC_SPARK_VERSION), 
config.getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), schema);
-      Map<String, String> sparkSerdeProperties = 
SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, 
config.getString(META_SYNC_BASE_PATH));
       tableProperties.putAll(sparkTableProperties);
+    }
+    return tableProperties;
+  }
+
+  private Map<String, String> getSerdeProperties(boolean readAsOptimized) {
+    Map<String, String> serdeProperties = 
ConfigUtils.toMap(config.getString(HIVE_TABLE_SERDE_PROPERTIES));
+    if (config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) {
+      Map<String, String> sparkSerdeProperties = 
SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, 
config.getString(META_SYNC_BASE_PATH));
       serdeProperties.putAll(sparkSerdeProperties);
     }
-    boolean schemaChanged = false;
-    // Check and sync schema
-    if (!tableExists) {
-      LOG.info("Hive table {} is not found. Creating it with schema {}.", 
tableName, schema);
-      HoodieFileFormat baseFileFormat = 
HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
-      String inputFormatClassName = 
HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, 
useRealTimeInputFormat);
-
-      if (baseFileFormat.equals(HoodieFileFormat.PARQUET) && 
config.getBooleanOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT)) {
-        // Parquet input format had an InputFormat class visible under the old 
naming scheme.
-        inputFormatClassName = useRealTimeInputFormat
-            ? 
com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
-            : com.uber.hoodie.hadoop.HoodieInputFormat.class.getName();
-      }
+    return serdeProperties;
+  }
 
-      String outputFormatClassName = 
HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat);
-      String serDeFormatClassName = 
HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
+  private void syncFirstTime(String tableName, boolean useRealTimeInputFormat, 
boolean readAsOptimized, MessageType schema) {
+    LOG.info("Sync table {} for the first time.", tableName);
+    HoodieFileFormat baseFileFormat = 
HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
+    String inputFormatClassName = getInputFormatClassName(baseFileFormat, 
useRealTimeInputFormat, 
config.getBooleanOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT));
+    String outputFormatClassName = getOutputFormatClassName(baseFileFormat);
+    String serDeFormatClassName = getSerDeClassName(baseFileFormat);
+    Map<String, String> serdeProperties = getSerdeProperties(readAsOptimized);
+    Map<String, String> tableProperties = getTableProperties(schema);
+
+    // Custom serde will not work with ALTER TABLE REPLACE COLUMNS
+    // 
https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
+    // /ql/exec/DDLTask.java#L3488
+    syncClient.createTable(tableName, schema, inputFormatClassName,
+        outputFormatClassName, serDeFormatClassName, serdeProperties, 
tableProperties);
+  }
 
-      // Custom serde will not work with ALTER TABLE REPLACE COLUMNS
-      // 
https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
-      // /ql/exec/DDLTask.java#L3488
-      syncClient.createTable(tableName, schema, inputFormatClassName,
-          outputFormatClassName, serDeFormatClassName, serdeProperties, 
tableProperties);
-      schemaChanged = true;
+  private boolean syncSchema(String tableName, MessageType schema) {
+    boolean schemaChanged = false;
+
+    // Check if the table schema has evolved
+    Map<String, String> tableSchema = syncClient.getMetastoreSchema(tableName);
+    SchemaDifference schemaDiff = getSchemaDifference(schema, tableSchema,
+        config.getSplitStrings(META_SYNC_PARTITION_FIELDS),
+        config.getBooleanOrDefault(HIVE_SUPPORT_TIMESTAMP_TYPE));
+    if (schemaDiff.isEmpty()) {
+      LOG.info("No Schema difference for {}\nMessageType: {}", tableName, 
schema);
     } else {
-      // Check if the table schema has evolved
-      Map<String, String> tableSchema = 
syncClient.getMetastoreSchema(tableName);
-      SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, 
tableSchema, config.getSplitStrings(META_SYNC_PARTITION_FIELDS),
-          config.getBooleanOrDefault(HIVE_SUPPORT_TIMESTAMP_TYPE));
-      if (!schemaDiff.isEmpty()) {
-        LOG.info("Schema difference found for {}. Updated schema: {}", 
tableName, schema);
-        syncClient.updateTableSchema(tableName, schema);
-        // Sync the table properties if the schema has changed
-        if (config.getString(HIVE_TABLE_PROPERTIES) != null || 
config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) {
-          syncClient.updateTableProperties(tableName, tableProperties);
-          syncClient.updateSerdeProperties(tableName, serdeProperties);
-          LOG.info("Sync table properties for " + tableName + ", table 
properties is: " + tableProperties);
-        }
-        schemaChanged = true;
-      } else {
-        LOG.info("No Schema difference for {}\nMessageType: {}", tableName, 
schema);
-      }
+      LOG.info("Schema difference found for {}. Updated schema: {}", 
tableName, schema);
+      syncClient.updateTableSchema(tableName, schema);
+      schemaChanged = true;
     }
 
     if (config.getBoolean(HIVE_SYNC_COMMENT)) {
       List<FieldSchema> fromMetastore = 
syncClient.getMetastoreFieldSchemas(tableName);
       List<FieldSchema> fromStorage = syncClient.getStorageFieldSchemas();
-      syncClient.updateTableComments(tableName, fromMetastore, fromStorage);
+      boolean commentsChanged = syncClient.updateTableComments(tableName, 
fromMetastore, fromStorage);
+      schemaChanged = schemaChanged || commentsChanged;
     }
     return schemaChanged;
   }
 
+  private boolean syncProperties(String tableName, boolean 
useRealTimeInputFormat, boolean readAsOptimized, MessageType schema) {
+    boolean propertiesChanged = false;
+
+    Map<String, String> serdeProperties = getSerdeProperties(readAsOptimized);
+    boolean serdePropertiesUpdated = 
syncClient.updateSerdeProperties(tableName, serdeProperties, 
useRealTimeInputFormat);
+    propertiesChanged = propertiesChanged || serdePropertiesUpdated;

Review Comment:
   having the OR will make the code more self-contained without relying on this 
being the first check. for e.g., checks below has to have it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to