This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 4c04216d5ca [HUDI-4228] Clean up literal usage in Hudi CLI argument check (#11042) 4c04216d5ca is described below commit 4c04216d5cade10f9589e2c0d425109cc502ebcd Author: Vova Kolmakov <wombatu...@gmail.com> AuthorDate: Thu Apr 18 09:14:32 2024 +0700 [HUDI-4228] Clean up literal usage in Hudi CLI argument check (#11042) --- .../org/apache/hudi/cli/commands/SparkMain.java | 202 +++++++-------------- .../org/apache/hudi/cli/ArchiveExecutorUtils.java | 2 +- 2 files changed, 69 insertions(+), 135 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 5e1aec52333..ba3b404474b 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -19,14 +19,12 @@ package org.apache.hudi.cli.commands; import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.cli.ArchiveExecutorUtils; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.client.timeline.HoodieTimelineArchiver; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; @@ -37,7 +35,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieBootstrapConfig; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -99,16 +96,45 @@ public class SparkMain { * Commands. */ enum SparkCommand { - BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE, - COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE, - CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE, - REPAIR_DEPRECATED_PARTITION, RENAME_PARTITION, ARCHIVE + BOOTSTRAP(18), ROLLBACK(6), DEDUPLICATE(8), ROLLBACK_TO_SAVEPOINT(6), SAVEPOINT(7), + IMPORT(13), UPSERT(13), COMPACT_SCHEDULE(7), COMPACT_RUN(10), COMPACT_SCHEDULE_AND_EXECUTE(9), + COMPACT_UNSCHEDULE_PLAN(9), COMPACT_UNSCHEDULE_FILE(10), COMPACT_VALIDATE(7), COMPACT_REPAIR(8), + CLUSTERING_SCHEDULE(7), CLUSTERING_RUN(9), CLUSTERING_SCHEDULE_AND_EXECUTE(8), CLEAN(5), + DELETE_MARKER(5), DELETE_SAVEPOINT(5), UPGRADE(5), DOWNGRADE(5), + REPAIR_DEPRECATED_PARTITION(4), RENAME_PARTITION(6), ARCHIVE(8); + + private final int minArgsCount; + + SparkCommand(int minArgsCount) { + this.minArgsCount = minArgsCount; + } + + void assertEq(int factArgsCount) { + ValidationUtils.checkArgument(factArgsCount == minArgsCount); + } + + void assertGtEq(int factArgsCount) { + ValidationUtils.checkArgument(factArgsCount >= minArgsCount); + } + + List<String> makeConfigs(String[] args) { + List<String> configs = new ArrayList<>(); + if (args.length > minArgsCount) { + configs.addAll(Arrays.asList(args).subList(minArgsCount, args.length)); + } + return configs; + } + + String getPropsFilePath(String[] args) { + return (args.length >= minArgsCount && !StringUtils.isNullOrEmpty(args[minArgsCount - 1])) + ? args[minArgsCount - 1] : null; + } } - public static void main(String[] args) throws Exception { + public static void main(String[] args) { ValidationUtils.checkArgument(args.length >= 4); final String commandString = args[0]; - LOG.info("Invoking SparkMain: " + commandString); + LOG.info("Invoking SparkMain: {}", commandString); final SparkCommand cmd = SparkCommand.valueOf(commandString); JavaSparkContext jsc = SparkUtil.initJavaSparkContext("hoodie-cli-" + commandString, @@ -116,193 +142,112 @@ public class SparkMain { int returnCode = 0; try { + cmd.assertGtEq(args.length); + List<String> configs = cmd.makeConfigs(args); + String propsFilePath = cmd.getPropsFilePath(args); switch (cmd) { case ROLLBACK: - assert (args.length == 6); + cmd.assertEq(args.length); returnCode = rollback(jsc, args[3], args[4], Boolean.parseBoolean(args[5])); break; case DEDUPLICATE: - assert (args.length == 8); + cmd.assertEq(args.length); returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], Boolean.parseBoolean(args[6]), args[7]); break; case ROLLBACK_TO_SAVEPOINT: - assert (args.length == 6); + cmd.assertEq(args.length); returnCode = rollbackToSavepoint(jsc, args[3], args[4], Boolean.parseBoolean(args[5])); break; case IMPORT: case UPSERT: - assert (args.length >= 13); - String propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[12])) { - propsFilePath = args[12]; - } - List<String> configs = new ArrayList<>(); - if (args.length > 13) { - configs.addAll(Arrays.asList(args).subList(13, args.length)); - } returnCode = dataLoad(jsc, commandString, args[3], args[4], args[5], args[6], args[7], args[8], Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs); break; case COMPACT_RUN: - assert (args.length >= 10); - propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[9])) { - propsFilePath = args[9]; - } - configs = new ArrayList<>(); - if (args.length > 10) { - configs.addAll(Arrays.asList(args).subList(10, args.length)); - } returnCode = compact(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[7], Integer.parseInt(args[8]), HoodieCompactor.EXECUTE, propsFilePath, configs); break; case COMPACT_SCHEDULE_AND_EXECUTE: - assert (args.length >= 9); - propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[8])) { - propsFilePath = args[8]; - } - configs = new ArrayList<>(); - if (args.length > 9) { - configs.addAll(Arrays.asList(args).subList(9, args.length)); - } - returnCode = compact(jsc, args[3], args[4], null, Integer.parseInt(args[5]), args[6], Integer.parseInt(args[7]), HoodieCompactor.SCHEDULE_AND_EXECUTE, propsFilePath, configs); break; case COMPACT_SCHEDULE: - assert (args.length >= 7); - propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[6])) { - propsFilePath = args[6]; - } - configs = new ArrayList<>(); - if (args.length > 7) { - configs.addAll(Arrays.asList(args).subList(7, args.length)); - } returnCode = compact(jsc, args[3], args[4], args[5], 1, "", 0, HoodieCompactor.SCHEDULE, propsFilePath, configs); break; case COMPACT_VALIDATE: - assert (args.length == 7); + cmd.assertEq(args.length); doCompactValidate(jsc, args[3], args[4], args[5], Integer.parseInt(args[6])); returnCode = 0; break; case COMPACT_REPAIR: - assert (args.length == 8); - doCompactRepair(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), - Boolean.parseBoolean(args[7])); + cmd.assertEq(args.length); + doCompactRepair(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), Boolean.parseBoolean(args[7])); returnCode = 0; break; case COMPACT_UNSCHEDULE_FILE: - assert (args.length == 10); + cmd.assertEq(args.length); doCompactUnscheduleFile(jsc, args[3], args[4], args[5], args[6], Integer.parseInt(args[7]), Boolean.parseBoolean(args[8]), Boolean.parseBoolean(args[9])); returnCode = 0; break; case COMPACT_UNSCHEDULE_PLAN: - assert (args.length == 9); + cmd.assertEq(args.length); doCompactUnschedule(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); returnCode = 0; break; case CLUSTERING_RUN: - assert (args.length >= 9); - propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[8])) { - propsFilePath = args[8]; - } - configs = new ArrayList<>(); - if (args.length > 9) { - configs.addAll(Arrays.asList(args).subList(9, args.length)); - } returnCode = cluster(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[2], Integer.parseInt(args[7]), EXECUTE, propsFilePath, configs); break; case CLUSTERING_SCHEDULE_AND_EXECUTE: - assert (args.length >= 8); - propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[7])) { - propsFilePath = args[7]; - } - configs = new ArrayList<>(); - if (args.length > 8) { - configs.addAll(Arrays.asList(args).subList(8, args.length)); - } returnCode = cluster(jsc, args[3], args[4], null, Integer.parseInt(args[5]), args[2], Integer.parseInt(args[6]), SCHEDULE_AND_EXECUTE, propsFilePath, configs); break; case CLUSTERING_SCHEDULE: - assert (args.length >= 7); - propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[6])) { - propsFilePath = args[6]; - } - configs = new ArrayList<>(); - if (args.length > 7) { - configs.addAll(Arrays.asList(args).subList(7, args.length)); - } - returnCode = cluster(jsc, args[3], args[4], args[5], 1, args[2], - 0, SCHEDULE, propsFilePath, configs); + returnCode = cluster(jsc, args[3], args[4], args[5], 1, args[2], 0, SCHEDULE, propsFilePath, configs); break; case CLEAN: - assert (args.length >= 5); - propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[4])) { - propsFilePath = args[4]; - } - configs = new ArrayList<>(); - if (args.length > 5) { - configs.addAll(Arrays.asList(args).subList(5, args.length)); - } clean(jsc, args[3], propsFilePath, configs); break; case SAVEPOINT: - assert (args.length == 7); + cmd.assertEq(args.length); returnCode = createSavepoint(jsc, args[3], args[4], args[5], args[6]); break; case DELETE_MARKER: - assert (args.length == 5); + cmd.assertEq(args.length); returnCode = deleteMarker(jsc, args[3], args[4]); break; case DELETE_SAVEPOINT: - assert (args.length == 5); + cmd.assertEq(args.length); returnCode = deleteSavepoint(jsc, args[3], args[4]); break; case BOOTSTRAP: - assert (args.length >= 18); - propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[17])) { - propsFilePath = args[17]; - } - configs = new ArrayList<>(); - if (args.length > 18) { - configs.addAll(Arrays.asList(args).subList(18, args.length)); - } returnCode = doBootstrap(jsc, args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12], args[13], args[14], args[15], args[16], propsFilePath, configs); break; case UPGRADE: case DOWNGRADE: - assert (args.length == 5); + cmd.assertEq(args.length); returnCode = upgradeOrDowngradeTable(jsc, args[3], args[4]); break; case REPAIR_DEPRECATED_PARTITION: - assert (args.length == 4); + cmd.assertEq(args.length); returnCode = repairDeprecatedPartition(jsc, args[3]); break; case RENAME_PARTITION: - assert (args.length == 6); + cmd.assertEq(args.length); returnCode = renamePartition(jsc, args[3], args[4], args[5]); break; case ARCHIVE: - assert (args.length == 8); + cmd.assertEq(args.length); returnCode = archive(jsc, Integer.parseInt(args[3]), Integer.parseInt(args[4]), Integer.parseInt(args[5]), Boolean.parseBoolean(args[6]), args[7]); break; default: break; } - } catch (Throwable throwable) { - LOG.error("Fail to execute commandString", throwable); + } catch (Exception exception) { + LOG.error("Fail to execute commandString", exception); returnCode = -1; } finally { jsc.stop(); @@ -473,7 +418,7 @@ public class SparkMain { try { fs.delete(new Path(basePath, oldPartition), true); } catch (IOException e) { - LOG.warn("Failed to delete older partition " + basePath); + LOG.warn("Failed to delete older partition {}", basePath); } } return 0; @@ -563,10 +508,10 @@ public class SparkMain { private static int rollback(JavaSparkContext jsc, String instantTime, String basePath, Boolean rollbackUsingMarkers) throws Exception { SparkRDDWriteClient client = createHoodieClient(jsc, basePath, rollbackUsingMarkers, false); if (client.rollback(instantTime)) { - LOG.info(String.format("The commit \"%s\" rolled back.", instantTime)); + LOG.info("The commit \"{}\" rolled back.", instantTime); return 0; } else { - LOG.warn(String.format("The commit \"%s\" failed to roll back.", instantTime)); + LOG.warn("The commit \"{}\" failed to roll back.", instantTime); return -1; } } @@ -575,10 +520,10 @@ public class SparkMain { String comments, String basePath) throws Exception { try (SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false)) { client.savepoint(commitTime, user, comments); - LOG.info(String.format("The commit \"%s\" has been savepointed.", commitTime)); + LOG.info("The commit \"{}\" has been savepointed.", commitTime); return 0; } catch (HoodieSavepointException se) { - LOG.warn(String.format("Failed: Could not create savepoint \"%s\".", commitTime)); + LOG.warn("Failed: Could not create savepoint \"{}\".", commitTime); return -1; } } @@ -586,7 +531,7 @@ public class SparkMain { private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath, boolean lazyCleanPolicy) throws Exception { try (SparkRDDWriteClient client = createHoodieClient(jsc, basePath, lazyCleanPolicy)) { client.restoreToSavepoint(savepointTime); - LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime)); + LOG.info("The commit \"{}\" rolled back.", savepointTime); return 0; } catch (Exception e) { LOG.warn(String.format("The commit \"%s\" failed to roll back.", savepointTime), e); @@ -597,7 +542,7 @@ public class SparkMain { private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception { try (SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false)) { client.deleteSavepoint(savepointTime); - LOG.info(String.format("Savepoint \"%s\" deleted.", savepointTime)); + LOG.info("Savepoint \"{}\" deleted.", savepointTime); return 0; } catch (Exception e) { LOG.warn(String.format("Failed: Could not delete savepoint \"%s\".", savepointTime), e); @@ -627,7 +572,7 @@ public class SparkMain { try { new UpgradeDowngrade(metaClient, updatedConfig, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance()) .run(HoodieTableVersion.valueOf(toVersion), null); - LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version \"%s\".", basePath, toVersion)); + LOG.info("Table at \"{}\" upgraded / downgraded to version \"{}\".", basePath, toVersion); return 0; } catch (Exception e) { LOG.warn(String.format("Failed: Could not upgrade/downgrade table at \"%s\" to version \"%s\".", basePath, toVersion), e); @@ -653,21 +598,10 @@ public class SparkMain { } private static int archive(JavaSparkContext jsc, int minCommits, int maxCommits, int commitsRetained, boolean enableMetadata, String basePath) { - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,maxCommits).build()) - .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(commitsRetained).build()) - .withEmbeddedTimelineServerEnabled(false) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build()) - .build(); - HoodieEngineContext context = new HoodieSparkEngineContext(jsc); - HoodieSparkTable<HoodieAvroPayload> table = HoodieSparkTable.create(config, context); try { - HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table); - archiver.archiveIfRequired(context,true); - } catch (IOException ioe) { - LOG.error("Failed to archive with IOException: " + ioe); - return -1; + return ArchiveExecutorUtils.archive(jsc, minCommits, maxCommits, commitsRetained, enableMetadata, basePath); + } catch (IOException ex) { + return -1; } - return 0; } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java index cdd7c08aed6..09afc650191 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java @@ -61,7 +61,7 @@ public final class ArchiveExecutorUtils { HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table); archiver.archiveIfRequired(context, true); } catch (IOException ioe) { - LOG.error("Failed to archive with IOException: " + ioe); + LOG.error("Failed to archive with IOException: {}", ioe.getMessage()); throw ioe; } return 0;