nsivabalan commented on a change in pull request #2092: URL: https://github.com/apache/hudi/pull/2092#discussion_r504028617
########## File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java ########## @@ -49,6 +54,11 @@ public void execute(ExecutionContext executionContext) throws Exception { Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline().getCommitsTimeline().lastInstant(); if (lastInstant.isPresent()) { log.info("Rolling back last instant {}", lastInstant.get()); + log.info("Cleaning up generated data for the instant being rolled back {}", lastInstant.get()); + ValidationUtils.checkArgument(executionContext.getWriterContext().getProps().getOrDefault(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR, + DFSPathSelector.class.getName()).toString().equalsIgnoreCase(DFSTestSuitePathSelector.class.getName()), "Test Suite only supports DFSTestSuitePathSelector"); + metaClient.getFs().delete(new Path(executionContext.getWriterContext().getCfg().inputBasePath, Review comment: minor. Can we first rollback from hudi and then delete from input data. Just incase rollback from hudi fails, we will have the input data intact for any debugging. ########## File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java ########## @@ -62,19 +67,26 @@ public DFSTestSuitePathSelector(TypedProperties props, Configuration hadoopConf) lastBatchId = 0; nextBatchId = 1; } - - log.info("Using DFSTestSuitePathSelector, checkpoint: " + lastCheckpointStr + " sourceLimit: " + sourceLimit - + " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId); // obtain all eligible files for the batch List<FileStatus> eligibleFiles = new ArrayList<>(); FileStatus[] fileStatuses = fs.globStatus( new Path(props.getString(Config.ROOT_INPUT_PATH_PROP), "*")); + // Say input data is as follow input/1, input/2, input/5 since 3,4 was rolled back and 5 is new generated data + // checkpoint from the latest commit metadata will be 2 since 3,4 has been rolled back. We need to set the + // next batch id correctly as 5 instead of 3 + Optional<String> correctBatchIdDueToRollback = Arrays.stream(fileStatuses) + .map(f -> f.getPath().toString().split("/")[f.getPath().toString().split("/").length - 1]) + .min((bid1, bid2) -> Integer.min(Integer.parseInt(bid1), Integer.parseInt(bid2))); Review comment: sorry, are we not looking for max value here. why min ? for eg, input/1, input/2, input/5. in this, we are interested in "5" right? ########## File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java ########## @@ -109,12 +109,16 @@ private void executeNode(DagNode node) { throw new RuntimeException("DagNode already completed! Cannot re-execute"); } try { - log.warn("executing node: " + node.getName() + " of type: " + node.getClass()); - node.execute(executionContext); - node.setCompleted(true); - log.info("Finished executing {}", node.getName()); + int repeatCount = node.getConfig().getRepeatCount(); + while (repeatCount > 0) { + log.warn("executing node: " + node.getName() + " of type: " + node.getClass()); + node.execute(executionContext); + node.setCompleted(true); Review comment: am I missing something here. once the node is set to complete, how can we execute in next cycle if repeatCount > 1 ? shouldn't we set it outside of while loop ########## File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java ########## @@ -49,6 +54,11 @@ public void execute(ExecutionContext executionContext) throws Exception { Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline().getCommitsTimeline().lastInstant(); if (lastInstant.isPresent()) { Review comment: looks like our rollback only supports rolling back the last commit. I assume we need to fix this in some later patch. ########## File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java ########## @@ -109,12 +109,16 @@ private void executeNode(DagNode node) { throw new RuntimeException("DagNode already completed! Cannot re-execute"); } try { - log.warn("executing node: " + node.getName() + " of type: " + node.getClass()); - node.execute(executionContext); - node.setCompleted(true); - log.info("Finished executing {}", node.getName()); + int repeatCount = node.getConfig().getRepeatCount(); + while (repeatCount > 0) { + log.warn("executing node: " + node.getName() + " of type: " + node.getClass()); + node.execute(executionContext); + node.setCompleted(true); Review comment: in general repeatCount is confusing. for eg: ``` first_insert: config: record_size: 70000 num_insert_partitions: 1 repeat_count: 5 num_records_insert: 1000 type: InsertNode deps: none ``` In this dag yaml, I expect 5*1000 records to be inserted since repeat count is 5. If 5000 records are to be expected, don't we need to fix all yaml dags(complex-dag-mow.yaml etc) for hive query results. I don't see any fixes in this patch for expected record counts though. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org