[GitHub] [hudi] danny0405 commented on a diff in pull request #8568: [HUDI-6134] prevent two clean run concurrently in flink.

2023-05-16 Thread via GitHub


danny0405 commented on code in PR #8568:
URL: https://github.com/apache/hudi/pull/8568#discussion_r1194859993


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java:
##
@@ -179,7 +179,7 @@ private void doCommit(String instant, HoodieClusteringPlan 
clusteringPlan, List<
 TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), 
table, instant);
 
 // whether to clean up the input base parquet files used for clustering
-if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
+if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
   LOG.info("Running inline clean");

Review Comment:
   After some analysis I find that there is an option 
`hoodie.clean.allow.multiple` for multiple writer cleaning, and by default it 
is true.
   
   I also find that the clean action executor would try to clean all the 
pending cleaning instants on the timeline if `hoodie.clean.allow.multiple` is 
enabled.
   
   But in flink streaming, the cleaning is scheduled and executed in a single 
worker thread pool, that means at most 1 cleaning task is running for streaming 
pipeline.
   
   But there is possibility for batch ingestion job, the `#open` method and 
`#doCommit` method can trigger the cleaning in separeate threads. Is that your 
case here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8568: [HUDI-6134] prevent two clean run concurrently in flink.

2023-05-16 Thread via GitHub


danny0405 commented on code in PR #8568:
URL: https://github.com/apache/hudi/pull/8568#discussion_r1194806781


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java:
##
@@ -179,7 +179,7 @@ private void doCommit(String instant, HoodieClusteringPlan 
clusteringPlan, List<
 TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), 
table, instant);
 
 // whether to clean up the input base parquet files used for clustering
-if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
+if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
   LOG.info("Running inline clean");

Review Comment:
   > Multiple cleaning tasks maybe running the same clean instant
   
   Can you explain why the same cleaning instant is executed by multiple tasks? 
You mean the cleaning service would try to execute the existing pending 
cleaning instant? Can you show us the code there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8568: [HUDI-6134] prevent two clean run concurrently in flink.

2023-04-26 Thread via GitHub


danny0405 commented on code in PR #8568:
URL: https://github.com/apache/hudi/pull/8568#discussion_r1177902562


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java:
##
@@ -179,7 +179,7 @@ private void doCommit(String instant, HoodieClusteringPlan 
clusteringPlan, List<
 TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), 
table, instant);
 
 // whether to clean up the input base parquet files used for clustering
-if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
+if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
   LOG.info("Running inline clean");

Review Comment:
   > Multiple cleaning tasks maybe running the same clean instant, it's 
unnecessary.
   
   That's not possible, cleaning service may have clean plan, one file can only 
belongs to one cleaning plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8568: [HUDI-6134] prevent two clean run concurrently in flink.

2023-04-25 Thread via GitHub


danny0405 commented on code in PR #8568:
URL: https://github.com/apache/hudi/pull/8568#discussion_r1177309173


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java:
##
@@ -179,7 +179,7 @@ private void doCommit(String instant, HoodieClusteringPlan 
clusteringPlan, List<
 TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), 
table, instant);
 
 // whether to clean up the input base parquet files used for clustering
-if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
+if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
   LOG.info("Running inline clean");

Review Comment:
   What's the matter if there are multiple cleaning tasks runing here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8568: [HUDI-6134] prevent two clean run concurrently in flink.

2023-04-25 Thread via GitHub


danny0405 commented on code in PR #8568:
URL: https://github.com/apache/hudi/pull/8568#discussion_r1177250127


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java:
##
@@ -64,7 +64,19 @@ public void open(Configuration parameters) throws Exception {
 this.executor = 
NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
 String instantTime = HoodieActiveTimeline.createNewInstantTime();
 LOG.info(String.format("exec clean with instant time %s...", instantTime));
-executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning 
finish");
+executor.execute(() -> {
+  if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
+this.isCleaning = true;
+try {
+  this.writeClient.clean(instantTime);
+} catch (Throwable throwable) {

Review Comment:
   We need to execute the cleaning no matter whether async cleaning is true or 
false.
   And this is a `NonThrownExecutor`, there is no need to catch the exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8568: [HUDI-6134] prevent two clean run concurrently in flink.

2023-04-25 Thread via GitHub


danny0405 commented on code in PR #8568:
URL: https://github.com/apache/hudi/pull/8568#discussion_r1177250127


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java:
##
@@ -64,7 +64,19 @@ public void open(Configuration parameters) throws Exception {
 this.executor = 
NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
 String instantTime = HoodieActiveTimeline.createNewInstantTime();
 LOG.info(String.format("exec clean with instant time %s...", instantTime));
-executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning 
finish");
+executor.execute(() -> {
+  if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
+this.isCleaning = true;
+try {
+  this.writeClient.clean(instantTime);
+} catch (Throwable throwable) {

Review Comment:
   We need to execute the cleaning no matter whether async cleaning is true or 
false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org