[GitHub] [hudi] danny0405 commented on a diff in pull request #8568: [HUDI-6134] prevent two clean run concurrently in flink.
danny0405 commented on code in PR #8568: URL: https://github.com/apache/hudi/pull/8568#discussion_r1194859993 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java: ## @@ -179,7 +179,7 @@ private void doCommit(String instant, HoodieClusteringPlan clusteringPlan, List< TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant); // whether to clean up the input base parquet files used for clustering -if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { +if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) { LOG.info("Running inline clean"); Review Comment: After some analysis I find that there is an option `hoodie.clean.allow.multiple` for multiple writer cleaning, and by default it is true. I also find that the clean action executor would try to clean all the pending cleaning instants on the timeline if `hoodie.clean.allow.multiple` is enabled. But in flink streaming, the cleaning is scheduled and executed in a single worker thread pool, that means at most 1 cleaning task is running for streaming pipeline. But there is possibility for batch ingestion job, the `#open` method and `#doCommit` method can trigger the cleaning in separeate threads. Is that your case here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8568: [HUDI-6134] prevent two clean run concurrently in flink.
danny0405 commented on code in PR #8568: URL: https://github.com/apache/hudi/pull/8568#discussion_r1194806781 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java: ## @@ -179,7 +179,7 @@ private void doCommit(String instant, HoodieClusteringPlan clusteringPlan, List< TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant); // whether to clean up the input base parquet files used for clustering -if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { +if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) { LOG.info("Running inline clean"); Review Comment: > Multiple cleaning tasks maybe running the same clean instant Can you explain why the same cleaning instant is executed by multiple tasks? You mean the cleaning service would try to execute the existing pending cleaning instant? Can you show us the code there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8568: [HUDI-6134] prevent two clean run concurrently in flink.
danny0405 commented on code in PR #8568: URL: https://github.com/apache/hudi/pull/8568#discussion_r1177902562 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java: ## @@ -179,7 +179,7 @@ private void doCommit(String instant, HoodieClusteringPlan clusteringPlan, List< TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant); // whether to clean up the input base parquet files used for clustering -if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { +if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) { LOG.info("Running inline clean"); Review Comment: > Multiple cleaning tasks maybe running the same clean instant, it's unnecessary. That's not possible, cleaning service may have clean plan, one file can only belongs to one cleaning plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8568: [HUDI-6134] prevent two clean run concurrently in flink.
danny0405 commented on code in PR #8568: URL: https://github.com/apache/hudi/pull/8568#discussion_r1177309173 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java: ## @@ -179,7 +179,7 @@ private void doCommit(String instant, HoodieClusteringPlan clusteringPlan, List< TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant); // whether to clean up the input base parquet files used for clustering -if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { +if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) { LOG.info("Running inline clean"); Review Comment: What's the matter if there are multiple cleaning tasks runing here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8568: [HUDI-6134] prevent two clean run concurrently in flink.
danny0405 commented on code in PR #8568: URL: https://github.com/apache/hudi/pull/8568#discussion_r1177250127 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java: ## @@ -64,7 +64,19 @@ public void open(Configuration parameters) throws Exception { this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); String instantTime = HoodieActiveTimeline.createNewInstantTime(); LOG.info(String.format("exec clean with instant time %s...", instantTime)); -executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning finish"); +executor.execute(() -> { + if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { +this.isCleaning = true; +try { + this.writeClient.clean(instantTime); +} catch (Throwable throwable) { Review Comment: We need to execute the cleaning no matter whether async cleaning is true or false. And this is a `NonThrownExecutor`, there is no need to catch the exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8568: [HUDI-6134] prevent two clean run concurrently in flink.
danny0405 commented on code in PR #8568: URL: https://github.com/apache/hudi/pull/8568#discussion_r1177250127 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java: ## @@ -64,7 +64,19 @@ public void open(Configuration parameters) throws Exception { this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); String instantTime = HoodieActiveTimeline.createNewInstantTime(); LOG.info(String.format("exec clean with instant time %s...", instantTime)); -executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning finish"); +executor.execute(() -> { + if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { +this.isCleaning = true; +try { + this.writeClient.clean(instantTime); +} catch (Throwable throwable) { Review Comment: We need to execute the cleaning no matter whether async cleaning is true or false. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org