yihua commented on a change in pull request #3857: URL: https://github.com/apache/hudi/pull/3857#discussion_r752819581
########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java ########## @@ -59,15 +69,15 @@ public static final ConfigProperty<String> PLAN_STRATEGY_CLASS_NAME = ConfigProperty .key("hoodie.clustering.plan.strategy.class") - .defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy") + .noDefaultValue() Review comment: Looks like I don't have to make this change to have engine-specific configs here. I reverted the changes. Regarding overriding `getWriteConfig()`, I think it may be better to put all configs in this class so it's easier to make changes in one place without confusion. For the docs, do we need to mention that for Java engine, the default class names are different? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java ########## @@ -99,6 +99,12 @@ .withDocumentation("Controls how compaction scheduling is triggered, by time or num delta commits or combination of both. " + "Valid options: " + Arrays.stream(CompactionTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(","))); + public static final ConfigProperty<String> ASYNC_COMPACT_ENABLE = ConfigProperty Review comment: Good point. I'll check this. ########## File path: hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java ########## @@ -105,6 +111,16 @@ public String startCommit() { public void endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) { javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata)); LOG.info("Ending Hudi commit " + commitTime); + + // Schedule clustering and compaction as needed. + if (writeConfig.isAsyncClusteringEnabled()) { + javaClient.scheduleClustering(Option.empty()).ifPresent( + instantTs -> LOG.info("Scheduled clustering at instant time:" + instantTs)); + } + if (writeConfig.isAsyncCompactionEnabled()) { + javaClient.scheduleCompaction(Option.empty()).ifPresent( + instantTs -> LOG.info("Scheduled compaction at instant time:" + instantTs)); + } Review comment: @rmahindra123 @vinothchandar Currently, I put flags around the scheduling of compaction and clustering. `writeConfig.isAsyncCompactionEnabled()` is a newly added API. ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java ########## @@ -430,9 +446,37 @@ public Builder withDataOptimizeDataSkippingEnable(boolean dataSkipping) { } public HoodieClusteringConfig build() { + clusteringConfig.setDefaultValue( + PLAN_STRATEGY_CLASS_NAME, getDefaultPlanStrategyClassName(engineType)); + clusteringConfig.setDefaultValue( + EXECUTION_STRATEGY_CLASS_NAME, getDefaultExecutionStrategyClassName(engineType)); clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName()); return clusteringConfig; } + + private String getDefaultPlanStrategyClassName(EngineType engineType) { + switch (engineType) { + case SPARK: + return SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY; + case FLINK: + case JAVA: + return JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY; Review comment: Yes, since Flink client is Java based. However, currently, Flink does not have clustering action integrated. ########## File path: hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java ########## @@ -54,15 +55,20 @@ private final Option<HoodieTableMetaClient> tableMetaClient; private final Configuration hadoopConf; + private final HoodieWriteConfig writeConfig; + private final KafkaConnectConfigs connectConfigs; private final String tableBasePath; private final String tableName; private final HoodieEngineContext context; private final HoodieJavaWriteClient<HoodieAvroPayload> javaClient; public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) throws HoodieException { - HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() - .withProperties(connectConfigs.getProps()).build(); + this.connectConfigs = connectConfigs; + this.writeConfig = HoodieWriteConfig.newBuilder() + .withEngineType(EngineType.JAVA) + .withProperties(connectConfigs.getProps()) Review comment: @rmahindra123 @vinothchandar do we want to explicitly turn off inline compaction and clustering here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org