This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f18a1c  Fixing bugs found during running hoodie demo (#760)
9f18a1c is described below

commit 9f18a1ca80ec1d08253688d9b4d4538a86068559
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Fri Jun 28 17:49:23 2019 -0700

    Fixing bugs found during running hoodie demo (#760)
---
 .../table/view/AbstractTableFileSystemView.java    |  2 +-
 .../hoodie/utilities/deltastreamer/DeltaSync.java  |  8 +++----
 .../deltastreamer/HoodieDeltaStreamer.java         | 26 ++++++++++++++++++++--
 3 files changed, 29 insertions(+), 7 deletions(-)

diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
index 129a584..8b05b69 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
@@ -216,7 +216,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
           log.info("Building file system view for partition (" + 
partitionPathStr + ")");
 
           // Create the path if it does not exist already
-          Path partitionPath = new Path(metaClient.getBasePath(), 
partitionPathStr);
+          Path partitionPath = 
FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPathStr);
           FSUtils.createPathIfNotExists(metaClient.getFs(), partitionPath);
           long beginLsTs = System.currentTimeMillis();
           FileStatus[] statuses = metaClient.getFs().listStatus(partitionPath);
diff --git 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
index c372b3c..89e5c73 100644
--- 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
+++ 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
@@ -151,6 +151,7 @@ public class DeltaSync implements Serializable {
    */
   private final HoodieTableType tableType;
 
+
   public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
       SchemaProvider schemaProvider, HoodieTableType tableType, 
TypedProperties props,
       JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
@@ -359,9 +360,8 @@ public class DeltaSync implements Serializable {
         log.info("Commit " + commitTime + " successful!");
 
         // Schedule compaction if needed
-        if (tableType.equals(HoodieTableType.MERGE_ON_READ) && 
cfg.continuousMode) {
-          scheduledCompactionInstant = writeClient
-              .scheduleCompaction(Optional.of(checkpointCommitMetadata));
+        if (cfg.isAsyncCompactionEnabled()) {
+          scheduledCompactionInstant = 
writeClient.scheduleCompaction(Optional.of(checkpointCommitMetadata));
         }
 
         // Sync to hive if enabled
@@ -458,7 +458,7 @@ public class DeltaSync implements Serializable {
             .withCompactionConfig(HoodieCompactionConfig.newBuilder()
                 .withPayloadClass(cfg.payloadClassName)
                 // Inline compaction is disabled for continuous mode. 
otherwise enabled for MOR
-                .withInlineCompaction(!cfg.continuousMode && 
tableType.equals(HoodieTableType.MERGE_ON_READ)).build())
+                .withInlineCompaction(cfg.isInlineCompactionEnabled()).build())
             .forTable(cfg.targetTableName)
             
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
             .withAutoCommit(false);
diff --git 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
index 6159414..c49f3f8 100644
--- 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -24,6 +24,7 @@ import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
+import com.google.common.base.Preconditions;
 import com.uber.hoodie.HoodieWriteClient;
 import com.uber.hoodie.OverwriteWithLatestAvroPayload;
 import com.uber.hoodie.SimpleKeyGenerator;
@@ -252,8 +253,26 @@ public class HoodieDeltaStreamer implements Serializable {
         + "https://spark.apache.org/docs/latest/job-scheduling.html";)
     public Integer compactSchedulingMinShare = 0;
 
+    /**
+     * Compaction is enabled for MoR table by default. This flag disables it
+     */
+    @Parameter(names = {"--disable-compaction"}, description = "Compaction is 
enabled for MoR table by default."
+        + "This flag disables it ")
+    public Boolean forceDisableCompaction = false;
+
     @Parameter(names = {"--help", "-h"}, help = true)
     public Boolean help = false;
+
+
+    public boolean isAsyncCompactionEnabled() {
+      return continuousMode && !forceDisableCompaction
+          && 
HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(storageType));
+    }
+
+    public boolean isInlineCompactionEnabled() {
+      return !continuousMode && !forceDisableCompaction
+          && 
HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(storageType));
+    }
   }
 
   public static void main(String[] args) throws Exception {
@@ -325,6 +344,9 @@ public class HoodieDeltaStreamer implements Serializable {
         HoodieTableMetaClient meta = new HoodieTableMetaClient(
             new Configuration(fs.getConf()), cfg.targetBasePath, false);
         tableType = meta.getTableType();
+        // This will guarantee there is no surprise with table type
+        
Preconditions.checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.storageType)),
+            "Hoodie table is of type " + tableType + " but passed in CLI 
argument is " + cfg.storageType);
       } else {
         tableType = HoodieTableType.valueOf(cfg.storageType);
       }
@@ -350,7 +372,7 @@ public class HoodieDeltaStreamer implements Serializable {
       ExecutorService executor = Executors.newFixedThreadPool(1);
       return Pair.of(CompletableFuture.supplyAsync(() -> {
         boolean error = false;
-        if (cfg.continuousMode && 
tableType.equals(HoodieTableType.MERGE_ON_READ)) {
+        if (cfg.isAsyncCompactionEnabled()) {
           // set Scheduler Pool.
           log.info("Setting Spark Pool name for delta-sync to " + 
SchedulerConfGenerator.DELTASYNC_POOL_NAME);
           jssc.setLocalProperty("spark.scheduler.pool", 
SchedulerConfGenerator.DELTASYNC_POOL_NAME);
@@ -395,7 +417,7 @@ public class HoodieDeltaStreamer implements Serializable {
      * @return
      */
     protected Boolean onInitializingWriteClient(HoodieWriteClient writeClient) 
{
-      if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
+      if (cfg.isAsyncCompactionEnabled()) {
         asyncCompactService = new AsyncCompactService(jssc, writeClient);
         // Enqueue existing pending compactions first
         HoodieTableMetaClient meta = new HoodieTableMetaClient(

Reply via email to