This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9f18a1c Fixing bugs found during running hoodie demo (#760)
9f18a1c is described below
commit 9f18a1ca80ec1d08253688d9b4d4538a86068559
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Fri Jun 28 17:49:23 2019 -0700
Fixing bugs found during running hoodie demo (#760)
---
.../table/view/AbstractTableFileSystemView.java | 2 +-
.../hoodie/utilities/deltastreamer/DeltaSync.java | 8 +++----
.../deltastreamer/HoodieDeltaStreamer.java | 26 ++++++++++++++++++++--
3 files changed, 29 insertions(+), 7 deletions(-)
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
index 129a584..8b05b69 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
@@ -216,7 +216,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
log.info("Building file system view for partition (" +
partitionPathStr + ")");
// Create the path if it does not exist already
- Path partitionPath = new Path(metaClient.getBasePath(),
partitionPathStr);
+ Path partitionPath =
FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPathStr);
FSUtils.createPathIfNotExists(metaClient.getFs(), partitionPath);
long beginLsTs = System.currentTimeMillis();
FileStatus[] statuses = metaClient.getFs().listStatus(partitionPath);
diff --git
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
index c372b3c..89e5c73 100644
---
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
+++
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
@@ -151,6 +151,7 @@ public class DeltaSync implements Serializable {
*/
private final HoodieTableType tableType;
+
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieTableType tableType,
TypedProperties props,
JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
@@ -359,9 +360,8 @@ public class DeltaSync implements Serializable {
log.info("Commit " + commitTime + " successful!");
// Schedule compaction if needed
- if (tableType.equals(HoodieTableType.MERGE_ON_READ) &&
cfg.continuousMode) {
- scheduledCompactionInstant = writeClient
- .scheduleCompaction(Optional.of(checkpointCommitMetadata));
+ if (cfg.isAsyncCompactionEnabled()) {
+ scheduledCompactionInstant =
writeClient.scheduleCompaction(Optional.of(checkpointCommitMetadata));
}
// Sync to hive if enabled
@@ -458,7 +458,7 @@ public class DeltaSync implements Serializable {
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass(cfg.payloadClassName)
// Inline compaction is disabled for continuous mode.
otherwise enabled for MOR
- .withInlineCompaction(!cfg.continuousMode &&
tableType.equals(HoodieTableType.MERGE_ON_READ)).build())
+ .withInlineCompaction(cfg.isInlineCompactionEnabled()).build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withAutoCommit(false);
diff --git
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
index 6159414..c49f3f8 100644
---
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
+++
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -24,6 +24,7 @@ import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
+import com.google.common.base.Preconditions;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.OverwriteWithLatestAvroPayload;
import com.uber.hoodie.SimpleKeyGenerator;
@@ -252,8 +253,26 @@ public class HoodieDeltaStreamer implements Serializable {
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer compactSchedulingMinShare = 0;
+ /**
+ * Compaction is enabled for MoR table by default. This flag disables it
+ */
+ @Parameter(names = {"--disable-compaction"}, description = "Compaction is
enabled for MoR table by default."
+ + "This flag disables it ")
+ public Boolean forceDisableCompaction = false;
+
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
+
+
+ public boolean isAsyncCompactionEnabled() {
+ return continuousMode && !forceDisableCompaction
+ &&
HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(storageType));
+ }
+
+ public boolean isInlineCompactionEnabled() {
+ return !continuousMode && !forceDisableCompaction
+ &&
HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(storageType));
+ }
}
public static void main(String[] args) throws Exception {
@@ -325,6 +344,9 @@ public class HoodieDeltaStreamer implements Serializable {
HoodieTableMetaClient meta = new HoodieTableMetaClient(
new Configuration(fs.getConf()), cfg.targetBasePath, false);
tableType = meta.getTableType();
+ // This will guarantee there is no surprise with table type
+
Preconditions.checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.storageType)),
+ "Hoodie table is of type " + tableType + " but passed in CLI
argument is " + cfg.storageType);
} else {
tableType = HoodieTableType.valueOf(cfg.storageType);
}
@@ -350,7 +372,7 @@ public class HoodieDeltaStreamer implements Serializable {
ExecutorService executor = Executors.newFixedThreadPool(1);
return Pair.of(CompletableFuture.supplyAsync(() -> {
boolean error = false;
- if (cfg.continuousMode &&
tableType.equals(HoodieTableType.MERGE_ON_READ)) {
+ if (cfg.isAsyncCompactionEnabled()) {
// set Scheduler Pool.
log.info("Setting Spark Pool name for delta-sync to " +
SchedulerConfGenerator.DELTASYNC_POOL_NAME);
jssc.setLocalProperty("spark.scheduler.pool",
SchedulerConfGenerator.DELTASYNC_POOL_NAME);
@@ -395,7 +417,7 @@ public class HoodieDeltaStreamer implements Serializable {
* @return
*/
protected Boolean onInitializingWriteClient(HoodieWriteClient writeClient)
{
- if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
+ if (cfg.isAsyncCompactionEnabled()) {
asyncCompactService = new AsyncCompactService(jssc, writeClient);
// Enqueue existing pending compactions first
HoodieTableMetaClient meta = new HoodieTableMetaClient(