Repository: hive Updated Branches: refs/heads/master e8e0396c1 -> 7da8f3d36
HIVE-21023 : Add test for replication to a target with hive.strict.managed.tables enabled. (Mahesh Kumar Behera, reviewed by Sankar Hariappan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7da8f3d3 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7da8f3d3 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7da8f3d3 Branch: refs/heads/master Commit: 7da8f3d36ee2b6c508ea2ab8c241df52107ac74e Parents: e8e0396 Author: Mahesh Kumar Behera <mah...@apache.org> Authored: Fri Dec 14 18:26:09 2018 +0530 Committer: Mahesh Kumar Behera <mah...@apache.org> Committed: Fri Dec 14 18:26:09 2018 +0530 ---------------------------------------------------------------------- ...ationScenariosIncrementalLoadAcidTables.java | 128 +------- .../TestReplicationScenariosMigration.java | 33 ++ .../TestReplicationWithTableMigration.java | 328 +++++++++++++++++++ 3 files changed, 363 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7da8f3d3/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java index b71cfa4..97775b3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java @@ -17,20 +17,13 @@ */ package org.apache.hadoop.hive.ql.parse; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; -import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.hive.ql.parse.WarehouseInstance; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; -import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable; -import static org.apache.hadoop.hive.ql.io.AcidUtils.isTransactionalTable; - import org.apache.hadoop.hive.ql.parse.ReplicationTestUtils; import org.junit.rules.TestName; @@ -48,9 +41,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; + import com.google.common.collect.Lists; /** @@ -62,7 +53,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables { protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenariosIncrementalLoadAcidTables.class); static WarehouseInstance primary; - private static WarehouseInstance replica, replicaNonAcid, replicaMigration, primaryMigration; + private static WarehouseInstance replica, replicaNonAcid; private static HiveConf conf; private String primaryDbName, replicatedDbName, primaryDbNameExtra; @@ -105,36 +96,6 @@ public class TestReplicationScenariosIncrementalLoadAcidTables { put("hive.metastore.client.capability.check", "false"); }}; replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1); - - HashMap<String, String> overridesForHiveConfReplicaMigration = new HashMap<String, String>() {{ - put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); - put("hive.support.concurrency", "true"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.exec.dynamic.partition.mode", "nonstrict"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.strict.managed.tables", "true"); - }}; - replicaMigration = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConfReplicaMigration); - - HashMap<String, String> overridesForHiveConfPrimaryMigration = new HashMap<String, String>() {{ - put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.exec.dynamic.partition.mode", "nonstrict"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.support.concurrency", "false"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); - put("hive.strict.managed.tables", "false"); - }}; - primaryMigration = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConfPrimaryMigration); } @AfterClass @@ -300,89 +261,4 @@ public class TestReplicationScenariosIncrementalLoadAcidTables { .run("select value from " + tableName + " order by value") .verifyResults(new String[] {"1", "100", "100", "100", "100"}); } - - private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId) throws Throwable { - WarehouseInstance.Tuple tuple = primaryMigration.run("use " + primaryDbName) - .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") - .run("insert into tacid values(1)") - .run("insert into tacid values(2)") - .run("insert into tacid values(3)") - .run("create table tacidpart (place string) partitioned by (country string) clustered by(place) " + - "into 3 buckets stored as orc ") - .run("alter table tacidpart add partition(country='france')") - .run("insert into tacidpart partition(country='india') values('mumbai')") - .run("insert into tacidpart partition(country='us') values('sf')") - .run("insert into tacidpart partition(country='france') values('paris')") - .run("create table tflat (rank int) stored as orc tblproperties(\"transactional\"=\"false\")") - .run("insert into tflat values(11)") - .run("insert into tflat values(22)") - .run("create table tflattext (id int) ") - .run("insert into tflattext values(111), (222)") - .run("create table tflattextpart (id int) partitioned by (country string) ") - .run("insert into tflattextpart partition(country='india') values(1111), (2222)") - .run("insert into tflattextpart partition(country='us') values(3333)") - .run("create table avro_table ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' " + - "stored as avro tblproperties ('avro.schema.url'='" + primaryMigration.avroSchemaFile.toUri().toString() + "')") - .run("insert into avro_table values('str1', 10)") - .dump(primaryDbName, fromReplId); - assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, "tacid"))); - assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, "tacidpart"))); - assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, "tflat"))); - assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, "tflattext"))); - assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, "tflattextpart"))); - Table avroTable = primaryMigration.getTable(replicatedDbName, "avro_table"); - assertFalse(isTransactionalTable(avroTable)); - assertFalse(MetaStoreUtils.isExternalTable(avroTable)); - return tuple; - } - - private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable { - replicaMigration.run("use " + replicatedDbName) - .run("show tables") - .verifyResults(new String[] {"tacid", "tacidpart", "tflat", "tflattext", "tflattextpart", - "avro_table"}) - .run("repl status " + replicatedDbName) - .verifyResult(lastReplId) - .run("select id from tacid order by id") - .verifyResults(new String[]{"1", "2", "3"}) - .run("select country from tacidpart order by country") - .verifyResults(new String[] {"france", "india", "us"}) - .run("select rank from tflat order by rank") - .verifyResults(new String[] {"11", "22"}) - .run("select id from tflattext order by id") - .verifyResults(new String[] {"111", "222"}) - .run("select id from tflattextpart order by id") - .verifyResults(new String[] {"1111", "2222", "3333"}) - .run("select col1 from avro_table") - .verifyResults(new String[] {"str1"}); - - assertTrue(isFullAcidTable(replicaMigration.getTable(replicatedDbName, "tacid"))); - assertTrue(isFullAcidTable(replicaMigration.getTable(replicatedDbName, "tacidpart"))); - assertTrue(isFullAcidTable(replicaMigration.getTable(replicatedDbName, "tflat"))); - assertTrue(!isFullAcidTable(replicaMigration.getTable(replicatedDbName, "tflattext"))); - assertTrue(!isFullAcidTable(replicaMigration.getTable(replicatedDbName, "tflattextpart"))); - assertTrue(isTransactionalTable(replicaMigration.getTable(replicatedDbName, "tflattext"))); - assertTrue(isTransactionalTable(replicaMigration.getTable(replicatedDbName, "tflattextpart"))); - - Table avroTable = replicaMigration.getTable(replicatedDbName, "avro_table"); - assertTrue(MetaStoreUtils.isExternalTable(avroTable)); - Path tablePath = new PathBuilder(replicaMigration.externalTableWarehouseRoot.toString()).addDescendant(replicatedDbName + ".db") - .addDescendant("avro_table") - .build(); - assertEquals(avroTable.getSd().getLocation().toLowerCase(), tablePath.toUri().toString().toLowerCase()); - } - - @Test - public void testMigrationManagedToAcid() throws Throwable { - WarehouseInstance.Tuple tupleForBootStrap = primaryMigration.dump(primaryDbName, null); - WarehouseInstance.Tuple tuple = prepareDataAndDump(primaryDbName, null); - WarehouseInstance.Tuple tupleForIncremental = primaryMigration.dump(primaryDbName, tupleForBootStrap.lastReplicationId); - replicaMigration.loadWithoutExplain(replicatedDbName, tuple.dumpLocation); - verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); - - replicaMigration.run("drop database if exists " + replicatedDbName + " cascade"); - replicaMigration.loadWithoutExplain(replicatedDbName, tupleForBootStrap.dumpLocation); - replicaMigration.loadWithoutExplain(replicatedDbName, tupleForIncremental.dumpLocation); - verifyLoadExecution(replicatedDbName, tupleForIncremental.lastReplicationId); - } } http://git-wip-us.apache.org/repos/asf/hive/blob/7da8f3d3/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java new file mode 100644 index 0000000..5b8e424 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import java.util.HashMap; +import org.junit.BeforeClass; + +public class TestReplicationScenariosMigration extends org.apache.hadoop.hive.ql.parse.TestReplicationScenarios { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + HashMap<String, String> overrideProperties = new HashMap<>(); + overrideProperties.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + internalBeforeClassSetup(overrideProperties, true); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/7da8f3d3/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java new file mode 100644 index 0000000..a1911b4 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.ql.parse.WarehouseInstance; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import org.apache.hadoop.hive.ql.parse.ReplicationTestUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.junit.rules.TestName; +import com.google.common.collect.Lists; +import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable; +import static org.apache.hadoop.hive.ql.io.AcidUtils.isTransactionalTable; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * TestReplicationWithTableMigration - test replication for Hive2 to Hive3 (Strict managed tables) + */ +public class TestReplicationWithTableMigration { + @Rule + public final TestName testName = new TestName(); + + protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationWithTableMigration.class); + private static WarehouseInstance primary, replica; + private String primaryDbName, replicatedDbName; + private static HiveConf conf; + + @BeforeClass + public static void classLevelSetup() throws Exception { + conf = new HiveConf(TestReplicationWithTableMigration.class); + conf.set("dfs.client.use.datanode.hostname", "true"); + conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + MiniDFSCluster miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + put("hive.support.concurrency", "true"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.strict.managed.tables", "true"); + }}; + replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); + + HashMap<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.support.concurrency", "false"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + put("hive.strict.managed.tables", "false"); + }}; + primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1); + } + + @AfterClass + public static void classLevelTearDown() throws IOException { + primary.close(); + replica.close(); + } + + @Before + public void setup() throws Throwable { + primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); + replicatedDbName = "replicated_" + primaryDbName; + primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + } + + @After + public void tearDown() throws Throwable { + primary.run("drop database if exists " + primaryDbName + " cascade"); + replica.run("drop database if exists " + replicatedDbName + " cascade"); + } + + private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId) throws Throwable { + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") + .run("insert into tacid values(1)") + .run("insert into tacid values(2)") + .run("insert into tacid values(3)") + .run("create table tacidpart (place string) partitioned by (country string) clustered by(place) " + + "into 3 buckets stored as orc ") + .run("alter table tacidpart add partition(country='france')") + .run("insert into tacidpart partition(country='india') values('mumbai')") + .run("insert into tacidpart partition(country='us') values('sf')") + .run("insert into tacidpart partition(country='france') values('paris')") + .run("create table tflat (rank int) stored as orc tblproperties(\"transactional\"=\"false\")") + .run("insert into tflat values(11)") + .run("insert into tflat values(22)") + .run("create table tflattext (id int) ") + .run("insert into tflattext values(111), (222)") + .run("create table tflattextpart (id int) partitioned by (country string) ") + .run("insert into tflattextpart partition(country='india') values(1111), (2222)") + .run("insert into tflattextpart partition(country='us') values(3333)") + .run("create table tacidloc (id int) clustered by(id) into 3 buckets stored as orc LOCATION '/tmp' ") + .run("insert into tacidloc values(1)") + .run("insert into tacidloc values(2)") + .run("insert into tacidloc values(3)") + .run("create table tacidpartloc (place string) partitioned by (country string) clustered by(place) " + + "into 3 buckets stored as orc ") + .run("alter table tacidpartloc add partition(country='france') LOCATION '/tmp/part'") + .run("insert into tacidpartloc partition(country='india') values('mumbai')") + .run("insert into tacidpartloc partition(country='us') values('sf')") + .run("insert into tacidpartloc partition(country='france') values('paris')") + .run("create table avro_table ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' " + + "stored as avro tblproperties ('avro.schema.url'='" + primary.avroSchemaFile.toUri().toString() + "')") + .run("insert into avro_table values('str1', 10)") + .dump(primaryDbName, fromReplId); + assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacid"))); + assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpart"))); + assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflat"))); + assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflattext"))); + assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflattextpart"))); + assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidloc"))); + assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpartloc"))); + Table avroTable = replica.getTable(replicatedDbName, "avro_table"); + assertFalse(isTransactionalTable(avroTable)); + assertFalse(MetaStoreUtils.isExternalTable(avroTable)); + return tuple; + } + + private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable { + replica.run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"tacid", "tacidpart", "tflat", "tflattext", "tflattextpart", + "tacidloc", "tacidpartloc", "avro_table"}) + .run("repl status " + replicatedDbName) + .verifyResult(lastReplId) + .run("select id from tacid order by id") + .verifyResults(new String[]{"1", "2", "3"}) + .run("select country from tacidpart order by country") + .verifyResults(new String[] {"france", "india", "us"}) + .run("select rank from tflat order by rank") + .verifyResults(new String[] {"11", "22"}) + .run("select id from tflattext order by id") + .verifyResults(new String[] {"111", "222"}) + .run("select id from tflattextpart order by id") + .verifyResults(new String[] {"1111", "2222", "3333"}) + .run("select id from tacidloc order by id") + .verifyResults(new String[]{"1", "2", "3"}) + .run("select country from tacidpartloc order by country") + .verifyResults(new String[] {"france", "india", "us"}) + .run("select col1 from avro_table") + .verifyResults(new String[] {"str1"}); + + assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid"))); + assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpart"))); + assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tflat"))); + assertTrue(!isFullAcidTable(replica.getTable(replicatedDbName, "tflattext"))); + assertTrue(!isFullAcidTable(replica.getTable(replicatedDbName, "tflattextpart"))); + assertTrue(isTransactionalTable(replica.getTable(replicatedDbName, "tflattext"))); + assertTrue(isTransactionalTable(replica.getTable(replicatedDbName, "tflattextpart"))); + assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidloc"))); + assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpartloc"))); + + /*Path databasePath = new Path(replica.warehouseRoot, replica.getDatabase(replicatedDbName).getLocationUri()); + assertEquals(replica.getTable(replicatedDbName, "tacidloc").getSd().getLocation(), + new Path(databasePath,"tacidloc").toUri().toString()); + + Path tablePath = new Path(databasePath, "tacidpartloc"); + List<Partition> partitions = replica.getAllPartitions(replicatedDbName, "tacidpartloc"); + for (Partition part : partitions) { + tablePath.equals(new Path(part.getSd().getLocation()).getParent()); + }*/ + + Table avroTable = replica.getTable(replicatedDbName, "avro_table"); + assertTrue(MetaStoreUtils.isExternalTable(avroTable)); + Path tablePath = new PathBuilder(replica.externalTableWarehouseRoot.toString()).addDescendant(replicatedDbName + ".db") + .addDescendant("avro_table") + .build(); + assertEquals(avroTable.getSd().getLocation().toLowerCase(), tablePath.toUri().toString().toLowerCase()); + } + + private void loadWithFailureInAddNotification(String tbl, String dumpLocation) throws Throwable { + BehaviourInjection<CallerArguments, Boolean> callerVerifier + = new BehaviourInjection<CallerArguments, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable InjectableBehaviourObjectStore.CallerArguments args) { + injectionPathCalled = true; + if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) { + LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + + " Constraint Table: " + String.valueOf(args.constraintTblName)); + return false; + } + if (args.tblName != null) { + LOG.warn("Verifier - Table: " + String.valueOf(args.tblName)); + return args.tblName.equalsIgnoreCase(tbl); + } + return true; + } + }; + InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); + try { + replica.loadFailure(replicatedDbName, dumpLocation); + } finally { + InjectableBehaviourObjectStore.resetCallerVerifier(); + } + callerVerifier.assertInjectionsPerformed(true, false); + } + + @Test + public void testBootstrapLoadMigrationManagedToAcid() throws Throwable { + WarehouseInstance.Tuple tuple = prepareDataAndDump(primaryDbName, null); + replica.load(replicatedDbName, tuple.dumpLocation); + verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); + } + + @Test + public void testIncrementalLoadMigrationManagedToAcid() throws Throwable { + WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, tuple.dumpLocation); + tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId); + replica.load(replicatedDbName, tuple.dumpLocation); + verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); + } + + @Test + public void testIncrementalLoadMigrationManagedToAcidFailure() throws Throwable { + WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, tuple.dumpLocation); + tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId); + loadWithFailureInAddNotification("tacid", tuple.dumpLocation); + replica.run("use " + replicatedDbName) + .run("show tables like tacid") + .verifyResult(null); + replica.load(replicatedDbName, tuple.dumpLocation); + verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); + } + + @Test + public void testIncrementalLoadMigrationManagedToAcidFailurePart() throws Throwable { + WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, tuple.dumpLocation); + tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId); + loadWithFailureInAddNotification("tacidpart", tuple.dumpLocation); + replica.run("use " + replicatedDbName) + .run("show tables like tacidpart") + .verifyResult(null); + replica.load(replicatedDbName, tuple.dumpLocation); + verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); + } + + @Test + public void testIncrementalLoadMigrationManagedToAcidAllOp() throws Throwable { + WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId); + List<String> selectStmtList = new ArrayList<>(); + List<String[]> expectedValues = new ArrayList<>(); + String tableName = testName.getMethodName() + "testInsert"; + String tableNameMM = tableName + "_MM"; + + ReplicationTestUtils.appendInsert(primary, primaryDbName, null, + tableName, tableNameMM, selectStmtList, expectedValues); + ReplicationTestUtils.appendTruncate(primary, primaryDbName, + null, selectStmtList, expectedValues); + ReplicationTestUtils.appendInsertIntoFromSelect(primary, primaryDbName, + null, tableName, tableNameMM, selectStmtList, expectedValues); + ReplicationTestUtils.appendCreateAsSelect(primary, primaryDbName, + null, tableName, tableNameMM, selectStmtList, expectedValues); + ReplicationTestUtils.appendImport(primary, primaryDbName, + null, tableName, tableNameMM, selectStmtList, expectedValues); + ReplicationTestUtils.appendInsertOverwrite(primary, primaryDbName, + null, tableName, tableNameMM, selectStmtList, expectedValues); + ReplicationTestUtils.appendLoadLocal(primary, primaryDbName, + null, tableName, tableNameMM, selectStmtList, expectedValues); + ReplicationTestUtils.appendInsertUnion(primary, primaryDbName, + null, tableName, tableNameMM, selectStmtList, expectedValues); + ReplicationTestUtils.appendAlterTable(primary, primaryDbName, + null, selectStmtList, expectedValues); + + ReplicationTestUtils.verifyIncrementalLoad(primary, replica, primaryDbName, + replicatedDbName, selectStmtList, expectedValues, bootStrapDump.lastReplicationId); + } +}