This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c04216d5ca [HUDI-4228] Clean up literal usage in Hudi CLI argument 
check (#11042)
4c04216d5ca is described below

commit 4c04216d5cade10f9589e2c0d425109cc502ebcd
Author: Vova Kolmakov <wombatu...@gmail.com>
AuthorDate: Thu Apr 18 09:14:32 2024 +0700

    [HUDI-4228] Clean up literal usage in Hudi CLI argument check (#11042)
---
 .../org/apache/hudi/cli/commands/SparkMain.java    | 202 +++++++--------------
 .../org/apache/hudi/cli/ArchiveExecutorUtils.java  |   2 +-
 2 files changed, 69 insertions(+), 135 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 5e1aec52333..ba3b404474b 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -19,14 +19,12 @@
 package org.apache.hudi.cli.commands;
 
 import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.cli.ArchiveExecutorUtils;
 import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -37,7 +35,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieBootstrapConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -99,16 +96,45 @@ public class SparkMain {
    * Commands.
    */
   enum SparkCommand {
-    BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, 
IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE,
-    COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, 
COMPACT_REPAIR, CLUSTERING_SCHEDULE,
-    CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, 
DELETE_SAVEPOINT, UPGRADE, DOWNGRADE,
-    REPAIR_DEPRECATED_PARTITION, RENAME_PARTITION, ARCHIVE
+    BOOTSTRAP(18), ROLLBACK(6), DEDUPLICATE(8), ROLLBACK_TO_SAVEPOINT(6), 
SAVEPOINT(7),
+    IMPORT(13), UPSERT(13), COMPACT_SCHEDULE(7), COMPACT_RUN(10), 
COMPACT_SCHEDULE_AND_EXECUTE(9),
+    COMPACT_UNSCHEDULE_PLAN(9), COMPACT_UNSCHEDULE_FILE(10), 
COMPACT_VALIDATE(7), COMPACT_REPAIR(8),
+    CLUSTERING_SCHEDULE(7), CLUSTERING_RUN(9), 
CLUSTERING_SCHEDULE_AND_EXECUTE(8), CLEAN(5),
+    DELETE_MARKER(5), DELETE_SAVEPOINT(5), UPGRADE(5), DOWNGRADE(5),
+    REPAIR_DEPRECATED_PARTITION(4), RENAME_PARTITION(6), ARCHIVE(8);
+
+    private final int minArgsCount;
+
+    SparkCommand(int minArgsCount) {
+      this.minArgsCount = minArgsCount;
+    }
+
+    void assertEq(int factArgsCount) {
+      ValidationUtils.checkArgument(factArgsCount == minArgsCount);
+    }
+
+    void assertGtEq(int factArgsCount) {
+      ValidationUtils.checkArgument(factArgsCount >= minArgsCount);
+    }
+
+    List<String> makeConfigs(String[] args) {
+      List<String> configs = new ArrayList<>();
+      if (args.length > minArgsCount) {
+        configs.addAll(Arrays.asList(args).subList(minArgsCount, args.length));
+      }
+      return configs;
+    }
+
+    String getPropsFilePath(String[] args) {
+      return (args.length >= minArgsCount && 
!StringUtils.isNullOrEmpty(args[minArgsCount - 1]))
+          ? args[minArgsCount - 1] : null;
+    }
   }
 
-  public static void main(String[] args) throws Exception {
+  public static void main(String[] args) {
     ValidationUtils.checkArgument(args.length >= 4);
     final String commandString = args[0];
-    LOG.info("Invoking SparkMain: " + commandString);
+    LOG.info("Invoking SparkMain: {}", commandString);
     final SparkCommand cmd = SparkCommand.valueOf(commandString);
 
     JavaSparkContext jsc = SparkUtil.initJavaSparkContext("hoodie-cli-" + 
commandString,
@@ -116,193 +142,112 @@ public class SparkMain {
 
     int returnCode = 0;
     try {
+      cmd.assertGtEq(args.length);
+      List<String> configs = cmd.makeConfigs(args);
+      String propsFilePath = cmd.getPropsFilePath(args);
       switch (cmd) {
         case ROLLBACK:
-          assert (args.length == 6);
+          cmd.assertEq(args.length);
           returnCode = rollback(jsc, args[3], args[4], 
Boolean.parseBoolean(args[5]));
           break;
         case DEDUPLICATE:
-          assert (args.length == 8);
+          cmd.assertEq(args.length);
           returnCode = deduplicatePartitionPath(jsc, args[3], args[4], 
args[5], Boolean.parseBoolean(args[6]), args[7]);
           break;
         case ROLLBACK_TO_SAVEPOINT:
-          assert (args.length == 6);
+          cmd.assertEq(args.length);
           returnCode = rollbackToSavepoint(jsc, args[3], args[4], 
Boolean.parseBoolean(args[5]));
           break;
         case IMPORT:
         case UPSERT:
-          assert (args.length >= 13);
-          String propsFilePath = null;
-          if (!StringUtils.isNullOrEmpty(args[12])) {
-            propsFilePath = args[12];
-          }
-          List<String> configs = new ArrayList<>();
-          if (args.length > 13) {
-            configs.addAll(Arrays.asList(args).subList(13, args.length));
-          }
           returnCode = dataLoad(jsc, commandString, args[3], args[4], args[5], 
args[6], args[7], args[8],
               Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), 
propsFilePath, configs);
           break;
         case COMPACT_RUN:
-          assert (args.length >= 10);
-          propsFilePath = null;
-          if (!StringUtils.isNullOrEmpty(args[9])) {
-            propsFilePath = args[9];
-          }
-          configs = new ArrayList<>();
-          if (args.length > 10) {
-            configs.addAll(Arrays.asList(args).subList(10, args.length));
-          }
           returnCode = compact(jsc, args[3], args[4], args[5], 
Integer.parseInt(args[6]), args[7],
               Integer.parseInt(args[8]), HoodieCompactor.EXECUTE, 
propsFilePath, configs);
           break;
         case COMPACT_SCHEDULE_AND_EXECUTE:
-          assert (args.length >= 9);
-          propsFilePath = null;
-          if (!StringUtils.isNullOrEmpty(args[8])) {
-            propsFilePath = args[8];
-          }
-          configs = new ArrayList<>();
-          if (args.length > 9) {
-            configs.addAll(Arrays.asList(args).subList(9, args.length));
-          }
-
           returnCode = compact(jsc, args[3], args[4], null, 
Integer.parseInt(args[5]), args[6],
               Integer.parseInt(args[7]), HoodieCompactor.SCHEDULE_AND_EXECUTE, 
propsFilePath, configs);
           break;
         case COMPACT_SCHEDULE:
-          assert (args.length >= 7);
-          propsFilePath = null;
-          if (!StringUtils.isNullOrEmpty(args[6])) {
-            propsFilePath = args[6];
-          }
-          configs = new ArrayList<>();
-          if (args.length > 7) {
-            configs.addAll(Arrays.asList(args).subList(7, args.length));
-          }
           returnCode = compact(jsc, args[3], args[4], args[5], 1, "", 0, 
HoodieCompactor.SCHEDULE, propsFilePath, configs);
           break;
         case COMPACT_VALIDATE:
-          assert (args.length == 7);
+          cmd.assertEq(args.length);
           doCompactValidate(jsc, args[3], args[4], args[5], 
Integer.parseInt(args[6]));
           returnCode = 0;
           break;
         case COMPACT_REPAIR:
-          assert (args.length == 8);
-          doCompactRepair(jsc, args[3], args[4], args[5], 
Integer.parseInt(args[6]),
-              Boolean.parseBoolean(args[7]));
+          cmd.assertEq(args.length);
+          doCompactRepair(jsc, args[3], args[4], args[5], 
Integer.parseInt(args[6]), Boolean.parseBoolean(args[7]));
           returnCode = 0;
           break;
         case COMPACT_UNSCHEDULE_FILE:
-          assert (args.length == 10);
+          cmd.assertEq(args.length);
           doCompactUnscheduleFile(jsc, args[3], args[4], args[5], args[6], 
Integer.parseInt(args[7]),
               Boolean.parseBoolean(args[8]), Boolean.parseBoolean(args[9]));
           returnCode = 0;
           break;
         case COMPACT_UNSCHEDULE_PLAN:
-          assert (args.length == 9);
+          cmd.assertEq(args.length);
           doCompactUnschedule(jsc, args[3], args[4], args[5], 
Integer.parseInt(args[6]),
               Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
           returnCode = 0;
           break;
         case CLUSTERING_RUN:
-          assert (args.length >= 9);
-          propsFilePath = null;
-          if (!StringUtils.isNullOrEmpty(args[8])) {
-            propsFilePath = args[8];
-          }
-          configs = new ArrayList<>();
-          if (args.length > 9) {
-            configs.addAll(Arrays.asList(args).subList(9, args.length));
-          }
           returnCode = cluster(jsc, args[3], args[4], args[5], 
Integer.parseInt(args[6]), args[2],
               Integer.parseInt(args[7]), EXECUTE, propsFilePath, configs);
           break;
         case CLUSTERING_SCHEDULE_AND_EXECUTE:
-          assert (args.length >= 8);
-          propsFilePath = null;
-          if (!StringUtils.isNullOrEmpty(args[7])) {
-            propsFilePath = args[7];
-          }
-          configs = new ArrayList<>();
-          if (args.length > 8) {
-            configs.addAll(Arrays.asList(args).subList(8, args.length));
-          }
           returnCode = cluster(jsc, args[3], args[4], null, 
Integer.parseInt(args[5]), args[2],
               Integer.parseInt(args[6]), SCHEDULE_AND_EXECUTE, propsFilePath, 
configs);
           break;
         case CLUSTERING_SCHEDULE:
-          assert (args.length >= 7);
-          propsFilePath = null;
-          if (!StringUtils.isNullOrEmpty(args[6])) {
-            propsFilePath = args[6];
-          }
-          configs = new ArrayList<>();
-          if (args.length > 7) {
-            configs.addAll(Arrays.asList(args).subList(7, args.length));
-          }
-          returnCode = cluster(jsc, args[3], args[4], args[5], 1, args[2],
-              0, SCHEDULE, propsFilePath, configs);
+          returnCode = cluster(jsc, args[3], args[4], args[5], 1, args[2], 0, 
SCHEDULE, propsFilePath, configs);
           break;
         case CLEAN:
-          assert (args.length >= 5);
-          propsFilePath = null;
-          if (!StringUtils.isNullOrEmpty(args[4])) {
-            propsFilePath = args[4];
-          }
-          configs = new ArrayList<>();
-          if (args.length > 5) {
-            configs.addAll(Arrays.asList(args).subList(5, args.length));
-          }
           clean(jsc, args[3], propsFilePath, configs);
           break;
         case SAVEPOINT:
-          assert (args.length == 7);
+          cmd.assertEq(args.length);
           returnCode = createSavepoint(jsc, args[3], args[4], args[5], 
args[6]);
           break;
         case DELETE_MARKER:
-          assert (args.length == 5);
+          cmd.assertEq(args.length);
           returnCode = deleteMarker(jsc, args[3], args[4]);
           break;
         case DELETE_SAVEPOINT:
-          assert (args.length == 5);
+          cmd.assertEq(args.length);
           returnCode = deleteSavepoint(jsc, args[3], args[4]);
           break;
         case BOOTSTRAP:
-          assert (args.length >= 18);
-          propsFilePath = null;
-          if (!StringUtils.isNullOrEmpty(args[17])) {
-            propsFilePath = args[17];
-          }
-          configs = new ArrayList<>();
-          if (args.length > 18) {
-            configs.addAll(Arrays.asList(args).subList(18, args.length));
-          }
           returnCode = doBootstrap(jsc, args[3], args[4], args[5], args[6], 
args[7], args[8], args[9], args[10],
               args[11], args[12], args[13], args[14], args[15], args[16], 
propsFilePath, configs);
           break;
         case UPGRADE:
         case DOWNGRADE:
-          assert (args.length == 5);
+          cmd.assertEq(args.length);
           returnCode = upgradeOrDowngradeTable(jsc, args[3], args[4]);
           break;
         case REPAIR_DEPRECATED_PARTITION:
-          assert (args.length == 4);
+          cmd.assertEq(args.length);
           returnCode = repairDeprecatedPartition(jsc, args[3]);
           break;
         case RENAME_PARTITION:
-          assert (args.length == 6);
+          cmd.assertEq(args.length);
           returnCode = renamePartition(jsc, args[3], args[4], args[5]);
           break;
         case ARCHIVE:
-          assert (args.length == 8);
+          cmd.assertEq(args.length);
           returnCode = archive(jsc, Integer.parseInt(args[3]), 
Integer.parseInt(args[4]), Integer.parseInt(args[5]), 
Boolean.parseBoolean(args[6]), args[7]);
           break;
         default:
           break;
       }
-    } catch (Throwable throwable) {
-      LOG.error("Fail to execute commandString", throwable);
+    } catch (Exception exception) {
+      LOG.error("Fail to execute commandString", exception);
       returnCode = -1;
     } finally {
       jsc.stop();
@@ -473,7 +418,7 @@ public class SparkMain {
       try {
         fs.delete(new Path(basePath, oldPartition), true);
       } catch (IOException e) {
-        LOG.warn("Failed to delete older partition " + basePath);
+        LOG.warn("Failed to delete older partition {}", basePath);
       }
     }
     return 0;
@@ -563,10 +508,10 @@ public class SparkMain {
   private static int rollback(JavaSparkContext jsc, String instantTime, String 
basePath, Boolean rollbackUsingMarkers) throws Exception {
     SparkRDDWriteClient client = createHoodieClient(jsc, basePath, 
rollbackUsingMarkers, false);
     if (client.rollback(instantTime)) {
-      LOG.info(String.format("The commit \"%s\" rolled back.", instantTime));
+      LOG.info("The commit \"{}\" rolled back.", instantTime);
       return 0;
     } else {
-      LOG.warn(String.format("The commit \"%s\" failed to roll back.", 
instantTime));
+      LOG.warn("The commit \"{}\" failed to roll back.", instantTime);
       return -1;
     }
   }
@@ -575,10 +520,10 @@ public class SparkMain {
                                      String comments, String basePath) throws 
Exception {
     try (SparkRDDWriteClient client = createHoodieClient(jsc, basePath, 
false)) {
       client.savepoint(commitTime, user, comments);
-      LOG.info(String.format("The commit \"%s\" has been savepointed.", 
commitTime));
+      LOG.info("The commit \"{}\" has been savepointed.", commitTime);
       return 0;
     } catch (HoodieSavepointException se) {
-      LOG.warn(String.format("Failed: Could not create savepoint \"%s\".", 
commitTime));
+      LOG.warn("Failed: Could not create savepoint \"{}\".", commitTime);
       return -1;
     }
   }
@@ -586,7 +531,7 @@ public class SparkMain {
   private static int rollbackToSavepoint(JavaSparkContext jsc, String 
savepointTime, String basePath, boolean lazyCleanPolicy) throws Exception {
     try (SparkRDDWriteClient client = createHoodieClient(jsc, basePath, 
lazyCleanPolicy)) {
       client.restoreToSavepoint(savepointTime);
-      LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime));
+      LOG.info("The commit \"{}\" rolled back.", savepointTime);
       return 0;
     } catch (Exception e) {
       LOG.warn(String.format("The commit \"%s\" failed to roll back.", 
savepointTime), e);
@@ -597,7 +542,7 @@ public class SparkMain {
   private static int deleteSavepoint(JavaSparkContext jsc, String 
savepointTime, String basePath) throws Exception {
     try (SparkRDDWriteClient client = createHoodieClient(jsc, basePath, 
false)) {
       client.deleteSavepoint(savepointTime);
-      LOG.info(String.format("Savepoint \"%s\" deleted.", savepointTime));
+      LOG.info("Savepoint \"{}\" deleted.", savepointTime);
       return 0;
     } catch (Exception e) {
       LOG.warn(String.format("Failed: Could not delete savepoint \"%s\".", 
savepointTime), e);
@@ -627,7 +572,7 @@ public class SparkMain {
     try {
       new UpgradeDowngrade(metaClient, updatedConfig, new 
HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance())
           .run(HoodieTableVersion.valueOf(toVersion), null);
-      LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version 
\"%s\".", basePath, toVersion));
+      LOG.info("Table at \"{}\" upgraded / downgraded to version \"{}\".", 
basePath, toVersion);
       return 0;
     } catch (Exception e) {
       LOG.warn(String.format("Failed: Could not upgrade/downgrade table at 
\"%s\" to version \"%s\".", basePath, toVersion), e);
@@ -653,21 +598,10 @@ public class SparkMain {
   }
 
   private static int archive(JavaSparkContext jsc, int minCommits, int 
maxCommits, int commitsRetained, boolean enableMetadata, String basePath) {
-    HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath)
-        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,maxCommits).build())
-        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(commitsRetained).build())
-        .withEmbeddedTimelineServerEnabled(false)
-        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build())
-        .build();
-    HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
-    HoodieSparkTable<HoodieAvroPayload> table = 
HoodieSparkTable.create(config, context);
     try {
-      HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, 
table);
-      archiver.archiveIfRequired(context,true);
-    } catch (IOException ioe) {
-      LOG.error("Failed to archive with IOException: " + ioe);
-      return  -1;
+      return ArchiveExecutorUtils.archive(jsc, minCommits, maxCommits, 
commitsRetained, enableMetadata, basePath);
+    } catch (IOException ex) {
+      return -1;
     }
-    return 0;
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
index cdd7c08aed6..09afc650191 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
@@ -61,7 +61,7 @@ public final class ArchiveExecutorUtils {
       HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, 
table);
       archiver.archiveIfRequired(context, true);
     } catch (IOException ioe) {
-      LOG.error("Failed to archive with IOException: " + ioe);
+      LOG.error("Failed to archive with IOException: {}", ioe.getMessage());
       throw ioe;
     }
     return 0;

Reply via email to