danny0405 commented on code in PR #9087: URL: https://github.com/apache/hudi/pull/9087#discussion_r1248757750
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java: ########## @@ -190,7 +202,18 @@ public static boolean needsAsyncClustering(Configuration conf) { * @param conf The flink configuration. */ public static boolean needsScheduleClustering(Configuration conf) { - return isInsertOperation(conf) && conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED); + if (!conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED)) { + return false; + } + WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); + if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) { + // Write pipelines for table with consistent bucket index would detect whether clustering service occurs, + // and automatically adjust the partitioner and write function if clustering service happens. + // So it could handle UPSERT and INSERT case. Review Comment: What the behavior of UPSERT + consistent hashing index ? For insert operation, we do not expect to deduplicate, while the log file readers always merge the record payloads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org