[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-08-05 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465503124



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
##
@@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) {
 return config == null ? null : String.format("%s.%s.%s", tableName, 
action, metric);
   }
 
-  public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) {
+  public void updateDeltaStreamerMetrics(long durationInNs) {
 if (config.isMetricsOn()) {
   Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), 
getDurationInMs(durationInNs));
-  Metrics.registerGauge(getMetricsName("deltastreamer", 
"hiveSyncDuration"), getDurationInMs(hiveSyncNs));
+}
+  }
+
+  public void updateDeltaStreamerMetaSyncMetrics(String syncClassName, long 
syncNs) {

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-08-05 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465495815



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
##
@@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) {
 return config == null ? null : String.format("%s.%s.%s", tableName, 
action, metric);
   }
 
-  public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) {
+  public void updateDeltaStreamerMetrics(long durationInNs) {
 if (config.isMetricsOn()) {
   Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), 
getDurationInMs(durationInNs));
-  Metrics.registerGauge(getMetricsName("deltastreamer", 
"hiveSyncDuration"), getDurationInMs(hiveSyncNs));
+}
+  }
+
+  public void updateDeltaStreamerMetaSyncMetrics(String syncClassName, long 
syncNs) {

Review comment:
   ok ,i will do it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-08-04 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465458394



##
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##
@@ -258,11 +258,14 @@ object DataSourceWriteOptions {
 */
   val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY = 
"hoodie.datasource.write.streaming.ignore.failed.batch"
   val DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL = "true"
+  val SYNC_CLIENT_TOOL_CLASS = "hoodie.sync.client.tool.class"
+  val DEFAULT_SYNC_CLIENT_TOOL_CLASS = "org.apache.hudi.hive.HiveSyncTool"

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-08-04 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465450971



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##
@@ -475,12 +480,38 @@ private String startCommit() {
 throw lastException;
   }
 
-  /**
-   * Sync to Hive.
-   */
-  public void syncHiveIfNeeded() {
+  private void syncMeta(HoodieDeltaStreamerMetrics metrics) {
+String syncClientToolClass = cfg.syncClientToolClass;
+// for backward compatibility
 if (cfg.enableHiveSync) {
-  syncHive();
+  cfg.enableMetaSync = true;
+  syncClientToolClass = String.format("%s,%s", cfg.syncClientToolClass, 
"org.apache.hudi.hive.HiveSyncTool");

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-08-04 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465450351



##
File path: 
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class AbstractSyncHoodieClient {
+  private static final Logger LOG = 
LogManager.getLogger(AbstractSyncHoodieClient.class);
+  protected final HoodieTableMetaClient metaClient;
+  protected HoodieTimeline activeTimeline;
+  protected final HoodieTableType tableType;
+  protected final FileSystem fs;
+  private String basePath;
+  private boolean assumeDatePartitioning;
+
+  public AbstractSyncHoodieClient(String basePath, boolean 
assumeDatePartitioning, FileSystem fs) {
+this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+this.tableType = metaClient.getTableType();
+this.basePath = basePath;
+this.assumeDatePartitioning = assumeDatePartitioning;
+this.fs = fs;
+this.activeTimeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public abstract void createTable(String tableName, MessageType storageSchema,
+   String inputFormatClass, String 
outputFormatClass, String serdeClass);
+
+  public abstract boolean doesTableExist(String tableName);
+
+  public abstract Option getLastCommitTimeSynced(String tableName);
+
+  public abstract void updateLastCommitTimeSynced(String tableName);
+
+  public abstract void addPartitionsToTable(String tableName, List 
partitionsToAdd);
+
+  public abstract void updatePartitionsToTable(String tableName, List 
changedPartitions);
+
+  public abstract Map getTableSchema(String tableName);
+
+  public HoodieTimeline getActiveTimeline() {
+return activeTimeline;
+  }
+
+  public HoodieTableType getTableType() {
+return tableType;
+  }
+
+  public String getBasePath() {
+return metaClient.getBasePath();
+  }
+
+  public FileSystem getFs() {
+return fs;
+  }
+
+  public void closeQuietly(ResultSet resultSet, Statement stmt) {
+try {
+  if (stmt != null) {
+stmt.close();
+  }
+} catch (SQLException e) {
+  LOG.error("Could not close the statement opened ", e);
+}
+
+try {
+  if (resultSet != null) {
+resultSet.close();
+  }
+} catch (SQLException e) {
+  LOG.error("Could not close the resultset opened ", e);

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-08-04 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465449363



##
File path: 
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class AbstractSyncHoodieClient {
+  private static final Logger LOG = 
LogManager.getLogger(AbstractSyncHoodieClient.class);
+  protected final HoodieTableMetaClient metaClient;
+  protected HoodieTimeline activeTimeline;
+  protected final HoodieTableType tableType;
+  protected final FileSystem fs;
+  private String basePath;
+  private boolean assumeDatePartitioning;
+
+  public AbstractSyncHoodieClient(String basePath, boolean 
assumeDatePartitioning, FileSystem fs) {
+this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+this.tableType = metaClient.getTableType();
+this.basePath = basePath;
+this.assumeDatePartitioning = assumeDatePartitioning;
+this.fs = fs;
+this.activeTimeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public abstract void createTable(String tableName, MessageType storageSchema,
+   String inputFormatClass, String 
outputFormatClass, String serdeClass);
+
+  public abstract boolean doesTableExist(String tableName);
+
+  public abstract Option getLastCommitTimeSynced(String tableName);
+
+  public abstract void updateLastCommitTimeSynced(String tableName);
+
+  public abstract void addPartitionsToTable(String tableName, List 
partitionsToAdd);
+
+  public abstract void updatePartitionsToTable(String tableName, List 
changedPartitions);
+
+  public abstract Map getTableSchema(String tableName);
+
+  public HoodieTimeline getActiveTimeline() {
+return activeTimeline;
+  }
+
+  public HoodieTableType getTableType() {
+return tableType;
+  }
+
+  public String getBasePath() {
+return metaClient.getBasePath();
+  }
+
+  public FileSystem getFs() {
+return fs;
+  }
+
+  public void closeQuietly(ResultSet resultSet, Statement stmt) {
+try {
+  if (stmt != null) {
+stmt.close();
+  }
+} catch (SQLException e) {
+  LOG.error("Could not close the statement opened ", e);

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-08-04 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465449064



##
File path: 
hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.dla;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.dla.util.Utils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hive.SchemaDifference;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Tool to sync a hoodie table with a dla table. Either use it as a api
+ * DLASyncTool.syncHoodieTable(DLASyncConfig) or as a command line java -cp 
hoodie-hive.jar DLASyncTool [args]
+ * 
+ * This utility will get the schema from the latest commit and will sync dla 
table schema Also this will sync the
+ * partitions incrementally (all the partitions modified since the last commit)
+ */
+@SuppressWarnings("WeakerAccess")
+public class DLASyncTool extends AbstractSyncTool {
+
+  private static final Logger LOG = LogManager.getLogger(DLASyncTool.class);
+  public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
+  public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
+
+  private final DLASyncConfig cfg;
+  private final HoodieDLAClient hoodieDLAClient;
+  private final String snapshotTableName;
+  private final Option roTableTableName;
+
+  public DLASyncTool(Properties properties, FileSystem fs) {
+super(properties, fs);
+this.hoodieDLAClient = new 
HoodieDLAClient(Utils.propertiesToConfig(properties), fs);
+this.cfg = Utils.propertiesToConfig(properties);
+switch (hoodieDLAClient.getTableType()) {
+  case COPY_ON_WRITE:
+this.snapshotTableName = cfg.tableName;
+this.roTableTableName = Option.empty();
+break;
+  case MERGE_ON_READ:
+this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
+this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
+Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+break;
+  default:
+LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+throw new InvalidTableException(hoodieDLAClient.getBasePath());
+}
+  }
+
+  @Override
+  public void syncHoodieTable() {
+try {
+  switch (hoodieDLAClient.getTableType()) {
+case COPY_ON_WRITE:
+  syncHoodieTable(snapshotTableName, false);
+  break;
+case MERGE_ON_READ:
+  // sync a RO table for MOR
+  syncHoodieTable(roTableTableName.get(), false);
+  // sync a RT table for MOR
+  syncHoodieTable(snapshotTableName, true);
+  break;
+default:
+  LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+  throw new InvalidTableException(hoodieDLAClient.getBasePath());
+  }
+} catch (RuntimeException re) {
+  LOG.error("Got runtime exception when dla syncing", re);
+} finally {
+  hoodieDLAClient.close();
+}
+  }
+
+  private void syncHoodieTable(String tableName, boolean 
useRealtimeInputFormat) {
+LOG.info("Trying to sync hoodie table " + tableName + " with base path " + 
hoodieDLAClient.getBasePath()
++ " of type " + hoodieDLAClient.getTableType());
+// Check if the 

[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-08-04 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465449225



##
File path: 
hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
##
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.dla;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.hive.PartitionValueExtractor;
+import org.apache.hudi.hive.SchemaDifference;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HoodieDLAClient extends AbstractSyncHoodieClient {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieDLAClient.class);
+  private static final String HOODIE_LAST_COMMIT_TIME_SYNC = 
"hoodie_last_sync";
+  // Make sure we have the dla JDBC driver in classpath
+  private static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
+  private static final String DLA_ESCAPE_CHARACTER = "";
+  private static final String TBL_PROPERTIES_STR = "TBLPROPERTIES";
+
+  static {
+try {
+  Class.forName(DRIVER_NAME);
+} catch (ClassNotFoundException e) {
+  throw new IllegalStateException("Could not find " + DRIVER_NAME + " in 
classpath. ", e);
+}
+  }
+
+  private Connection connection;
+  private DLASyncConfig dlaConfig;
+  private PartitionValueExtractor partitionValueExtractor;
+
+  public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) {
+super(syncConfig.basePath, syncConfig.assumeDatePartitioning, fs);
+this.dlaConfig = syncConfig;
+try {
+  this.partitionValueExtractor =
+  (PartitionValueExtractor) 
Class.forName(dlaConfig.partitionValueExtractorClass).newInstance();
+} catch (Exception e) {
+  throw new HoodieException(
+  "Failed to initialize PartitionValueExtractor class " + 
dlaConfig.partitionValueExtractorClass, e);
+}
+createDLAConnection();
+  }
+
+  private void createDLAConnection() {
+if (connection == null) {
+  try {
+Class.forName(DRIVER_NAME);
+  } catch (ClassNotFoundException e) {
+LOG.error("Unable to load DLA driver class", e);
+return;
+  }
+  try {
+this.connection = DriverManager.getConnection(dlaConfig.jdbcUrl, 
dlaConfig.dlaUser, dlaConfig.dlaPass);
+LOG.info("Successfully established DLA connection to  " + 
dlaConfig.jdbcUrl);
+  } catch (SQLException e) {
+throw new HoodieException("Cannot create dla connection ", e);
+  }
+}
+  }
+
+  @Override
+  public void createTable(String tableName, MessageType storageSchema, String 
inputFormatClass, String outputFormatClass, String serdeClass) {
+try {
+  String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, 
storageSchema, toHiveSyncConfig(), inputFormatClass, outputFormatClass, 
serdeClass);
+  LOG.info("Creating table with " + createSQLQuery);
+  updateDLASQL(createSQLQuery);
+} catch (IOException e) {
+  throw new HoodieException("Failed to create table " + tableName, e);
+}
+  }
+
+  public Map getTableSchema(String tableName) {
+if (!doesTableExist(tableName)) {
+  throw new IllegalArgumentException(
+  "Failed to get schema for table " + tableName + " does not exist");
+}
+Map 

[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-08-04 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465448424



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##
@@ -261,6 +268,44 @@ private[hudi] object HoodieSparkSqlWriter {
 hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+   basePath: Path,
+   hadoopConf: Configuration): Boolean = {
+val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+var syncClientToolClass = parameters(SYNC_CLIENT_TOOL_CLASS)
+// for backward compatibility
+if (hiveSyncEnabled) {
+  metaSyncEnabled = true
+  syncClientToolClass = String.format("%s,%s", syncClientToolClass, 
"org.apache.hudi.hive.HiveSyncTool")

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-08-04 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465434113



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##
@@ -267,9 +267,16 @@ public Operation convert(String value) throws 
ParameterException {
 description = "Should duplicate records from source be 
dropped/filtered out before insert/bulk-insert")
 public Boolean filterDupes = false;
 
+//will abandon in the future version, recommended use --enable-sync
 @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing 
to hive")
 public Boolean enableHiveSync = false;
 
+@Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
+public Boolean enableMetaSync = false;
+
+@Parameter(names = {"--sync-tool-classes"}, description = "Meta sync 
client tool, using comma to separate multi tools")
+public String syncClientToolClass = "org.apache.hudi.hive.HiveSyncTool";

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-08-04 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465433316



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##
@@ -261,6 +268,44 @@ private[hudi] object HoodieSparkSqlWriter {
 hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+   basePath: Path,
+   hadoopConf: Configuration): Boolean = {
+val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-08-04 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465133092



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##
@@ -267,9 +267,16 @@ public Operation convert(String value) throws 
ParameterException {
 description = "Should duplicate records from source be 
dropped/filtered out before insert/bulk-insert")
 public Boolean filterDupes = false;
 
+//will abandon in the future version, recommended use --enable-sync

Review comment:
   agree with you ,and  i will do it

##
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##
@@ -255,6 +262,43 @@ private[hudi] object HoodieSparkSqlWriter {
 hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+   basePath: Path,
+   hadoopConf: Configuration): Boolean = {
+val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+// for backward compatibility
+if (hiveSyncEnabled) {
+  metaSyncEnabled = true
+  syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS

Review comment:
   yes, when user set hiveSyncEnabled and --sync-tool-classes, sync both 
hive and --sync-tool-classes make sense. i will fix it 

##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
##
@@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) {
 return config == null ? null : String.format("%s.%s.%s", tableName, 
action, metric);
   }
 
-  public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) {
+  public void updateDeltaStreamerMetrics(long durationInNs, long syncNs, 
boolean hiveSync) {
 if (config.isMetricsOn()) {
   Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), 
getDurationInMs(durationInNs));
-  Metrics.registerGauge(getMetricsName("deltastreamer", 
"hiveSyncDuration"), getDurationInMs(hiveSyncNs));
+  if (hiveSync) {
+Metrics.registerGauge(getMetricsName("deltastreamer", 
"hiveSyncDuration"), getDurationInMs(syncNs));
+  } else {
+Metrics.registerGauge(getMetricsName("deltastreamer", 
"metaSyncDuration"), getDurationInMs(syncNs));

Review comment:
   i have do it , different  sync tool class have its own metrics with name 
of sync class

##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##
@@ -442,7 +449,8 @@ private void refreshTimeline() throws IOException {
 long overallTimeMs = overallTimerContext != null ? 
overallTimerContext.stop() : 0;
 
 // Send DeltaStreamer Metrics
-metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs);
+metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs, true);
+metrics.updateDeltaStreamerMetrics(overallTimeMs, metaSyncTimeMs, false);

Review comment:
   ok  , have do this in syncMeta





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-07-16 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r455531592



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##
@@ -255,6 +262,43 @@ private[hudi] object HoodieSparkSqlWriter {
 hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+   basePath: Path,
+   hadoopConf: Configuration): Boolean = {
+val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+// for backward compatibility
+if (hiveSyncEnabled) {
+  metaSyncEnabled = true
+  syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS
+}
+var metaSyncSuccess = true
+if (metaSyncEnabled) {
+  val impls = syncClientToolClass.split(",")
+  impls.foreach(impl => {
+val syncSuccess = impl.trim match {
+  case DEFAULT_SYNC_CLIENT_TOOL_CLASS => {
+log.info("Syncing to Hive Metastore (URL: " + 
parameters(HIVE_URL_OPT_KEY) + ")")
+val fs = FSUtils.getFs(basePath.toString, hadoopConf)
+syncHive(basePath, fs, parameters)
+  }
+  case _ => {
+val fs = FSUtils.getFs(basePath.toString, hadoopConf)
+val properties = new Properties();
+properties.putAll(parameters)
+properties.put("basePath", basePath.toString)
+val syncHoodie = ReflectionUtils.loadClass(impl.trim, 
Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, 
fs).asInstanceOf[AbstractSyncTool]
+syncHoodie.syncHoodieTable()

Review comment:
   because HoodieHiveClient  Constructor   is different like this





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-07-15 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r455011023



##
File path: 
hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.dla;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.dla.util.Utils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hive.SchemaDifference;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Tool to sync a hoodie table with a dla table. Either use it as a api
+ * DLASyncTool.syncHoodieTable(DLASyncConfig) or as a command line java -cp 
hoodie-hive.jar DLASyncTool [args]
+ * 
+ * This utility will get the schema from the latest commit and will sync dla 
table schema Also this will sync the
+ * partitions incrementally (all the partitions modified since the last commit)
+ */
+@SuppressWarnings("WeakerAccess")
+public class DLASyncTool extends AbstractSyncTool {
+
+  private static final Logger LOG = LogManager.getLogger(DLASyncTool.class);
+  public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
+  public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
+
+  private final DLASyncConfig cfg;
+  private final HoodieDLAClient hoodieDLAClient;
+  private final String snapshotTableName;
+  private final Option roTableTableName;
+
+  public DLASyncTool(Properties properties, FileSystem fs) {
+super(properties, fs);
+this.hoodieDLAClient = new 
HoodieDLAClient(Utils.propertiesToConfig(properties), fs);
+this.cfg = Utils.propertiesToConfig(properties);
+switch (hoodieDLAClient.getTableType()) {
+  case COPY_ON_WRITE:
+this.snapshotTableName = cfg.tableName;
+this.roTableTableName = Option.empty();
+break;
+  case MERGE_ON_READ:
+this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
+this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
+Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+break;
+  default:
+LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+throw new InvalidTableException(hoodieDLAClient.getBasePath());
+}
+  }
+
+  @Override
+  public void syncHoodieTable() {
+try {
+  switch (hoodieDLAClient.getTableType()) {
+case COPY_ON_WRITE:
+  syncHoodieTable(snapshotTableName, false);
+  break;
+case MERGE_ON_READ:
+  // sync a RO table for MOR
+  syncHoodieTable(roTableTableName.get(), false);
+  // sync a RT table for MOR
+  syncHoodieTable(snapshotTableName, true);
+  break;
+default:
+  LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+  throw new InvalidTableException(hoodieDLAClient.getBasePath());
+  }
+} catch (RuntimeException re) {
+  LOG.error("Got runtime exception when dla syncing", re);
+} finally {
+  hoodieDLAClient.close();
+}
+  }
+
+  private void syncHoodieTable(String tableName, boolean 
useRealtimeInputFormat) {
+LOG.info("Trying to sync hoodie table " + tableName + " with base path " + 
hoodieDLAClient.getBasePath()
++ " of type " + hoodieDLAClient.getTableType());
+// Check if the 

[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-07-15 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r454974889



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##
@@ -255,6 +262,43 @@ private[hudi] object HoodieSparkSqlWriter {
 hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+   basePath: Path,
+   hadoopConf: Configuration): Boolean = {
+val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+// for backward compatibility
+if (hiveSyncEnabled) {
+  metaSyncEnabled = true
+  syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS

Review comment:
   yes, this will back compatibility





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-07-15 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r454853182



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##
@@ -237,6 +237,9 @@ public Operation convert(String value) throws 
ParameterException {
 @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing 
to hive")
 public Boolean enableHiveSync = false;
 
+@Parameter(names = {"--hoodie-sync-client-tool-class"}, description = 
"Meta sync client tool, using comma to separate multi tools")

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-07-11 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r453190766



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##
@@ -237,6 +237,9 @@ public Operation convert(String value) throws 
ParameterException {
 @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing 
to hive")
 public Boolean enableHiveSync = false;
 
+@Parameter(names = {"--hoodie-sync-client-tool-class"}, description = 
"Meta sync client tool, using comma to separate multi tools")

Review comment:
   ok , i  can add a new --enable-sync  as the default choice , and also 
support --enable-hive-sync  for a duplicate parameter.  what do you think about 
? @garyli1019 @leesf 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-07-11 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r453190458



##
File path: hudi-sync/hudi-hive-sync/pom.xml
##
@@ -43,6 +45,11 @@
   hudi-hadoop-mr
   ${project.version}
 
+
+  org.apache.hudi
+  hudi-sync-common

Review comment:
   thanks, be consistent with hudi-client will be better

##
File path: 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
##
@@ -272,6 +268,7 @@ void createTable(String tableName, MessageType 
storageSchema, String inputFormat
   /**
* Get the table schema.
*/
+  // overwrite

Review comment:
   done

##
File path: 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java
##
@@ -20,12 +20,14 @@
 
 import org.apache.parquet.schema.MessageType;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.StringJoiner;
+import java.util.ArrayList;

Review comment:
   done

##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##
@@ -63,10 +67,11 @@
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Objects;
+import java.util.Properties;

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-07-11 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r453190335



##
File path: 
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class AbstractSyncHoodieClient {
+  private static final Logger LOG = 
LogManager.getLogger(AbstractSyncHoodieClient.class);
+  protected final HoodieTableMetaClient metaClient;
+  protected HoodieTimeline activeTimeline;
+  protected final HoodieTableType tableType;
+  protected final FileSystem fs;
+  private String basePath;
+  private boolean assumeDatePartitioning;
+
+  public AbstractSyncHoodieClient(String basePath, boolean 
assumeDatePartitioning, FileSystem fs) {
+this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+this.tableType = metaClient.getTableType();
+this.basePath = basePath;
+this.assumeDatePartitioning = assumeDatePartitioning;
+this.fs = fs;
+this.activeTimeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public abstract void createTable(String tableName, MessageType storageSchema,
+   String inputFormatClass, String 
outputFormatClass, String serdeClass);
+
+  public abstract boolean doesTableExist(String tableName);
+
+  public abstract Option getLastCommitTimeSynced(String tableName);
+
+  public abstract void updateLastCommitTimeSynced(String tableName);
+
+  public abstract void addPartitionsToTable(String tableName, List 
partitionsToAdd);
+
+  public abstract void updatePartitionsToTable(String tableName, List 
changedPartitions);
+
+  public HoodieTimeline getActiveTimeline() {
+return activeTimeline;
+  }
+
+  public HoodieTableType getTableType() {
+return tableType;
+  }
+
+  public String getBasePath() {
+return metaClient.getBasePath();
+  }
+
+  public FileSystem getFs() {
+return fs;
+  }
+
+  public void closeQuietly(ResultSet resultSet, Statement stmt) {
+try {
+  if (stmt != null) {
+stmt.close();
+  }
+} catch (SQLException e) {
+  LOG.error("Could not close the statement opened ", e);
+}
+
+try {
+  if (resultSet != null) {
+resultSet.close();
+  }
+} catch (SQLException e) {
+  LOG.error("Could not close the resultset opened ", e);
+}
+  }
+
+  /**
+   * Gets the schema for a hoodie table. Depending on the type of table, try 
to read schema from commit metadata if
+   * present, else fallback to reading from any file written in the latest 
commit. We will assume that the schema has
+   * not changed within a single atomic write.
+   *
+   * @return Parquet schema for this table
+   */
+  public MessageType getDataSchema() {
+try {
+  return new TableSchemaResolver(metaClient).getTableParquetSchema();
+} catch (Exception e) {
+  throw new HoodieSyncException("Failed to read data schema", e);
+}
+  }
+
+  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+  

[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-07-09 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r452166076



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##
@@ -237,6 +237,9 @@ public Operation convert(String value) throws 
ParameterException {
 @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing 
to hive")
 public Boolean enableHiveSync = false;
 
+@Parameter(names = {"--hoodie-sync-client-tool-class"}, description = 
"Meta sync client tool, using comma to separate multi tools")

Review comment:
   This place is worth discussing. For compatible, we just add a 
--hoodie-sync-client-tool-class.
   But i think use change --enable-hive-sync to   --enable-sync reasonable , or 
just add a new parameter --enable-sync and compatible the --enable-hive-sync 
parameter for old users. cc @vinothchandar @leesf 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-07-09 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r452160822



##
File path: 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java
##
@@ -20,12 +20,14 @@
 
 import org.apache.parquet.schema.MessageType;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.StringJoiner;
+import java.util.ArrayList;

Review comment:
   okay

##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##
@@ -63,10 +67,11 @@
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Objects;
+import java.util.Properties;

Review comment:
   thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-07-09 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r452160698



##
File path: 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
##
@@ -272,6 +268,7 @@ void createTable(String tableName, MessageType 
storageSchema, String inputFormat
   /**
* Get the table schema.
*/
+  // overwrite

Review comment:
   agree with you. can put the abstract method getTableSchema to base 
syncClient.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-07-09 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r452157405



##
File path: hudi-sync/hudi-hive-sync/pom.xml
##
@@ -43,6 +45,11 @@
   hudi-hadoop-mr
   ${project.version}
 
+
+  org.apache.hudi
+  hudi-sync-common

Review comment:
   Thanks, I thank put hudi-sync-common base class to hudi-common is make 
sense. but hudi-sync-hive 、hudi-sync-dla under hudi/ directory  will make 
hudi code so much moudle .  
   I think another choice to  put hudi-sync-common base class to hudi-common , 
and hudi-sync-hive 、hudi-sync-dla  etc.. under hudi-sync.
   what about yours suggestion @vinothchandar @leesf ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

2020-07-09 Thread GitBox


lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r452153918



##
File path: 
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class AbstractSyncHoodieClient {
+  private static final Logger LOG = 
LogManager.getLogger(AbstractSyncHoodieClient.class);
+  protected final HoodieTableMetaClient metaClient;
+  protected HoodieTimeline activeTimeline;
+  protected final HoodieTableType tableType;
+  protected final FileSystem fs;
+  private String basePath;
+  private boolean assumeDatePartitioning;
+
+  public AbstractSyncHoodieClient(String basePath, boolean 
assumeDatePartitioning, FileSystem fs) {
+this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+this.tableType = metaClient.getTableType();
+this.basePath = basePath;
+this.assumeDatePartitioning = assumeDatePartitioning;
+this.fs = fs;
+this.activeTimeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public abstract void createTable(String tableName, MessageType storageSchema,
+   String inputFormatClass, String 
outputFormatClass, String serdeClass);
+
+  public abstract boolean doesTableExist(String tableName);
+
+  public abstract Option getLastCommitTimeSynced(String tableName);
+
+  public abstract void updateLastCommitTimeSynced(String tableName);
+
+  public abstract void addPartitionsToTable(String tableName, List 
partitionsToAdd);
+
+  public abstract void updatePartitionsToTable(String tableName, List 
changedPartitions);
+
+  public HoodieTimeline getActiveTimeline() {
+return activeTimeline;
+  }
+
+  public HoodieTableType getTableType() {
+return tableType;
+  }
+
+  public String getBasePath() {
+return metaClient.getBasePath();
+  }
+
+  public FileSystem getFs() {
+return fs;
+  }
+
+  public void closeQuietly(ResultSet resultSet, Statement stmt) {
+try {
+  if (stmt != null) {
+stmt.close();
+  }
+} catch (SQLException e) {
+  LOG.error("Could not close the statement opened ", e);
+}
+
+try {
+  if (resultSet != null) {
+resultSet.close();
+  }
+} catch (SQLException e) {
+  LOG.error("Could not close the resultset opened ", e);
+}
+  }
+
+  /**
+   * Gets the schema for a hoodie table. Depending on the type of table, try 
to read schema from commit metadata if
+   * present, else fallback to reading from any file written in the latest 
commit. We will assume that the schema has
+   * not changed within a single atomic write.
+   *
+   * @return Parquet schema for this table
+   */
+  public MessageType getDataSchema() {
+try {
+  return new TableSchemaResolver(metaClient).getTableParquetSchema();
+} catch (Exception e) {
+  throw new HoodieSyncException("Failed to read data schema", e);
+}
+  }
+
+  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+