nsivabalan commented on a change in pull request #2092:
URL: https://github.com/apache/hudi/pull/2092#discussion_r504028617



##########
File path: 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
##########
@@ -49,6 +54,11 @@ public void execute(ExecutionContext executionContext) 
throws Exception {
     Option<HoodieInstant> lastInstant = 
metaClient.getActiveTimeline().getCommitsTimeline().lastInstant();
     if (lastInstant.isPresent()) {
       log.info("Rolling back last instant {}", lastInstant.get());
+      log.info("Cleaning up generated data for the instant being rolled back 
{}", lastInstant.get());
+      
ValidationUtils.checkArgument(executionContext.getWriterContext().getProps().getOrDefault(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR,
+          
DFSPathSelector.class.getName()).toString().equalsIgnoreCase(DFSTestSuitePathSelector.class.getName()),
 "Test Suite only supports DFSTestSuitePathSelector");
+      metaClient.getFs().delete(new 
Path(executionContext.getWriterContext().getCfg().inputBasePath,

Review comment:
       minor. Can we first rollback from hudi and then delete from input data. 
Just incase rollback from hudi fails, we will have the input data intact for 
any debugging. 

##########
File path: 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
##########
@@ -62,19 +67,26 @@ public DFSTestSuitePathSelector(TypedProperties props, 
Configuration hadoopConf)
         lastBatchId = 0;
         nextBatchId = 1;
       }
-
-      log.info("Using DFSTestSuitePathSelector, checkpoint: " + 
lastCheckpointStr + " sourceLimit: " + sourceLimit
-          + " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId);
       // obtain all eligible files for the batch
       List<FileStatus> eligibleFiles = new ArrayList<>();
       FileStatus[] fileStatuses = fs.globStatus(
           new Path(props.getString(Config.ROOT_INPUT_PATH_PROP), "*"));
+      // Say input data is as follow input/1, input/2, input/5 since 3,4 was 
rolled back and 5 is new generated data
+      // checkpoint from the latest commit metadata will be 2 since 3,4 has 
been rolled back. We need to set the
+      // next batch id correctly as 5 instead of 3
+      Optional<String> correctBatchIdDueToRollback = 
Arrays.stream(fileStatuses)
+          .map(f -> 
f.getPath().toString().split("/")[f.getPath().toString().split("/").length - 1])
+          .min((bid1, bid2) -> Integer.min(Integer.parseInt(bid1), 
Integer.parseInt(bid2)));

Review comment:
       sorry, are we not looking for max value here. why min ? 
   for eg, input/1, input/2, input/5. in this, we are interested in "5" right? 

##########
File path: 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
##########
@@ -109,12 +109,16 @@ private void executeNode(DagNode node) {
       throw new RuntimeException("DagNode already completed! Cannot 
re-execute");
     }
     try {
-      log.warn("executing node: " + node.getName() + " of type: " + 
node.getClass());
-      node.execute(executionContext);
-      node.setCompleted(true);
-      log.info("Finished executing {}", node.getName());
+      int repeatCount = node.getConfig().getRepeatCount();
+      while (repeatCount > 0) {
+        log.warn("executing node: " + node.getName() + " of type: " + 
node.getClass());
+        node.execute(executionContext);
+        node.setCompleted(true);

Review comment:
       am I missing something here. once the node is set to complete, how can 
we execute in next cycle if repeatCount > 1 ? shouldn't we set it outside of 
while loop

##########
File path: 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
##########
@@ -49,6 +54,11 @@ public void execute(ExecutionContext executionContext) 
throws Exception {
     Option<HoodieInstant> lastInstant = 
metaClient.getActiveTimeline().getCommitsTimeline().lastInstant();
     if (lastInstant.isPresent()) {

Review comment:
       looks like our rollback only supports rolling back the last commit. I 
assume we need to fix this in some later patch. 

##########
File path: 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
##########
@@ -109,12 +109,16 @@ private void executeNode(DagNode node) {
       throw new RuntimeException("DagNode already completed! Cannot 
re-execute");
     }
     try {
-      log.warn("executing node: " + node.getName() + " of type: " + 
node.getClass());
-      node.execute(executionContext);
-      node.setCompleted(true);
-      log.info("Finished executing {}", node.getName());
+      int repeatCount = node.getConfig().getRepeatCount();
+      while (repeatCount > 0) {
+        log.warn("executing node: " + node.getName() + " of type: " + 
node.getClass());
+        node.execute(executionContext);
+        node.setCompleted(true);

Review comment:
       in general repeatCount is confusing. for eg: 
   ```
   first_insert:
     config:
       record_size: 70000
       num_insert_partitions: 1
       repeat_count: 5
       num_records_insert: 1000
     type: InsertNode
     deps: none
   ```
   In this dag yaml, I expect 5*1000 records to be inserted since repeat count 
is 5. If 5000 records are to be expected, don't we need to fix all yaml 
dags(complex-dag-mow.yaml etc) for hive query results. I don't see any fixes in 
this patch for expected record counts though. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to