HIVE-18988: Support bootstrap replication of ACID tables (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Thejas M Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4c0475ff Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4c0475ff Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4c0475ff Branch: refs/heads/storage-branch-2.6 Commit: 4c0475ffb4a3157270cd5244cf39823085a68f57 Parents: dcc733b Author: Sankar Hariappan <sank...@apache.org> Authored: Wed May 2 22:15:29 2018 +0530 Committer: Deepak Jaiswal <djais...@apache.org> Committed: Tue May 8 11:35:04 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 + .../listener/DbNotificationListener.java | 2 +- .../TestReplicationScenariosAcidTables.java | 167 + .../java/org/apache/hadoop/hive/ql/Driver.java | 1 + .../hadoop/hive/ql/exec/ReplCopyTask.java | 8 +- .../apache/hadoop/hive/ql/exec/ReplTxnTask.java | 27 +- .../apache/hadoop/hive/ql/exec/ReplTxnWork.java | 106 - .../apache/hadoop/hive/ql/exec/TaskFactory.java | 1 + .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 86 +- .../hadoop/hive/ql/exec/repl/ReplDumpWork.java | 4 +- .../ql/exec/repl/bootstrap/ReplLoadTask.java | 9 +- .../ql/exec/repl/bootstrap/ReplLoadWork.java | 8 +- .../exec/repl/bootstrap/events/TableEvent.java | 3 + .../events/filesystem/FSPartitionEvent.java | 6 + .../events/filesystem/FSTableEvent.java | 16 + .../bootstrap/load/table/LoadPartitions.java | 49 +- .../repl/bootstrap/load/table/LoadTable.java | 51 +- .../exec/repl/bootstrap/load/util/Context.java | 4 +- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 36 + .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 10 + .../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 6 + .../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 17 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 5 +- .../hive/ql/parse/LoadSemanticAnalyzer.java | 2 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 7 +- .../hadoop/hive/ql/parse/ReplicationSpec.java | 28 +- .../hadoop/hive/ql/parse/repl/CopyUtils.java | 98 +- .../dump/BootStrapReplicationSpecFunction.java | 10 +- .../hive/ql/parse/repl/dump/HiveWrapper.java | 6 +- .../ql/parse/repl/dump/PartitionExport.java | 6 +- .../hive/ql/parse/repl/dump/TableExport.java | 14 +- .../hadoop/hive/ql/parse/repl/dump/Utils.java | 10 + .../ql/parse/repl/dump/io/FileOperations.java | 69 +- .../parse/repl/dump/io/FunctionSerializer.java | 2 +- .../repl/load/message/AbortTxnHandler.java | 2 +- .../repl/load/message/AllocWriteIdHandler.java | 2 +- .../repl/load/message/CommitTxnHandler.java | 2 +- .../parse/repl/load/message/OpenTxnHandler.java | 2 +- .../hadoop/hive/ql/plan/ImportTableDesc.java | 11 + .../apache/hadoop/hive/ql/plan/ReplTxnWork.java | 124 + .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 2632 +++++---- .../gen/thrift/gen-cpp/ThriftHiveMetastore.h | 107 + .../ThriftHiveMetastore_server.skeleton.cpp | 5 + .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 3303 +++++------ .../gen/thrift/gen-cpp/hive_metastore_types.h | 75 + .../metastore/api/AddDynamicPartitions.java | 32 +- .../api/AllocateTableWriteIdsRequest.java | 68 +- .../api/AllocateTableWriteIdsResponse.java | 36 +- .../metastore/api/ClearFileMetadataRequest.java | 32 +- .../hive/metastore/api/ClientCapabilities.java | 32 +- .../hive/metastore/api/CompactionRequest.java | 44 +- .../hive/metastore/api/CreationMetadata.java | 32 +- .../metastore/api/FindSchemasByColsResp.java | 36 +- .../hive/metastore/api/FireEventRequest.java | 32 +- .../metastore/api/GetAllFunctionsResponse.java | 36 +- .../api/GetFileMetadataByExprRequest.java | 32 +- .../api/GetFileMetadataByExprResult.java | 48 +- .../metastore/api/GetFileMetadataRequest.java | 32 +- .../metastore/api/GetFileMetadataResult.java | 44 +- .../hive/metastore/api/GetTablesRequest.java | 32 +- .../hive/metastore/api/GetTablesResult.java | 36 +- .../metastore/api/GetValidWriteIdsRequest.java | 32 +- .../metastore/api/GetValidWriteIdsResponse.java | 36 +- .../api/HeartbeatTxnRangeResponse.java | 64 +- .../metastore/api/InsertEventRequestData.java | 64 +- .../hadoop/hive/metastore/api/LockRequest.java | 36 +- .../hive/metastore/api/Materialization.java | 32 +- .../api/NotificationEventResponse.java | 36 +- .../metastore/api/PutFileMetadataRequest.java | 64 +- .../api/ReplTblWriteIdStateRequest.java | 952 ++++ .../hive/metastore/api/SchemaVersion.java | 36 +- .../hive/metastore/api/ShowCompactResponse.java | 36 +- .../hive/metastore/api/ShowLocksResponse.java | 36 +- .../hive/metastore/api/TableValidWriteIds.java | 32 +- .../hive/metastore/api/ThriftHiveMetastore.java | 5145 ++++++++++-------- .../hive/metastore/api/WMFullResourcePlan.java | 144 +- .../api/WMGetAllResourcePlanResponse.java | 36 +- .../WMGetTriggersForResourePlanResponse.java | 36 +- .../api/WMValidateResourcePlanResponse.java | 64 +- .../gen-php/metastore/ThriftHiveMetastore.php | 2066 +++---- .../src/gen/thrift/gen-php/metastore/Types.php | 1028 ++-- .../hive_metastore/ThriftHiveMetastore-remote | 7 + .../hive_metastore/ThriftHiveMetastore.py | 1482 ++--- .../gen/thrift/gen-py/hive_metastore/ttypes.py | 676 ++- .../gen/thrift/gen-rb/hive_metastore_types.rb | 31 + .../gen/thrift/gen-rb/thrift_hive_metastore.rb | 52 + .../hadoop/hive/metastore/HiveMetaStore.java | 5 + .../hive/metastore/HiveMetaStoreClient.java | 27 + .../hadoop/hive/metastore/IMetaStoreClient.java | 19 +- .../hive/metastore/ReplChangeManager.java | 63 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 326 +- .../hadoop/hive/metastore/txn/TxnStore.java | 11 +- .../hadoop/hive/metastore/txn/TxnUtils.java | 51 +- .../src/main/thrift/hive_metastore.thrift | 10 + .../HiveMetaStoreClientPreCatalog.java | 27 + .../hadoop/hive/common/ValidReadTxnList.java | 3 - 96 files changed, 12249 insertions(+), 8264 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 049a594..668750c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -469,6 +469,11 @@ public class HiveConf extends Configuration { + "metadata for acid tables which do not require the corresponding transaction \n" + "semantics to be applied on target. This can be removed when ACID table \n" + "replication is supported."), + REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT("hive.repl.bootstrap.dump.open.txn.timeout", "1h", + new TimeValidator(TimeUnit.HOURS), + "Indicates the timeout for all transactions which are opened before triggering bootstrap REPL DUMP. " + + "If these open transactions are not closed within the timeout value, then REPL DUMP will " + + "forcefully abort those transactions and continue with bootstrap dump."), //https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser REPL_ADD_RAW_RESERVED_NAMESPACE("hive.repl.add.raw.reserved.namespace", false, "For TDE with same encryption keys on source and target, allow Distcp super user to access \n" http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 59d1e3a..7835691 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -240,7 +240,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener FileStatus file = files[i]; i++; return ReplChangeManager.encodeFileUri(file.getPath().toString(), - ReplChangeManager.checksumFor(file.getPath(), fs)); + ReplChangeManager.checksumFor(file.getPath(), fs), null); } catch (IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 2ad83b6..8ad507f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -19,12 +19,21 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.shims.Utils; import org.junit.rules.TestName; import org.junit.rules.TestRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -32,7 +41,9 @@ import org.junit.BeforeClass; import org.junit.AfterClass; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; /** * TestReplicationScenariosAcidTables - test replication for ACID tables @@ -59,6 +70,9 @@ public class TestReplicationScenariosAcidTables { put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); put("hive.support.concurrency", "true"); put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + put("hive.repl.dump.include.acid.tables", "true"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); }}; primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); @@ -66,6 +80,8 @@ public class TestReplicationScenariosAcidTables { put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); put("hive.support.concurrency", "false"); put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + put("hive.repl.dump.include.acid.tables", "true"); + put("hive.metastore.client.capability.check", "false"); }}; replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1); } @@ -92,6 +108,157 @@ public class TestReplicationScenariosAcidTables { } @Test + public void testAcidTablesBootstrap() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary + .run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)") + .run("insert into t1 values(2)") + .run("create table t2 (place string) partitioned by (country string) clustered by(place) " + + "into 3 buckets stored as orc tblproperties (\"transactional\"=\"true\")") + .run("insert into t2 partition(country='india') values ('bangalore')") + .run("insert into t2 partition(country='us') values ('austin')") + .run("insert into t2 partition(country='france') values ('paris')") + .run("alter table t2 add partition(country='italy')") + .run("create table t3 (rank int) tblproperties(\"transactional\"=\"true\", " + + "\"transactional_properties\"=\"insert_only\")") + .run("insert into t3 values(11)") + .run("insert into t3 values(22)") + .run("create table t4 (id int)") + .run("insert into t4 values(111), (222)") + .run("create table t5 (id int) stored as orc ") + .run("insert into t5 values(1111), (2222)") + .run("alter table t5 set tblproperties (\"transactional\"=\"true\")") + .run("insert into t5 values(3333)") + .dump(primaryDbName, null); + + replica.load(replicatedDbName, bootstrapDump.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"t1", "t2", "t3", "t4", "t5"}) + .run("repl status " + replicatedDbName) + .verifyResult(bootstrapDump.lastReplicationId) + .run("select id from t1 order by id") + .verifyResults(new String[]{"1", "2"}) + .run("select country from t2 order by country") + .verifyResults(new String[] {"france", "india", "us"}) + .run("select rank from t3 order by rank") + .verifyResults(new String[] {"11", "22"}) + .run("select id from t4 order by id") + .verifyResults(new String[] {"111", "222"}) + .run("select id from t5 order by id") + .verifyResults(new String[] {"1111", "2222", "3333"}); + } + + @Test + public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable { + // Open 5 txns + HiveConf primaryConf = primary.getConf(); + TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf()); + OpenTxnsResponse otResp = txnHandler.openTxns(new OpenTxnRequest(5, "u1", "localhost")); + List<Long> txns = otResp.getTxn_ids(); + String txnIdRange = " txn_id >= " + txns.get(0) + " and txn_id <= " + txns.get(4); + Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"), + 5, TxnDbUtil.countQueryAgent(primaryConf, + "select count(*) from TXNS where txn_state = 'o' and " + txnIdRange)); + + // Create 2 tables, one partitioned and other not. Also, have both types of full ACID and MM tables. + primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)") + .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " + + "\"transactional_properties\"=\"insert_only\")") + .run("insert into t2 partition(name='Bob') values(11)") + .run("insert into t2 partition(name='Carl') values(10)"); + // Allocate write ids for both tables t1 and t2 for all txns + // t1=5+1(insert) and t2=5+2(insert) + AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(primaryDbName, "t1"); + rqst.setTxnIds(txns); + txnHandler.allocateTableWriteIds(rqst); + rqst.setTableName("t2"); + txnHandler.allocateTableWriteIds(rqst); + Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXN_TO_WRITE_ID"), + 6, TxnDbUtil.countQueryAgent(primaryConf, + "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + primaryDbName.toLowerCase() + + "' and t2w_table = 't1'")); + Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXN_TO_WRITE_ID"), + 7, TxnDbUtil.countQueryAgent(primaryConf, + "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + primaryDbName.toLowerCase() + + "' and t2w_table = 't2'")); + + // Bootstrap dump with open txn timeout as 1s. + List<String> withConfigs = Arrays.asList( + "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'"); + WarehouseInstance.Tuple bootstrapDump = primary + .run("use " + primaryDbName) + .dump(primaryDbName, null, withConfigs); + + // After bootstrap dump, all the opened txns should be aborted. Verify it. + Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"), + 0, TxnDbUtil.countQueryAgent(primaryConf, + "select count(*) from TXNS where txn_state = 'o' and " + txnIdRange)); + Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"), + 5, TxnDbUtil.countQueryAgent(primaryConf, + "select count(*) from TXNS where txn_state = 'a' and " + txnIdRange)); + + // Verify the next write id + String[] nextWriteId = TxnDbUtil.queryToString(primaryConf, "select nwi_next from NEXT_WRITE_ID where " + + " nwi_database = '" + primaryDbName.toLowerCase() + "' and nwi_table = 't1'") + .split("\n"); + Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 7L); + nextWriteId = TxnDbUtil.queryToString(primaryConf, "select nwi_next from NEXT_WRITE_ID where " + + " nwi_database = '" + primaryDbName.toLowerCase() + "' and nwi_table = 't2'") + .split("\n"); + Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 8L); + + // Bootstrap load which should also replicate the aborted write ids on both tables. + HiveConf replicaConf = replica.getConf(); + replica.load(replicatedDbName, bootstrapDump.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"t1", "t2"}) + .run("repl status " + replicatedDbName) + .verifyResult(bootstrapDump.lastReplicationId) + .run("select id from t1") + .verifyResults(new String[]{"1"}) + .run("select rank from t2 order by rank") + .verifyResults(new String[] {"10", "11"}); + + // Verify if HWM is properly set after REPL LOAD + nextWriteId = TxnDbUtil.queryToString(replicaConf, "select nwi_next from NEXT_WRITE_ID where " + + " nwi_database = '" + replicatedDbName.toLowerCase() + "' and nwi_table = 't1'") + .split("\n"); + Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 7L); + nextWriteId = TxnDbUtil.queryToString(replicaConf, "select nwi_next from NEXT_WRITE_ID where " + + " nwi_database = '" + replicatedDbName.toLowerCase() + "' and nwi_table = 't2'") + .split("\n"); + Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 8L); + + // Verify if all the aborted write ids are replicated to the replicated DB + Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from TXN_TO_WRITE_ID"), + 5, TxnDbUtil.countQueryAgent(replicaConf, + "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + replicatedDbName.toLowerCase() + + "' and t2w_table = 't1'")); + Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from TXN_TO_WRITE_ID"), + 5, TxnDbUtil.countQueryAgent(replicaConf, + "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + replicatedDbName.toLowerCase() + + "' and t2w_table = 't2'")); + + // Verify if entries added in COMPACTION_QUEUE for each table/partition + // t1-> 1 entry and t2-> 2 entries (1 per partition) + Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from COMPACTION_QUEUE"), + 1, TxnDbUtil.countQueryAgent(replicaConf, + "select count(*) from COMPACTION_QUEUE where cq_database = '" + replicatedDbName + + "' and cq_table = 't1'")); + Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from COMPACTION_QUEUE"), + 2, TxnDbUtil.countQueryAgent(replicaConf, + "select count(*) from COMPACTION_QUEUE where cq_database = '" + replicatedDbName + + "' and cq_table = 't2'")); + } + + @Test public void testOpenTxnEvent() throws Throwable { String tableName = testName.getMethodName(); WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 41ad002..9f4e6f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -591,6 +591,7 @@ public class Driver implements IDriver { setTriggerContext(queryId); } + ctx.setHiveTxnManager(queryTxnMgr); ctx.setStatsSource(statsSource); ctx.setCmd(command); ctx.setHDFSCleanup(true); http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index de270cf..24e7324 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -83,7 +83,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { if (ReplChangeManager.isCMFileUri(fromPath, srcFs)) { String[] result = ReplChangeManager.getFileWithChksumFromURI(fromPath.toString()); ReplChangeManager.FileInfo sourceInfo = ReplChangeManager - .getFileInfo(new Path(result[0]), result[1], conf); + .getFileInfo(new Path(result[0]), result[1], result[2], conf); if (FileUtils.copy( sourceInfo.getSrcFs(), sourceInfo.getSourcePath(), dstFs, toPath, false, false, conf)) { @@ -130,7 +130,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { console.printInfo("Copying file: " + oneSrc.getPath().toString()); LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath); srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf), - oneSrc.getPath())); + oneSrc.getPath(), null)); } } @@ -183,14 +183,14 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { try (BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(fileListing)))) { // TODO : verify if skipping charset here is okay - String line = null; + String line; while ((line = br.readLine()) != null) { LOG.debug("ReplCopyTask :_filesReadLine: {}", line); String[] fileWithChksum = ReplChangeManager.getFileWithChksumFromURI(line); try { ReplChangeManager.FileInfo f = ReplChangeManager - .getFileInfo(new Path(fileWithChksum[0]), fileWithChksum[1], conf); + .getFileInfo(new Path(fileWithChksum[0]), fileWithChksum[1], fileWithChksum[2], conf); filePaths.add(f); } catch (MetaException e) { // issue warning for missing file and throw exception http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java index 2615072..5bbc25a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.plan.ReplTxnWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; @@ -46,18 +47,13 @@ public class ReplTxnTask extends Task<ReplTxnWork> { @Override public int execute(DriverContext driverContext) { String replPolicy = work.getReplPolicy(); - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Executing ReplTxnTask " + work.getOperationType().toString() + - " for txn ids : " + work.getTxnIds().toString() + " replPolicy : " + replPolicy); - } - - String tableName = work.getTableName() == null || work.getTableName().isEmpty() ? null : work.getTableName(); - if (tableName != null) { + String tableName = work.getTableName(); + ReplicationSpec replicationSpec = work.getReplicationSpec(); + if ((tableName != null) && (replicationSpec != null)) { Table tbl; try { tbl = Hive.get().getTable(work.getDbName(), tableName); - ReplicationSpec replicationSpec = work.getReplicationSpec(); - if (replicationSpec != null && !replicationSpec.allowReplacementInto(tbl.getParameters())) { + if (!replicationSpec.allowReplacementInto(tbl.getParameters())) { // if the event is already replayed, then no need to replay it again. LOG.debug("ReplTxnTask: Event is skipped as it is already replayed. Event Id: " + replicationSpec.getReplicationState() + "Event Type: " + work.getOperationType()); @@ -75,7 +71,6 @@ public class ReplTxnTask extends Task<ReplTxnWork> { try { HiveTxnManager txnManager = driverContext.getCtx().getHiveTxnManager(); String user = UserGroupInformation.getCurrentUser().getUserName(); - LOG.debug("Replaying " + work.getOperationType().toString() + " Event for policy " + replPolicy); switch(work.getOperationType()) { case REPL_OPEN_TXN: List<Long> txnIds = txnManager.replOpenTxn(replPolicy, work.getTxnIds(), user); @@ -98,11 +93,17 @@ public class ReplTxnTask extends Task<ReplTxnWork> { case REPL_ALLOC_WRITE_ID: assert work.getTxnToWriteIdList() != null; String dbName = work.getDbName(); - String tblName = work.getTableName(); List <TxnToWriteId> txnToWriteIdList = work.getTxnToWriteIdList(); - txnManager.replAllocateTableWriteIdsBatch(dbName, tblName, replPolicy, txnToWriteIdList); + txnManager.replAllocateTableWriteIdsBatch(dbName, tableName, replPolicy, txnToWriteIdList); LOG.info("Replayed alloc write Id Event for repl policy: " + replPolicy + " db Name : " + dbName + - " txnToWriteIdList: " +txnToWriteIdList.toString() + " table name: " + tblName); + " txnToWriteIdList: " +txnToWriteIdList.toString() + " table name: " + tableName); + return 0; + case REPL_WRITEID_STATE: + txnManager.replTableWriteIdState(work.getValidWriteIdList(), + work.getDbName(), tableName, work.getPartNames()); + LOG.info("Replicated WriteId state for DbName: " + work.getDbName() + + " TableName: " + tableName + + " ValidWriteIdList: " + work.getValidWriteIdList()); return 0; default: LOG.error("Operation Type " + work.getOperationType() + " is not supported "); http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java deleted file mode 100644 index 530e9be..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec; - -import java.io.Serializable; - -import org.apache.hadoop.hive.metastore.api.TxnToWriteId; -import org.apache.hadoop.hive.ql.parse.ReplicationSpec; -import org.apache.hadoop.hive.ql.plan.Explain; -import org.apache.hadoop.hive.ql.plan.Explain.Level; -import java.util.Collections; -import java.util.List; - -/** - * ReplTxnTask. - * Used for replaying the transaction related events. - */ -@Explain(displayName = "Replication Transaction", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) -public class ReplTxnWork implements Serializable { - private static final long serialVersionUID = 1L; - private String dbName; - private String tableName; - private String replPolicy; - private List<Long> txnIds; - private List<TxnToWriteId> txnToWriteIdList; - private ReplicationSpec replicationSpec; - - /** - * OperationType. - * Different kind of events supported for replaying. - */ - public enum OperationType { - REPL_OPEN_TXN, REPL_ABORT_TXN, REPL_COMMIT_TXN, REPL_ALLOC_WRITE_ID - } - - OperationType operation; - - public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type, - List<TxnToWriteId> txnToWriteIdList, ReplicationSpec replicationSpec) { - this.txnIds = txnIds; - this.dbName = dbName; - this.tableName = tableName; - this.operation = type; - this.replPolicy = replPolicy; - this.txnToWriteIdList = txnToWriteIdList; - this.replicationSpec = replicationSpec; - } - - public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type, - ReplicationSpec replicationSpec) { - this(replPolicy, dbName, tableName, txnIds, type, null, replicationSpec); - } - - public ReplTxnWork(String replPolicy, String dbName, String tableName, Long txnId, - OperationType type, ReplicationSpec replicationSpec) { - this(replPolicy, dbName, tableName, Collections.singletonList(txnId), type, null, replicationSpec); - } - - public ReplTxnWork(String replPolicy, String dbName, String tableName, OperationType type, - List<TxnToWriteId> txnToWriteIdList, ReplicationSpec replicationSpec) { - this(replPolicy, dbName, tableName, null, type, txnToWriteIdList, replicationSpec); - } - - public List<Long> getTxnIds() { - return txnIds; - } - - public String getDbName() { - return dbName; - } - - public String getTableName() { - return tableName; - } - - public String getReplPolicy() { - return replPolicy; - } - - public OperationType getOperationType() { - return operation; - } - - public List<TxnToWriteId> getTxnToWriteIdList() { - return txnToWriteIdList; - } - - public ReplicationSpec getReplicationSpec() { - return replicationSpec; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 2da6b0f..3a107b7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.ReplCopyWork; +import org.apache.hadoop.hive.ql.plan.ReplTxnWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.TezWork; http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index ce0757c..88d352b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -21,6 +21,8 @@ import com.google.common.primitives.Ints; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; @@ -39,6 +41,8 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFil import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; @@ -65,9 +69,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; @@ -90,6 +97,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } } + private static long sleepTime = 60000; private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class); private ReplLogger replLogger; @@ -204,20 +212,21 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { // bootstrap case Hive hiveDb = getHive(); Long bootDumpBeginReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); + String validTxnList = getValidTxnListForReplDump(hiveDb); for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(), Utils.getAllTables(getHive(), dbName).size(), getHive().getAllFunctions().size()); replLogger.startLog(); - Path dbRoot = dumpDbMetadata(dbName, dumpRoot); + Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId); dumpFunctionMetadata(dbName, dumpRoot); String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { LOG.debug( "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); - dumpTable(dbName, tblName, dbRoot); + dumpTable(dbName, tblName, validTxnList, dbRoot); dumpConstraintMetadata(dbName, tblName, dbRoot); } Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey); @@ -257,17 +266,17 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return bootDumpBeginReplId; } - private Path dumpDbMetadata(String dbName, Path dumpRoot) throws Exception { + private Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) throws Exception { Path dbRoot = new Path(dumpRoot, dbName); // TODO : instantiating FS objects are generally costly. Refactor FileSystem fs = dbRoot.getFileSystem(conf); Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); - HiveWrapper.Tuple<Database> database = new HiveWrapper(getHive(), dbName).database(); + HiveWrapper.Tuple<Database> database = new HiveWrapper(getHive(), dbName, lastReplId).database(); EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec); return dbRoot; } - private void dumpTable(String dbName, String tblName, Path dbRoot) throws Exception { + private void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot) throws Exception { try { Hive db = getHive(); HiveWrapper.Tuple<Table> tuple = new HiveWrapper(db, dbName).table(tblName); @@ -276,6 +285,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true); String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false + if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) { + tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList)); + } new TableExport(exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf).write(); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); @@ -286,6 +298,70 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } } + private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException { + if ((validTxnString == null) || validTxnString.isEmpty()) { + return null; + } + String fullTableName = AcidUtils.getFullTableName(dbName, tblName); + ValidWriteIdList validWriteIds = getTxnMgr() + .getValidWriteIds(Collections.singletonList(fullTableName), validTxnString) + .getTableValidWriteIdList(fullTableName); + return ((validWriteIds != null) ? validWriteIds.toString() : null); + } + + private List<Long> getOpenTxns(ValidTxnList validTxnList) { + long[] invalidTxns = validTxnList.getInvalidTransactions(); + List<Long> openTxns = new ArrayList<>(); + for (long invalidTxn : invalidTxns) { + if (!validTxnList.isTxnAborted(invalidTxn)) { + openTxns.add(invalidTxn); + } + } + return openTxns; + } + + private String getValidTxnListForReplDump(Hive hiveDb) throws HiveException { + // Key design point for REPL DUMP is to not have any txns older than current txn in which dump runs. + // This is needed to ensure that Repl dump doesn't copy any data files written by any open txns + // mainly for streaming ingest case where one delta file shall have data from committed/aborted/open txns. + // It may also have data inconsistency if the on-going txns doesn't have corresponding open/write + // events captured which means, catch-up incremental phase won't be able to replicate those txns. + // So, the logic is to wait for configured amount of time to see if all open txns < current txn is + // getting aborted/committed. If not, then we forcefully abort those txns just like AcidHouseKeeperService. + ValidTxnList validTxnList = getTxnMgr().getValidTxns(); + long timeoutInMs = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); + long waitUntilTime = System.currentTimeMillis() + timeoutInMs; + while (System.currentTimeMillis() < waitUntilTime) { + // If there are no txns which are open for the given ValidTxnList snapshot, then just return it. + if (getOpenTxns(validTxnList).isEmpty()) { + return validTxnList.toString(); + } + + // Wait for 1 minute and check again. + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + LOG.info("REPL DUMP thread sleep interrupted", e); + } + validTxnList = getTxnMgr().getValidTxns(); + } + + // After the timeout just force abort the open txns + List<Long> openTxns = getOpenTxns(validTxnList); + if (!openTxns.isEmpty()) { + hiveDb.abortTransactions(openTxns); + validTxnList = getTxnMgr().getValidTxns(); + if (validTxnList.getMinOpenTxn() != null) { + openTxns = getOpenTxns(validTxnList); + LOG.warn("REPL DUMP unable to force abort all the open txns: {} after timeout due to unknown reasons. " + + "However, this is rare case that shouldn't happen.", openTxns); + throw new IllegalStateException("REPL DUMP triggered abort txns failed for unknown reasons."); + } + } + return validTxnList.toString(); + } + private ReplicationSpec getNewReplicationSpec(String evState, String objState, boolean isMetadataOnly) { return new ReplicationSpec(true, isMetadataOnly, evState, objState, false, true, true); http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 323c73d..61fa424 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -39,8 +39,8 @@ public class ReplDumpWork implements Serializable { } public ReplDumpWork(String dbNameOrPattern, String tableNameOrPattern, - Long eventFrom, Long eventTo, String astRepresentationForErrorMsg, Integer maxEventLimit, - String resultTempPath) { + Long eventFrom, Long eventTo, String astRepresentationForErrorMsg, Integer maxEventLimit, + String resultTempPath) { this.dbNameOrPattern = dbNameOrPattern; this.tableNameOrPattern = tableNameOrPattern; this.eventFrom = eventFrom; http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index 6f217cf..748d318 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -72,8 +72,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { protected int execute(DriverContext driverContext) { try { int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); - Context context = new Context(conf, getHive(), work.sessionStateLineageState, - work.currentTransactionId, driverContext.getCtx()); + Context context = new Context(conf, getHive(), work.sessionStateLineageState, driverContext.getCtx()); TaskTracker loadTaskTracker = new TaskTracker(maxTasks); /* for now for simplicity we are doing just one directory ( one database ), come back to use @@ -127,7 +126,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn); TableEvent tableEvent = (TableEvent) next; LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(), - tableContext, loadTaskTracker, getTxnMgr()); + tableContext, loadTaskTracker); tableTracker = loadTable.tasks(); if (!scope.database) { scope.rootTasks.addAll(tableTracker.tasks()); @@ -145,7 +144,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { // for a table we explicitly try to load partitions as there is no separate partitions events. LoadPartitions loadPartitions = new LoadPartitions(context, iterator.replLogger(), loadTaskTracker, tableEvent, - work.dbNameToLoadIn, tableContext, getTxnMgr()); + work.dbNameToLoadIn, tableContext); TaskTracker partitionsTracker = loadPartitions.tasks(); partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, partitionsTracker); @@ -163,7 +162,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { work.tableNameToLoadIn); LoadPartitions loadPartitions = new LoadPartitions(context, iterator.replLogger(), tableContext, loadTaskTracker, - event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), getTxnMgr()); + event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated()); /* the tableTracker here should be a new instance and not an existing one as this can only happen when we break in between loading partitions. http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java index 91ec93e..c1a9a62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -44,22 +44,20 @@ public class ReplLoadWork implements Serializable { taken care when using other methods. */ final LineageState sessionStateLineageState; - public final long currentTransactionId; public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, - String tableNameToLoadIn, LineageState lineageState, long currentTransactionId) + String tableNameToLoadIn, LineageState lineageState) throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; sessionStateLineageState = lineageState; - this.currentTransactionId = currentTransactionId; this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); this.dbNameToLoadIn = dbNameToLoadIn; } public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern, - LineageState lineageState, long currentTransactionId) throws IOException { - this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, currentTransactionId); + LineageState lineageState) throws IOException { + this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState); } public BootstrapEventsIterator iterator() { http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java index e817f5f..3dcc1d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java @@ -31,6 +31,9 @@ public interface TableEvent extends BootstrapEvent { List<AddPartitionDesc> partitionDescriptions(ImportTableDesc tblDesc) throws SemanticException; + List<String> partitions(ImportTableDesc tblDesc) + throws SemanticException; + ReplicationSpec replicationSpec(); boolean shouldNotReplicate(); http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java index ef73d89..ee804e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java @@ -68,6 +68,12 @@ public class FSPartitionEvent implements PartitionEvent { } @Override + public List<String> partitions(ImportTableDesc tblDesc) + throws SemanticException { + return tableEvent.partitions(tblDesc); + } + + @Override public ReplicationSpec replicationSpec() { return tableEvent.replicationSpec(); } http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index cfd1640..0fabf5a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.metadata.Table; @@ -88,6 +89,21 @@ public class FSTableEvent implements TableEvent { return descs; } + @Override + public List<String> partitions(ImportTableDesc tblDesc) + throws SemanticException { + List<String> partitions = new ArrayList<>(); + try { + for (Partition partition : metadata.getPartitions()) { + String partName = Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues()); + partitions.add(partName); + } + } catch (MetaException e) { + throw new SemanticException(e); + } + return partitions; + } + private AddPartitionDesc partitionDesc(Path fromPath, ImportTableDesc tblDesc, Partition partition) throws SemanticException { try { http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index a42c299..df7f30d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; @@ -45,10 +45,10 @@ import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; +import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; import org.apache.hadoop.hive.ql.plan.MoveWork; -import org.apache.hadoop.hive.ql.session.SessionState; import org.datanucleus.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -77,19 +78,16 @@ public class LoadPartitions { private final ImportTableDesc tableDesc; private Table table; - private final HiveTxnManager txnMgr; public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker, TableEvent event, String dbNameToLoadIn, - TableContext tableContext, - HiveTxnManager txnMgr) throws HiveException, IOException { - this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null, txnMgr); + TableContext tableContext) throws HiveException, IOException { + this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null); } public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext, TaskTracker limiter, TableEvent event, String dbNameToLoadIn, - AddPartitionDesc lastReplicatedPartition, - HiveTxnManager txnMgr) throws HiveException, IOException { + AddPartitionDesc lastReplicatedPartition) throws HiveException, IOException { this.tracker = new TaskTracker(limiter); this.event = event; this.context = context; @@ -99,7 +97,6 @@ public class LoadPartitions { this.tableDesc = tableContext.overrideProperties(event.tableDesc(dbNameToLoadIn)); this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); - this.txnMgr = txnMgr; } private String location() throws MetaException, HiveException { @@ -141,8 +138,7 @@ public class LoadPartitions { if (table == null) { //new table - - table = new Table(tableDesc.getDatabaseName(), tableDesc.getTableName()); + table = tableDesc.toTable(context.hiveConf); if (isPartitioned(tableDesc)) { updateReplicationState(initialReplicationState()); if (!forNewTable().hasReplicationState()) { @@ -153,7 +149,6 @@ public class LoadPartitions { } } else { // existing - if (table.isPartitioned()) { List<AddPartitionDesc> partitionDescs = event.partitionDescriptions(tableDesc); if (!event.replicationSpec().isMetadataOnly() && !partitionDescs.isEmpty()) { @@ -242,18 +237,24 @@ public class LoadPartitions { /** * This will create the move of partition data from temp path to actual path */ - private Task<?> movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec, - Path tmpPath) { - // Note: this sets LoadFileType incorrectly for ACID; is that relevant for load? - // See setLoadFileType and setIsAcidIow calls elsewhere for an example. - LoadTableDesc loadTableWork = new LoadTableDesc( - tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), - event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, - txnMgr.getCurrentTxnId() - ); - loadTableWork.setInheritTableSpecs(false); - MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); - return TaskFactory.get(work, context.hiveConf); + private Task<?> movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec, Path tmpPath) { + MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); + if (AcidUtils.isTransactionalTable(table)) { + LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( + Collections.singletonList(tmpPath), + Collections.singletonList(new Path(partSpec.getLocation())), + true, null, null); + moveWork.setMultiFilesDesc(loadFilesWork); + } else { + LoadTableDesc loadTableWork = new LoadTableDesc( + tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), + event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, 0L + ); + loadTableWork.setInheritTableSpecs(false); + moveWork.setLoadTableWork(loadTableWork); + } + + return TaskFactory.get(moveWork, context.hiveConf); } private Path locationOnReplicaWarehouse(Table table, AddPartitionDesc.OnePartitionDesc partSpec) http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index ddb26e5..e2ec4af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; @@ -42,16 +42,18 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; +import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; import org.apache.hadoop.hive.ql.plan.MoveWork; -import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.plan.ReplTxnWork; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.TreeMap; @@ -66,17 +68,15 @@ public class LoadTable { private final TableContext tableContext; private final TaskTracker tracker; private final TableEvent event; - private final HiveTxnManager txnMgr; public LoadTable(TableEvent event, Context context, ReplLogger replLogger, - TableContext tableContext, TaskTracker limiter, HiveTxnManager txnMgr) + TableContext tableContext, TaskTracker limiter) throws SemanticException, IOException { this.event = event; this.context = context; this.replLogger = replLogger; this.tableContext = tableContext; this.tracker = new TaskTracker(limiter); - this.txnMgr = txnMgr; } private void createTableReplLogTask(String tableName, TableType tableType) throws SemanticException { @@ -189,9 +189,8 @@ public class LoadTable { } } - private void newTableTasks(ImportTableDesc tblDesc) throws SemanticException { - Table table; - table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName()); + private void newTableTasks(ImportTableDesc tblDesc) throws Exception { + Table table = tblDesc.toTable(context.hiveConf); // Either we're dropping and re-creating, or the table didn't exist, and we're creating. Task<?> createTableTask = tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf); @@ -199,12 +198,22 @@ public class LoadTable { tracker.addTask(createTableTask); return; } + + Task<?> parentTask = createTableTask; + if (event.replicationSpec().isTransactionalTableDump()) { + List<String> partNames = isPartitioned(tblDesc) ? event.partitions(tblDesc) : null; + ReplTxnWork replTxnWork = new ReplTxnWork(tblDesc.getDatabaseName(), tblDesc.getTableName(), partNames, + event.replicationSpec().getValidWriteIdList(), ReplTxnWork.OperationType.REPL_WRITEID_STATE); + Task<?> replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf); + createTableTask.addDependentTask(replTxnTask); + parentTask = replTxnTask; + } if (!isPartitioned(tblDesc)) { - LOG.debug("adding dependent CopyWork/MoveWork for table"); + LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table"); Task<?> loadTableTask = loadTableTask(table, event.replicationSpec(), new Path(tblDesc.getLocation()), event.metadataPath()); - createTableTask.addDependentTask(loadTableTask); + parentTask.addDependentTask(loadTableTask); } tracker.addTask(createTableTask); } @@ -229,14 +238,20 @@ public class LoadTable { Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); - LoadTableDesc loadTableWork = new LoadTableDesc( - tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), - replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, - //todo: what is the point of this? If this is for replication, who would have opened a txn? - txnMgr.getCurrentTxnId() - ); - MoveWork moveWork = - new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); + MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); + if (AcidUtils.isTransactionalTable(table)) { + LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( + Collections.singletonList(tmpPath), + Collections.singletonList(tgtPath), + true, null, null); + moveWork.setMultiFilesDesc(loadFilesWork); + } else { + LoadTableDesc loadTableWork = new LoadTableDesc( + tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), + replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, 0L + ); + moveWork.setLoadTableWork(loadTableWork); + } Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf); copyTask.addDependentTask(loadTableTask); return copyTask; http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java index 6fbc657..7eae1ea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java @@ -36,18 +36,16 @@ public class Context { taken care when using other methods. */ public final LineageState sessionStateLineageState; - public final long currentTransactionId; public Context(HiveConf hiveConf, Hive hiveDb, - LineageState lineageState, long currentTransactionId, + LineageState lineageState, org.apache.hadoop.hive.ql.Context nestedContext) throws MetaException { this.hiveConf = hiveConf; this.hiveDb = hiveDb; this.warehouse = new Warehouse(hiveConf); this.pathInfo = new PathInfo(hiveConf); sessionStateLineageState = lineageState; - this.currentTransactionId = currentTransactionId; this.nestedContext = nestedContext; } } http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 445e126..183515a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.HiveStatsUtils; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -1831,6 +1832,41 @@ public class AcidUtils { return fileList; } + public static List<Path> getValidDataPaths(Path dataPath, Configuration conf, String validWriteIdStr) + throws IOException { + List<Path> pathList = new ArrayList<>(); + if ((validWriteIdStr == null) || validWriteIdStr.isEmpty()) { + // If Non-Acid case, then all files would be in the base data path. So, just return it. + pathList.add(dataPath); + return pathList; + } + + // If ACID/MM tables, then need to find the valid state wrt to given ValidWriteIdList. + ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(validWriteIdStr); + Directory acidInfo = AcidUtils.getAcidState(dataPath, conf, validWriteIdList); + + for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) { + pathList.add(hfs.getFileStatus().getPath()); + } + for (ParsedDelta delta : acidInfo.getCurrentDirectories()) { + pathList.add(delta.getPath()); + } + if (acidInfo.getBaseDirectory() != null) { + pathList.add(acidInfo.getBaseDirectory()); + } + return pathList; + } + + public static String getAcidSubDir(Path dataPath) { + String dataDir = dataPath.getName(); + if (dataDir.startsWith(AcidUtils.BASE_PREFIX) + || dataDir.startsWith(AcidUtils.DELTA_PREFIX) + || dataDir.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + return dataDir; + } + return null; + } + public static boolean isAcidEnabled(HiveConf hiveConf) { String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER); boolean concurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 76569d5..515c08b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -690,6 +690,16 @@ public final class DbTxnManager extends HiveTxnManagerImpl { } @Override + public void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames) + throws LockException { + try { + getMS().replTableWriteIdState(validWriteIdList, dbName, tableName, partNames); + } catch (TException e) { + throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); + } + } + + @Override public void heartbeat() throws LockException { List<HiveLock> locks; if(isTxnOpen()) { http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index a74670b..ab9d67e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -235,6 +235,12 @@ class DummyTxnManager extends HiveTxnManagerImpl { } @Override + public void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames) + throws LockException { + // No-op + } + + @Override public void heartbeat() throws LockException { // No-op } http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index f239535..5f68e08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -57,7 +57,7 @@ public interface HiveTxnManager { * @return The new transaction id. * @throws LockException in case of failure to start the transaction. */ - List<Long> replOpenTxn(String replPolicy, List<Long> srcTxnIds, String user) throws LockException; + List<Long> replOpenTxn(String replPolicy, List<Long> srcTxnIds, String user) throws LockException; /** * Commit the transaction in target cluster. @@ -65,7 +65,7 @@ public interface HiveTxnManager { * @param srcTxnId The id of the transaction at the source cluster * @throws LockException in case of failure to commit the transaction. */ - void replCommitTxn(String replPolicy, long srcTxnId) throws LockException; + void replCommitTxn(String replPolicy, long srcTxnId) throws LockException; /** * Abort the transaction in target cluster. @@ -73,7 +73,18 @@ public interface HiveTxnManager { * @param srcTxnId The id of the transaction at the source cluster * @throws LockException in case of failure to abort the transaction. */ - void replRollbackTxn(String replPolicy, long srcTxnId) throws LockException; + void replRollbackTxn(String replPolicy, long srcTxnId) throws LockException; + + /** + * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark. + * @param validWriteIdList Snapshot of writeid list when the table/partition is dumped. + * @param dbName Database name + * @param tableName Table which is written. + * @param partNames List of partitions being written. + * @throws LockException in case of failure. + */ + void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames) + throws LockException; /** * Get the lock manager. This must be used rather than instantiating an http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index b850ddc..fa32807 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -327,8 +327,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Long writeId = 0L; // Initialize with 0 for non-ACID and non-MM tables. int stmtId = 0; - if ((tableExists && AcidUtils.isTransactionalTable(table)) - || (!tableExists && AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps()))) { + if (!replicationSpec.isInReplicationScope() + && ((tableExists && AcidUtils.isTransactionalTable(table)) + || (!tableExists && AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps())))) { //if importing into existing transactional table or will create a new transactional table //(because Export was done from transactional table), need a writeId // Explain plan doesn't open a txn and hence no need to allocate write id. http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index c07991d..8332bcc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -81,7 +81,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer { } }); if ((srcs != null) && srcs.length == 1) { - if (srcs[0].isDir()) { + if (srcs[0].isDirectory()) { srcs = fs.listStatus(srcs[0].getPath(), new PathFilter() { @Override public boolean accept(Path p) { http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 05eca1f..562f497 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -368,9 +368,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { - ReplLoadWork replLoadWork = - new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern, - queryState.getLineageState(), getTxnMgr().getCurrentTxnId()); + ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, + tblNameOrPattern, queryState.getLineageState()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); return; } @@ -401,7 +400,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - queryState.getLineageState(), getTxnMgr().getCurrentTxnId()); + queryState.getLineageState()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); // // for (FileStatus dir : dirsInLoadPath) { http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 824da21..7d901f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -43,6 +43,7 @@ public class ReplicationSpec { private boolean isNoop = false; private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in. private boolean isReplace = true; // default is that the import mode is insert overwrite + private String validWriteIdList = null; // WriteIds snapshot for replicating ACID/MM tables. private Type specType = Type.DEFAULT; // DEFAULT means REPL_LOAD or BOOTSTRAP_DUMP or EXPORT // Key definitions related to replication @@ -52,7 +53,8 @@ public class ReplicationSpec { CURR_STATE_ID("repl.last.id"), NOOP("repl.noop"), LAZY("repl.lazy"), - IS_REPLACE("repl.is.replace") + IS_REPLACE("repl.is.replace"), + VALID_WRITEID_LIST("repl.valid.writeid.list") ; private final String keyName; @@ -140,6 +142,7 @@ public class ReplicationSpec { this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString())); this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString())); this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString())); + this.validWriteIdList = keyFetcher.apply(ReplicationSpec.KEY.VALID_WRITEID_LIST.toString()); } /** @@ -325,6 +328,27 @@ public class ReplicationSpec { this.isLazy = isLazy; } + /** + * @return the WriteIds snapshot for the current ACID/MM table being replicated + */ + public String getValidWriteIdList() { + return validWriteIdList; + } + + /** + * @param validWriteIdList WriteIds snapshot for the current ACID/MM table being replicated + */ + public void setValidWriteIdList(String validWriteIdList) { + this.validWriteIdList = validWriteIdList; + } + + /** + * @return whether the current replication dumped object related to ACID/Mm table + */ + public boolean isTransactionalTableDump() { + return (validWriteIdList != null); + } + public String get(KEY key) { switch (key){ case REPL_SCOPE: @@ -346,6 +370,8 @@ public class ReplicationSpec { return String.valueOf(isLazy()); case IS_REPLACE: return String.valueOf(isReplace()); + case VALID_WRITEID_LIST: + return getValidWriteIdList(); } return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 4e61280..529ea21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -66,43 +66,52 @@ public class CopyUtils { // Used by replication, copy files from source to destination. It is possible source file is // changed/removed during copy, so double check the checksum after copy, // if not match, copy again from cm - public void copyAndVerify(FileSystem destinationFs, Path destination, + public void copyAndVerify(FileSystem destinationFs, Path destRoot, List<ReplChangeManager.FileInfo> srcFiles) throws IOException, LoginException { - Map<FileSystem, List<ReplChangeManager.FileInfo>> map = fsToFileMap(srcFiles); - for (Map.Entry<FileSystem, List<ReplChangeManager.FileInfo>> entry : map.entrySet()) { + Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map = fsToFileMap(srcFiles, destRoot); + for (Map.Entry<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) { FileSystem sourceFs = entry.getKey(); - List<ReplChangeManager.FileInfo> fileInfoList = entry.getValue(); - boolean useRegularCopy = regularCopy(destinationFs, sourceFs, fileInfoList); + Map<Path, List<ReplChangeManager.FileInfo>> destMap = entry.getValue(); + for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry : destMap.entrySet()) { + Path destination = destMapEntry.getKey(); + List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue(); + boolean useRegularCopy = regularCopy(destinationFs, sourceFs, fileInfoList); - doCopyRetry(sourceFs, fileInfoList, destinationFs, destination, useRegularCopy); - - // Verify checksum, retry if checksum changed - List<ReplChangeManager.FileInfo> retryFileInfoList = new ArrayList<>(); - for (ReplChangeManager.FileInfo srcFile : srcFiles) { - if(!srcFile.isUseSourcePath()) { - // If already use cmpath, nothing we can do here, skip this file - continue; + if (!destinationFs.exists(destination) + && !FileUtils.mkdir(destinationFs, destination, hiveConf)) { + LOG.error("Failed to create destination directory: " + destination); + throw new IOException("Destination directory creation failed"); } - String sourceChecksumString = srcFile.getCheckSum(); - if (sourceChecksumString != null) { - String verifySourceChecksumString; - try { - verifySourceChecksumString - = ReplChangeManager.checksumFor(srcFile.getSourcePath(), sourceFs); - } catch (IOException e) { - // Retry with CM path - verifySourceChecksumString = null; + doCopyRetry(sourceFs, fileInfoList, destinationFs, destination, useRegularCopy); + + // Verify checksum, retry if checksum changed + List<ReplChangeManager.FileInfo> retryFileInfoList = new ArrayList<>(); + for (ReplChangeManager.FileInfo srcFile : srcFiles) { + if (!srcFile.isUseSourcePath()) { + // If already use cmpath, nothing we can do here, skip this file + continue; } - if ((verifySourceChecksumString == null) - || !sourceChecksumString.equals(verifySourceChecksumString)) { - // If checksum does not match, likely the file is changed/removed, copy again from cm - srcFile.setIsUseSourcePath(false); - retryFileInfoList.add(srcFile); + String sourceChecksumString = srcFile.getCheckSum(); + if (sourceChecksumString != null) { + String verifySourceChecksumString; + try { + verifySourceChecksumString + = ReplChangeManager.checksumFor(srcFile.getSourcePath(), sourceFs); + } catch (IOException e) { + // Retry with CM path + verifySourceChecksumString = null; + } + if ((verifySourceChecksumString == null) + || !sourceChecksumString.equals(verifySourceChecksumString)) { + // If checksum does not match, likely the file is changed/removed, copy again from cm + srcFile.setIsUseSourcePath(false); + retryFileInfoList.add(srcFile); + } } } - } - if (!retryFileInfoList.isEmpty()) { - doCopyRetry(sourceFs, retryFileInfoList, destinationFs, destination, useRegularCopy); + if (!retryFileInfoList.isEmpty()) { + doCopyRetry(sourceFs, retryFileInfoList, destinationFs, destination, useRegularCopy); + } } } } @@ -212,7 +221,7 @@ public class CopyUtils { for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) { final FileSystem sourceFs = entry.getKey(); List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(), - path -> { return new ReplChangeManager.FileInfo(sourceFs, path);}); + path -> new ReplChangeManager.FileInfo(sourceFs, path, null)); doCopyOnce(sourceFs, entry.getValue(), destinationFs, destination, regularCopy(destinationFs, sourceFs, fileList)); @@ -287,16 +296,33 @@ public class CopyUtils { return result; } - private Map<FileSystem, List<ReplChangeManager.FileInfo>> fsToFileMap( - List<ReplChangeManager.FileInfo> srcFiles) throws IOException { - Map<FileSystem, List<ReplChangeManager.FileInfo>> result = new HashMap<>(); + // Create map of source file system to destination path to list of files to copy + private Map<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> fsToFileMap( + List<ReplChangeManager.FileInfo> srcFiles, Path destRoot) throws IOException { + Map<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> result = new HashMap<>(); for (ReplChangeManager.FileInfo file : srcFiles) { FileSystem fileSystem = file.getSrcFs(); if (!result.containsKey(fileSystem)) { - result.put(fileSystem, new ArrayList<ReplChangeManager.FileInfo>()); + result.put(fileSystem, new HashMap<>()); } - result.get(fileSystem).add(file); + Path destination = getCopyDestination(file, destRoot); + if (!result.get(fileSystem).containsKey(destination)) { + result.get(fileSystem).put(destination, new ArrayList<>()); + } + result.get(fileSystem).get(destination).add(file); } return result; } + + private Path getCopyDestination(ReplChangeManager.FileInfo fileInfo, Path destRoot) { + if (fileInfo.getSubDir() == null) { + return destRoot; + } + String[] subDirs = fileInfo.getSubDir().split(Path.SEPARATOR); + Path destination = destRoot; + for (String subDir: subDirs) { + destination = new Path(destination, subDir); + } + return destination; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java index 5c1850c..4b2812e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java @@ -24,16 +24,18 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; class BootStrapReplicationSpecFunction implements HiveWrapper.Tuple.Function<ReplicationSpec> { private final Hive db; + private final long currentNotificationId; - BootStrapReplicationSpecFunction(Hive db) { + BootStrapReplicationSpecFunction(Hive db, long currentNotificationId) { this.db = db; + this.currentNotificationId = currentNotificationId; } @Override public ReplicationSpec fromMetaStore() throws HiveException { try { - long currentNotificationId = db.getMSC() - .getCurrentNotificationEventId().getEventId(); + long currentReplicationState = (this.currentNotificationId > 0) + ? this.currentNotificationId : db.getMSC().getCurrentNotificationEventId().getEventId(); ReplicationSpec replicationSpec = new ReplicationSpec( true, @@ -45,7 +47,7 @@ class BootStrapReplicationSpecFunction implements HiveWrapper.Tuple.Function<Rep false ); - replicationSpec.setCurrentReplicationState(String.valueOf(currentNotificationId)); + replicationSpec.setCurrentReplicationState(String.valueOf(currentReplicationState)); return replicationSpec; } catch (Exception e) { throw new SemanticException(e);