aasha commented on a change in pull request #1529:
URL: https://github.com/apache/hive/pull/1529#discussion_r510009825
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
##########
@@ -207,119 +220,88 @@ private void addConsolidatedPartitionDesc() throws
Exception {
tableDesc.getTableName(), true, partitions);
//don't need to add ckpt task separately. Added as part of add partition
task
- addPartition((toPartitionCount < totalPartitionCount),
consolidatedPartitionDesc, null);
- if (partitions.size() > 0) {
- LOG.info("Added {} partitions", partitions.size());
+ addPartition((toPartitionCount < totalPartitionCount),
consolidatedPartitionDesc);
+ if (!tracker.canAddMoreTasks()) {
+ //No need to do processing as no more tasks can be added. Will be
processed in next run. State is already
+ //updated in add partition task
+ return;
}
currentPartitionCount = toPartitionCount;
}
}
private TaskTracker forNewTable() throws Exception {
- if (isMetaDataOp() ||
TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
- // Place all partitions in single task to reduce load on HMS.
- addConsolidatedPartitionDesc();
- return tracker;
- }
-
- Iterator<AlterTableAddPartitionDesc> iterator =
event.partitionDescriptions(tableDesc).iterator();
- while (iterator.hasNext() && tracker.canAddMoreTasks()) {
- AlterTableAddPartitionDesc currentPartitionDesc = iterator.next();
- /*
- the currentPartitionDesc cannot be inlined as we need the hasNext() to
be evaluated post the
- current retrieved lastReplicatedPartition
- */
- addPartition(iterator.hasNext(), currentPartitionDesc, null);
- }
+ // Place all partitions in single task to reduce load on HMS.
+ addConsolidatedPartitionDesc(null);
return tracker;
}
- private void addPartition(boolean hasMorePartitions,
AlterTableAddPartitionDesc addPartitionDesc, Task<?> ptnRootTask)
+ private void addPartition(boolean hasMorePartitions,
AlterTableAddPartitionDesc addPartitionDesc)
throws Exception {
- tracker.addTask(tasksForAddPartition(table, addPartitionDesc,
ptnRootTask));
- if (hasMorePartitions && !tracker.canAddMoreTasks()) {
+ boolean processingComplete = addTasksForPartition(table, addPartitionDesc,
null,
+ PartitionState.Stage.PARTITION);
+ //If processing is not complete, means replication state is already
updated with copy or move tasks which need
+ //to be processed
+ if (processingComplete && hasMorePartitions && !tracker.canAddMoreTasks())
{
ReplicationState currentReplicationState =
new ReplicationState(new PartitionState(table.getTableName(),
addPartitionDesc));
updateReplicationState(currentReplicationState);
}
}
/**
- * returns the root task for adding a partition
+ * returns the root task for adding all partitions in a batch
*/
- private Task<?> tasksForAddPartition(Table table, AlterTableAddPartitionDesc
addPartitionDesc, Task<?> ptnRootTask)
+ private boolean addTasksForPartition(Table table, AlterTableAddPartitionDesc
addPartitionDesc,
+ AlterTableAddPartitionDesc.PartitionDesc
lastPartSpec,
+ PartitionState.Stage lastStage)
throws MetaException, HiveException {
Task<?> addPartTask = TaskFactory.get(
new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc,
true, (new Path(context.dumpDirectory)).getParent().toString(),
this.metricCollector),
context.hiveConf
);
- //checkpointing task already added as part of add batch of partition in
case for metadata only and external tables
+ //checkpointing task already added as part of add batch of partition
if (isMetaDataOp() ||
TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
- if (ptnRootTask == null) {
- ptnRootTask = addPartTask;
- } else {
- ptnRootTask.addDependentTask(addPartTask);
- }
- return ptnRootTask;
+ tracker.addTask(addPartTask);
+ return true;
}
-
- AlterTableAddPartitionDesc.PartitionDesc partSpec =
addPartitionDesc.getPartitions().get(0);
- Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
- Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table,
partSpec);
- partSpec.setLocation(replicaWarehousePartitionLocation.toString());
- LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
- + partSpecToString(partSpec.getPartSpec()) + " with source location: "
- + partSpec.getLocation());
- Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
- tableDesc,
- (HashMap<String, String>)partSpec.getPartSpec(),
- context.dumpDirectory,
- this.metricCollector,
- context.hiveConf
- );
-
- Path stagingDir = replicaWarehousePartitionLocation;
- // if move optimization is enabled, copy the files directly to the target
path. No need to create the staging dir.
- LoadFileType loadFileType;
- if (event.replicationSpec().isInReplicationScope() &&
- context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
- loadFileType = LoadFileType.IGNORE;
- } else {
- loadFileType = event.replicationSpec().isReplace() ?
LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
- stagingDir =
PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation,
context.pathInfo);
- }
- boolean copyAtLoad =
context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
- Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+ //Add Copy task for all partitions
+ //If last stage was move, that means all copy tasks processed, go to
processing move tasks
+ boolean lastProcessedStageFound = false;
+ for (AlterTableAddPartitionDesc.PartitionDesc partSpec :
addPartitionDesc.getPartitions()) {
+ if (!tracker.canAddMoreTasks()) {
+ //update replication state with the copy task added with which it
needs to proceed next
+ ReplicationState currentReplicationState =
+ new ReplicationState(new PartitionState(table.getTableName(),
addPartitionDesc,
+ partSpec, PartitionState.Stage.COPY));
+ updateReplicationState(currentReplicationState);
+ return false;
+ }
+ Path replicaWarehousePartitionLocation =
locationOnReplicaWarehouse(table, partSpec);
+ partSpec.setLocation(replicaWarehousePartitionLocation.toString());
+ LOG.debug("adding dependent CopyWork for partition "
+ + partSpecToString(partSpec.getPartSpec()) + " with source location: "
+ + partSpec.getLocation());
+ if (!lastProcessedStageFound && lastPartSpec != null &&
+ lastPartSpec.getLocation() != partSpec.getLocation()) {
+ //Don't process copy task if already processed as part of previous run
+ continue;
+ }
+ lastProcessedStageFound = true;
+ boolean copyAtLoad =
context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
+ Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
event.replicationSpec(),
- new Path(event.dataPath() + Path.SEPARATOR +
getPartitionName(sourceWarehousePartitionLocation)),
- stagingDir,
+ new Path(event.dataPath() + Path.SEPARATOR +
Warehouse.makePartPath(partSpec.getPartSpec())),
+ replicaWarehousePartitionLocation,
context.hiveConf, copyAtLoad, false, (new
Path(context.dumpDirectory)).getParent().toString(),
this.metricCollector
- );
-
- Task<?> movePartitionTask = null;
- if (loadFileType != LoadFileType.IGNORE) {
- // no need to create move task, if file is moved directly to target
location.
- movePartitionTask = movePartitionTask(table, partSpec, stagingDir,
loadFileType);
- }
-
- if (ptnRootTask == null) {
- ptnRootTask = copyTask;
- } else {
- ptnRootTask.addDependentTask(copyTask);
- }
-
- // Set Checkpoint task as dependant to the tail of add partition tasks.
So, if same dump is
- // retried for bootstrap, we skip current partition update.
- copyTask.addDependentTask(addPartTask);
- if (movePartitionTask != null) {
- addPartTask.addDependentTask(movePartitionTask);
- movePartitionTask.addDependentTask(ckptTask);
- } else {
- addPartTask.addDependentTask(ckptTask);
+ );
+ tracker.addTask(copyTask);
}
- return ptnRootTask;
+ //add partition metadata task once all the copy tasks are added
+ tracker.addDependentTask(addPartTask);
Review comment:
will not do this now as we don't have checkpointing info at copy task
level
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]