[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1191850116 ## hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestOfflineHoodieCompactor.java: ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.offlinejob; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieLayoutConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner; +import org.apache.hudi.table.storage.HoodieStorageLayout; +import org.apache.hudi.utilities.HoodieCompactor; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; + +public class TestOfflineHoodieCompactor extends HoodieOfflineJobTestBase { + + protected HoodieCompactor initialHoodieCompactorClean(String tableBasePath, Boolean runSchedule, String scheduleAndExecute, + Boolean isAutoClean) { Review Comment: There seems to be no testing class for org.apache.hudi.utilities.HoodieCompactor before -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1191848563 ## hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestOfflineHoodieCompactor.java: ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.offlinejob; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieLayoutConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner; +import org.apache.hudi.table.storage.HoodieStorageLayout; +import org.apache.hudi.utilities.HoodieCompactor; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; + +public class TestOfflineHoodieCompactor extends HoodieOfflineJobTestBase { + + protected HoodieCompactor initialHoodieCompactorClean(String tableBasePath, Boolean runSchedule, String scheduleAndExecute, + Boolean isAutoClean) { Review Comment: > Can we move the tests to `TestHoodieCompactor` ? This test class belongs to the project hudi-spark-client(not hudi-utilities), and is mainly used to test online compaction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1186656966 ## hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java: ## @@ -295,4 +301,11 @@ private String getSchemaFromLatestInstant() throws Exception { Schema schema = schemaUtil.getTableAvroSchema(false); return schema.toString(); } + + private void cleanAfterCompact(SparkRDDWriteClient client) { +if (client.getConfig().isAutoClean()) { + LOG.info("Start to clean synchronously."); Review Comment: Auto-Clean can be modified through the input para config of the job constructor, just like unit testing. And this is also reasonable, allowing users to manually turn off cleaning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1186656966 ## hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java: ## @@ -295,4 +301,11 @@ private String getSchemaFromLatestInstant() throws Exception { Schema schema = schemaUtil.getTableAvroSchema(false); return schema.toString(); } + + private void cleanAfterCompact(SparkRDDWriteClient client) { +if (client.getConfig().isAutoClean()) { + LOG.info("Start to clean synchronously."); Review Comment: Auto-Clean can be modified through the input para config of the job constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1186146172 ## hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java: ## @@ -269,13 +272,14 @@ private int doCompact(JavaSparkContext jsc) throws Exception { } } HoodieWriteMetadata> compactionMetadata = client.compact(cfg.compactionInstantTime); + cleanAfterCompact(client); return UtilHelpers.handleErrors(compactionMetadata.getCommitMetadata().get(), cfg.compactionInstantTime); } } private Option doSchedule(JavaSparkContext jsc) { try (SparkRDDWriteClient client = - UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.of(cfg.strategyClassName), props)) { + UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.of(cfg.strategyClassName), props, cfg.asyncSerivceEanble)) { if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) { LOG.warn("No instant time is provided for scheduling compaction."); Review Comment: Removed this method, add clean config into props. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1185742751 ## hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java: ## @@ -256,7 +279,16 @@ private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception { LOG.info("The schedule instant time is " + instantTime.get()); LOG.info("Step 2: Do cluster"); Option metadata = client.cluster(instantTime.get()).getCommitMetadata(); + cleanAfterCluster(client); return UtilHelpers.handleErrors(metadata.get(), instantTime.get()); } } + + private void cleanAfterCluster(SparkRDDWriteClient client) { +client.waitForAsyncServiceCompletion(); +if (client.getConfig().isAutoClean() && !client.getConfig().isAsyncClean()) { Review Comment: > We only need to add a sync cleaning of it is **enabled**, does that make sense to you? I originally intended to maintain consistency with Flinkjob, retaining both cleaning way. Adjusted to only keep synchronous cleaning mode -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1185642315 ## hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java: ## @@ -256,7 +279,16 @@ private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception { LOG.info("The schedule instant time is " + instantTime.get()); LOG.info("Step 2: Do cluster"); Option metadata = client.cluster(instantTime.get()).getCommitMetadata(); + cleanAfterCluster(client); return UtilHelpers.handleErrors(metadata.get(), instantTime.get()); } } + + private void cleanAfterCluster(SparkRDDWriteClient client) { +client.waitForAsyncServiceCompletion(); +if (client.getConfig().isAutoClean() && !client.getConfig().isAsyncClean()) { Review Comment: > I think we need to trigge a sync clean if it is enabled. IF isAsyncClean is enable, spark offline job will start an async-cleaning in prewrite like flink job. So if isAsyncClean is disable then add a synchronous cleanup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1185642208 ## hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java: ## @@ -256,7 +279,16 @@ private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception { LOG.info("The schedule instant time is " + instantTime.get()); LOG.info("Step 2: Do cluster"); Option metadata = client.cluster(instantTime.get()).getCommitMetadata(); + cleanAfterCluster(client); return UtilHelpers.handleErrors(metadata.get(), instantTime.get()); } } + + private void cleanAfterCluster(SparkRDDWriteClient client) { +client.waitForAsyncServiceCompletion(); +if (client.getConfig().isAutoClean() && !client.getConfig().isAsyncClean()) { Review Comment: > I think we need to trigge a sync clean if it is enabled. IF isAsyncClean is enable, spark offline job will start an async-cleaning in prewrite like flink job. So if isAsyncClean is disable then add a synchronous cleanup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1185641155 ## hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java: ## @@ -256,7 +279,16 @@ private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception { LOG.info("The schedule instant time is " + instantTime.get()); LOG.info("Step 2: Do cluster"); Option metadata = client.cluster(instantTime.get()).getCommitMetadata(); + cleanAfterCluster(client); return UtilHelpers.handleErrors(metadata.get(), instantTime.get()); } } + + private void cleanAfterCluster(SparkRDDWriteClient client) { +client.waitForAsyncServiceCompletion(); +if (client.getConfig().isAutoClean() && !client.getConfig().isAsyncClean()) { Review Comment: > What kind of async table service do we want to wait for here? Asyn-cleanning and Async-archiving. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1177963180 ## hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java: ## @@ -269,6 +269,7 @@ private int doCompact(JavaSparkContext jsc) throws Exception { } } HoodieWriteMetadata> compactionMetadata = client.compact(cfg.compactionInstantTime); + cleanAfterCompact(client); return UtilHelpers.handleErrors(compactionMetadata.getCommitMetadata().get(), cfg.compactionInstantTime); Review Comment: How about add an asyncEnable config like flink offline job,default is false? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1177963180 ## hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java: ## @@ -269,6 +269,7 @@ private int doCompact(JavaSparkContext jsc) throws Exception { } } HoodieWriteMetadata> compactionMetadata = client.compact(cfg.compactionInstantTime); + cleanAfterCompact(client); return UtilHelpers.handleErrors(compactionMetadata.getCommitMetadata().get(), cfg.compactionInstantTime); Review Comment: How about add an asyncEnable config like flink offline job? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1177963180 ## hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java: ## @@ -269,6 +269,7 @@ private int doCompact(JavaSparkContext jsc) throws Exception { } } HoodieWriteMetadata> compactionMetadata = client.compact(cfg.compactionInstantTime); + cleanAfterCompact(client); return UtilHelpers.handleErrors(compactionMetadata.getCommitMetadata().get(), cfg.compactionInstantTime); Review Comment: How about add a asyncEnable config like flink offline job? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1176051746 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java: ## @@ -245,6 +246,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION) ); } +waitForAsyncServiceCompletion(); LOG.info("Clustering successfully on commit " + clusteringCommitTime); Review Comment: Without this change,if config ASYNC_CLEAN = true,AsyncCleanerService will be used to do clean in offline job . ![image](https://user-images.githubusercontent.com/34104400/234192874-e369bead-cc4a-4c8e-ab0a-c4791c8bd0ef.png) In my unit testing for offline job,if the completion time of the compact/cluster job is earlier than the completion time of the sync-cleaning job, function BaseHoodieTableServiceClient.close() will force the asunc-cleaning job to be closed. https://user-images.githubusercontent.com/34104400/234192499-dabdbd5f-8df8-476f-9812-72151e5d6873.png;> So I added this wait and made the entire task wait for clean to complete before smoothly exiting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1176051746 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java: ## @@ -245,6 +246,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION) ); } +waitForAsyncServiceCompletion(); LOG.info("Clustering successfully on commit " + clusteringCommitTime); Review Comment: Without this change,if config ASYNC_CLEAN = true,AsyncCleanerService will be used to do clean in offline job . In my unit testing for offline job,if the completion time of the compact/cluster job is earlier than the completion time of the sync-cleaning job, function BaseHoodieTableServiceClient.close() will force the asunc-cleaning job to be closed. So I added this wait and made the entire task wait for clean to complete before smoothly exiting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1176051746 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java: ## @@ -245,6 +246,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION) ); } +waitForAsyncServiceCompletion(); LOG.info("Clustering successfully on commit " + clusteringCommitTime); Review Comment: Without this change,if config ASYNC_CLEAN = true,AsyncCleanerService will be used to do clean in offline job . In my unit testing for offline job,if the completion time of the compact/cluster job is earlier than the completion time of the sync-cleaning job, function BaseHoodieTableServiceClient.close() will force the asynchronous cleaning job to be closed, it will causes interrupt Excpetion and end this cleaning. So I added this wait and made the entire task wait for clean to complete before smoothly exiting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1176051746 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java: ## @@ -245,6 +246,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION) ); } +waitForAsyncServiceCompletion(); LOG.info("Clustering successfully on commit " + clusteringCommitTime); Review Comment: Without this change,if config ASYNC_CLEAN = true,AsyncCleanerService will be used to do clean in offline job 。In my unit testing for offline job,if the completion time of the compact/cluster job is earlier than the completion time of the sync-cleaning job, function BaseHoodieTableServiceClient.close() will force the asynchronous cleaning job to be closed, it will causes interrupt Excpetion and end this cleaning. So I added this wait and made the entire task wait for clean to complete before smoothly exiting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1176051746 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java: ## @@ -245,6 +246,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION) ); } +waitForAsyncServiceCompletion(); LOG.info("Clustering successfully on commit " + clusteringCommitTime); Review Comment: Without this change,if config ASYNC_CLEAN = true,AsyncCleanerService will be used to do clean in offline job 。In my unit testing for offline job,if the completion time of the compact/cluster job is earlier than the completion time of the sync-cleaning job, function BaseHoodieTableServiceClient.close() will force the asynchronous cleaning job to be closed, it will causes interrupt Excpetion. So I added this wait and made the entire task wait for clean to complete before smoothly exiting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1176051746 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java: ## @@ -245,6 +246,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION) ); } +waitForAsyncServiceCompletion(); LOG.info("Clustering successfully on commit " + clusteringCommitTime); Review Comment: Without this change,if config ASYNC_CLEAN = true,AsyncCleanerService will be used to do clean in offline job 。In my unit testing for offline job,if the completion time of the compact/cluster job is earlier than the completion time of the sync-cleaning job, function BaseHoodieTableServiceClient.close() will force the asynchronous cleaning job to be closed, it will causes interrupt Excpetion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1175193258 ## hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieCompactor.java: ## @@ -0,0 +1,177 @@ +package org.apache.hudi.utilities; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieLayoutConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner; +import org.apache.hudi.table.storage.HoodieStorageLayout; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestHoodieCompactor extends UtilitiesTestBase { + private static final Logger LOG = LoggerFactory.getLogger(UtilitiesTestBase.class); + private HoodieTestDataGenerator dataGen; + private SparkRDDWriteClient client; + private HoodieTableMetaClient metaClient; + + @BeforeAll + public static void initClass() throws Exception { +UtilitiesTestBase.initTestServices(false, false, false); + } + + @BeforeEach + public void setup() { +dataGen = new HoodieTestDataGenerator(); + } + + protected HoodieCompactor initialHoodieCompactorSyncClean(String tableBasePath, Boolean runSchedule, String scheduleAndExecute) { +HoodieCompactor.Config compactionConfig = buildHoodieCompactionUtilConfig(tableBasePath, + runSchedule, scheduleAndExecute); +List configs = new ArrayList<>(); Review Comment: Rename to TestOfflineHoodieCompactor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job
zhuanshenbsj1 commented on code in PR #8505: URL: https://github.com/apache/hudi/pull/8505#discussion_r1174758600 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ## @@ -292,7 +292,9 @@ protected void completeCompaction(HoodieCommitMetadata metadata, protected HoodieWriteMetadata> compact(String compactionInstantTime, boolean shouldComplete) { HoodieSparkTable table = HoodieSparkTable.create(config, context); preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient()); -return tableServiceClient.compact(compactionInstantTime, shouldComplete); +HoodieWriteMetadata> compactionMetadata = tableServiceClient.compact(compactionInstantTime, shouldComplete); +autoCleanOnCommit(); +return compactionMetadata; Review Comment: Move the clean operation to offline && Add UT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org