xushiyan commented on code in PR #8775: URL: https://github.com/apache/hudi/pull/8775#discussion_r1201544493
########## hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java: ########## @@ -280,83 +282,87 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, droppedPartitions); } - boolean meetSyncConditions = schemaChanged || partitionsChanged; + boolean meetSyncConditions = schemaChanged || propertiesChanged || partitionsChanged; if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) { syncClient.updateLastCommitTimeSynced(tableName); } LOG.info("Sync complete for " + tableName); } - /** - * Get the latest schema from the last commit and check if its in sync with the hive table schema. If not, evolves the - * table schema. - * - * @param tableExists does table exist - * @param schema extracted schema - */ - private boolean syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, - boolean readAsOptimized, MessageType schema) { - // Append spark table properties & serde properties + private Map<String, String> getTableProperties(MessageType schema) { Map<String, String> tableProperties = ConfigUtils.toMap(config.getString(HIVE_TABLE_PROPERTIES)); - Map<String, String> serdeProperties = ConfigUtils.toMap(config.getString(HIVE_TABLE_SERDE_PROPERTIES)); if (config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) { Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(config.getSplitStrings(META_SYNC_PARTITION_FIELDS), config.getStringOrDefault(META_SYNC_SPARK_VERSION), config.getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), schema); - Map<String, String> sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, config.getString(META_SYNC_BASE_PATH)); tableProperties.putAll(sparkTableProperties); + } + return tableProperties; + } + + private Map<String, String> getSerdeProperties(boolean readAsOptimized) { + Map<String, String> serdeProperties = ConfigUtils.toMap(config.getString(HIVE_TABLE_SERDE_PROPERTIES)); + if (config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) { + Map<String, String> sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, config.getString(META_SYNC_BASE_PATH)); serdeProperties.putAll(sparkSerdeProperties); } - boolean schemaChanged = false; - // Check and sync schema - if (!tableExists) { - LOG.info("Hive table {} is not found. Creating it with schema {}.", tableName, schema); - HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase()); - String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat); - - if (baseFileFormat.equals(HoodieFileFormat.PARQUET) && config.getBooleanOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT)) { - // Parquet input format had an InputFormat class visible under the old naming scheme. - inputFormatClassName = useRealTimeInputFormat - ? com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName() - : com.uber.hoodie.hadoop.HoodieInputFormat.class.getName(); - } + return serdeProperties; + } - String outputFormatClassName = HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat); - String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat); + private void syncFirstTime(String tableName, boolean useRealTimeInputFormat, boolean readAsOptimized, MessageType schema) { + LOG.info("Sync table {} for the first time.", tableName); + HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase()); + String inputFormatClassName = getInputFormatClassName(baseFileFormat, useRealTimeInputFormat, config.getBooleanOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT)); + String outputFormatClassName = getOutputFormatClassName(baseFileFormat); + String serDeFormatClassName = getSerDeClassName(baseFileFormat); + Map<String, String> serdeProperties = getSerdeProperties(readAsOptimized); + Map<String, String> tableProperties = getTableProperties(schema); + + // Custom serde will not work with ALTER TABLE REPLACE COLUMNS + // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive + // /ql/exec/DDLTask.java#L3488 + syncClient.createTable(tableName, schema, inputFormatClassName, + outputFormatClassName, serDeFormatClassName, serdeProperties, tableProperties); + } - // Custom serde will not work with ALTER TABLE REPLACE COLUMNS - // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive - // /ql/exec/DDLTask.java#L3488 - syncClient.createTable(tableName, schema, inputFormatClassName, - outputFormatClassName, serDeFormatClassName, serdeProperties, tableProperties); - schemaChanged = true; + private boolean syncSchema(String tableName, MessageType schema) { + boolean schemaChanged = false; + + // Check if the table schema has evolved + Map<String, String> tableSchema = syncClient.getMetastoreSchema(tableName); + SchemaDifference schemaDiff = getSchemaDifference(schema, tableSchema, + config.getSplitStrings(META_SYNC_PARTITION_FIELDS), + config.getBooleanOrDefault(HIVE_SUPPORT_TIMESTAMP_TYPE)); + if (schemaDiff.isEmpty()) { + LOG.info("No Schema difference for {}\nMessageType: {}", tableName, schema); } else { - // Check if the table schema has evolved - Map<String, String> tableSchema = syncClient.getMetastoreSchema(tableName); - SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, config.getSplitStrings(META_SYNC_PARTITION_FIELDS), - config.getBooleanOrDefault(HIVE_SUPPORT_TIMESTAMP_TYPE)); - if (!schemaDiff.isEmpty()) { - LOG.info("Schema difference found for {}. Updated schema: {}", tableName, schema); - syncClient.updateTableSchema(tableName, schema); - // Sync the table properties if the schema has changed - if (config.getString(HIVE_TABLE_PROPERTIES) != null || config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) { - syncClient.updateTableProperties(tableName, tableProperties); - syncClient.updateSerdeProperties(tableName, serdeProperties); - LOG.info("Sync table properties for " + tableName + ", table properties is: " + tableProperties); - } - schemaChanged = true; - } else { - LOG.info("No Schema difference for {}\nMessageType: {}", tableName, schema); - } + LOG.info("Schema difference found for {}. Updated schema: {}", tableName, schema); + syncClient.updateTableSchema(tableName, schema); + schemaChanged = true; } if (config.getBoolean(HIVE_SYNC_COMMENT)) { List<FieldSchema> fromMetastore = syncClient.getMetastoreFieldSchemas(tableName); List<FieldSchema> fromStorage = syncClient.getStorageFieldSchemas(); - syncClient.updateTableComments(tableName, fromMetastore, fromStorage); + boolean commentsChanged = syncClient.updateTableComments(tableName, fromMetastore, fromStorage); + schemaChanged = schemaChanged || commentsChanged; } return schemaChanged; } + private boolean syncProperties(String tableName, boolean useRealTimeInputFormat, boolean readAsOptimized, MessageType schema) { + boolean propertiesChanged = false; + + Map<String, String> serdeProperties = getSerdeProperties(readAsOptimized); + boolean serdePropertiesUpdated = syncClient.updateSerdeProperties(tableName, serdeProperties, useRealTimeInputFormat); + propertiesChanged = propertiesChanged || serdePropertiesUpdated; Review Comment: having the OR will make the code more self-contained without relying on this being the first check. for e.g., checks below has to have it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org