pkumarsinha commented on a change in pull request #1529:
URL: https://github.com/apache/hive/pull/1529#discussion_r509895178
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
##########
@@ -224,54 +220,19 @@ public String getName() {
}
- public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path
srcPath, Path dstPath,
- HiveConf conf, boolean isAutoPurge,
boolean needRecycle,
- boolean readSourceAsFileList) {
- return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf,
isAutoPurge, needRecycle,
- readSourceAsFileList, false);
- }
-
public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path
srcPath, Path dstPath,
HiveConf conf, boolean isAutoPurge,
boolean needRecycle,
boolean readSourceAsFileList, String
dumpDirectory,
ReplicationMetricCollector
metricCollector) {
return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf,
isAutoPurge, needRecycle,
- readSourceAsFileList, false, dumpDirectory, metricCollector);
+ readSourceAsFileList, false, true, dumpDirectory, metricCollector);
}
- private static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path
srcPath, Path dstPath,
- HiveConf conf, boolean isAutoPurge,
boolean needRecycle,
- boolean readSourceAsFileList,
- boolean overWrite) {
- Task<?> copyTask = null;
- LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath);
- if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
- ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false,
overWrite);
- rcwork.setReadSrcAsFilesList(readSourceAsFileList);
- if (replicationSpec.isReplace() &&
(conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION))) {
- rcwork.setDeleteDestIfExist(true);
- rcwork.setAutoPurge(isAutoPurge);
- rcwork.setNeedRecycle(needRecycle);
- }
- // For replace case, duplicate check should not be done. The new base
directory will automatically make the older
- // data invisible. Doing duplicate check and ignoring copy will cause
consistency issue if there are multiple
- // replace events getting replayed in the first incremental load.
- rcwork.setCheckDuplicateCopy(replicationSpec.needDupCopyCheck() &&
!replicationSpec.isReplace());
- LOG.debug("ReplCopyTask:\trcwork");
- String distCpDoAsUser =
conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
- rcwork.setDistCpDoAsUser(distCpDoAsUser);
- copyTask = TaskFactory.get(rcwork, conf);
- } else {
- LOG.debug("ReplCopyTask:\tcwork");
- copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf);
- }
- return copyTask;
- }
private static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path
srcPath, Path dstPath,
HiveConf conf, boolean isAutoPurge,
boolean needRecycle,
boolean readSourceAsFileList,
- boolean overWrite,
+ boolean overWrite, boolean autoPurge,
Review comment:
Why do we need to have both isAutoPurge and autoPurge? Can we simplify
this part?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
##########
@@ -204,10 +209,10 @@ public void setResultValues(List<String> resultValues) {
replSpec.setInReplicationScope(true);
EximUtil.DataCopyPath managedTableCopyPath = new
EximUtil.DataCopyPath(replSpec);
managedTableCopyPath.loadFromString(managedTblCopyPathIterator.next());
- Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+ Task<?> copyTask = ReplCopyTask.getDumpCopyTask(
managedTableCopyPath.getReplicationSpec(),
managedTableCopyPath.getSrcPath(),
- managedTableCopyPath.getTargetPath(), conf, false,
shouldOverwrite,
- getCurrentDumpPath().toString(), getMetricCollector());
+ managedTableCopyPath.getTargetPath(), conf, false,
shouldOverwrite, !isBootstrap,
Review comment:
isBootstrap value doesn't influence isAutoPurge as that will default to
false when this method is called. Is there any other usage of this !isBootstrap?
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -655,6 +643,11 @@ private static void populateLlapDaemonVarsSet(Set<String>
llapDaemonVarsSetLocal
"Provide the maximum number of partitions of a table that will be
batched together during \n"
+ "repl load. All the partitions in a batch will make a single
metastore call to update the metadata. \n"
+ "The data for these partitions will be copied before copying the
metadata batch. "),
+
REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE("hive.repl.load.partitions.with.data.copy.batch.size",
+ 1000,
Review comment:
nit: accommodate in previous line
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
##########
@@ -649,8 +644,7 @@ private static ImportTableDesc
getBaseCreateTableDescFromTable(String dbName,
LoadFileType loadFileType;
Path destPath;
- if (replicationSpec.isInReplicationScope() &&
(x.getCtx().getConf().getBoolean(
- REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
+ if (replicationSpec.isInReplicationScope()) {
Review comment:
Will it be any different for import/eexport
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
##########
@@ -207,119 +220,88 @@ private void addConsolidatedPartitionDesc() throws
Exception {
tableDesc.getTableName(), true, partitions);
//don't need to add ckpt task separately. Added as part of add partition
task
- addPartition((toPartitionCount < totalPartitionCount),
consolidatedPartitionDesc, null);
- if (partitions.size() > 0) {
- LOG.info("Added {} partitions", partitions.size());
+ addPartition((toPartitionCount < totalPartitionCount),
consolidatedPartitionDesc);
+ if (!tracker.canAddMoreTasks()) {
+ //No need to do processing as no more tasks can be added. Will be
processed in next run. State is already
+ //updated in add partition task
+ return;
}
currentPartitionCount = toPartitionCount;
}
}
private TaskTracker forNewTable() throws Exception {
- if (isMetaDataOp() ||
TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
- // Place all partitions in single task to reduce load on HMS.
- addConsolidatedPartitionDesc();
- return tracker;
- }
-
- Iterator<AlterTableAddPartitionDesc> iterator =
event.partitionDescriptions(tableDesc).iterator();
- while (iterator.hasNext() && tracker.canAddMoreTasks()) {
- AlterTableAddPartitionDesc currentPartitionDesc = iterator.next();
- /*
- the currentPartitionDesc cannot be inlined as we need the hasNext() to
be evaluated post the
- current retrieved lastReplicatedPartition
- */
- addPartition(iterator.hasNext(), currentPartitionDesc, null);
- }
+ // Place all partitions in single task to reduce load on HMS.
+ addConsolidatedPartitionDesc(null);
return tracker;
}
- private void addPartition(boolean hasMorePartitions,
AlterTableAddPartitionDesc addPartitionDesc, Task<?> ptnRootTask)
+ private void addPartition(boolean hasMorePartitions,
AlterTableAddPartitionDesc addPartitionDesc)
throws Exception {
- tracker.addTask(tasksForAddPartition(table, addPartitionDesc,
ptnRootTask));
- if (hasMorePartitions && !tracker.canAddMoreTasks()) {
+ boolean processingComplete = addTasksForPartition(table, addPartitionDesc,
null,
+ PartitionState.Stage.PARTITION);
+ //If processing is not complete, means replication state is already
updated with copy or move tasks which need
+ //to be processed
+ if (processingComplete && hasMorePartitions && !tracker.canAddMoreTasks())
{
ReplicationState currentReplicationState =
new ReplicationState(new PartitionState(table.getTableName(),
addPartitionDesc));
updateReplicationState(currentReplicationState);
}
}
/**
- * returns the root task for adding a partition
+ * returns the root task for adding all partitions in a batch
*/
- private Task<?> tasksForAddPartition(Table table, AlterTableAddPartitionDesc
addPartitionDesc, Task<?> ptnRootTask)
+ private boolean addTasksForPartition(Table table, AlterTableAddPartitionDesc
addPartitionDesc,
+ AlterTableAddPartitionDesc.PartitionDesc
lastPartSpec,
+ PartitionState.Stage lastStage)
throws MetaException, HiveException {
Task<?> addPartTask = TaskFactory.get(
new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc,
true, (new Path(context.dumpDirectory)).getParent().toString(),
this.metricCollector),
context.hiveConf
);
- //checkpointing task already added as part of add batch of partition in
case for metadata only and external tables
+ //checkpointing task already added as part of add batch of partition
if (isMetaDataOp() ||
TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
- if (ptnRootTask == null) {
- ptnRootTask = addPartTask;
- } else {
- ptnRootTask.addDependentTask(addPartTask);
- }
- return ptnRootTask;
+ tracker.addTask(addPartTask);
+ return true;
}
-
- AlterTableAddPartitionDesc.PartitionDesc partSpec =
addPartitionDesc.getPartitions().get(0);
- Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
- Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table,
partSpec);
- partSpec.setLocation(replicaWarehousePartitionLocation.toString());
- LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
- + partSpecToString(partSpec.getPartSpec()) + " with source location: "
- + partSpec.getLocation());
- Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
- tableDesc,
- (HashMap<String, String>)partSpec.getPartSpec(),
- context.dumpDirectory,
- this.metricCollector,
- context.hiveConf
- );
-
- Path stagingDir = replicaWarehousePartitionLocation;
- // if move optimization is enabled, copy the files directly to the target
path. No need to create the staging dir.
- LoadFileType loadFileType;
- if (event.replicationSpec().isInReplicationScope() &&
- context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
- loadFileType = LoadFileType.IGNORE;
- } else {
- loadFileType = event.replicationSpec().isReplace() ?
LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
- stagingDir =
PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation,
context.pathInfo);
- }
- boolean copyAtLoad =
context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
- Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+ //Add Copy task for all partitions
+ //If last stage was move, that means all copy tasks processed, go to
processing move tasks
+ boolean lastProcessedStageFound = false;
+ for (AlterTableAddPartitionDesc.PartitionDesc partSpec :
addPartitionDesc.getPartitions()) {
+ if (!tracker.canAddMoreTasks()) {
+ //update replication state with the copy task added with which it
needs to proceed next
+ ReplicationState currentReplicationState =
+ new ReplicationState(new PartitionState(table.getTableName(),
addPartitionDesc,
+ partSpec, PartitionState.Stage.COPY));
+ updateReplicationState(currentReplicationState);
+ return false;
+ }
+ Path replicaWarehousePartitionLocation =
locationOnReplicaWarehouse(table, partSpec);
+ partSpec.setLocation(replicaWarehousePartitionLocation.toString());
+ LOG.debug("adding dependent CopyWork for partition "
+ + partSpecToString(partSpec.getPartSpec()) + " with source location: "
+ + partSpec.getLocation());
+ if (!lastProcessedStageFound && lastPartSpec != null &&
+ lastPartSpec.getLocation() != partSpec.getLocation()) {
+ //Don't process copy task if already processed as part of previous run
+ continue;
+ }
+ lastProcessedStageFound = true;
+ boolean copyAtLoad =
context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
+ Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
event.replicationSpec(),
- new Path(event.dataPath() + Path.SEPARATOR +
getPartitionName(sourceWarehousePartitionLocation)),
- stagingDir,
+ new Path(event.dataPath() + Path.SEPARATOR +
Warehouse.makePartPath(partSpec.getPartSpec())),
+ replicaWarehousePartitionLocation,
context.hiveConf, copyAtLoad, false, (new
Path(context.dumpDirectory)).getParent().toString(),
this.metricCollector
- );
-
- Task<?> movePartitionTask = null;
- if (loadFileType != LoadFileType.IGNORE) {
- // no need to create move task, if file is moved directly to target
location.
- movePartitionTask = movePartitionTask(table, partSpec, stagingDir,
loadFileType);
- }
-
- if (ptnRootTask == null) {
- ptnRootTask = copyTask;
- } else {
- ptnRootTask.addDependentTask(copyTask);
- }
-
- // Set Checkpoint task as dependant to the tail of add partition tasks.
So, if same dump is
- // retried for bootstrap, we skip current partition update.
- copyTask.addDependentTask(addPartTask);
- if (movePartitionTask != null) {
- addPartTask.addDependentTask(movePartitionTask);
- movePartitionTask.addDependentTask(ckptTask);
- } else {
- addPartTask.addDependentTask(ckptTask);
+ );
+ tracker.addTask(copyTask);
}
- return ptnRootTask;
+ //add partition metadata task once all the copy tasks are added
+ tracker.addDependentTask(addPartTask);
Review comment:
If we will have a check-pointing for stage (COPY,PARTITION), if there is
a restart for some reason, it can avoid re-doing first stage if that was
already done. Should we accommodate that?
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
##########
@@ -27,19 +27,14 @@
import org.apache.hadoop.hive.ql.ddl.DDLWork;
import
org.apache.hadoop.hive.ql.ddl.table.partition.add.AlterTableAddPartitionDesc;
import
org.apache.hadoop.hive.ql.ddl.table.partition.drop.AlterTableDropPartitionDesc;
-import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.*;
Review comment:
Revert this
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
##########
@@ -207,119 +220,88 @@ private void addConsolidatedPartitionDesc() throws
Exception {
tableDesc.getTableName(), true, partitions);
//don't need to add ckpt task separately. Added as part of add partition
task
- addPartition((toPartitionCount < totalPartitionCount),
consolidatedPartitionDesc, null);
- if (partitions.size() > 0) {
- LOG.info("Added {} partitions", partitions.size());
+ addPartition((toPartitionCount < totalPartitionCount),
consolidatedPartitionDesc);
+ if (!tracker.canAddMoreTasks()) {
+ //No need to do processing as no more tasks can be added. Will be
processed in next run. State is already
+ //updated in add partition task
+ return;
}
currentPartitionCount = toPartitionCount;
}
}
private TaskTracker forNewTable() throws Exception {
- if (isMetaDataOp() ||
TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
- // Place all partitions in single task to reduce load on HMS.
- addConsolidatedPartitionDesc();
- return tracker;
- }
-
- Iterator<AlterTableAddPartitionDesc> iterator =
event.partitionDescriptions(tableDesc).iterator();
- while (iterator.hasNext() && tracker.canAddMoreTasks()) {
- AlterTableAddPartitionDesc currentPartitionDesc = iterator.next();
- /*
- the currentPartitionDesc cannot be inlined as we need the hasNext() to
be evaluated post the
- current retrieved lastReplicatedPartition
- */
- addPartition(iterator.hasNext(), currentPartitionDesc, null);
- }
+ // Place all partitions in single task to reduce load on HMS.
+ addConsolidatedPartitionDesc(null);
return tracker;
}
- private void addPartition(boolean hasMorePartitions,
AlterTableAddPartitionDesc addPartitionDesc, Task<?> ptnRootTask)
+ private void addPartition(boolean hasMorePartitions,
AlterTableAddPartitionDesc addPartitionDesc)
throws Exception {
- tracker.addTask(tasksForAddPartition(table, addPartitionDesc,
ptnRootTask));
- if (hasMorePartitions && !tracker.canAddMoreTasks()) {
+ boolean processingComplete = addTasksForPartition(table, addPartitionDesc,
null,
+ PartitionState.Stage.PARTITION);
+ //If processing is not complete, means replication state is already
updated with copy or move tasks which need
Review comment:
Do we have move task in this case anymore?
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
##########
@@ -270,10 +269,9 @@ static TableLocationTuple tableLocation(ImportTableDesc
tblDesc, Database parent
Path dataPath = fromURI;
Path tmpPath = tgtPath;
- // if move optimization is enabled, copy the files directly to the target
path. No need to create the staging dir.
+ // if acid tables, copy the files directly to the target path. No need to
create the staging dir.
LoadFileType loadFileType;
- if (replicationSpec.isInReplicationScope() &&
- context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
+ if (replicationSpec.isInReplicationScope() &&
AcidUtils.isTransactionalTable(table)) {
Review comment:
For partitioned table, we always assume that it's an acid table case,
shouldn't we assume the same here as well?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]