[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r465503124 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java ## @@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) { return config == null ? null : String.format("%s.%s.%s", tableName, action, metric); } - public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) { + public void updateDeltaStreamerMetrics(long durationInNs) { if (config.isMetricsOn()) { Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), getDurationInMs(durationInNs)); - Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(hiveSyncNs)); +} + } + + public void updateDeltaStreamerMetaSyncMetrics(String syncClassName, long syncNs) { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r465495815 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java ## @@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) { return config == null ? null : String.format("%s.%s.%s", tableName, action, metric); } - public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) { + public void updateDeltaStreamerMetrics(long durationInNs) { if (config.isMetricsOn()) { Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), getDurationInMs(durationInNs)); - Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(hiveSyncNs)); +} + } + + public void updateDeltaStreamerMetaSyncMetrics(String syncClassName, long syncNs) { Review comment: ok ,i will do it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r465458394 ## File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala ## @@ -258,11 +258,14 @@ object DataSourceWriteOptions { */ val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY = "hoodie.datasource.write.streaming.ignore.failed.batch" val DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL = "true" + val SYNC_CLIENT_TOOL_CLASS = "hoodie.sync.client.tool.class" + val DEFAULT_SYNC_CLIENT_TOOL_CLASS = "org.apache.hudi.hive.HiveSyncTool" Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r465450971 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java ## @@ -475,12 +480,38 @@ private String startCommit() { throw lastException; } - /** - * Sync to Hive. - */ - public void syncHiveIfNeeded() { + private void syncMeta(HoodieDeltaStreamerMetrics metrics) { +String syncClientToolClass = cfg.syncClientToolClass; +// for backward compatibility if (cfg.enableHiveSync) { - syncHive(); + cfg.enableMetaSync = true; + syncClientToolClass = String.format("%s,%s", cfg.syncClientToolClass, "org.apache.hudi.hive.HiveSyncTool"); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r465450351 ## File path: hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java ## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.common; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public abstract class AbstractSyncHoodieClient { + private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class); + protected final HoodieTableMetaClient metaClient; + protected HoodieTimeline activeTimeline; + protected final HoodieTableType tableType; + protected final FileSystem fs; + private String basePath; + private boolean assumeDatePartitioning; + + public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) { +this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); +this.tableType = metaClient.getTableType(); +this.basePath = basePath; +this.assumeDatePartitioning = assumeDatePartitioning; +this.fs = fs; +this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + } + + public abstract void createTable(String tableName, MessageType storageSchema, + String inputFormatClass, String outputFormatClass, String serdeClass); + + public abstract boolean doesTableExist(String tableName); + + public abstract Option getLastCommitTimeSynced(String tableName); + + public abstract void updateLastCommitTimeSynced(String tableName); + + public abstract void addPartitionsToTable(String tableName, List partitionsToAdd); + + public abstract void updatePartitionsToTable(String tableName, List changedPartitions); + + public abstract Map getTableSchema(String tableName); + + public HoodieTimeline getActiveTimeline() { +return activeTimeline; + } + + public HoodieTableType getTableType() { +return tableType; + } + + public String getBasePath() { +return metaClient.getBasePath(); + } + + public FileSystem getFs() { +return fs; + } + + public void closeQuietly(ResultSet resultSet, Statement stmt) { +try { + if (stmt != null) { +stmt.close(); + } +} catch (SQLException e) { + LOG.error("Could not close the statement opened ", e); +} + +try { + if (resultSet != null) { +resultSet.close(); + } +} catch (SQLException e) { + LOG.error("Could not close the resultset opened ", e); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r465449363 ## File path: hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java ## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.common; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public abstract class AbstractSyncHoodieClient { + private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class); + protected final HoodieTableMetaClient metaClient; + protected HoodieTimeline activeTimeline; + protected final HoodieTableType tableType; + protected final FileSystem fs; + private String basePath; + private boolean assumeDatePartitioning; + + public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) { +this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); +this.tableType = metaClient.getTableType(); +this.basePath = basePath; +this.assumeDatePartitioning = assumeDatePartitioning; +this.fs = fs; +this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + } + + public abstract void createTable(String tableName, MessageType storageSchema, + String inputFormatClass, String outputFormatClass, String serdeClass); + + public abstract boolean doesTableExist(String tableName); + + public abstract Option getLastCommitTimeSynced(String tableName); + + public abstract void updateLastCommitTimeSynced(String tableName); + + public abstract void addPartitionsToTable(String tableName, List partitionsToAdd); + + public abstract void updatePartitionsToTable(String tableName, List changedPartitions); + + public abstract Map getTableSchema(String tableName); + + public HoodieTimeline getActiveTimeline() { +return activeTimeline; + } + + public HoodieTableType getTableType() { +return tableType; + } + + public String getBasePath() { +return metaClient.getBasePath(); + } + + public FileSystem getFs() { +return fs; + } + + public void closeQuietly(ResultSet resultSet, Statement stmt) { +try { + if (stmt != null) { +stmt.close(); + } +} catch (SQLException e) { + LOG.error("Could not close the statement opened ", e); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r465449064 ## File path: hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.dla; + +import com.beust.jcommander.JCommander; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.dla.util.Utils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.InvalidTableException; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; +import org.apache.hudi.hive.SchemaDifference; +import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient; +import org.apache.hudi.sync.common.AbstractSyncTool; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.schema.MessageType; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +/** + * Tool to sync a hoodie table with a dla table. Either use it as a api + * DLASyncTool.syncHoodieTable(DLASyncConfig) or as a command line java -cp hoodie-hive.jar DLASyncTool [args] + * + * This utility will get the schema from the latest commit and will sync dla table schema Also this will sync the + * partitions incrementally (all the partitions modified since the last commit) + */ +@SuppressWarnings("WeakerAccess") +public class DLASyncTool extends AbstractSyncTool { + + private static final Logger LOG = LogManager.getLogger(DLASyncTool.class); + public static final String SUFFIX_SNAPSHOT_TABLE = "_rt"; + public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro"; + + private final DLASyncConfig cfg; + private final HoodieDLAClient hoodieDLAClient; + private final String snapshotTableName; + private final Option roTableTableName; + + public DLASyncTool(Properties properties, FileSystem fs) { +super(properties, fs); +this.hoodieDLAClient = new HoodieDLAClient(Utils.propertiesToConfig(properties), fs); +this.cfg = Utils.propertiesToConfig(properties); +switch (hoodieDLAClient.getTableType()) { + case COPY_ON_WRITE: +this.snapshotTableName = cfg.tableName; +this.roTableTableName = Option.empty(); +break; + case MERGE_ON_READ: +this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE; +this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) : +Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE); +break; + default: +LOG.error("Unknown table type " + hoodieDLAClient.getTableType()); +throw new InvalidTableException(hoodieDLAClient.getBasePath()); +} + } + + @Override + public void syncHoodieTable() { +try { + switch (hoodieDLAClient.getTableType()) { +case COPY_ON_WRITE: + syncHoodieTable(snapshotTableName, false); + break; +case MERGE_ON_READ: + // sync a RO table for MOR + syncHoodieTable(roTableTableName.get(), false); + // sync a RT table for MOR + syncHoodieTable(snapshotTableName, true); + break; +default: + LOG.error("Unknown table type " + hoodieDLAClient.getTableType()); + throw new InvalidTableException(hoodieDLAClient.getBasePath()); + } +} catch (RuntimeException re) { + LOG.error("Got runtime exception when dla syncing", re); +} finally { + hoodieDLAClient.close(); +} + } + + private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) { +LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath() ++ " of type " + hoodieDLAClient.getTableType()); +// Check if the
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r465449225 ## File path: hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java ## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.dla; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; +import org.apache.hudi.hive.PartitionValueExtractor; +import org.apache.hudi.hive.SchemaDifference; +import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class HoodieDLAClient extends AbstractSyncHoodieClient { + private static final Logger LOG = LogManager.getLogger(HoodieDLAClient.class); + private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "hoodie_last_sync"; + // Make sure we have the dla JDBC driver in classpath + private static final String DRIVER_NAME = "com.mysql.jdbc.Driver"; + private static final String DLA_ESCAPE_CHARACTER = ""; + private static final String TBL_PROPERTIES_STR = "TBLPROPERTIES"; + + static { +try { + Class.forName(DRIVER_NAME); +} catch (ClassNotFoundException e) { + throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e); +} + } + + private Connection connection; + private DLASyncConfig dlaConfig; + private PartitionValueExtractor partitionValueExtractor; + + public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) { +super(syncConfig.basePath, syncConfig.assumeDatePartitioning, fs); +this.dlaConfig = syncConfig; +try { + this.partitionValueExtractor = + (PartitionValueExtractor) Class.forName(dlaConfig.partitionValueExtractorClass).newInstance(); +} catch (Exception e) { + throw new HoodieException( + "Failed to initialize PartitionValueExtractor class " + dlaConfig.partitionValueExtractorClass, e); +} +createDLAConnection(); + } + + private void createDLAConnection() { +if (connection == null) { + try { +Class.forName(DRIVER_NAME); + } catch (ClassNotFoundException e) { +LOG.error("Unable to load DLA driver class", e); +return; + } + try { +this.connection = DriverManager.getConnection(dlaConfig.jdbcUrl, dlaConfig.dlaUser, dlaConfig.dlaPass); +LOG.info("Successfully established DLA connection to " + dlaConfig.jdbcUrl); + } catch (SQLException e) { +throw new HoodieException("Cannot create dla connection ", e); + } +} + } + + @Override + public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) { +try { + String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, toHiveSyncConfig(), inputFormatClass, outputFormatClass, serdeClass); + LOG.info("Creating table with " + createSQLQuery); + updateDLASQL(createSQLQuery); +} catch (IOException e) { + throw new HoodieException("Failed to create table " + tableName, e); +} + } + + public Map getTableSchema(String tableName) { +if (!doesTableExist(tableName)) { + throw new IllegalArgumentException( + "Failed to get schema for table " + tableName + " does not exist"); +} +Map
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r465448424 ## File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala ## @@ -261,6 +268,44 @@ private[hudi] object HoodieSparkSqlWriter { hiveSyncConfig } + private def metaSync(parameters: Map[String, String], + basePath: Path, + hadoopConf: Configuration): Boolean = { +val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) +var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) +var syncClientToolClass = parameters(SYNC_CLIENT_TOOL_CLASS) +// for backward compatibility +if (hiveSyncEnabled) { + metaSyncEnabled = true + syncClientToolClass = String.format("%s,%s", syncClientToolClass, "org.apache.hudi.hive.HiveSyncTool") Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r465434113 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java ## @@ -267,9 +267,16 @@ public Operation convert(String value) throws ParameterException { description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert") public Boolean filterDupes = false; +//will abandon in the future version, recommended use --enable-sync @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive") public Boolean enableHiveSync = false; +@Parameter(names = {"--enable-sync"}, description = "Enable syncing meta") +public Boolean enableMetaSync = false; + +@Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools") +public String syncClientToolClass = "org.apache.hudi.hive.HiveSyncTool"; Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r465433316 ## File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala ## @@ -261,6 +268,44 @@ private[hudi] object HoodieSparkSqlWriter { hiveSyncConfig } + private def metaSync(parameters: Map[String, String], + basePath: Path, + hadoopConf: Configuration): Boolean = { +val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) +var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r465133092 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java ## @@ -267,9 +267,16 @@ public Operation convert(String value) throws ParameterException { description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert") public Boolean filterDupes = false; +//will abandon in the future version, recommended use --enable-sync Review comment: agree with you ,and i will do it ## File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala ## @@ -255,6 +262,43 @@ private[hudi] object HoodieSparkSqlWriter { hiveSyncConfig } + private def metaSync(parameters: Map[String, String], + basePath: Path, + hadoopConf: Configuration): Boolean = { +val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) +var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) +var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get +// for backward compatibility +if (hiveSyncEnabled) { + metaSyncEnabled = true + syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS Review comment: yes, when user set hiveSyncEnabled and --sync-tool-classes, sync both hive and --sync-tool-classes make sense. i will fix it ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java ## @@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) { return config == null ? null : String.format("%s.%s.%s", tableName, action, metric); } - public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) { + public void updateDeltaStreamerMetrics(long durationInNs, long syncNs, boolean hiveSync) { if (config.isMetricsOn()) { Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), getDurationInMs(durationInNs)); - Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(hiveSyncNs)); + if (hiveSync) { +Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(syncNs)); + } else { +Metrics.registerGauge(getMetricsName("deltastreamer", "metaSyncDuration"), getDurationInMs(syncNs)); Review comment: i have do it , different sync tool class have its own metrics with name of sync class ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java ## @@ -442,7 +449,8 @@ private void refreshTimeline() throws IOException { long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0; // Send DeltaStreamer Metrics -metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs); +metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs, true); +metrics.updateDeltaStreamerMetrics(overallTimeMs, metaSyncTimeMs, false); Review comment: ok , have do this in syncMeta This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r455531592 ## File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala ## @@ -255,6 +262,43 @@ private[hudi] object HoodieSparkSqlWriter { hiveSyncConfig } + private def metaSync(parameters: Map[String, String], + basePath: Path, + hadoopConf: Configuration): Boolean = { +val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) +var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) +var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get +// for backward compatibility +if (hiveSyncEnabled) { + metaSyncEnabled = true + syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS +} +var metaSyncSuccess = true +if (metaSyncEnabled) { + val impls = syncClientToolClass.split(",") + impls.foreach(impl => { +val syncSuccess = impl.trim match { + case DEFAULT_SYNC_CLIENT_TOOL_CLASS => { +log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") +val fs = FSUtils.getFs(basePath.toString, hadoopConf) +syncHive(basePath, fs, parameters) + } + case _ => { +val fs = FSUtils.getFs(basePath.toString, hadoopConf) +val properties = new Properties(); +properties.putAll(parameters) +properties.put("basePath", basePath.toString) +val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool] +syncHoodie.syncHoodieTable() Review comment: because HoodieHiveClient Constructor is different like this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r455011023 ## File path: hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.dla; + +import com.beust.jcommander.JCommander; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.dla.util.Utils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.InvalidTableException; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; +import org.apache.hudi.hive.SchemaDifference; +import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient; +import org.apache.hudi.sync.common.AbstractSyncTool; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.schema.MessageType; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +/** + * Tool to sync a hoodie table with a dla table. Either use it as a api + * DLASyncTool.syncHoodieTable(DLASyncConfig) or as a command line java -cp hoodie-hive.jar DLASyncTool [args] + * + * This utility will get the schema from the latest commit and will sync dla table schema Also this will sync the + * partitions incrementally (all the partitions modified since the last commit) + */ +@SuppressWarnings("WeakerAccess") +public class DLASyncTool extends AbstractSyncTool { + + private static final Logger LOG = LogManager.getLogger(DLASyncTool.class); + public static final String SUFFIX_SNAPSHOT_TABLE = "_rt"; + public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro"; + + private final DLASyncConfig cfg; + private final HoodieDLAClient hoodieDLAClient; + private final String snapshotTableName; + private final Option roTableTableName; + + public DLASyncTool(Properties properties, FileSystem fs) { +super(properties, fs); +this.hoodieDLAClient = new HoodieDLAClient(Utils.propertiesToConfig(properties), fs); +this.cfg = Utils.propertiesToConfig(properties); +switch (hoodieDLAClient.getTableType()) { + case COPY_ON_WRITE: +this.snapshotTableName = cfg.tableName; +this.roTableTableName = Option.empty(); +break; + case MERGE_ON_READ: +this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE; +this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) : +Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE); +break; + default: +LOG.error("Unknown table type " + hoodieDLAClient.getTableType()); +throw new InvalidTableException(hoodieDLAClient.getBasePath()); +} + } + + @Override + public void syncHoodieTable() { +try { + switch (hoodieDLAClient.getTableType()) { +case COPY_ON_WRITE: + syncHoodieTable(snapshotTableName, false); + break; +case MERGE_ON_READ: + // sync a RO table for MOR + syncHoodieTable(roTableTableName.get(), false); + // sync a RT table for MOR + syncHoodieTable(snapshotTableName, true); + break; +default: + LOG.error("Unknown table type " + hoodieDLAClient.getTableType()); + throw new InvalidTableException(hoodieDLAClient.getBasePath()); + } +} catch (RuntimeException re) { + LOG.error("Got runtime exception when dla syncing", re); +} finally { + hoodieDLAClient.close(); +} + } + + private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) { +LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath() ++ " of type " + hoodieDLAClient.getTableType()); +// Check if the
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r454974889 ## File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala ## @@ -255,6 +262,43 @@ private[hudi] object HoodieSparkSqlWriter { hiveSyncConfig } + private def metaSync(parameters: Map[String, String], + basePath: Path, + hadoopConf: Configuration): Boolean = { +val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) +var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) +var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get +// for backward compatibility +if (hiveSyncEnabled) { + metaSyncEnabled = true + syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS Review comment: yes, this will back compatibility This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r454853182 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java ## @@ -237,6 +237,9 @@ public Operation convert(String value) throws ParameterException { @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive") public Boolean enableHiveSync = false; +@Parameter(names = {"--hoodie-sync-client-tool-class"}, description = "Meta sync client tool, using comma to separate multi tools") Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r453190766 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java ## @@ -237,6 +237,9 @@ public Operation convert(String value) throws ParameterException { @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive") public Boolean enableHiveSync = false; +@Parameter(names = {"--hoodie-sync-client-tool-class"}, description = "Meta sync client tool, using comma to separate multi tools") Review comment: ok , i can add a new --enable-sync as the default choice , and also support --enable-hive-sync for a duplicate parameter. what do you think about ? @garyli1019 @leesf This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r453190458 ## File path: hudi-sync/hudi-hive-sync/pom.xml ## @@ -43,6 +45,11 @@ hudi-hadoop-mr ${project.version} + + org.apache.hudi + hudi-sync-common Review comment: thanks, be consistent with hudi-client will be better ## File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java ## @@ -272,6 +268,7 @@ void createTable(String tableName, MessageType storageSchema, String inputFormat /** * Get the table schema. */ + // overwrite Review comment: done ## File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java ## @@ -20,12 +20,14 @@ import org.apache.parquet.schema.MessageType; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.StringJoiner; +import java.util.ArrayList; Review comment: done ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java ## @@ -63,10 +67,11 @@ import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Objects; +import java.util.Properties; Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r453190335 ## File path: hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.common; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.stream.Collectors; + +public abstract class AbstractSyncHoodieClient { + private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class); + protected final HoodieTableMetaClient metaClient; + protected HoodieTimeline activeTimeline; + protected final HoodieTableType tableType; + protected final FileSystem fs; + private String basePath; + private boolean assumeDatePartitioning; + + public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) { +this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); +this.tableType = metaClient.getTableType(); +this.basePath = basePath; +this.assumeDatePartitioning = assumeDatePartitioning; +this.fs = fs; +this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + } + + public abstract void createTable(String tableName, MessageType storageSchema, + String inputFormatClass, String outputFormatClass, String serdeClass); + + public abstract boolean doesTableExist(String tableName); + + public abstract Option getLastCommitTimeSynced(String tableName); + + public abstract void updateLastCommitTimeSynced(String tableName); + + public abstract void addPartitionsToTable(String tableName, List partitionsToAdd); + + public abstract void updatePartitionsToTable(String tableName, List changedPartitions); + + public HoodieTimeline getActiveTimeline() { +return activeTimeline; + } + + public HoodieTableType getTableType() { +return tableType; + } + + public String getBasePath() { +return metaClient.getBasePath(); + } + + public FileSystem getFs() { +return fs; + } + + public void closeQuietly(ResultSet resultSet, Statement stmt) { +try { + if (stmt != null) { +stmt.close(); + } +} catch (SQLException e) { + LOG.error("Could not close the statement opened ", e); +} + +try { + if (resultSet != null) { +resultSet.close(); + } +} catch (SQLException e) { + LOG.error("Could not close the resultset opened ", e); +} + } + + /** + * Gets the schema for a hoodie table. Depending on the type of table, try to read schema from commit metadata if + * present, else fallback to reading from any file written in the latest commit. We will assume that the schema has + * not changed within a single atomic write. + * + * @return Parquet schema for this table + */ + public MessageType getDataSchema() { +try { + return new TableSchemaResolver(metaClient).getTableParquetSchema(); +} catch (Exception e) { + throw new HoodieSyncException("Failed to read data schema", e); +} + } + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") +
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r452166076 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java ## @@ -237,6 +237,9 @@ public Operation convert(String value) throws ParameterException { @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive") public Boolean enableHiveSync = false; +@Parameter(names = {"--hoodie-sync-client-tool-class"}, description = "Meta sync client tool, using comma to separate multi tools") Review comment: This place is worth discussing. For compatible, we just add a --hoodie-sync-client-tool-class. But i think use change --enable-hive-sync to --enable-sync reasonable , or just add a new parameter --enable-sync and compatible the --enable-hive-sync parameter for old users. cc @vinothchandar @leesf This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r452160822 ## File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java ## @@ -20,12 +20,14 @@ import org.apache.parquet.schema.MessageType; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.StringJoiner; +import java.util.ArrayList; Review comment: okay ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java ## @@ -63,10 +67,11 @@ import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Objects; +import java.util.Properties; Review comment: thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r452160698 ## File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java ## @@ -272,6 +268,7 @@ void createTable(String tableName, MessageType storageSchema, String inputFormat /** * Get the table schema. */ + // overwrite Review comment: agree with you. can put the abstract method getTableSchema to base syncClient. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r452157405 ## File path: hudi-sync/hudi-hive-sync/pom.xml ## @@ -43,6 +45,11 @@ hudi-hadoop-mr ${project.version} + + org.apache.hudi + hudi-sync-common Review comment: Thanks, I thank put hudi-sync-common base class to hudi-common is make sense. but hudi-sync-hive 、hudi-sync-dla under hudi/ directory will make hudi code so much moudle . I think another choice to put hudi-sync-common base class to hudi-common , and hudi-sync-hive 、hudi-sync-dla etc.. under hudi-sync. what about yours suggestion @vinothchandar @leesf ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync
lw309637554 commented on a change in pull request #1810: URL: https://github.com/apache/hudi/pull/1810#discussion_r452153918 ## File path: hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.common; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.stream.Collectors; + +public abstract class AbstractSyncHoodieClient { + private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class); + protected final HoodieTableMetaClient metaClient; + protected HoodieTimeline activeTimeline; + protected final HoodieTableType tableType; + protected final FileSystem fs; + private String basePath; + private boolean assumeDatePartitioning; + + public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) { +this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); +this.tableType = metaClient.getTableType(); +this.basePath = basePath; +this.assumeDatePartitioning = assumeDatePartitioning; +this.fs = fs; +this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + } + + public abstract void createTable(String tableName, MessageType storageSchema, + String inputFormatClass, String outputFormatClass, String serdeClass); + + public abstract boolean doesTableExist(String tableName); + + public abstract Option getLastCommitTimeSynced(String tableName); + + public abstract void updateLastCommitTimeSynced(String tableName); + + public abstract void addPartitionsToTable(String tableName, List partitionsToAdd); + + public abstract void updatePartitionsToTable(String tableName, List changedPartitions); + + public HoodieTimeline getActiveTimeline() { +return activeTimeline; + } + + public HoodieTableType getTableType() { +return tableType; + } + + public String getBasePath() { +return metaClient.getBasePath(); + } + + public FileSystem getFs() { +return fs; + } + + public void closeQuietly(ResultSet resultSet, Statement stmt) { +try { + if (stmt != null) { +stmt.close(); + } +} catch (SQLException e) { + LOG.error("Could not close the statement opened ", e); +} + +try { + if (resultSet != null) { +resultSet.close(); + } +} catch (SQLException e) { + LOG.error("Could not close the resultset opened ", e); +} + } + + /** + * Gets the schema for a hoodie table. Depending on the type of table, try to read schema from commit metadata if + * present, else fallback to reading from any file written in the latest commit. We will assume that the schema has + * not changed within a single atomic write. + * + * @return Parquet schema for this table + */ + public MessageType getDataSchema() { +try { + return new TableSchemaResolver(metaClient).getTableParquetSchema(); +} catch (Exception e) { + throw new HoodieSyncException("Failed to read data schema", e); +} + } + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") +