codope commented on code in PR #5854:
URL: https://github.com/apache/hudi/pull/5854#discussion_r910986567


##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java:
##########
@@ -133,6 +135,14 @@ public <T> String getString(ConfigProperty<T> 
configProperty) {
     return rawValue.map(Object::toString).orElse(null);
   }
 
+  public <T> List<String> getSplitStrings(ConfigProperty<T> configProperty) {
+    return getSplitStrings(configProperty, ",");
+  }
+
+  public <T> List<String> getSplitStrings(ConfigProperty<T> configProperty, 
String delimiter) {
+    return StringUtils.split(getString(configProperty), delimiter);

Review Comment:
   Should we filter out empty strings?



##########
hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java:
##########
@@ -106,14 +119,19 @@ private void syncCoWTable(HoodieBigQuerySyncClient 
bqSyncClient) {
     LOG.info("Sync table complete for " + snapshotViewName);
   }
 
+  @Override
+  public void close() {

Review Comment:
   Since it's a subclass of an auto-closeable class, why do we  even need to 
override here?



##########
hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncTool.java:
##########
@@ -18,53 +18,44 @@
 
 package org.apache.hudi.aws.sync;
 
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HiveSyncTool;
 
 import com.beust.jcommander.JCommander;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.util.Properties;
 
 /**
  * Currently Experimental. Utility class that implements syncing a Hudi Table 
with the
  * AWS Glue Data Catalog 
(https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html)
  * to enable querying via Glue ETLs, Athena etc.
- *
+ * <p>
  * Extends HiveSyncTool since most logic is similar to Hive syncing,
  * expect using a different client {@link AWSGlueCatalogSyncClient} that 
implements
  * the necessary functionality using Glue APIs.
  *
  * @Experimental
  */
-public class AwsGlueCatalogSyncTool extends HiveSyncTool {
-
-  public AwsGlueCatalogSyncTool(TypedProperties props, Configuration conf, 
FileSystem fs) {
-    super(props, new HiveConf(conf, HiveConf.class), fs);
-  }
+public class AWSGlueCatalogSyncTool extends HiveSyncTool {
 
-  public AwsGlueCatalogSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf 
hiveConf, FileSystem fs) {
-    super(hiveSyncConfig, hiveConf, fs);
+  public AWSGlueCatalogSyncTool(Properties props, Configuration hadoopConf) {
+    super(props, hadoopConf);

Review Comment:
   +1 on getting rid of HiveConf from subclasses.



##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.sync.common.model.Partition;
+import org.apache.hudi.sync.common.model.PartitionEvent;
+import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA;
+
+public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, 
AutoCloseable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSyncClient.class);
+
+  protected final HoodieSyncConfig config;
+  protected final PartitionValueExtractor partitionValueExtractor;
+  protected final HoodieTableMetaClient metaClient;
+
+  public HoodieSyncClient(HoodieSyncConfig config) {
+    this.config = config;
+    this.partitionValueExtractor = 
ReflectionUtils.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+    this.metaClient = HoodieTableMetaClient.builder()
+        .setConf(config.getHadoopConf())
+        .setBasePath(config.getString(META_SYNC_BASE_PATH))
+        .setLoadActiveTimelineOnLoad(true)
+        .build();
+  }
+
+  public HoodieTimeline getActiveTimeline() {
+    return 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public HoodieTableType getTableType() {
+    return metaClient.getTableType();
+  }
+
+  public String getBasePath() {
+    return metaClient.getBasePath();
+  }
+
+  public boolean isBootstrap() {
+    return metaClient.getTableConfig().getBootstrapBasePath().isPresent();
+  }
+
+  public boolean isDropPartition() {
+    try {
+      Option<HoodieCommitMetadata> hoodieCommitMetadata = 
HoodieTableMetadataUtil.getLatestCommitMetadata(metaClient);
+
+      if (hoodieCommitMetadata.isPresent()
+          && 
WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType()))
 {
+        return true;
+      }
+    } catch (Exception e) {
+      throw new HoodieSyncException("Failed to get commit metadata", e);
+    }
+    return false;
+  }
+
+  public List<String> getPartitionsWrittenToSince(Option<String> 
lastCommitTimeSynced) {
+    if (!lastCommitTimeSynced.isPresent()) {
+      LOG.info("Last commit time synced is not known, listing all partitions 
in "
+          + config.getString(META_SYNC_BASE_PATH)
+          + ",FS :" + config.getHadoopFileSystem());
+      HoodieLocalEngineContext engineContext = new 
HoodieLocalEngineContext(metaClient.getHadoopConf());
+      return FSUtils.getAllPartitionPaths(engineContext,
+          config.getString(META_SYNC_BASE_PATH),
+          config.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA),
+          config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
+    } else {
+      LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", 
Getting commits since then");
+      return 
TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline()
+          .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
+    }
+  }
+
+  /**
+   * Iterate over the storage partitions and find if there are any new 
partitions that need to be added or updated.
+   * Generate a list of PartitionEvent based on the changes required.
+   */
+  public List<PartitionEvent> getPartitionEvents(List<Partition> 
tablePartitions, List<String> partitionStoragePartitions, boolean 
isDropPartition) {
+    Map<String, String> paths = new HashMap<>();
+    for (Partition tablePartition : tablePartitions) {
+      List<String> hivePartitionValues = tablePartition.getValues();
+      String fullTablePartitionPath =
+          Path.getPathWithoutSchemeAndAuthority(new 
Path(tablePartition.getStorageLocation())).toUri().getPath();
+      paths.put(String.join(", ", hivePartitionValues), 
fullTablePartitionPath);
+    }
+
+    List<PartitionEvent> events = new ArrayList<>();
+    for (String storagePartition : partitionStoragePartitions) {
+      Path storagePartitionPath = 
FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH), 
storagePartition);
+      String fullStoragePartitionPath = 
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+      // Check if the partition values or if hdfs path is the same
+      List<String> storagePartitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
+
+      if (isDropPartition) {
+        events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
+      } else {
+        if (!storagePartitionValues.isEmpty()) {
+          String storageValue = String.join(", ", storagePartitionValues);
+          if (!paths.containsKey(storageValue)) {
+            events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
+          } else if 
(!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+            
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
+          }
+        }

Review Comment:
   Duplicate code in `HoodieAdbJdbcClient`. Let's extract this block to a 
method and reuse that in subclass as well.



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java:
##########
@@ -90,11 +98,11 @@ public void createTable(String tableName, MessageType 
storageSchema, String inpu
   @Override
   public void updateTableDefinition(String tableName, MessageType newSchema) {
     try {
-      String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, 
config.partitionFields, config.supportTimestamp);
+      String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, 
config.getSplitStrings(META_SYNC_PARTITION_FIELDS), 
config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE));
       // Cascade clause should not be present for non-partitioned tables
-      String cascadeClause = config.partitionFields.size() > 0 ? " cascade" : 
"";
+      String cascadeClause = 
config.getSplitStrings(HIVE_SUPPORT_TIMESTAMP_TYPE).size() > 0 ? " cascade" : 
"";

Review Comment:
   let's make sure that `getSplitStrings` does not return empty string.



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java:
##########
@@ -48,19 +50,19 @@ public void syncHoodieTable() {
   @Override
   protected void syncHoodieTable(String tableName, boolean 
useRealtimeInputFormat, boolean readAsOptimized) {
     super.syncHoodieTable(tableName, useRealtimeInputFormat, readAsOptimized);
-    if (((GlobalHiveSyncConfig) hiveSyncConfig).globallyReplicatedTimeStamp != 
null) {
-      hoodieHiveClient.updateLastReplicatedTimeStamp(tableName,
-          ((GlobalHiveSyncConfig) hiveSyncConfig).globallyReplicatedTimeStamp);
+    Option<String> timestamp = 
Option.ofNullable(config.getString(META_SYNC_GLOBAL_REPLICATE_TIMESTAMP));
+    if (timestamp.isPresent()) {

Review Comment:
   Should we log a warning if the replicate_timestamp is not set?



##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/FieldSchema.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+import java.util.Objects;
+
+public class FieldSchema {

Review Comment:
   I see this is used in meta sync operations like `getStorageFieldSchemas` 
which is passed at many places. Should this implement `Serializable`?



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java:
##########
@@ -180,11 +181,11 @@ protected static void writeCommonPropsToFile(FileSystem 
dfs, String dfsBasePath)
     
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
dfsBasePath + "/target.avsc");
 
     // Hive Configs
-    props.setProperty(HiveSyncConfig.HIVE_URL.key(), 
"jdbc:hive2://127.0.0.1:9999/");
-    props.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), "testdb1");
-    props.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), "hive_trips");
-    props.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), 
"datestr");
-    props.setProperty(HiveSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
+    props.setProperty(HiveSyncConfigHolder.HIVE_URL.key(), 
"jdbc:hive2://127.0.0.1:9999/");

Review Comment:
   nit: static imports



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java:
##########
@@ -133,6 +135,14 @@ public <T> String getString(ConfigProperty<T> 
configProperty) {
     return rawValue.map(Object::toString).orElse(null);
   }
 
+  public <T> List<String> getSplitStrings(ConfigProperty<T> configProperty) {
+    return getSplitStrings(configProperty, ",");
+  }
+
+  public <T> List<String> getSplitStrings(ConfigProperty<T> configProperty, 
String delimiter) {
+    return StringUtils.split(getString(configProperty), delimiter);

Review Comment:
   Also, why can't we reuse `StringUtils#split`



##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java:
##########
@@ -165,24 +140,85 @@ public class HoodieSyncConfig extends HoodieConfig {
       .defaultValue("")
       .withDocumentation("The spark version used when syncing with a 
metastore.");
 
-  public HoodieSyncConfig(TypedProperties props) {
+  private Configuration hadoopConf;
+
+  public HoodieSyncConfig(Properties props) {
+    this(props, ConfigUtils.createHadoopConf(props));
+  }
+
+  public HoodieSyncConfig(Properties props, Configuration hadoopConf) {
     super(props);
-    setDefaults();
-
-    this.basePath = getStringOrDefault(META_SYNC_BASE_PATH);
-    this.databaseName = getStringOrDefault(META_SYNC_DATABASE_NAME);
-    this.tableName = getStringOrDefault(META_SYNC_TABLE_NAME);
-    this.baseFileFormat = getStringOrDefault(META_SYNC_BASE_FILE_FORMAT);
-    this.partitionFields = 
props.getStringList(META_SYNC_PARTITION_FIELDS.key(), ",", 
Collections.emptyList());
-    this.partitionValueExtractorClass = 
getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS);
-    this.assumeDatePartitioning = 
getBooleanOrDefault(META_SYNC_ASSUME_DATE_PARTITION);
-    this.decodePartition = 
getBooleanOrDefault(KeyGeneratorOptions.URL_ENCODE_PARTITIONING);
-    this.useFileListingFromMetadata = 
getBooleanOrDefault(META_SYNC_USE_FILE_LISTING_FROM_METADATA);
-    this.isConditionalSync = getBooleanOrDefault(META_SYNC_CONDITIONAL_SYNC);
-    this.sparkVersion = getStringOrDefault(META_SYNC_SPARK_VERSION);
+    this.hadoopConf = hadoopConf;
+  }
+
+  public void setHadoopConf(Configuration hadoopConf) {
+    this.hadoopConf = hadoopConf;
+  }
+
+  public Configuration getHadoopConf() {
+    return hadoopConf;
+  }
+
+  public FileSystem getHadoopFileSystem() {
+    return FSUtils.getFs(getString(META_SYNC_BASE_PATH), getHadoopConf());
   }
 
-  protected void setDefaults() {
-    this.setDefaultValue(META_SYNC_TABLE_NAME);
+  public String getAbsoluteBasePath() {
+    return getString(META_SYNC_BASE_PATH);
+  }
+
+  @Override
+  public String toString() {
+    return props.toString();
+  }
+
+  public static class HoodieSyncConfigParams {
+    @Parameter(names = {"--database"}, description = "name of the target 
database in meta store", required = true)
+    public String databaseName;
+    @Parameter(names = {"--table"}, description = "name of the target table in 
meta store", required = true)
+    public String tableName;
+    @Parameter(names = {"--base-path"}, description = "Base path of the hoodie 
table to sync", required = true)
+    public String basePath;
+    @Parameter(names = {"--base-file-format"}, description = "Format of the 
base files (PARQUET (or) HFILE)")
+    public String baseFileFormat;
+    @Parameter(names = "--partitioned-by", description = "Fields in the schema 
partitioned by")
+    public List<String> partitionFields;
+    @Parameter(names = "--partition-value-extractor", description = "Class 
which implements PartitionValueExtractor "
+        + "to extract the partition values from HDFS path")
+    public String partitionValueExtractorClass;
+    @Parameter(names = {"--assume-date-partitioning"}, description = "Assume 
standard yyyy/mm/dd partitioning, this"
+        + " exists to support backward compatibility. If you use hoodie 0.3.x, 
do not set this parameter")
+    public Boolean assumeDatePartitioning;
+    @Parameter(names = {"--decode-partition"}, description = "Decode the 
partition value if the partition has encoded during writing")
+    public Boolean decodePartition;
+    @Parameter(names = {"--use-file-listing-from-metadata"}, description = 
"Fetch file listing from Hudi's metadata")
+    public Boolean useFileListingFromMetadata;
+    @Parameter(names = {"--conditional-sync"}, description = "If true, only 
sync on conditions like schema change or partition change.")
+    public Boolean isConditionalSync;
+    @Parameter(names = {"--spark-version"}, description = "The spark version")
+    public String sparkVersion;
+
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public boolean help = false;
+
+    public boolean isHelp() {
+      return help;
+    }
+
+    public TypedProperties toProps() {
+      final TypedProperties props = new TypedProperties();
+      props.setPropertyIfNonNull(META_SYNC_BASE_PATH.key(), basePath);
+      props.setPropertyIfNonNull(META_SYNC_DATABASE_NAME.key(), databaseName);
+      props.setPropertyIfNonNull(META_SYNC_TABLE_NAME.key(), tableName);
+      props.setPropertyIfNonNull(META_SYNC_BASE_FILE_FORMAT.key(), 
baseFileFormat);
+      props.setPropertyIfNonNull(META_SYNC_PARTITION_FIELDS.key(), 
StringUtils.join(",", partitionFields));

Review Comment:
   Previously, this was being set as empty list in case of config not being set 
by the user. Now, it will be null. This could have side-effect downstream. Why 
change the default here?



##########
hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java:
##########
@@ -40,26 +47,28 @@
  *
  * @Experimental
  */
-public class BigQuerySyncTool extends AbstractSyncTool {
+public class BigQuerySyncTool extends HoodieSyncTool {
 
   private static final Logger LOG = 
LogManager.getLogger(BigQuerySyncTool.class);
 
-  public final BigQuerySyncConfig cfg;
+  public final BigQuerySyncConfig config;
+  public final String tableName;
   public final String manifestTableName;
   public final String versionsTableName;
   public final String snapshotViewName;
 
-  public BigQuerySyncTool(TypedProperties properties, Configuration conf, 
FileSystem fs) {
-    super(properties, conf, fs);
-    cfg = BigQuerySyncConfig.fromProps(properties);
-    manifestTableName = cfg.tableName + "_manifest";
-    versionsTableName = cfg.tableName + "_versions";
-    snapshotViewName = cfg.tableName;
+  public BigQuerySyncTool(Properties props) {
+    super(props);
+    this.config = new BigQuerySyncConfig(props);
+    this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME);

Review Comment:
   is there a validation somewhere that this config is mandatory?



##########
hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java:
##########
@@ -152,89 +109,101 @@ public class AdbSyncConfig extends HoodieSyncConfig {
       .defaultValue(false)
       .withDocumentation("Whether drop table before creation");
 
-  public AdbSyncConfig() {
-    this(new TypedProperties());
+  public AdbSyncConfig(Properties props) {
+    super(props);
+  }
+
+  @Override
+  public String getAbsoluteBasePath() {
+    return generateAbsolutePathStr(new Path(getString(META_SYNC_BASE_PATH)));

Review Comment:
   Do we  have a prior validation on META_SYNC_BASE_PATH being non-empty?



##########
hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java:
##########
@@ -69,7 +78,7 @@ public void syncHoodieTable() {
           throw new UnsupportedOperationException(bqSyncClient.getTableType() 
+ " table type is not supported yet.");
       }
     } catch (Exception e) {
-      throw new HoodieBigQuerySyncException("Got runtime exception when big 
query syncing " + cfg.tableName, e);
+      throw new HoodieBigQuerySyncException("Got runtime exception when big 
query syncing " + tableName, e);

Review Comment:
   nit: **bigquery** is a single word



##########
hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java:
##########
@@ -49,6 +49,24 @@ public TypedProperties(Properties defaults) {
     }
   }
 
+  public void setPropertyIfNonNull(String key, String value) {

Review Comment:
   We can define the method as `setPropertyIfNonNull(String key, Object value)` 
and convert to String if not null. This will eliminate the need for other two 
methods.



##########
hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java:
##########
@@ -49,6 +49,24 @@ public TypedProperties(Properties defaults) {
     }
   }
 
+  public void setPropertyIfNonNull(String key, String value) {

Review Comment:
   Perhaps we can define single method as `public void 
setPropertyIfNonNull(String key, Object value)` and avoid some duplicate code? 
At the end, all values are being written as String.



##########
hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java:
##########
@@ -166,76 +164,23 @@ public void createSnapshotView(String viewName, String 
versionsTableName, String
   }
 
   @Override
-  public Map<String, String> getTableSchema(String tableName) {
+  public Map<String, String> getMetastoreSchema(String tableName) {
     // TODO: Implement automatic schema evolution when you add a new column.
     return Collections.emptyMap();
   }
 
-  @Override
-  public void addPartitionsToTable(final String tableName, final List<String> 
partitionsToAdd) {
-    // bigQuery discovers the new partitions automatically, so do nothing.
-    throw new UnsupportedOperationException("No support for 
addPartitionsToTable yet.");
-  }
-
   public boolean datasetExists() {
-    Dataset dataset = bigquery.getDataset(DatasetId.of(syncConfig.projectId, 
syncConfig.datasetName));
+    Dataset dataset = bigquery.getDataset(DatasetId.of(projectId, 
datasetName));
     return dataset != null;
   }
 
-  @Override
-  public boolean doesTableExist(final String tableName) {
-    return tableExists(tableName);
-  }
-
   @Override
   public boolean tableExists(String tableName) {
-    TableId tableId = TableId.of(syncConfig.projectId, syncConfig.datasetName, 
tableName);
+    TableId tableId = TableId.of(projectId, datasetName, tableName);
     Table table = bigquery.getTable(tableId, BigQuery.TableOption.fields());
     return table != null && table.exists();
   }
 
-  @Override
-  public Option<String> getLastCommitTimeSynced(final String tableName) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("Not support 
getLastCommitTimeSynced yet.");
-  }
-
-  @Override
-  public void updateLastCommitTimeSynced(final String tableName) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("No support for 
updateLastCommitTimeSynced yet.");
-  }
-
-  @Override
-  public Option<String> getLastReplicatedTime(String tableName) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("Not support getLastReplicatedTime 
yet.");
-  }
-
-  @Override
-  public void updateLastReplicatedTimeStamp(String tableName, String 
timeStamp) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("No support for 
updateLastReplicatedTimeStamp yet.");
-  }
-
-  @Override
-  public void deleteLastReplicatedTimeStamp(String tableName) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("No support for 
deleteLastReplicatedTimeStamp yet.");
-  }
-
-  @Override
-  public void updatePartitionsToTable(final String tableName, final 
List<String> changedPartitions) {
-    // bigQuery updates the partitions automatically, so do nothing.
-    throw new UnsupportedOperationException("No support for 
updatePartitionsToTable yet.");
-  }
-
-  @Override
-  public void dropPartitions(String tableName, List<String> partitionsToDrop) {
-    // bigQuery discovers the new partitions automatically, so do nothing.
-    throw new UnsupportedOperationException("No support for dropPartitions 
yet.");
-  }
-
   @Override
   public void close() {
     // bigQuery has no connection close method, so do nothing.

Review Comment:
   I see. But, probably we can null out the instance and free up the heap?



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java:
##########
@@ -270,63 +264,6 @@ public static JavaRDD<HoodieRecord> 
dropDuplicates(JavaSparkContext jssc, JavaRD
     return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
   }
 
-  /**
-   * @deprecated Use {@link HiveSyncConfig} constructor directly and provide 
the props,
-   * and set {@link HoodieSyncConfig#META_SYNC_BASE_PATH} and {@link 
HoodieSyncConfig#META_SYNC_BASE_FILE_FORMAT} instead.
-   */
-  @Deprecated
-  public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, 
String basePath, String baseFileFormat) {

Review Comment:
   +1 on cleaning it up. It was a maintenance overhead.



##########
hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java:
##########
@@ -49,45 +52,52 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
       .noDefaultValue()
       .withDocumentation("Pluggable class to supply a DataHub REST emitter to 
connect to the DataHub instance. This overwrites other emitter configs.");
 
-  @Parameter(names = {"--identifier-class"}, description = "Pluggable class to 
help provide info to identify a DataHub Dataset.")
-  public String identifierClass;
+  public final HoodieDataHubDatasetIdentifier datasetIdentifier;
+
+  public DataHubSyncConfig(Properties props) {
+    super(props);
+    String identifierClass = 
getStringOrDefault(META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS);
+    datasetIdentifier = (HoodieDataHubDatasetIdentifier) 
ReflectionUtils.loadClass(identifierClass, new Class<?>[] {Properties.class}, 
props);
+  }
 
-  @Parameter(names = {"--emitter-server"}, description = "Server URL of the 
DataHub instance.")
-  public String emitterServer;
+  public RestEmitter getRestEmitter() {
+    if (contains(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS)) {
+      return ((DataHubEmitterSupplier) 
ReflectionUtils.loadClass(getString(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS))).get();
+    } else if (contains(META_SYNC_DATAHUB_EMITTER_SERVER)) {
+      return RestEmitter.create(b -> 
b.server(getString(META_SYNC_DATAHUB_EMITTER_SERVER)).token(getStringOrDefault(META_SYNC_DATAHUB_EMITTER_TOKEN,
 null)));
+    } else {
+      return RestEmitter.createWithDefaults();
+    }
+  }
 
-  @Parameter(names = {"--emitter-token"}, description = "Auth token to connect 
to the DataHub instance.")
-  public String emitterToken;
+  public static class DataHubSyncConfigParams {
 
-  @Parameter(names = {"--emitter-supplier-class"}, description = "Pluggable 
class to supply a DataHub REST emitter to connect to the DataHub instance. This 
overwrites other emitter configs.")
-  public String emitterSupplierClass;
+    @ParametersDelegate()
+    public final HoodieSyncConfigParams hoodieSyncConfigParams = new 
HoodieSyncConfigParams();
 
-  @Parameter(names = {"--help", "-h"}, help = true)
-  public Boolean help = false;
+    @Parameter(names = {"--identifier-class"}, description = "Pluggable class 
to help provide info to identify a DataHub Dataset.")
+    public String identifierClass;
 
-  public final HoodieDataHubDatasetIdentifier datasetIdentifier;
+    @Parameter(names = {"--emitter-server"}, description = "Server URL of the 
DataHub instance.")
+    public String emitterServer;
 
-  public DataHubSyncConfig() {
-    this(new TypedProperties());
-  }
+    @Parameter(names = {"--emitter-token"}, description = "Auth token to 
connect to the DataHub instance.")
+    public String emitterToken;
 
-  public DataHubSyncConfig(TypedProperties props) {
-    super(props);
-    identifierClass = 
getStringOrDefault(META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS);
-    emitterServer = getStringOrDefault(META_SYNC_DATAHUB_EMITTER_SERVER, null);
-    emitterToken = getStringOrDefault(META_SYNC_DATAHUB_EMITTER_TOKEN, null);
-    emitterSupplierClass = 
getStringOrDefault(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS, null);
+    @Parameter(names = {"--emitter-supplier-class"}, description = "Pluggable 
class to supply a DataHub REST emitter to connect to the DataHub instance. This 
overwrites other emitter configs.")
+    public String emitterSupplierClass;
 
-    datasetIdentifier = (HoodieDataHubDatasetIdentifier) ReflectionUtils
-        .loadClass(identifierClass, new Class<?>[] {TypedProperties.class}, 
props);
-  }
+    public boolean isHelp() {

Review Comment:
   Probably, we can move this to config superclass? It's needed everywhere.



##########
hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java:
##########
@@ -405,26 +423,69 @@ private String constructShowCreateDatabaseSql(String 
databaseName) {
 
   private String constructUpdateTblPropertiesSql(String tableName, String 
lastCommitSynced) {
     return String.format("alter table `%s`.`%s` set tblproperties('%s' = 
'%s')",
-        adbSyncConfig.databaseName, tableName, HOODIE_LAST_COMMIT_TIME_SYNC, 
lastCommitSynced);
+        databaseName, tableName, HOODIE_LAST_COMMIT_TIME_SYNC, 
lastCommitSynced);
   }
 
   private String constructAddColumnSql(String tableName, String columnName, 
String columnType) {
     return String.format("alter table `%s`.`%s` add columns(`%s` %s)",
-        adbSyncConfig.databaseName, tableName, columnName, columnType);
+        databaseName, tableName, columnName, columnType);
   }
 
   private String constructChangeColumnSql(String tableName, String columnName, 
String columnType) {
     return String.format("alter table `%s`.`%s` change `%s` `%s` %s",
-        adbSyncConfig.databaseName, tableName, columnName, columnName, 
columnType);
+        databaseName, tableName, columnName, columnName, columnType);
+  }
+
+  /**
+   * TODO align with {@link HoodieSyncClient#getPartitionEvents}
+   */
+  public List<PartitionEvent> getPartitionEvents(Map<List<String>, String> 
tablePartitions, List<String> partitionStoragePartitions) {
+    Map<String, String> paths = new HashMap<>();
+
+    for (Map.Entry<List<String>, String> entry : tablePartitions.entrySet()) {
+      List<String> partitionValues = entry.getKey();
+      String fullTablePartitionPath = entry.getValue();
+      paths.put(String.join(", ", partitionValues), fullTablePartitionPath);
+    }
+    List<PartitionEvent> events = new ArrayList<>();
+    for (String storagePartition : partitionStoragePartitions) {
+      Path storagePartitionPath = 
FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH), 
storagePartition);
+      String fullStoragePartitionPath = 
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+      // Check if the partition values or if hdfs path is the same
+      List<String> storagePartitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
+      if (config.getBoolean(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING)) {
+        String partition = String.join("/", storagePartitionValues);
+        storagePartitionPath = 
FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH), partition);
+        fullStoragePartitionPath = 
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+      }
+      if (!storagePartitionValues.isEmpty()) {
+        String storageValue = String.join(", ", storagePartitionValues);
+        if (!paths.containsKey(storageValue)) {
+          events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
+        } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+          events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
+        }
+      }
+    }
+    return events;
   }
 
-  private HiveSyncConfig getHiveSyncConfig() {
-    HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
-    hiveSyncConfig.partitionFields = adbSyncConfig.partitionFields;
-    hiveSyncConfig.databaseName = adbSyncConfig.databaseName;
-    Path basePath = new Path(adbSyncConfig.basePath);
-    hiveSyncConfig.basePath = generateAbsolutePathStr(basePath);
-    return hiveSyncConfig;
+  public void closeQuietly(ResultSet resultSet, Statement stmt) {

Review Comment:
   Probably we don't need this separate method to close if we do 
try-with-resources wherever needed. Both `ResultSet` and `Statement` implement 
`AutoCloseable`.



##########
hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java:
##########
@@ -19,113 +19,120 @@
 
 package org.apache.hudi.gcp.bigquery;
 
-import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
 
 import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParametersDelegate;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 
 /**
  * Configs needed to sync data into BigQuery.
  */
-public class BigQuerySyncConfig implements Serializable {
-
-  public static String BIGQUERY_SYNC_PROJECT_ID = 
"hoodie.gcp.bigquery.sync.project_id";
-  public static String BIGQUERY_SYNC_DATASET_NAME = 
"hoodie.gcp.bigquery.sync.dataset_name";
-  public static String BIGQUERY_SYNC_DATASET_LOCATION = 
"hoodie.gcp.bigquery.sync.dataset_location";
-  public static String BIGQUERY_SYNC_TABLE_NAME = 
"hoodie.gcp.bigquery.sync.table_name";
-  public static String BIGQUERY_SYNC_SOURCE_URI = 
"hoodie.gcp.bigquery.sync.source_uri";
-  public static String BIGQUERY_SYNC_SOURCE_URI_PREFIX = 
"hoodie.gcp.bigquery.sync.source_uri_prefix";
-  public static String BIGQUERY_SYNC_SYNC_BASE_PATH = 
"hoodie.gcp.bigquery.sync.base_path";
-  public static String BIGQUERY_SYNC_PARTITION_FIELDS = 
"hoodie.gcp.bigquery.sync.partition_fields";
-  public static String BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA = 
"hoodie.gcp.bigquery.sync.use_file_listing_from_metadata";
-  public static String BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING = 
"hoodie.gcp.bigquery.sync.assume_date_partitioning";
-
-  @Parameter(names = {"--project-id"}, description = "name of the target 
project in BigQuery", required = true)
-  public String projectId;
-  @Parameter(names = {"--dataset-name"}, description = "name of the target 
dataset in BigQuery", required = true)
-  public String datasetName;
-  @Parameter(names = {"--dataset-location"}, description = "location of the 
target dataset in BigQuery", required = true)
-  public String datasetLocation;
-  @Parameter(names = {"--table-name"}, description = "name of the target table 
in BigQuery", required = true)
-  public String tableName;
-  @Parameter(names = {"--source-uri"}, description = "name of the source uri 
gcs path of the table", required = true)
-  public String sourceUri;
-  @Parameter(names = {"--source-uri-prefix"}, description = "name of the 
source uri gcs path prefix of the table", required = true)
-  public String sourceUriPrefix;
-  @Parameter(names = {"--base-path"}, description = "Base path of the hoodie 
table to sync", required = true)
-  public String basePath;
-  @Parameter(names = {"--partitioned-by"}, description = "Comma-delimited 
partition fields. Default to non-partitioned.")
-  public List<String> partitionFields = new ArrayList<>();
-  @Parameter(names = {"--use-file-listing-from-metadata"}, description = 
"Fetch file listing from Hudi's metadata")
-  public Boolean useFileListingFromMetadata = false;
-  @Parameter(names = {"--assume-date-partitioning"}, description = "Assume 
standard yyyy/mm/dd partitioning, this"
-      + " exists to support backward compatibility. If you use hoodie 0.3.x, 
do not set this parameter")
-  public Boolean assumeDatePartitioning = false;
-  @Parameter(names = {"--help", "-h"}, help = true)
-  public Boolean help = false;
-
-  public static BigQuerySyncConfig copy(BigQuerySyncConfig cfg) {
-    BigQuerySyncConfig newConfig = new BigQuerySyncConfig();
-    newConfig.projectId = cfg.projectId;
-    newConfig.datasetName = cfg.datasetName;
-    newConfig.datasetLocation = cfg.datasetLocation;
-    newConfig.tableName = cfg.tableName;
-    newConfig.sourceUri = cfg.sourceUri;
-    newConfig.sourceUriPrefix = cfg.sourceUriPrefix;
-    newConfig.basePath = cfg.basePath;
-    newConfig.partitionFields = cfg.partitionFields;
-    newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
-    newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
-    newConfig.help = cfg.help;
-    return newConfig;
-  }
+public class BigQuerySyncConfig extends HoodieSyncConfig implements 
Serializable {
 
-  public TypedProperties toProps() {
-    TypedProperties properties = new TypedProperties();
-    properties.put(BIGQUERY_SYNC_PROJECT_ID, projectId);
-    properties.put(BIGQUERY_SYNC_DATASET_NAME, datasetName);
-    properties.put(BIGQUERY_SYNC_DATASET_LOCATION, datasetLocation);
-    properties.put(BIGQUERY_SYNC_TABLE_NAME, tableName);
-    properties.put(BIGQUERY_SYNC_SOURCE_URI, sourceUri);
-    properties.put(BIGQUERY_SYNC_SOURCE_URI_PREFIX, sourceUriPrefix);
-    properties.put(BIGQUERY_SYNC_SYNC_BASE_PATH, basePath);
-    properties.put(BIGQUERY_SYNC_PARTITION_FIELDS, String.join(",", 
partitionFields));
-    properties.put(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, 
useFileListingFromMetadata);
-    properties.put(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, 
assumeDatePartitioning);
-    return properties;
-  }
+  public static final ConfigProperty<String> BIGQUERY_SYNC_PROJECT_ID = 
ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.project_id")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> BIGQUERY_SYNC_DATASET_NAME = 
ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.dataset_name")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> BIGQUERY_SYNC_DATASET_LOCATION = 
ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.dataset_location")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> BIGQUERY_SYNC_TABLE_NAME = 
ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.table_name")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> BIGQUERY_SYNC_SOURCE_URI = 
ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.source_uri")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> BIGQUERY_SYNC_SOURCE_URI_PREFIX = 
ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.source_uri_prefix")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> BIGQUERY_SYNC_SYNC_BASE_PATH = 
ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.base_path")
+      .noDefaultValue()
+      .withDocumentation("");
 
-  public static BigQuerySyncConfig fromProps(TypedProperties props) {
-    BigQuerySyncConfig config = new BigQuerySyncConfig();
-    config.projectId = props.getString(BIGQUERY_SYNC_PROJECT_ID);
-    config.datasetName = props.getString(BIGQUERY_SYNC_DATASET_NAME);
-    config.datasetLocation = props.getString(BIGQUERY_SYNC_DATASET_LOCATION);
-    config.tableName = props.getString(BIGQUERY_SYNC_TABLE_NAME);
-    config.sourceUri = props.getString(BIGQUERY_SYNC_SOURCE_URI);
-    config.sourceUriPrefix = props.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX);
-    config.basePath = props.getString(BIGQUERY_SYNC_SYNC_BASE_PATH);
-    config.partitionFields = 
props.getStringList(BIGQUERY_SYNC_PARTITION_FIELDS, ",", 
Collections.emptyList());
-    config.useFileListingFromMetadata = 
props.getBoolean(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, false);
-    config.assumeDatePartitioning = 
props.getBoolean(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, false);
-    return config;
+  public static final ConfigProperty<String> BIGQUERY_SYNC_PARTITION_FIELDS = 
ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.partition_fields")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<Boolean> 
BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA = ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.use_file_listing_from_metadata")
+      .defaultValue(true)

Review Comment:
   Wasn't the default value false earlier?



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java:
##########
@@ -241,20 +248,20 @@ public void dropPartitionsToTable(String tableName, 
List<String> partitionsToDro
         if (HivePartitionUtil.partitionExists(client, tableName, 
dropPartition, partitionValueExtractor, syncConfig)) {
           String partitionClause =
               HivePartitionUtil.getPartitionClauseForDrop(dropPartition, 
partitionValueExtractor, syncConfig);
-          client.dropPartition(syncConfig.databaseName, tableName, 
partitionClause, false);
+          client.dropPartition(databaseName, tableName, partitionClause, 
false);
         }
         LOG.info("Drop partition " + dropPartition + " on " + tableName);
       }
     } catch (TException e) {
-      LOG.error(syncConfig.databaseName + "." + tableName + " drop partition 
failed", e);
-      throw new HoodieHiveSyncException(syncConfig.databaseName + "." + 
tableName + " drop partition failed", e);
+      LOG.error(databaseName + "." + tableName + " drop partition failed", e);
+      throw new HoodieHiveSyncException(databaseName + "." + tableName + " 
drop partition failed", e);

Review Comment:
   Should we also log partitions in case of failure in this method and 
dropPartitions?



##########
hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java:
##########
@@ -19,113 +19,120 @@
 
 package org.apache.hudi.gcp.bigquery;
 
-import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
 
 import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParametersDelegate;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 
 /**
  * Configs needed to sync data into BigQuery.
  */
-public class BigQuerySyncConfig implements Serializable {
-
-  public static String BIGQUERY_SYNC_PROJECT_ID = 
"hoodie.gcp.bigquery.sync.project_id";
-  public static String BIGQUERY_SYNC_DATASET_NAME = 
"hoodie.gcp.bigquery.sync.dataset_name";
-  public static String BIGQUERY_SYNC_DATASET_LOCATION = 
"hoodie.gcp.bigquery.sync.dataset_location";
-  public static String BIGQUERY_SYNC_TABLE_NAME = 
"hoodie.gcp.bigquery.sync.table_name";
-  public static String BIGQUERY_SYNC_SOURCE_URI = 
"hoodie.gcp.bigquery.sync.source_uri";
-  public static String BIGQUERY_SYNC_SOURCE_URI_PREFIX = 
"hoodie.gcp.bigquery.sync.source_uri_prefix";
-  public static String BIGQUERY_SYNC_SYNC_BASE_PATH = 
"hoodie.gcp.bigquery.sync.base_path";
-  public static String BIGQUERY_SYNC_PARTITION_FIELDS = 
"hoodie.gcp.bigquery.sync.partition_fields";
-  public static String BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA = 
"hoodie.gcp.bigquery.sync.use_file_listing_from_metadata";
-  public static String BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING = 
"hoodie.gcp.bigquery.sync.assume_date_partitioning";
-
-  @Parameter(names = {"--project-id"}, description = "name of the target 
project in BigQuery", required = true)
-  public String projectId;
-  @Parameter(names = {"--dataset-name"}, description = "name of the target 
dataset in BigQuery", required = true)
-  public String datasetName;
-  @Parameter(names = {"--dataset-location"}, description = "location of the 
target dataset in BigQuery", required = true)
-  public String datasetLocation;
-  @Parameter(names = {"--table-name"}, description = "name of the target table 
in BigQuery", required = true)
-  public String tableName;
-  @Parameter(names = {"--source-uri"}, description = "name of the source uri 
gcs path of the table", required = true)
-  public String sourceUri;
-  @Parameter(names = {"--source-uri-prefix"}, description = "name of the 
source uri gcs path prefix of the table", required = true)
-  public String sourceUriPrefix;
-  @Parameter(names = {"--base-path"}, description = "Base path of the hoodie 
table to sync", required = true)
-  public String basePath;
-  @Parameter(names = {"--partitioned-by"}, description = "Comma-delimited 
partition fields. Default to non-partitioned.")
-  public List<String> partitionFields = new ArrayList<>();
-  @Parameter(names = {"--use-file-listing-from-metadata"}, description = 
"Fetch file listing from Hudi's metadata")
-  public Boolean useFileListingFromMetadata = false;
-  @Parameter(names = {"--assume-date-partitioning"}, description = "Assume 
standard yyyy/mm/dd partitioning, this"
-      + " exists to support backward compatibility. If you use hoodie 0.3.x, 
do not set this parameter")
-  public Boolean assumeDatePartitioning = false;
-  @Parameter(names = {"--help", "-h"}, help = true)
-  public Boolean help = false;
-
-  public static BigQuerySyncConfig copy(BigQuerySyncConfig cfg) {
-    BigQuerySyncConfig newConfig = new BigQuerySyncConfig();
-    newConfig.projectId = cfg.projectId;
-    newConfig.datasetName = cfg.datasetName;
-    newConfig.datasetLocation = cfg.datasetLocation;
-    newConfig.tableName = cfg.tableName;
-    newConfig.sourceUri = cfg.sourceUri;
-    newConfig.sourceUriPrefix = cfg.sourceUriPrefix;
-    newConfig.basePath = cfg.basePath;
-    newConfig.partitionFields = cfg.partitionFields;
-    newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
-    newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
-    newConfig.help = cfg.help;
-    return newConfig;
-  }
+public class BigQuerySyncConfig extends HoodieSyncConfig implements 
Serializable {
 
-  public TypedProperties toProps() {
-    TypedProperties properties = new TypedProperties();
-    properties.put(BIGQUERY_SYNC_PROJECT_ID, projectId);
-    properties.put(BIGQUERY_SYNC_DATASET_NAME, datasetName);
-    properties.put(BIGQUERY_SYNC_DATASET_LOCATION, datasetLocation);
-    properties.put(BIGQUERY_SYNC_TABLE_NAME, tableName);
-    properties.put(BIGQUERY_SYNC_SOURCE_URI, sourceUri);
-    properties.put(BIGQUERY_SYNC_SOURCE_URI_PREFIX, sourceUriPrefix);
-    properties.put(BIGQUERY_SYNC_SYNC_BASE_PATH, basePath);
-    properties.put(BIGQUERY_SYNC_PARTITION_FIELDS, String.join(",", 
partitionFields));
-    properties.put(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, 
useFileListingFromMetadata);
-    properties.put(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, 
assumeDatePartitioning);
-    return properties;
-  }
+  public static final ConfigProperty<String> BIGQUERY_SYNC_PROJECT_ID = 
ConfigProperty

Review Comment:
   +1 for using ConfigProperty.
   Should we add some doc as well so that it gets automatically updated in our 
Hudi website configuration page?



##########
hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/replication/TestHiveSyncGlobalCommitTool.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+import org.apache.hudi.hive.testutils.TestCluster;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static 
org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
+import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS;
+import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
+import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT;
+import static 
org.apache.hudi.hive.replication.GlobalHiveSyncConfig.META_SYNC_GLOBAL_REPLICATE_TIMESTAMP;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_BASE_PATH;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_HIVE_SERVER_JDBC_URLS;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_HIVE_SITE_URI;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_BASE_PATH;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_HIVE_SERVER_JDBC_URLS;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_HIVE_SITE_URI;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHiveSyncGlobalCommitTool {
+
+  @RegisterExtension

Review Comment:
   Sorry i'm not very familiar with this annotation. Am I correct in assuming 
that by simply using this annotation, BeforeAll and AfterAll steps of 
`TestCluster` will be executed always? Asking as you are not stopping the 
local/remote cluster anywhere in this test.



##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.sync.common.model.Partition;
+import org.apache.hudi.sync.common.model.PartitionEvent;
+import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA;
+
+public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, 
AutoCloseable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSyncClient.class);
+
+  protected final HoodieSyncConfig config;
+  protected final PartitionValueExtractor partitionValueExtractor;
+  protected final HoodieTableMetaClient metaClient;
+
+  public HoodieSyncClient(HoodieSyncConfig config) {
+    this.config = config;
+    this.partitionValueExtractor = 
ReflectionUtils.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+    this.metaClient = HoodieTableMetaClient.builder()
+        .setConf(config.getHadoopConf())
+        .setBasePath(config.getString(META_SYNC_BASE_PATH))
+        .setLoadActiveTimelineOnLoad(true)
+        .build();
+  }
+
+  public HoodieTimeline getActiveTimeline() {
+    return 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public HoodieTableType getTableType() {
+    return metaClient.getTableType();
+  }
+
+  public String getBasePath() {
+    return metaClient.getBasePath();

Review Comment:
   let's use `getBasePathV2` api



##########
hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java:
##########
@@ -244,10 +244,10 @@ public void testSyncCOWTableWithProperties(boolean 
useSchemaFromCommitMetadata,
         put("tp_1", "p1");
       }
     };
-    hiveSyncProps.setProperty(HiveSyncConfig.HIVE_SYNC_MODE.key(), syncMode);
-    
hiveSyncProps.setProperty(HiveSyncConfig.HIVE_SYNC_AS_DATA_SOURCE_TABLE.key(), 
String.valueOf(syncAsDataSourceTable));
-    
hiveSyncProps.setProperty(HiveSyncConfig.HIVE_TABLE_SERDE_PROPERTIES.key(), 
ConfigUtils.configToString(serdeProperties));
-    hiveSyncProps.setProperty(HiveSyncConfig.HIVE_TABLE_PROPERTIES.key(), 
ConfigUtils.configToString(tableProperties));
+    hiveSyncProps.setProperty(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), 
syncMode);

Review Comment:
   optional: static imports wherever possible?



##########
hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/replication/TestHiveSyncGlobalCommitTool.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+import org.apache.hudi.hive.testutils.TestCluster;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static 
org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
+import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS;
+import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
+import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT;
+import static 
org.apache.hudi.hive.replication.GlobalHiveSyncConfig.META_SYNC_GLOBAL_REPLICATE_TIMESTAMP;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_BASE_PATH;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_HIVE_SERVER_JDBC_URLS;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_HIVE_SITE_URI;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_BASE_PATH;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_HIVE_SERVER_JDBC_URLS;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_HIVE_SITE_URI;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHiveSyncGlobalCommitTool {
+
+  @RegisterExtension
+  public static TestCluster localCluster = new TestCluster();
+  @RegisterExtension
+  public static TestCluster remoteCluster = new TestCluster();
+
+  private static final String DB_NAME = "foo";
+  private static final String TBL_NAME = "bar";
+
+  private HiveSyncGlobalCommitParams getGlobalCommitConfig(String commitTime) 
throws Exception {
+    HiveSyncGlobalCommitParams params = new HiveSyncGlobalCommitParams();
+    params.loadedProps.setProperty(LOCAL_HIVE_SITE_URI, 
localCluster.getHiveSiteXmlLocation());
+    params.loadedProps.setProperty(REMOTE_HIVE_SITE_URI, 
remoteCluster.getHiveSiteXmlLocation());
+    params.loadedProps.setProperty(LOCAL_HIVE_SERVER_JDBC_URLS, 
localCluster.getHiveJdBcUrl());
+    params.loadedProps.setProperty(REMOTE_HIVE_SERVER_JDBC_URLS, 
remoteCluster.getHiveJdBcUrl());
+    params.loadedProps.setProperty(LOCAL_BASE_PATH, 
localCluster.tablePath(DB_NAME, TBL_NAME));
+    params.loadedProps.setProperty(REMOTE_BASE_PATH, 
remoteCluster.tablePath(DB_NAME, TBL_NAME));
+    params.loadedProps.setProperty(META_SYNC_GLOBAL_REPLICATE_TIMESTAMP.key(), 
commitTime);
+    params.loadedProps.setProperty(HIVE_USER.key(), 
System.getProperty("user.name"));
+    params.loadedProps.setProperty(HIVE_PASS.key(), "");
+    params.loadedProps.setProperty(META_SYNC_DATABASE_NAME.key(), DB_NAME);
+    params.loadedProps.setProperty(META_SYNC_TABLE_NAME.key(), TBL_NAME);
+    params.loadedProps.setProperty(META_SYNC_BASE_PATH.key(), 
localCluster.tablePath(DB_NAME, TBL_NAME));
+    params.loadedProps.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), 
"true");
+    params.loadedProps.setProperty(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), 
"false");
+    params.loadedProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), 
"datestr");
+    return params;
+  }
+
+  private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitParams 
config) throws Exception {
+    assertEquals(localCluster.getHMSClient()
+        .getTable(DB_NAME, TBL_NAME).getParameters()
+        .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), remoteCluster.getHMSClient()
+        .getTable(DB_NAME, TBL_NAME).getParameters()
+        .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), "compare replicated 
timestamps");
+  }
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    localCluster.forceCreateDb(DB_NAME);

Review Comment:
   Since we're creating the db after each test here, so why not drop db (with 
cascade) after each test in the `clear` method?



##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.sync.common.model.FieldSchema;
+import org.apache.hudi.sync.common.model.Partition;
+
+import org.apache.parquet.schema.MessageType;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface HoodieMetaSyncOperations {
+
+  String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
+
+  /**
+   * Create the table.
+   *
+   * @param tableName         The table name.
+   * @param storageSchema     The table schema.
+   * @param inputFormatClass  The input format class of this table.
+   * @param outputFormatClass The output format class of this table.
+   * @param serdeClass        The serde class of this table.
+   * @param serdeProperties   The serde properties of this table.
+   * @param tableProperties   The table properties for this table.
+   */
+  default void createTable(String tableName,
+                           MessageType storageSchema,
+                           String inputFormatClass,
+                           String outputFormatClass,
+                           String serdeClass,
+                           Map<String, String> serdeProperties,
+                           Map<String, String> tableProperties) {
+
+  }
+
+  default boolean tableExists(String tableName) {
+    return false;
+  }
+
+  default void dropTable(String tableName) {
+
+  }
+
+  default void addPartitionsToTable(String tableName, List<String> 
partitionsToAdd) {

Review Comment:
   let's add javadoc for all these method? it's very hekpful especially when 
there are multiple arguments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to