aasha commented on a change in pull request #1529:
URL: https://github.com/apache/hive/pull/1529#discussion_r510009825



##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
##########
@@ -207,119 +220,88 @@ private void addConsolidatedPartitionDesc() throws 
Exception {
         tableDesc.getTableName(), true, partitions);
 
       //don't need to add ckpt task separately. Added as part of add partition 
task
-      addPartition((toPartitionCount < totalPartitionCount), 
consolidatedPartitionDesc, null);
-      if (partitions.size() > 0) {
-        LOG.info("Added {} partitions", partitions.size());
+      addPartition((toPartitionCount < totalPartitionCount), 
consolidatedPartitionDesc);
+      if (!tracker.canAddMoreTasks()) {
+        //No need to do processing as no more tasks can be added. Will be 
processed in next run. State is already
+        //updated in add partition task
+        return;
       }
       currentPartitionCount = toPartitionCount;
     }
   }
 
   private TaskTracker forNewTable() throws Exception {
-    if (isMetaDataOp() || 
TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
-      // Place all partitions in single task to reduce load on HMS.
-      addConsolidatedPartitionDesc();
-      return tracker;
-    }
-
-    Iterator<AlterTableAddPartitionDesc> iterator = 
event.partitionDescriptions(tableDesc).iterator();
-    while (iterator.hasNext() && tracker.canAddMoreTasks()) {
-      AlterTableAddPartitionDesc currentPartitionDesc = iterator.next();
-      /*
-       the currentPartitionDesc cannot be inlined as we need the hasNext() to 
be evaluated post the
-       current retrieved lastReplicatedPartition
-      */
-      addPartition(iterator.hasNext(), currentPartitionDesc, null);
-    }
+    // Place all partitions in single task to reduce load on HMS.
+    addConsolidatedPartitionDesc(null);
     return tracker;
   }
 
-  private void addPartition(boolean hasMorePartitions, 
AlterTableAddPartitionDesc addPartitionDesc, Task<?> ptnRootTask)
+  private void addPartition(boolean hasMorePartitions, 
AlterTableAddPartitionDesc addPartitionDesc)
           throws Exception {
-    tracker.addTask(tasksForAddPartition(table, addPartitionDesc, 
ptnRootTask));
-    if (hasMorePartitions && !tracker.canAddMoreTasks()) {
+    boolean processingComplete = addTasksForPartition(table, addPartitionDesc, 
null,
+      PartitionState.Stage.PARTITION);
+    //If processing is not complete, means replication state is already 
updated with copy or move tasks which need
+    //to be processed
+    if (processingComplete && hasMorePartitions && !tracker.canAddMoreTasks()) 
{
       ReplicationState currentReplicationState =
           new ReplicationState(new PartitionState(table.getTableName(), 
addPartitionDesc));
       updateReplicationState(currentReplicationState);
     }
   }
 
   /**
-   * returns the root task for adding a partition
+   * returns the root task for adding all partitions in a batch
    */
-  private Task<?> tasksForAddPartition(Table table, AlterTableAddPartitionDesc 
addPartitionDesc, Task<?> ptnRootTask)
+  private boolean addTasksForPartition(Table table, AlterTableAddPartitionDesc 
addPartitionDesc,
+                                    AlterTableAddPartitionDesc.PartitionDesc 
lastPartSpec,
+                                    PartitionState.Stage lastStage)
           throws MetaException, HiveException {
     Task<?> addPartTask = TaskFactory.get(
       new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc,
               true, (new Path(context.dumpDirectory)).getParent().toString(), 
this.metricCollector),
       context.hiveConf
     );
-    //checkpointing task already added as part of add batch of partition in 
case for metadata only and external tables
+    //checkpointing task already added as part of add batch of partition
     if (isMetaDataOp() || 
TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
-      if (ptnRootTask == null) {
-        ptnRootTask = addPartTask;
-      } else {
-        ptnRootTask.addDependentTask(addPartTask);
-      }
-      return ptnRootTask;
+      tracker.addTask(addPartTask);
+      return true;
     }
-
-    AlterTableAddPartitionDesc.PartitionDesc partSpec = 
addPartitionDesc.getPartitions().get(0);
-    Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
-    Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, 
partSpec);
-    partSpec.setLocation(replicaWarehousePartitionLocation.toString());
-    LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
-      + partSpecToString(partSpec.getPartSpec()) + " with source location: "
-      + partSpec.getLocation());
-    Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
-      tableDesc,
-      (HashMap<String, String>)partSpec.getPartSpec(),
-      context.dumpDirectory,
-      this.metricCollector,
-      context.hiveConf
-    );
-
-    Path stagingDir = replicaWarehousePartitionLocation;
-    // if move optimization is enabled, copy the files directly to the target 
path. No need to create the staging dir.
-    LoadFileType loadFileType;
-    if (event.replicationSpec().isInReplicationScope() &&
-            context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
-      loadFileType = LoadFileType.IGNORE;
-    } else {
-      loadFileType = event.replicationSpec().isReplace() ? 
LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
-      stagingDir = 
PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, 
context.pathInfo);
-    }
-    boolean copyAtLoad = 
context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
-    Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+    //Add Copy task for all partitions
+    //If last stage was move, that means all copy tasks processed, go to 
processing move tasks
+    boolean lastProcessedStageFound = false;
+    for (AlterTableAddPartitionDesc.PartitionDesc partSpec : 
addPartitionDesc.getPartitions()) {
+      if (!tracker.canAddMoreTasks()) {
+        //update replication state with the copy task added with which it 
needs to proceed next
+        ReplicationState currentReplicationState =
+          new ReplicationState(new PartitionState(table.getTableName(), 
addPartitionDesc,
+            partSpec, PartitionState.Stage.COPY));
+        updateReplicationState(currentReplicationState);
+        return false;
+      }
+      Path replicaWarehousePartitionLocation = 
locationOnReplicaWarehouse(table, partSpec);
+      partSpec.setLocation(replicaWarehousePartitionLocation.toString());
+      LOG.debug("adding dependent CopyWork for partition "
+        + partSpecToString(partSpec.getPartSpec()) + " with source location: "
+        + partSpec.getLocation());
+      if (!lastProcessedStageFound && lastPartSpec != null &&
+        lastPartSpec.getLocation() != partSpec.getLocation()) {
+        //Don't process copy task if already processed as part of previous run
+        continue;
+      }
+      lastProcessedStageFound = true;
+      boolean copyAtLoad = 
context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
+      Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
         event.replicationSpec(),
-        new Path(event.dataPath() + Path.SEPARATOR + 
getPartitionName(sourceWarehousePartitionLocation)),
-        stagingDir,
+        new Path(event.dataPath() + Path.SEPARATOR + 
Warehouse.makePartPath(partSpec.getPartSpec())),
+        replicaWarehousePartitionLocation,
         context.hiveConf, copyAtLoad, false, (new 
Path(context.dumpDirectory)).getParent().toString(),
         this.metricCollector
-    );
-
-    Task<?> movePartitionTask = null;
-    if (loadFileType != LoadFileType.IGNORE) {
-      // no need to create move task, if file is moved directly to target 
location.
-      movePartitionTask = movePartitionTask(table, partSpec, stagingDir, 
loadFileType);
-    }
-
-    if (ptnRootTask == null) {
-      ptnRootTask = copyTask;
-    } else {
-      ptnRootTask.addDependentTask(copyTask);
-    }
-
-    // Set Checkpoint task as dependant to the tail of add partition tasks. 
So, if same dump is
-    // retried for bootstrap, we skip current partition update.
-    copyTask.addDependentTask(addPartTask);
-    if (movePartitionTask != null) {
-      addPartTask.addDependentTask(movePartitionTask);
-      movePartitionTask.addDependentTask(ckptTask);
-    } else {
-      addPartTask.addDependentTask(ckptTask);
+      );
+      tracker.addTask(copyTask);
     }
-    return ptnRootTask;
+    //add partition metadata task once all the copy tasks are added
+    tracker.addDependentTask(addPartTask);

Review comment:
       will not do this now as we don't have checkpointing info at copy task 
level




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to