yihua commented on a change in pull request #3857:
URL: https://github.com/apache/hudi/pull/3857#discussion_r752819581



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
##########
@@ -59,15 +69,15 @@
 
   public static final ConfigProperty<String> PLAN_STRATEGY_CLASS_NAME = 
ConfigProperty
       .key("hoodie.clustering.plan.strategy.class")
-      
.defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy")
+      .noDefaultValue()

Review comment:
       Looks like I don't have to make this change to have engine-specific 
configs here.  I reverted the changes.  Regarding overriding 
`getWriteConfig()`, I think it may be better to put all configs in this class 
so it's easier to make changes in one place without confusion.
   
   For the docs, do we need to mention that for Java engine, the default class 
names are different?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -99,6 +99,12 @@
       .withDocumentation("Controls how compaction scheduling is triggered, by 
time or num delta commits or combination of both. "
           + "Valid options: " + 
Arrays.stream(CompactionTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(",")));
 
+  public static final ConfigProperty<String> ASYNC_COMPACT_ENABLE = 
ConfigProperty

Review comment:
       Good point.  I'll check this.

##########
File path: 
hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
##########
@@ -105,6 +111,16 @@ public String startCommit() {
   public void endCommit(String commitTime, List<WriteStatus> writeStatuses, 
Map<String, String> extraMetadata) {
     javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata));
     LOG.info("Ending Hudi commit " + commitTime);
+
+    // Schedule clustering and compaction as needed.
+    if (writeConfig.isAsyncClusteringEnabled()) {
+      javaClient.scheduleClustering(Option.empty()).ifPresent(
+          instantTs -> LOG.info("Scheduled clustering at instant time:" + 
instantTs));
+    }
+    if (writeConfig.isAsyncCompactionEnabled()) {
+      javaClient.scheduleCompaction(Option.empty()).ifPresent(
+          instantTs -> LOG.info("Scheduled compaction at instant time:" + 
instantTs));
+    }

Review comment:
       @rmahindra123 @vinothchandar Currently, I put flags around the 
scheduling of compaction and clustering. 
`writeConfig.isAsyncCompactionEnabled()` is a newly added API.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
##########
@@ -430,9 +446,37 @@ public Builder withDataOptimizeDataSkippingEnable(boolean 
dataSkipping) {
     }
 
     public HoodieClusteringConfig build() {
+      clusteringConfig.setDefaultValue(
+          PLAN_STRATEGY_CLASS_NAME, 
getDefaultPlanStrategyClassName(engineType));
+      clusteringConfig.setDefaultValue(
+          EXECUTION_STRATEGY_CLASS_NAME, 
getDefaultExecutionStrategyClassName(engineType));
       clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName());
       return clusteringConfig;
     }
+
+    private String getDefaultPlanStrategyClassName(EngineType engineType) {
+      switch (engineType) {
+        case SPARK:
+          return SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
+        case FLINK:
+        case JAVA:
+          return JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;

Review comment:
       Yes, since Flink client is Java based.  However, currently, Flink does 
not have clustering action integrated.

##########
File path: 
hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
##########
@@ -54,15 +55,20 @@
 
   private final Option<HoodieTableMetaClient> tableMetaClient;
   private final Configuration hadoopConf;
+  private final HoodieWriteConfig writeConfig;
+  private final KafkaConnectConfigs connectConfigs;
   private final String tableBasePath;
   private final String tableName;
   private final HoodieEngineContext context;
 
   private final HoodieJavaWriteClient<HoodieAvroPayload> javaClient;
 
   public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) 
throws HoodieException {
-    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
-        .withProperties(connectConfigs.getProps()).build();
+    this.connectConfigs = connectConfigs;
+    this.writeConfig = HoodieWriteConfig.newBuilder()
+        .withEngineType(EngineType.JAVA)
+        .withProperties(connectConfigs.getProps())

Review comment:
       @rmahindra123 @vinothchandar do we want to explicitly turn off inline 
compaction and clustering here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to