pkumarsinha commented on a change in pull request #1529:
URL: https://github.com/apache/hive/pull/1529#discussion_r509895178



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
##########
@@ -224,54 +220,19 @@ public String getName() {
   }
 
 
-  public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path 
srcPath, Path dstPath,
-                                        HiveConf conf, boolean isAutoPurge, 
boolean needRecycle,
-                                        boolean readSourceAsFileList) {
-    return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, 
isAutoPurge, needRecycle,
-            readSourceAsFileList, false);
-  }
-
   public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path 
srcPath, Path dstPath,
                                         HiveConf conf, boolean isAutoPurge, 
boolean needRecycle,
                                         boolean readSourceAsFileList, String 
dumpDirectory,
                                         ReplicationMetricCollector 
metricCollector) {
     return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, 
isAutoPurge, needRecycle,
-            readSourceAsFileList, false, dumpDirectory, metricCollector);
+            readSourceAsFileList, false, true, dumpDirectory, metricCollector);
   }
 
-  private static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path 
srcPath, Path dstPath,
-                                        HiveConf conf, boolean isAutoPurge, 
boolean needRecycle,
-                                        boolean readSourceAsFileList,
-                                        boolean overWrite) {
-    Task<?> copyTask = null;
-    LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath);
-    if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
-      ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false, 
overWrite);
-      rcwork.setReadSrcAsFilesList(readSourceAsFileList);
-      if (replicationSpec.isReplace() && 
(conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION))) {
-        rcwork.setDeleteDestIfExist(true);
-        rcwork.setAutoPurge(isAutoPurge);
-        rcwork.setNeedRecycle(needRecycle);
-      }
-      // For replace case, duplicate check should not be done. The new base 
directory will automatically make the older
-      // data invisible. Doing duplicate check and ignoring copy will cause 
consistency issue if there are multiple
-      // replace events getting replayed in the first incremental load.
-      rcwork.setCheckDuplicateCopy(replicationSpec.needDupCopyCheck() && 
!replicationSpec.isReplace());
-      LOG.debug("ReplCopyTask:\trcwork");
-      String distCpDoAsUser = 
conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
-      rcwork.setDistCpDoAsUser(distCpDoAsUser);
-      copyTask = TaskFactory.get(rcwork, conf);
-    } else {
-      LOG.debug("ReplCopyTask:\tcwork");
-      copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf);
-    }
-    return copyTask;
-  }
 
   private static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path 
srcPath, Path dstPath,
                                          HiveConf conf, boolean isAutoPurge, 
boolean needRecycle,
                                          boolean readSourceAsFileList,
-                                         boolean overWrite,
+                                         boolean overWrite, boolean autoPurge,

Review comment:
       Why do we need to have both isAutoPurge and autoPurge? Can we simplify 
this part?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
##########
@@ -204,10 +209,10 @@ public void setResultValues(List<String> resultValues) {
       replSpec.setInReplicationScope(true);
       EximUtil.DataCopyPath managedTableCopyPath = new 
EximUtil.DataCopyPath(replSpec);
       managedTableCopyPath.loadFromString(managedTblCopyPathIterator.next());
-      Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+      Task<?> copyTask = ReplCopyTask.getDumpCopyTask(
               managedTableCopyPath.getReplicationSpec(), 
managedTableCopyPath.getSrcPath(),
-              managedTableCopyPath.getTargetPath(), conf, false, 
shouldOverwrite,
-              getCurrentDumpPath().toString(), getMetricCollector());
+              managedTableCopyPath.getTargetPath(), conf, false, 
shouldOverwrite, !isBootstrap,

Review comment:
       isBootstrap value doesn't influence isAutoPurge as that will default to 
false when this method is called. Is there any other usage of this !isBootstrap?

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -655,6 +643,11 @@ private static void populateLlapDaemonVarsSet(Set<String> 
llapDaemonVarsSetLocal
       "Provide the maximum number of partitions of a table that will be 
batched together during  \n"
         + "repl load. All the partitions in a batch will make a single 
metastore call to update the metadata. \n"
         + "The data for these partitions will be copied before copying the 
metadata batch. "),
+    
REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE("hive.repl.load.partitions.with.data.copy.batch.size",
+      1000,

Review comment:
       nit: accommodate in previous line 

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
##########
@@ -649,8 +644,7 @@ private static ImportTableDesc 
getBaseCreateTableDescFromTable(String dbName,
 
       LoadFileType loadFileType;
       Path destPath;
-      if (replicationSpec.isInReplicationScope() && 
(x.getCtx().getConf().getBoolean(
-        REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
+      if (replicationSpec.isInReplicationScope()) {

Review comment:
       Will it be any different for import/eexport

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
##########
@@ -207,119 +220,88 @@ private void addConsolidatedPartitionDesc() throws 
Exception {
         tableDesc.getTableName(), true, partitions);
 
       //don't need to add ckpt task separately. Added as part of add partition 
task
-      addPartition((toPartitionCount < totalPartitionCount), 
consolidatedPartitionDesc, null);
-      if (partitions.size() > 0) {
-        LOG.info("Added {} partitions", partitions.size());
+      addPartition((toPartitionCount < totalPartitionCount), 
consolidatedPartitionDesc);
+      if (!tracker.canAddMoreTasks()) {
+        //No need to do processing as no more tasks can be added. Will be 
processed in next run. State is already
+        //updated in add partition task
+        return;
       }
       currentPartitionCount = toPartitionCount;
     }
   }
 
   private TaskTracker forNewTable() throws Exception {
-    if (isMetaDataOp() || 
TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
-      // Place all partitions in single task to reduce load on HMS.
-      addConsolidatedPartitionDesc();
-      return tracker;
-    }
-
-    Iterator<AlterTableAddPartitionDesc> iterator = 
event.partitionDescriptions(tableDesc).iterator();
-    while (iterator.hasNext() && tracker.canAddMoreTasks()) {
-      AlterTableAddPartitionDesc currentPartitionDesc = iterator.next();
-      /*
-       the currentPartitionDesc cannot be inlined as we need the hasNext() to 
be evaluated post the
-       current retrieved lastReplicatedPartition
-      */
-      addPartition(iterator.hasNext(), currentPartitionDesc, null);
-    }
+    // Place all partitions in single task to reduce load on HMS.
+    addConsolidatedPartitionDesc(null);
     return tracker;
   }
 
-  private void addPartition(boolean hasMorePartitions, 
AlterTableAddPartitionDesc addPartitionDesc, Task<?> ptnRootTask)
+  private void addPartition(boolean hasMorePartitions, 
AlterTableAddPartitionDesc addPartitionDesc)
           throws Exception {
-    tracker.addTask(tasksForAddPartition(table, addPartitionDesc, 
ptnRootTask));
-    if (hasMorePartitions && !tracker.canAddMoreTasks()) {
+    boolean processingComplete = addTasksForPartition(table, addPartitionDesc, 
null,
+      PartitionState.Stage.PARTITION);
+    //If processing is not complete, means replication state is already 
updated with copy or move tasks which need
+    //to be processed
+    if (processingComplete && hasMorePartitions && !tracker.canAddMoreTasks()) 
{
       ReplicationState currentReplicationState =
           new ReplicationState(new PartitionState(table.getTableName(), 
addPartitionDesc));
       updateReplicationState(currentReplicationState);
     }
   }
 
   /**
-   * returns the root task for adding a partition
+   * returns the root task for adding all partitions in a batch
    */
-  private Task<?> tasksForAddPartition(Table table, AlterTableAddPartitionDesc 
addPartitionDesc, Task<?> ptnRootTask)
+  private boolean addTasksForPartition(Table table, AlterTableAddPartitionDesc 
addPartitionDesc,
+                                    AlterTableAddPartitionDesc.PartitionDesc 
lastPartSpec,
+                                    PartitionState.Stage lastStage)
           throws MetaException, HiveException {
     Task<?> addPartTask = TaskFactory.get(
       new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc,
               true, (new Path(context.dumpDirectory)).getParent().toString(), 
this.metricCollector),
       context.hiveConf
     );
-    //checkpointing task already added as part of add batch of partition in 
case for metadata only and external tables
+    //checkpointing task already added as part of add batch of partition
     if (isMetaDataOp() || 
TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
-      if (ptnRootTask == null) {
-        ptnRootTask = addPartTask;
-      } else {
-        ptnRootTask.addDependentTask(addPartTask);
-      }
-      return ptnRootTask;
+      tracker.addTask(addPartTask);
+      return true;
     }
-
-    AlterTableAddPartitionDesc.PartitionDesc partSpec = 
addPartitionDesc.getPartitions().get(0);
-    Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
-    Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, 
partSpec);
-    partSpec.setLocation(replicaWarehousePartitionLocation.toString());
-    LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
-      + partSpecToString(partSpec.getPartSpec()) + " with source location: "
-      + partSpec.getLocation());
-    Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
-      tableDesc,
-      (HashMap<String, String>)partSpec.getPartSpec(),
-      context.dumpDirectory,
-      this.metricCollector,
-      context.hiveConf
-    );
-
-    Path stagingDir = replicaWarehousePartitionLocation;
-    // if move optimization is enabled, copy the files directly to the target 
path. No need to create the staging dir.
-    LoadFileType loadFileType;
-    if (event.replicationSpec().isInReplicationScope() &&
-            context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
-      loadFileType = LoadFileType.IGNORE;
-    } else {
-      loadFileType = event.replicationSpec().isReplace() ? 
LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
-      stagingDir = 
PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, 
context.pathInfo);
-    }
-    boolean copyAtLoad = 
context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
-    Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+    //Add Copy task for all partitions
+    //If last stage was move, that means all copy tasks processed, go to 
processing move tasks
+    boolean lastProcessedStageFound = false;
+    for (AlterTableAddPartitionDesc.PartitionDesc partSpec : 
addPartitionDesc.getPartitions()) {
+      if (!tracker.canAddMoreTasks()) {
+        //update replication state with the copy task added with which it 
needs to proceed next
+        ReplicationState currentReplicationState =
+          new ReplicationState(new PartitionState(table.getTableName(), 
addPartitionDesc,
+            partSpec, PartitionState.Stage.COPY));
+        updateReplicationState(currentReplicationState);
+        return false;
+      }
+      Path replicaWarehousePartitionLocation = 
locationOnReplicaWarehouse(table, partSpec);
+      partSpec.setLocation(replicaWarehousePartitionLocation.toString());
+      LOG.debug("adding dependent CopyWork for partition "
+        + partSpecToString(partSpec.getPartSpec()) + " with source location: "
+        + partSpec.getLocation());
+      if (!lastProcessedStageFound && lastPartSpec != null &&
+        lastPartSpec.getLocation() != partSpec.getLocation()) {
+        //Don't process copy task if already processed as part of previous run
+        continue;
+      }
+      lastProcessedStageFound = true;
+      boolean copyAtLoad = 
context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
+      Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
         event.replicationSpec(),
-        new Path(event.dataPath() + Path.SEPARATOR + 
getPartitionName(sourceWarehousePartitionLocation)),
-        stagingDir,
+        new Path(event.dataPath() + Path.SEPARATOR + 
Warehouse.makePartPath(partSpec.getPartSpec())),
+        replicaWarehousePartitionLocation,
         context.hiveConf, copyAtLoad, false, (new 
Path(context.dumpDirectory)).getParent().toString(),
         this.metricCollector
-    );
-
-    Task<?> movePartitionTask = null;
-    if (loadFileType != LoadFileType.IGNORE) {
-      // no need to create move task, if file is moved directly to target 
location.
-      movePartitionTask = movePartitionTask(table, partSpec, stagingDir, 
loadFileType);
-    }
-
-    if (ptnRootTask == null) {
-      ptnRootTask = copyTask;
-    } else {
-      ptnRootTask.addDependentTask(copyTask);
-    }
-
-    // Set Checkpoint task as dependant to the tail of add partition tasks. 
So, if same dump is
-    // retried for bootstrap, we skip current partition update.
-    copyTask.addDependentTask(addPartTask);
-    if (movePartitionTask != null) {
-      addPartTask.addDependentTask(movePartitionTask);
-      movePartitionTask.addDependentTask(ckptTask);
-    } else {
-      addPartTask.addDependentTask(ckptTask);
+      );
+      tracker.addTask(copyTask);
     }
-    return ptnRootTask;
+    //add partition metadata task once all the copy tasks are added
+    tracker.addDependentTask(addPartTask);

Review comment:
       If we will have a check-pointing for stage (COPY,PARTITION), if there is 
a restart for some reason, it can avoid re-doing first stage if that was  
already done. Should we accommodate that?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
##########
@@ -27,19 +27,14 @@
 import org.apache.hadoop.hive.ql.ddl.DDLWork;
 import 
org.apache.hadoop.hive.ql.ddl.table.partition.add.AlterTableAddPartitionDesc;
 import 
org.apache.hadoop.hive.ql.ddl.table.partition.drop.AlterTableDropPartitionDesc;
-import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.*;

Review comment:
       Revert this

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
##########
@@ -207,119 +220,88 @@ private void addConsolidatedPartitionDesc() throws 
Exception {
         tableDesc.getTableName(), true, partitions);
 
       //don't need to add ckpt task separately. Added as part of add partition 
task
-      addPartition((toPartitionCount < totalPartitionCount), 
consolidatedPartitionDesc, null);
-      if (partitions.size() > 0) {
-        LOG.info("Added {} partitions", partitions.size());
+      addPartition((toPartitionCount < totalPartitionCount), 
consolidatedPartitionDesc);
+      if (!tracker.canAddMoreTasks()) {
+        //No need to do processing as no more tasks can be added. Will be 
processed in next run. State is already
+        //updated in add partition task
+        return;
       }
       currentPartitionCount = toPartitionCount;
     }
   }
 
   private TaskTracker forNewTable() throws Exception {
-    if (isMetaDataOp() || 
TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
-      // Place all partitions in single task to reduce load on HMS.
-      addConsolidatedPartitionDesc();
-      return tracker;
-    }
-
-    Iterator<AlterTableAddPartitionDesc> iterator = 
event.partitionDescriptions(tableDesc).iterator();
-    while (iterator.hasNext() && tracker.canAddMoreTasks()) {
-      AlterTableAddPartitionDesc currentPartitionDesc = iterator.next();
-      /*
-       the currentPartitionDesc cannot be inlined as we need the hasNext() to 
be evaluated post the
-       current retrieved lastReplicatedPartition
-      */
-      addPartition(iterator.hasNext(), currentPartitionDesc, null);
-    }
+    // Place all partitions in single task to reduce load on HMS.
+    addConsolidatedPartitionDesc(null);
     return tracker;
   }
 
-  private void addPartition(boolean hasMorePartitions, 
AlterTableAddPartitionDesc addPartitionDesc, Task<?> ptnRootTask)
+  private void addPartition(boolean hasMorePartitions, 
AlterTableAddPartitionDesc addPartitionDesc)
           throws Exception {
-    tracker.addTask(tasksForAddPartition(table, addPartitionDesc, 
ptnRootTask));
-    if (hasMorePartitions && !tracker.canAddMoreTasks()) {
+    boolean processingComplete = addTasksForPartition(table, addPartitionDesc, 
null,
+      PartitionState.Stage.PARTITION);
+    //If processing is not complete, means replication state is already 
updated with copy or move tasks which need

Review comment:
       Do we have move task in this case anymore?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
##########
@@ -270,10 +269,9 @@ static TableLocationTuple tableLocation(ImportTableDesc 
tblDesc, Database parent
     Path dataPath = fromURI;
     Path tmpPath = tgtPath;
 
-    // if move optimization is enabled, copy the files directly to the target 
path. No need to create the staging dir.
+    // if acid tables, copy the files directly to the target path. No need to 
create the staging dir.
     LoadFileType loadFileType;
-    if (replicationSpec.isInReplicationScope() &&
-            context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
+    if (replicationSpec.isInReplicationScope() && 
AcidUtils.isTransactionalTable(table)) {

Review comment:
       For partitioned table, we always assume that it's an acid table case, 
shouldn't we assume the same here as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to