Repository: hive Updated Branches: refs/heads/master c358ef5af -> 8d5e8a60a
HIVE-19499: Bootstrap REPL LOAD shall add tasks to create checkpoints for db/tables/partitions. (Sankar Hariappan, reviewed by Mahesh Kumar Behera,Anishek Agarwal) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d5e8a60 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d5e8a60 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d5e8a60 Branch: refs/heads/master Commit: 8d5e8a60ae6eeaaac8369a718fa7d5c04d142f57 Parents: c358ef5 Author: Anishek Agarwal <anis...@gmail.com> Authored: Fri May 25 13:13:22 2018 +0530 Committer: Anishek Agarwal <anis...@gmail.com> Committed: Fri May 25 13:13:22 2018 +0530 ---------------------------------------------------------------------- ...TestReplicationScenariosAcrossInstances.java | 46 ++++++++++++++++ .../hadoop/hive/ql/parse/WarehouseInstance.java | 15 +++++ .../hadoop/hive/ql/exec/repl/ReplUtils.java | 58 ++++++++++++++++++++ .../ql/exec/repl/bootstrap/ReplLoadTask.java | 3 +- .../ql/exec/repl/bootstrap/ReplLoadWork.java | 2 + .../exec/repl/bootstrap/load/LoadDatabase.java | 6 ++ .../exec/repl/bootstrap/load/TaskTracker.java | 19 ++++++- .../bootstrap/load/table/LoadPartitions.java | 40 +++++++------- .../repl/bootstrap/load/table/LoadTable.java | 34 +++++------- .../exec/repl/bootstrap/load/util/Context.java | 4 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 49 ++++++++--------- 11 files changed, 203 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 8caa55c..bcbf113 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -25,9 +25,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.util.DependencyResolver; import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -47,6 +51,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.hamcrest.CoreMatchers.equalTo; @@ -54,6 +59,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class TestReplicationScenariosAcrossInstances { @Rule @@ -757,4 +763,44 @@ public class TestReplicationScenariosAcrossInstances { FileSystem fs = cSerdesTableDumpLocation.getFileSystem(primary.hiveConf); assertFalse(fs.exists(cSerdesTableDumpLocation)); } + + private void verifyIfCkptSet(Map<String, String> props, String dumpDir) { + assertTrue(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY)); + assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(dumpDir)); + } + + @Test + public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (10)") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='uk') values ('london')") + .run("insert into table t2 partition(country='us') values ('sfo')") + .dump(primaryDbName, null); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select country from t2") + .verifyResults(Arrays.asList("india", "uk", "us")); + + Database db = replica.getDatabase(replicatedDbName); + verifyIfCkptSet(db.getParameters(), tuple.dumpLocation); + Table t1 = replica.getTable(replicatedDbName, "t1"); + verifyIfCkptSet(t1.getParameters(), tuple.dumpLocation); + Table t2 = replica.getTable(replicatedDbName, "t2"); + verifyIfCkptSet(t2.getParameters(), tuple.dumpLocation); + Partition india = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("india")); + verifyIfCkptSet(india.getParameters(), tuple.dumpLocation); + Partition us = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("us")); + verifyIfCkptSet(us.getParameters(), tuple.dumpLocation); + Partition uk = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("uk")); + verifyIfCkptSet(uk.getParameters(), tuple.dumpLocation); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index dc31e92..17fd799 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -30,6 +30,9 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; @@ -319,6 +322,18 @@ public class WarehouseInstance implements Closeable { } } + public Database getDatabase(String dbName) throws Exception { + return client.getDatabase(dbName); + } + + public Table getTable(String dbName, String tableName) throws Exception { + return client.getTable(dbName, tableName); + } + + public Partition getPartition(String dbName, String tableName, List<String> partValues) throws Exception { + return client.getPartition(dbName, tableName, partValues); + } + ReplicationV1CompatRule getReplivationV1CompatRule(List<String> testsToSkip) { return new ReplicationV1CompatRule(client, hiveConf, testsToSkip); } http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java new file mode 100644 index 0000000..cbec3ad --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.plan.AlterTableDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.ImportTableDesc; +import org.apache.hadoop.hive.ql.stats.StatsUtils; + +import java.util.HashMap; +import java.util.HashSet; + + +public class ReplUtils { + + public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key"; + + public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf) + throws SemanticException { + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableDesc.tableType()); + return TaskFactory.get(replLogWork, conf); + } + + public static Task<?> getTableCheckpointTask(ImportTableDesc tableDesc, HashMap<String, String> partSpec, + String dumpRoot, HiveConf conf) throws SemanticException { + HashMap<String, String> mapProp = new HashMap<>(); + mapProp.put(REPL_CHECKPOINT_KEY, dumpRoot); + + AlterTableDesc alterTblDesc = new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS); + alterTblDesc.setProps(mapProp); + alterTblDesc.setOldName( + StatsUtils.getFullyQualifiedTableName(tableDesc.getDatabaseName(), tableDesc.getTableName())); + if (partSpec != null) { + alterTblDesc.setPartSpec(partSpec); + } + return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterTblDesc), conf); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index 748d318..97917f8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -72,7 +72,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { protected int execute(DriverContext driverContext) { try { int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); - Context context = new Context(conf, getHive(), work.sessionStateLineageState, driverContext.getCtx()); + Context context = new Context(work.dumpDirectory, conf, getHive(), + work.sessionStateLineageState, driverContext.getCtx()); TaskTracker loadTaskTracker = new TaskTracker(maxTasks); /* for now for simplicity we are doing just one directory ( one database ), come back to use http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java index c1a9a62..048727f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -33,6 +33,7 @@ import java.io.Serializable; public class ReplLoadWork implements Serializable { final String dbNameToLoadIn; final String tableNameToLoadIn; + final String dumpDirectory; private final BootstrapEventsIterator iterator; private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; @@ -50,6 +51,7 @@ public class ReplLoadWork implements Serializable { throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; sessionStateLineageState = lineageState; + this.dumpDirectory = dumpDirectory; this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); this.dbNameToLoadIn = dbNameToLoadIn; http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java index 537c5e9..c5f2779 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.PrincipalDesc; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; import java.io.Serializable; import java.util.HashMap; @@ -84,6 +85,11 @@ public class LoadDatabase { */ Map<String, String> parameters = new HashMap<>(dbObj.getParameters()); parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + + // Add the checkpoint key to the Database binding it to current dump directory. + // So, if retry using same dump, we shall skip Database object update. + parameters.put(ReplUtils.REPL_CHECKPOINT_KEY, context.dumpDirectory); + createDbDesc.setDatabaseProperties(parameters); // note that we do not set location - for repl load, we want that auto-created. createDbDesc.setIfNotExists(false); http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java index 95f484b..f8f0801 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,8 +66,21 @@ public class TaskTracker { updateTaskCount(task, visited); } - public void updateTaskCount(Task<? extends Serializable> task, - List <Task<? extends Serializable>> visited) { + // This method is used to traverse the DAG created in tasks list and add the dependent task to + // the tail of each task chain. + public void addDependentTask(Task<? extends Serializable> dependent) { + if (tasks.isEmpty()) { + addTask(dependent); + } else { + DAGTraversal.traverse(tasks, new AddDependencyToLeaves(dependent)); + + List<Task<? extends Serializable>> visited = new ArrayList<>(); + updateTaskCount(dependent, visited); + } + } + + private void updateTaskCount(Task<? extends Serializable> task, + List <Task<? extends Serializable>> visited) { numberOfTasks += 1; visited.add(task); if (task.getChildTasks() != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index df7f30d..870f70a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -26,14 +26,12 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; -import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -55,8 +53,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -112,21 +110,6 @@ public class LoadPartitions { } } - private void createTableReplLogTask() throws SemanticException { - ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, - tableDesc.getTableName(), tableDesc.tableType()); - Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, context.hiveConf); - - if (tracker.tasks().isEmpty()) { - tracker.addTask(replLogTask); - } else { - DAGTraversal.traverse(tracker.tasks(), new AddDependencyToLeaves(replLogTask)); - - List<Task<? extends Serializable>> visited = new ArrayList<>(); - tracker.updateTaskCount(replLogTask, visited); - } - } - public TaskTracker tasks() throws SemanticException { try { /* @@ -143,7 +126,9 @@ public class LoadPartitions { updateReplicationState(initialReplicationState()); if (!forNewTable().hasReplicationState()) { // Add ReplStateLogTask only if no pending table load tasks left for next cycle - createTableReplLogTask(); + Task<? extends Serializable> replLogTask + = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf); + tracker.addDependentTask(replLogTask); } return tracker; } @@ -155,7 +140,9 @@ public class LoadPartitions { updateReplicationState(initialReplicationState()); if (!forExistingTable(lastReplicatedPartition).hasReplicationState()) { // Add ReplStateLogTask only if no pending table load tasks left for next cycle - createTableReplLogTask(); + Task<? extends Serializable> replLogTask + = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf); + tracker.addDependentTask(replLogTask); } return tracker; } @@ -229,8 +216,19 @@ public class LoadPartitions { Task<?> movePartitionTask = movePartitionTask(table, partSpec, tmpPath); + // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for + // bootstrap, we skip current partition update. + Task<?> ckptTask = ReplUtils.getTableCheckpointTask( + tableDesc, + (HashMap<String, String>)partSpec.getPartSpec(), + context.dumpDirectory, + context.hiveConf + ); + copyTask.addDependentTask(addPartTask); addPartTask.addDependentTask(movePartitionTask); + movePartitionTask.addDependentTask(ckptTask); + return copyTask; } http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index d10ca76..f2b7fa4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; @@ -27,13 +26,11 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; -import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; @@ -52,7 +49,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -79,21 +75,6 @@ public class LoadTable { this.tracker = new TaskTracker(limiter); } - private void createTableReplLogTask(String tableName, TableType tableType) throws SemanticException { - ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger,tableName, tableType); - Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork); - DAGTraversal.traverse(tracker.tasks(), new AddDependencyToLeaves(replLogTask)); - - if (tracker.tasks().isEmpty()) { - tracker.addTask(replLogTask); - } else { - DAGTraversal.traverse(tracker.tasks(), new AddDependencyToLeaves(replLogTask)); - - List<Task<? extends Serializable>> visited = new ArrayList<>(); - tracker.updateTaskCount(replLogTask, visited); - } - } - public TaskTracker tasks() throws SemanticException { // Path being passed to us is a table dump location. We go ahead and load it in as needed. // If tblName is null, then we default to the table name specified in _metadata, which is good. @@ -159,9 +140,20 @@ public class LoadTable { existingTableTasks(tableDesc, table, replicationSpec); } + // Set Checkpoint task as dependant to create table task. So, if same dump is retried for + // bootstrap, we skip current table update. + Task<?> ckptTask = ReplUtils.getTableCheckpointTask( + tableDesc, + null, + context.dumpDirectory, + context.hiveConf + ); if (!isPartitioned(tableDesc)) { - createTableReplLogTask(tableDesc.getTableName(), tableDesc.tableType()); + Task<? extends Serializable> replLogTask + = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf); + ckptTask.addDependentTask(replLogTask); } + tracker.addDependentTask(ckptTask); return tracker; } catch (Exception e) { throw new SemanticException(e); http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java index 7eae1ea..b90da06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.session.LineageState; public class Context { + public final String dumpDirectory; public final HiveConf hiveConf; public final Hive hiveDb; public final Warehouse warehouse; @@ -38,9 +39,10 @@ public class Context { public final LineageState sessionStateLineageState; - public Context(HiveConf hiveConf, Hive hiveDb, + public Context(String dumpDirectory, HiveConf hiveConf, Hive hiveDb, LineageState lineageState, org.apache.hadoop.hive.ql.Context nestedContext) throws MetaException { + this.dumpDirectory = dumpDirectory; this.hiveConf = hiveConf; this.hiveDb = hiveDb; this.warehouse = new Warehouse(hiveConf); http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 21b9865..a2b5897 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1136,11 +1136,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - // Clean the txn to writeid map/TXN_COMPONENTS for the given table as we bootstrap here - String sql = "delete from TXN_TO_WRITE_ID where t2w_database = " + quoteString(dbName) - + " and t2w_table = " + quoteString(tblName); - LOG.debug("Going to execute delete <" + sql + ">"); - stmt.executeUpdate(sql); + // Check if this txn state is already replicated for this given table. If yes, then it is + // idempotent case and just return. + String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName) + + " and nwi_table = " + quoteString(tblName); + LOG.debug("Going to execute query <" + sql + ">"); + + rs = stmt.executeQuery(sql); + if (rs.next()) { + LOG.info("Idempotent flow: WriteId state <" + validWriteIdList + "> is already applied for the table: " + + dbName + "." + tblName); + rollbackDBConn(dbConn); + return; + } if (numAbortedWrites > 0) { // Allocate/Map one txn per aborted writeId and abort the txn to mark writeid as aborted. @@ -1173,30 +1181,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { // There are some txns in the list which has no write id allocated and hence go ahead and do it. // Get the next write id for the given table and update it with new next write id. - // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID - sql = sqlGenerator.addForUpdateClause( - "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName) - + " and nwi_table = " + quoteString(tblName)); - LOG.debug("Going to execute query <" + sql + ">"); - + // It is expected NEXT_WRITE_ID doesn't have entry for this table and hence directly insert it. long nextWriteId = validWriteIdList.getHighWatermark() + 1; - rs = stmt.executeQuery(sql); - if (!rs.next()) { - // First allocation of write id (hwm+1) should add the table to the next_write_id meta table. - sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (" - + quoteString(dbName) + "," + quoteString(tblName) + "," - + Long.toString(nextWriteId) + ")"; - LOG.debug("Going to execute insert <" + sql + ">"); - stmt.execute(sql); - } else { - // Update the NEXT_WRITE_ID for the given table with hwm+1 from source - sql = "update NEXT_WRITE_ID set nwi_next = " + (nextWriteId) - + " where nwi_database = " + quoteString(dbName) - + " and nwi_table = " + quoteString(tblName); - LOG.debug("Going to execute update <" + sql + ">"); - stmt.executeUpdate(sql); - } + // First allocation of write id (hwm+1) should add the table to the next_write_id meta table. + sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (" + + quoteString(dbName) + "," + quoteString(tblName) + "," + + Long.toString(nextWriteId) + ")"; + LOG.debug("Going to execute insert <" + sql + ">"); + stmt.execute(sql); + + LOG.info("WriteId state <" + validWriteIdList + "> is applied for the table: " + dbName + "." + tblName); LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) {