[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-05-11 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1191850116


##
hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestOfflineHoodieCompactor.java:
##
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.offlinejob;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieLayoutConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
+import org.apache.hudi.table.storage.HoodieStorageLayout;
+import org.apache.hudi.utilities.HoodieCompactor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+
+public class TestOfflineHoodieCompactor extends HoodieOfflineJobTestBase {
+
+  protected HoodieCompactor initialHoodieCompactorClean(String tableBasePath, 
Boolean runSchedule, String scheduleAndExecute,
+ Boolean isAutoClean) {

Review Comment:
   There seems to be no testing class for 
org.apache.hudi.utilities.HoodieCompactor before



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-05-11 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1191848563


##
hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestOfflineHoodieCompactor.java:
##
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.offlinejob;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieLayoutConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
+import org.apache.hudi.table.storage.HoodieStorageLayout;
+import org.apache.hudi.utilities.HoodieCompactor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+
+public class TestOfflineHoodieCompactor extends HoodieOfflineJobTestBase {
+
+  protected HoodieCompactor initialHoodieCompactorClean(String tableBasePath, 
Boolean runSchedule, String scheduleAndExecute,
+ Boolean isAutoClean) {

Review Comment:
   > Can we move the tests to `TestHoodieCompactor` ?
   
   This test class belongs to the project hudi-spark-client(not 
hudi-utilities), and is mainly used to test online compaction. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-05-06 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1186656966


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -295,4 +301,11 @@ private String getSchemaFromLatestInstant() throws 
Exception {
 Schema schema = schemaUtil.getTableAvroSchema(false);
 return schema.toString();
   }
+
+  private void cleanAfterCompact(SparkRDDWriteClient client) {
+if (client.getConfig().isAutoClean()) {
+  LOG.info("Start to clean synchronously.");

Review Comment:
   Auto-Clean can be modified through the input para config of the job 
constructor, just like unit testing.  And this is also reasonable, allowing 
users to manually turn off cleaning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-05-06 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1186656966


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -295,4 +301,11 @@ private String getSchemaFromLatestInstant() throws 
Exception {
 Schema schema = schemaUtil.getTableAvroSchema(false);
 return schema.toString();
   }
+
+  private void cleanAfterCompact(SparkRDDWriteClient client) {
+if (client.getConfig().isAutoClean()) {
+  LOG.info("Start to clean synchronously.");

Review Comment:
   Auto-Clean can be modified through the input para config of the job 
constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-05-05 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1186146172


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -269,13 +272,14 @@ private int doCompact(JavaSparkContext jsc) throws 
Exception {
 }
   }
   HoodieWriteMetadata> compactionMetadata = 
client.compact(cfg.compactionInstantTime);
+  cleanAfterCompact(client);
   return 
UtilHelpers.handleErrors(compactionMetadata.getCommitMetadata().get(), 
cfg.compactionInstantTime);
 }
   }
 
   private Option doSchedule(JavaSparkContext jsc) {
 try (SparkRDDWriteClient client =
- UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", 
cfg.parallelism, Option.of(cfg.strategyClassName), props)) {
+ UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", 
cfg.parallelism, Option.of(cfg.strategyClassName), props, 
cfg.asyncSerivceEanble)) {
 
   if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
 LOG.warn("No instant time is provided for scheduling compaction.");

Review Comment:
   Removed this method, add clean config into props.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-05-05 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1185742751


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java:
##
@@ -256,7 +279,16 @@ private int doScheduleAndCluster(JavaSparkContext jsc) 
throws Exception {
   LOG.info("The schedule instant time is " + instantTime.get());
   LOG.info("Step 2: Do cluster");
   Option metadata = 
client.cluster(instantTime.get()).getCommitMetadata();
+  cleanAfterCluster(client);
   return UtilHelpers.handleErrors(metadata.get(), instantTime.get());
 }
   }
+
+  private void cleanAfterCluster(SparkRDDWriteClient client) {
+client.waitForAsyncServiceCompletion();
+if (client.getConfig().isAutoClean() && 
!client.getConfig().isAsyncClean()) {

Review Comment:
   > We only need to add a sync cleaning of it is **enabled**, does that make 
sense to you?
   
   
   I originally intended to maintain consistency with Flinkjob, retaining both 
cleaning way.  Adjusted to only keep synchronous cleaning mode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-05-04 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1185642315


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java:
##
@@ -256,7 +279,16 @@ private int doScheduleAndCluster(JavaSparkContext jsc) 
throws Exception {
   LOG.info("The schedule instant time is " + instantTime.get());
   LOG.info("Step 2: Do cluster");
   Option metadata = 
client.cluster(instantTime.get()).getCommitMetadata();
+  cleanAfterCluster(client);
   return UtilHelpers.handleErrors(metadata.get(), instantTime.get());
 }
   }
+
+  private void cleanAfterCluster(SparkRDDWriteClient client) {
+client.waitForAsyncServiceCompletion();
+if (client.getConfig().isAutoClean() && 
!client.getConfig().isAsyncClean()) {

Review Comment:
   > I think we need to trigge a sync clean if it is enabled.
   
   IF isAsyncClean is enable, spark offline job will start an async-cleaning in 
prewrite like flink job. So if isAsyncClean is disable then add a synchronous 
cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-05-04 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1185642208


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java:
##
@@ -256,7 +279,16 @@ private int doScheduleAndCluster(JavaSparkContext jsc) 
throws Exception {
   LOG.info("The schedule instant time is " + instantTime.get());
   LOG.info("Step 2: Do cluster");
   Option metadata = 
client.cluster(instantTime.get()).getCommitMetadata();
+  cleanAfterCluster(client);
   return UtilHelpers.handleErrors(metadata.get(), instantTime.get());
 }
   }
+
+  private void cleanAfterCluster(SparkRDDWriteClient client) {
+client.waitForAsyncServiceCompletion();
+if (client.getConfig().isAutoClean() && 
!client.getConfig().isAsyncClean()) {

Review Comment:
   > I think we need to trigge a sync clean if it is enabled.
   IF isAsyncClean is enable, spark offline job will start an async-cleaning in 
prewrite like flink job. So if isAsyncClean is disable then add a synchronous 
cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-05-04 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1185641155


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java:
##
@@ -256,7 +279,16 @@ private int doScheduleAndCluster(JavaSparkContext jsc) 
throws Exception {
   LOG.info("The schedule instant time is " + instantTime.get());
   LOG.info("Step 2: Do cluster");
   Option metadata = 
client.cluster(instantTime.get()).getCommitMetadata();
+  cleanAfterCluster(client);
   return UtilHelpers.handleErrors(metadata.get(), instantTime.get());
 }
   }
+
+  private void cleanAfterCluster(SparkRDDWriteClient client) {
+client.waitForAsyncServiceCompletion();
+if (client.getConfig().isAutoClean() && 
!client.getConfig().isAsyncClean()) {

Review Comment:
   > What kind of async table service do we want to wait for here?
   
   Asyn-cleanning and Async-archiving.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-04-26 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1177963180


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -269,6 +269,7 @@ private int doCompact(JavaSparkContext jsc) throws 
Exception {
 }
   }
   HoodieWriteMetadata> compactionMetadata = 
client.compact(cfg.compactionInstantTime);
+  cleanAfterCompact(client);
   return 
UtilHelpers.handleErrors(compactionMetadata.getCommitMetadata().get(), 
cfg.compactionInstantTime);

Review Comment:
   How about add an asyncEnable config like flink offline job,default is false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-04-26 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1177963180


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -269,6 +269,7 @@ private int doCompact(JavaSparkContext jsc) throws 
Exception {
 }
   }
   HoodieWriteMetadata> compactionMetadata = 
client.compact(cfg.compactionInstantTime);
+  cleanAfterCompact(client);
   return 
UtilHelpers.handleErrors(compactionMetadata.getCommitMetadata().get(), 
cfg.compactionInstantTime);

Review Comment:
   How about add an asyncEnable config like flink offline job?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-04-26 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1177963180


##
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##
@@ -269,6 +269,7 @@ private int doCompact(JavaSparkContext jsc) throws 
Exception {
 }
   }
   HoodieWriteMetadata> compactionMetadata = 
client.compact(cfg.compactionInstantTime);
+  cleanAfterCompact(client);
   return 
UtilHelpers.handleErrors(compactionMetadata.getCommitMetadata().get(), 
cfg.compactionInstantTime);

Review Comment:
   How about add a asyncEnable config like flink offline job?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-04-25 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1176051746


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##
@@ -245,6 +246,7 @@ private void completeClustering(HoodieReplaceCommitMetadata 
metadata,
   metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
   );
 }
+waitForAsyncServiceCompletion();
 LOG.info("Clustering successfully on commit " + clusteringCommitTime);

Review Comment:
   Without this change,if config ASYNC_CLEAN = true,AsyncCleanerService will be 
used to do clean in offline job . 
   
![image](https://user-images.githubusercontent.com/34104400/234192874-e369bead-cc4a-4c8e-ab0a-c4791c8bd0ef.png)
   In my unit testing for offline job,if the completion time of the 
compact/cluster job is earlier than the completion time of the sync-cleaning 
job, function BaseHoodieTableServiceClient.close() will force the 
asunc-cleaning job to be closed.
   https://user-images.githubusercontent.com/34104400/234192499-dabdbd5f-8df8-476f-9812-72151e5d6873.png;>
   
   So I added this wait and made the entire task wait for clean to complete 
before smoothly exiting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-04-25 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1176051746


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##
@@ -245,6 +246,7 @@ private void completeClustering(HoodieReplaceCommitMetadata 
metadata,
   metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
   );
 }
+waitForAsyncServiceCompletion();
 LOG.info("Clustering successfully on commit " + clusteringCommitTime);

Review Comment:
   Without this change,if config ASYNC_CLEAN = true,AsyncCleanerService will be 
used to do clean in offline job . In my unit testing for offline job,if the 
completion time of the compact/cluster job is earlier than the completion time 
of the sync-cleaning job, function BaseHoodieTableServiceClient.close() will 
force the asunc-cleaning job to be closed.
   So I added this wait and made the entire task wait for clean to complete 
before smoothly exiting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-04-25 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1176051746


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##
@@ -245,6 +246,7 @@ private void completeClustering(HoodieReplaceCommitMetadata 
metadata,
   metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
   );
 }
+waitForAsyncServiceCompletion();
 LOG.info("Clustering successfully on commit " + clusteringCommitTime);

Review Comment:
   Without this change,if config ASYNC_CLEAN = true,AsyncCleanerService will be 
used to do clean in offline job . In my unit testing for offline job,if the 
completion time of the compact/cluster job is earlier than the completion time 
of the sync-cleaning job, function BaseHoodieTableServiceClient.close() will 
force the asynchronous  cleaning job to be closed, it will causes interrupt 
Excpetion and end this cleaning.
   So I added this wait and made the entire task wait for clean to complete 
before smoothly exiting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-04-25 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1176051746


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##
@@ -245,6 +246,7 @@ private void completeClustering(HoodieReplaceCommitMetadata 
metadata,
   metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
   );
 }
+waitForAsyncServiceCompletion();
 LOG.info("Clustering successfully on commit " + clusteringCommitTime);

Review Comment:
   Without this change,if config ASYNC_CLEAN = true,AsyncCleanerService will be 
used to do clean in offline job 。In my unit testing for offline job,if the 
completion time of the compact/cluster job is earlier than the completion time 
of the sync-cleaning job, function BaseHoodieTableServiceClient.close() will 
force the asynchronous  cleaning job to be closed, it will causes interrupt 
Excpetion and end this cleaning.
   So I added this wait and made the entire task wait for clean to complete 
before smoothly exiting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-04-25 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1176051746


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##
@@ -245,6 +246,7 @@ private void completeClustering(HoodieReplaceCommitMetadata 
metadata,
   metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
   );
 }
+waitForAsyncServiceCompletion();
 LOG.info("Clustering successfully on commit " + clusteringCommitTime);

Review Comment:
   Without this change,if config ASYNC_CLEAN = true,AsyncCleanerService will be 
used to do clean in offline job 。In my unit testing for offline job,if the 
completion time of the compact/cluster job is earlier than the completion time 
of the sync-cleaning job, function BaseHoodieTableServiceClient.close() will 
force the asynchronous  cleaning job to be closed, it will causes interrupt 
Excpetion.
   So I added this wait and made the entire task wait for clean to complete 
before smoothly exiting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-04-25 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1176051746


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##
@@ -245,6 +246,7 @@ private void completeClustering(HoodieReplaceCommitMetadata 
metadata,
   metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
   );
 }
+waitForAsyncServiceCompletion();
 LOG.info("Clustering successfully on commit " + clusteringCommitTime);

Review Comment:
   Without this change,if config ASYNC_CLEAN = true,AsyncCleanerService will be 
used to do clean in offline job 。In my unit testing for offline job,if the 
completion time of the compact/cluster job is earlier than the completion time 
of the sync-cleaning job, function BaseHoodieTableServiceClient.close() will 
force the asynchronous  cleaning job to be closed, it will causes interrupt 
Excpetion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-04-24 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1175193258


##
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieCompactor.java:
##
@@ -0,0 +1,177 @@
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieLayoutConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
+import org.apache.hudi.table.storage.HoodieStorageLayout;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieCompactor extends UtilitiesTestBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(UtilitiesTestBase.class);
+  private HoodieTestDataGenerator dataGen;
+  private SparkRDDWriteClient client;
+  private HoodieTableMetaClient metaClient;
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+UtilitiesTestBase.initTestServices(false, false, false);
+  }
+
+  @BeforeEach
+  public void setup() {
+dataGen = new HoodieTestDataGenerator();
+  }
+
+  protected HoodieCompactor initialHoodieCompactorSyncClean(String 
tableBasePath, Boolean runSchedule, String scheduleAndExecute) {
+HoodieCompactor.Config compactionConfig = 
buildHoodieCompactionUtilConfig(tableBasePath,
+  runSchedule, scheduleAndExecute);
+List configs = new ArrayList<>();

Review Comment:
   Rename to TestOfflineHoodieCompactor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] zhuanshenbsj1 commented on a diff in pull request #8505: [HUDI-6106] Spark offline compaction/Clustering Job will do clean like Flink job

2023-04-23 Thread via GitHub


zhuanshenbsj1 commented on code in PR #8505:
URL: https://github.com/apache/hudi/pull/8505#discussion_r1174758600


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##
@@ -292,7 +292,9 @@ protected void completeCompaction(HoodieCommitMetadata 
metadata,
   protected HoodieWriteMetadata> compact(String 
compactionInstantTime, boolean shouldComplete) {
 HoodieSparkTable table = HoodieSparkTable.create(config, context);
 preWrite(compactionInstantTime, WriteOperationType.COMPACT, 
table.getMetaClient());
-return tableServiceClient.compact(compactionInstantTime, shouldComplete);
+HoodieWriteMetadata> compactionMetadata = 
tableServiceClient.compact(compactionInstantTime, shouldComplete);
+autoCleanOnCommit();
+return compactionMetadata;

Review Comment:
   Move the clean operation to offline && Add UT.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org