Repository: hive Updated Branches: refs/heads/master c03001e98 -> 508d7e6f2
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java new file mode 100644 index 0000000..b98c74a --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -0,0 +1,467 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.io.NullWritable; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +/** + * Tests related to support of LOAD DATA with Acid tables + * Most tests run in vectorized and non-vectorized mode since we currently have a vectorized and + * a non-vectorized acid readers and it's critical that ROW_IDs are generated the same way. + */ +public class TestTxnLoadData extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnLoadData.class); + private static final String TEST_DATA_DIR = + new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnLoadData.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + @Rule + public TemporaryFolder folder= new TemporaryFolder(); + + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + + @Test + public void loadData() throws Exception { + loadData(false); + } + @Test + public void loadDataVectorized() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + loadData(true); + } + @Test + public void loadDataUpdate() throws Exception { + loadDataUpdate(false); + } + @Test + public void loadDataUpdateVectorized() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + loadDataUpdate(true); + } + @Test + public void loadDataNonAcid2AcidConversion() throws Exception { + loadDataNonAcid2AcidConversion(false); + } + @Test + public void loadDataNonAcid2AcidConversionVectorized() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + loadDataNonAcid2AcidConversion(true); + } + @Test + public void testMultiStatement() throws Exception { + testMultiStatement(false); + } + @Test + public void testMultiStatementVectorized() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + testMultiStatement(true); + } + private void loadDataUpdate(boolean isVectorized) throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver( + "create table T (a int, b int) stored as orc tblproperties('transactional'='true')"); + //Tstage is just a simple way to generate test data + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + //"load data local inpath" doesn't delete source files so clean it here + runStatementOnDriver("truncate table Tstage"); + //and do a Load Data into the same table, which should now land in a delta_x_x. + // 'data' is created by export command/ + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); + + String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : + "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + String[][] expected = new String[][]{ + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"}}; + checkResult(expected, testQuery, isVectorized, "load data inpath"); + runStatementOnDriver("update T set b = 17 where a = 1"); + String[][] expected2 = new String[][]{ + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000023_0000023_0000/bucket_00000"} + }; + checkResult(expected2, testQuery, isVectorized, "update"); + + runStatementOnDriver("insert into T values(2,2)"); + runStatementOnDriver("delete from T where a = 3"); + //test minor compaction + runStatementOnDriver("alter table T compact 'minor'"); + TestTxnCommands2.runWorker(hiveConf); + String[][] expected3 = new String[][] { + {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000020_0000027/bucket_00000"}, + {"{\"transactionid\":26,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000020_0000027/bucket_00000"} + }; + checkResult(expected3, testQuery, isVectorized, "delete compact minor"); + + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' overwrite into table T"); + String[][] expected4 = new String[][]{ + {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000031/000000_0"}, + {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000031/000000_0"}}; + checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite"); + + //load same data again (additive) + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); + runStatementOnDriver("update T set b = 17 where a = 1");//matches 2 rows + runStatementOnDriver("delete from T where a = 3");//matches 2 rows + runStatementOnDriver("insert into T values(2,2)"); + String[][] expected5 = new String[][]{ + {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000035_0000035_0000/bucket_00000"}, + {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000035_0000035_0000/bucket_00000"}, + {"{\"transactionid\":37,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000037_0000037_0000/bucket_00000"} + }; + checkResult(expected5, testQuery, isVectorized, "load data inpath overwrite update"); + + //test major compaction + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + String[][] expected6 = new String[][]{ + {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000037/bucket_00000"}, + {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000037/bucket_00000"}, + {"{\"transactionid\":37,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000037/bucket_00000"} + }; + checkResult(expected6, testQuery, isVectorized, "load data inpath compact major"); + } + private void loadData(boolean isVectorized) throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("insert into T values(0,2),(0,4)"); + //Tstage is just a simple way to generate test data + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'"); + //"load data local inpath" doesn't delete source files so clean it here + runStatementOnDriver("truncate table Tstage"); + //and do a Load Data into the same table, which should now land in a delta_x_x. + // 'data' is created by export command/ + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); + + String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : + "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + String[][] expected = new String[][] { + //normal insert + {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000016_0000/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000016_0000/bucket_00000"}, + //Load Data + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000021_0000021_0000/000000_0"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000021_0000021_0000/000000_0"}}; + checkResult(expected, testQuery, isVectorized, "load data inpath"); + + //test minor compaction + runStatementOnDriver("alter table T compact 'minor'"); + TestTxnCommands2.runWorker(hiveConf); + String[][] expected1 = new String[][] { + {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000021/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000021/bucket_00000"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000016_0000021/bucket_00000"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000016_0000021/bucket_00000"} + }; + checkResult(expected1, testQuery, isVectorized, "load data inpath (minor)"); + + //test major compaction + runStatementOnDriver("insert into T values(2,2)"); + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + String[][] expected2 = new String[][] { + {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000027/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000027/bucket_00000"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000027/bucket_00000"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000027/bucket_00000"}, + {"{\"transactionid\":27,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000027/bucket_00000"} + }; + checkResult(expected2, testQuery, isVectorized, "load data inpath (major)"); + + //create more staging data and test Load Data Overwrite + runStatementOnDriver("insert into Tstage values(5,6),(7,8)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'"); + runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/2/data' overwrite into table T"); + String[][] expected3 = new String[][] { + {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000033/000000_0"}, + {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000033/000000_0"}}; + checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite"); + + //one more major compaction + runStatementOnDriver("insert into T values(6,6)"); + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + String[][] expected4 = new String[][] { + {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000036/bucket_00000"}, + {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000036/bucket_00000"}, + {"{\"transactionid\":36,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000036/bucket_00000"}}; + checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite (major)"); + } + /** + * Load Data [overwrite] in to an (un-)partitioned acid converted table + */ + private void loadDataNonAcid2AcidConversion(boolean isVectorized) throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) stored as orc"); + //per acid write to test nonAcid2acid conversion mixed with load data + runStatementOnDriver("insert into T values(0,2),(0,4)"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); + //make 2 more inserts so that we have 000000_0_copy_1, 000000_0_copy_2 files in export + //export works at file level so if you have copy_N in the table dir, you'll have those in output + runStatementOnDriver("insert into Tstage values(2,2),(3,3)"); + runStatementOnDriver("insert into Tstage values(4,4),(5,5)"); + //create a file we'll import later + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'"); + runStatementOnDriver("truncate table Tstage");//clean the staging table + + //now convert T to acid + runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional' = 'true')"); + //and do a Load Data into the same table, which should now land in a delta/ + // (with 000000_0, 000000_0_copy_1, 000000_0_copy_2) + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); + + String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : + "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + + String[][] expected = new String[][] { + //from pre-acid insert + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"}, + //from Load Data into acid converted table + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000024_0000024_0000/000000_0"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000024_0000024_0000/000000_0"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":2}\t2\t2", "t/delta_0000024_0000024_0000/000000_0_copy_1"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":3}\t3\t3", "t/delta_0000024_0000024_0000/000000_0_copy_1"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":4}\t4\t4", "t/delta_0000024_0000024_0000/000000_0_copy_2"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":5}\t5\t5", "t/delta_0000024_0000024_0000/000000_0_copy_2"}, + }; + checkResult(expected, testQuery, isVectorized, "load data inpath"); + + + //create more staging data with copy_N files and do LD+Overwrite + runStatementOnDriver("insert into Tstage values(5,6),(7,8)"); + runStatementOnDriver("insert into Tstage values(8,8)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'"); + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' overwrite into table T"); + + String[][] expected2 = new String[][] { + {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000030/000000_0"}, + {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000030/000000_0"}, + {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":2}\t8\t8", "t/base_0000030/000000_0_copy_1"} + }; + checkResult(expected2, testQuery, isVectorized, "load data inpath overwrite"); + + //create 1 more delta_x_x so that compactor has > dir file to compact + runStatementOnDriver("insert into T values(9,9)"); + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + + String[][] expected3 = new String[][] { + {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000033/bucket_00000"}, + {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000033/bucket_00000"}, + {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":2}\t8\t8", "t/base_0000033/bucket_00000"}, + {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t9\t9", "t/base_0000033/bucket_00000"} + + }; + checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite (major)"); + } + /** + * Load Data [overwrite] in to a partitioned transactional table + */ + @Test + public void loadDataPartitioned() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + + runStatementOnDriver("insert into Tstage values(0,2),(0,4)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'"); + runStatementOnDriver("truncate table Tstage");//because 'local' inpath doesn't delete source files + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T partition(p=0)"); + + runStatementOnDriver("insert into Tstage values(1,2),(1,4)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'"); + runStatementOnDriver("truncate table Tstage"); + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' into table T partition(p=1)"); + + runStatementOnDriver("insert into Tstage values(2,2),(2,4)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/3'"); + runStatementOnDriver("truncate table Tstage"); + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/3/data' into table T partition(p=1)"); + + List<String> rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); + String[][] expected = new String[][] { + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t2", "t/p=1/delta_0000024_0000024_0000/000000_0"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4", "t/p=1/delta_0000024_0000024_0000/000000_0"}, + {"{\"transactionid\":28,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t2", "t/p=1/delta_0000028_0000028_0000/000000_0"}, + {"{\"transactionid\":28,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4", "t/p=1/delta_0000028_0000028_0000/000000_0"}}; + checkExpected(rs, expected, "load data inpath partitioned"); + + + runStatementOnDriver("insert into Tstage values(5,2),(5,4)"); + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/4'"); + runStatementOnDriver("truncate table Tstage"); + runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/4/data' overwrite into table T partition(p=1)"); + String[][] expected2 = new String[][] { + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000020_0000020_0000/000000_0"}, + {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t1\t5\t2", "t/p=1/base_0000033/000000_0"}, + {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t1\t5\t4", "t/p=1/base_0000033/000000_0"}}; + rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); + checkExpected(rs, expected2, "load data inpath partitioned overwrite"); + } + + /** + * By default you can't load into bucketed tables. Things will break badly in acid (data loss, etc) + * if loaded data is not bucketed properly. This test is to capture that this is still the default. + * If the default is changed, Load Data should probably do more validation to ensure data is + * properly distributed into files and files are named correctly. + */ + @Test + public void testValidations() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) clustered by (a) into 2 buckets stored as orc tblproperties('transactional'='true')"); + File createdFile= folder.newFile("myfile.txt"); + FileUtils.writeStringToFile(createdFile, "hello world"); + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("insert into Tstage values(1,2),(3,4)"); + CommandProcessorResponse cpr = runStatementOnDriverNegative("load data local inpath '" + getWarehouseDir() + "' into table T"); + Assert.assertTrue(cpr.getErrorMessage().contains("Load into bucketed tables are disabled")); + } + private void checkExpected(List<String> rs, String[][] expected, String msg) { + super.checkExpected(rs, expected, msg, LOG, true); + } + @Test + public void testMMOrcTable() throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true', 'transactional_properties'='insert_only')"); + int[][] values = {{1,2},{3,4}}; + runStatementOnDriver("insert into T " + makeValuesClause(values)); + List<String> rs = runStatementOnDriver("select a, b from T order by b"); + Assert.assertEquals("", stringifyValues(values), rs); + } + + /** + * Make sure Load Data assigns ROW_IDs correctly when there is statementId suffix on delta dir + * For example, delta_x_x_0001. + */ + private void testMultiStatement(boolean isVectorized) throws Exception { + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')"); + //Tstage is just a simple way to generate test data + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("insert into Tstage values(5,5),(6,6)"); + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + //and do a Load Data into the same table, which should now land in a delta_x_x. + // 'data' is created by export command/ + runStatementOnDriver("START TRANSACTION"); + //statementId = 0 + runStatementOnDriver("insert into T values(1,2),(3,4)"); + //statementId = 1 + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); + runStatementOnDriver("COMMIT"); + + String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : + "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + String[][] expected = new String[][] { + {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000019_0000019_0000/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000019_0000019_0000/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000019_0000019_0001/000000_0"}, + {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000019_0000019_0001/000000_0"} + }; + checkResult(expected, testQuery, isVectorized, "load data inpath"); + + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + String[][] expected2 = new String[][] { + {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000019/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000019/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000019/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000019/bucket_00000"} + }; + checkResult(expected2, testQuery, isVectorized, "load data inpath (major)"); + //at lest for now, Load Data w/Overwrite is not allowed in a txn: HIVE-18154 + } + @Test + public void testAbort() throws Exception { + boolean isVectorized = false; + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("drop table if exists Tstage"); + runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')"); + //Tstage is just a simple way to generate test data + runStatementOnDriver("create table Tstage (a int, b int) stored as orc"); + runStatementOnDriver("insert into Tstage values(5,5),(6,6)"); + //this creates an ORC data file with correct schema under table root + runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); + //and do a Load Data into the same table, which should now land in a delta_x_x. + // 'data' is created by export command/ + runStatementOnDriver("insert into T values(1,2),(3,4)"); + runStatementOnDriver("START TRANSACTION"); + runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); + runStatementOnDriver("ROLLBACK"); + + String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : + "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + String[][] expected = new String[][] { + {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000019_0000019_0000/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000019_0000019_0000/bucket_00000"} + }; + checkResult(expected, testQuery, isVectorized, "load data inpath"); + } + /** + * We have to use a different query to check results for Vectorized tests because to get the + * file name info we need to use {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#FILENAME} + * which will currently make the query non-vectorizable. This means we can't check the file name + * for vectorized version of the test. + */ + private void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg) throws Exception{ + List<String> rs = runStatementOnDriver(query); + checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized); + assertVectorized(isVectorized, query); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index 7f5e091..f5f8cc8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -264,18 +264,6 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver }; checkExpected(rs, expected, "Unexpected row count after ctas"); } - private void checkExpected(List<String> rs, String[][] expected, String msg) { - LOG.warn(testName.getMethodName() + ": read data(" + msg + "): "); - for(String s : rs) { - LOG.warn(s); - } - Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size()); - //verify data and layout - for(int i = 0; i < expected.length; i++) { - Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); - Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); - } - } /** * The idea here is to create a non acid table that was written by multiple writers, i.e. * unbucketed table that has 000000_0 & 000001_0, for example. @@ -363,7 +351,9 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver Assert.assertEquals(2, BucketCodec.determineVersion(537001984).decodeWriterId(537001984)); Assert.assertEquals(1, BucketCodec.determineVersion(536936448).decodeWriterId(536936448)); + assertVectorized(true, "update T set b = 88 where b = 80"); runStatementOnDriver("update T set b = 88 where b = 80"); + assertVectorized(true, "delete from T where b = 8"); runStatementOnDriver("delete from T where b = 8"); String expected3[][] = { {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/000002_0"}, @@ -374,7 +364,7 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/000000_0_copy_1"}, {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"}, - {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/delta_0000023_0000023_0000/bucket_00000"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/delta_0000024_0000024_0000/bucket_00000"}, }; rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); checkExpected(rs, expected3,"after converting to acid (no compaction with updates)"); @@ -386,15 +376,15 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver /*Compaction preserves location of rows wrt buckets/tranches (for now)*/ String expected4[][] = { - {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/base_0000024/bucket_00002"}, - {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/base_0000024/bucket_00002"}, - {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/base_0000024/bucket_00001"}, - {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t9\t10", "warehouse/t/base_0000024/bucket_00001"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t10\t20", "warehouse/t/base_0000024/bucket_00000"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/base_0000024/bucket_00000"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/base_0000024/bucket_00000"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/base_0000024/bucket_00000"}, - {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/base_0000024/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/base_0000026/bucket_00002"}, + {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/base_0000026/bucket_00002"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/base_0000026/bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t9\t10", "warehouse/t/base_0000026/bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t10\t20", "warehouse/t/base_0000026/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/base_0000026/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/base_0000026/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/base_0000026/bucket_00000"}, + {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/base_0000026/bucket_00000"}, }; checkExpected(rs, expected4,"after major compact"); } @@ -635,15 +625,8 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver //vectorized because there is INPUT__FILE__NAME assertVectorized(false, query); } - private void assertVectorized(boolean vectorized, String query) throws Exception { - List<String> rs = runStatementOnDriver("EXPLAIN VECTORIZATION DETAIL " + query); - for(String line : rs) { - if(line != null && line.contains("Execution mode: vectorized")) { - Assert.assertTrue("Was vectorized when it wasn't expected", vectorized); - return; - } - } - Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized); + private void checkExpected(List<String> rs, String[][] expected, String msg) { + super.checkExpected(rs, expected, msg, LOG, true); } /** * HIVE-17900 http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 9f31eb1..6a2164f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -33,6 +33,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.rules.TestName; +import org.slf4j.Logger; import java.io.File; import java.util.ArrayList; @@ -74,7 +75,6 @@ public abstract class TxnCommandsBaseForTests { hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir()); - hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, @@ -151,6 +151,21 @@ public abstract class TxnCommandsBaseForTests { } throw new RuntimeException("Didn't get expected failure!"); } + + /** + * Runs Vectorized Explain on the query and checks if the plan is vectorized as expected + * @param vectorized {@code true} - assert that it's vectorized + */ + void assertVectorized(boolean vectorized, String query) throws Exception { + List<String> rs = runStatementOnDriver("EXPLAIN VECTORIZATION DETAIL " + query); + for(String line : rs) { + if(line != null && line.contains("Execution mode: vectorized")) { + Assert.assertTrue("Was vectorized when it wasn't expected", vectorized); + return; + } + } + Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized); + } /** * Will assert that actual files match expected. * @param expectedFiles - suffixes of expected Paths. Must be the same length @@ -176,4 +191,18 @@ public abstract class TxnCommandsBaseForTests { } Assert.assertEquals("Unexpected file list", expectedFiles, actualFiles); } + void checkExpected(List<String> rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) { + LOG.warn(testName.getMethodName() + ": read data(" + msg + "): "); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size()); + //verify data and layout + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line (data) " + i + " data: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + if(checkFileName) { + Assert.assertTrue("Actual line(file) " + i + " file: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); + } + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index d5ab079..afccf64 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -142,7 +142,7 @@ public class TestExecDriver extends TestCase { db.createTable(src, cols, null, TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class); db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING, - true, false, false, false, null, 0, false); + true, false, false, false, null, 0); i++; } http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index ccd7d8e..5d26524 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -836,16 +836,22 @@ public class TestInputOutputFormat { public void testEtlCombinedStrategy() throws Exception { conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL"); conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS.varname, "1000000"); + AcidUtils.setTransactionalTableScan(conf, true); + conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); + conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default"); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf); MockFileSystem fs = new MockFileSystem(conf, new MockFile("mock:/a/1/part-00", 1000, new byte[1]), new MockFile("mock:/a/1/part-01", 1000, new byte[1]), new MockFile("mock:/a/2/part-00", 1000, new byte[1]), new MockFile("mock:/a/2/part-01", 1000, new byte[1]), - new MockFile("mock:/a/3/base_0/1", 1000, new byte[1]), - new MockFile("mock:/a/4/base_0/1", 1000, new byte[1]), - new MockFile("mock:/a/5/base_0/1", 1000, new byte[1]), - new MockFile("mock:/a/5/delta_0_25/1", 1000, new byte[1]) + new MockFile("mock:/a/3/base_0/bucket_00001", 1000, new byte[1]), + new MockFile("mock:/a/4/base_0/bucket_00001", 1000, new byte[1]), + new MockFile("mock:/a/5/base_0/bucket_00001", 1000, new byte[1]), + new MockFile("mock:/a/5/delta_0_25/bucket_00001", 1000, new byte[1]), + new MockFile("mock:/a/6/delta_27_29/bucket_00001", 1000, new byte[1]), + new MockFile("mock:/a/6/delete_delta_27_29/bucket_00001", 1000, new byte[1]) ); OrcInputFormat.CombinedCtx combineCtx = new OrcInputFormat.CombinedCtx(); @@ -891,20 +897,27 @@ public class TestInputOutputFormat { assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy); assertEquals(2, etlSs.files.size()); assertEquals(2, etlSs.dirs.size()); - // The fifth will not be combined because of delta files. + // The fifth could be combined again. ss = createOrCombineStrategies(context, fs, "mock:/a/5", combineCtx); + assertTrue(ss.isEmpty()); + assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy); + assertEquals(4, etlSs.files.size()); + assertEquals(3, etlSs.dirs.size()); + + // The sixth will not be combined because of delete delta files. Is that desired? HIVE-18110 + ss = createOrCombineStrategies(context, fs, "mock:/a/6", combineCtx); assertEquals(1, ss.size()); assertTrue(ss.get(0) instanceof OrcInputFormat.ETLSplitStrategy); assertNotSame(etlSs, ss); - assertEquals(2, etlSs.files.size()); - assertEquals(2, etlSs.dirs.size()); + assertEquals(4, etlSs.files.size()); + assertEquals(3, etlSs.dirs.size()); } public List<SplitStrategy<?>> createOrCombineStrategies(OrcInputFormat.Context context, MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException { OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path); return OrcInputFormat.determineSplitStrategies(combineCtx, context, - adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, + adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents, null, null, true); } @@ -918,7 +931,7 @@ public class TestInputOutputFormat { OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException { OrcInputFormat.AcidDirInfo adi = gen.call(); return OrcInputFormat.determineSplitStrategies( - null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, + null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents, null, null, true); } @@ -3586,10 +3599,14 @@ public class TestInputOutputFormat { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: open to read data - split 1 => mock:/mocktable8/0_0 - // call-2: split 2 - find hive.acid.key.index in footer of delta_x_y/bucket_00001 - // call-3: split 2 - read delta_x_y/bucket_00001 - assertEquals(5, readOpsDelta); + // call-1: open(mock:/mocktable7/0_0) + // call-2: open(mock:/mocktable7/0_0) + // call-3: listLocatedFileStatuses(mock:/mocktable7) + // call-4: getFileStatus(mock:/mocktable7/delta_0000001_0000001_0000/_metadata_acid) + // call-5: open(mock:/mocktable7/delta_0000001_0000001_0000/bucket_00001) + // call-6: getFileStatus(mock:/mocktable7/delta_0000001_0000001_0000/_metadata_acid) + // call-7: open(mock:/mocktable7/delta_0000001_0000001_0000/bucket_00001) + assertEquals(7, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3662,9 +3679,11 @@ public class TestInputOutputFormat { } } // call-1: open to read data - split 1 => mock:/mocktable8/0_0 - // call-2: split 2 - find hive.acid.key.index in footer of delta_x_y/bucket_00001 - // call-3: split 2 - read delta_x_y/bucket_00001 - assertEquals(3, readOpsDelta); + // call-2: listLocatedFileStatus(mock:/mocktable8) + // call-3: getFileStatus(mock:/mocktable8/delta_0000001_0000001_0000/_metadata_acid) + // call-4: getFileStatus(mock:/mocktable8/delta_0000001_0000001_0000/_metadata_acid) + // call-5: open(mock:/mocktable8/delta_0000001_0000001_0000/bucket_00001) + assertEquals(5, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 9628a40..030f012 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -298,7 +298,7 @@ public class TestOrcRawRecordMerger { int BUCKET = 10; ReaderKey key = new ReaderKey(); Configuration conf = new Configuration(); - int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET); + int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET, 0); Reader reader = createMockOriginalReader(); RecordIdentifier minKey = new RecordIdentifier(0, bucketProperty, 1); RecordIdentifier maxKey = new RecordIdentifier(0, bucketProperty, 3); @@ -308,7 +308,7 @@ public class TestOrcRawRecordMerger { fs.makeQualified(root); fs.create(root); ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, BUCKET, minKey, maxKey, - new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList()); + new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList(), 0); RecordReader recordReader = pair.getRecordReader(); assertEquals(0, key.getTransactionId()); assertEquals(bucketProperty, key.getBucketProperty()); @@ -338,13 +338,13 @@ public class TestOrcRawRecordMerger { ReaderKey key = new ReaderKey(); Reader reader = createMockOriginalReader(); Configuration conf = new Configuration(); - int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET); + int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET, 0); FileSystem fs = FileSystem.getLocal(conf); Path root = new Path(tmpDir, "testOriginalReaderPairNoMin"); fs.makeQualified(root); fs.create(root); ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, BUCKET, null, null, - new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList()); + new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList(), 0); assertEquals("first", value(pair.nextRecord())); assertEquals(0, key.getTransactionId()); assertEquals(bucketProperty, key.getBucketProperty()); @@ -835,6 +835,8 @@ public class TestOrcRawRecordMerger { assertEquals(null, merger.getMaxKey()); assertEquals(true, merger.next(id, event)); + //minor comp, so we ignore 'base_0000100' files so all Deletes end up first since + // they all modify primordial rows assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id); @@ -891,10 +893,10 @@ public class TestOrcRawRecordMerger { baseReader = OrcFile.createReader(basePath, OrcFile.readerOptions(conf)); merger = - new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, + new OrcRawRecordMerger(conf, true, null, false, BUCKET, createMaximalTxnList(), new Reader.Options(), AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options() - .isCompacting(true).isMajorCompaction(true)); + .isCompacting(true).isMajorCompaction(true).baseDir(new Path(root, "base_0000100"))); assertEquals(null, merger.getMinKey()); assertEquals(null, merger.getMaxKey()); http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index b2ac687..95e3463 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -48,7 +48,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.orc.TypeDescription; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; + /** * This class tests the VectorizedOrcAcidRowBatchReader by creating an actual split and a set * of delete delta files. The split is on an insert delta and there are multiple delete deltas @@ -186,7 +186,7 @@ public class TestVectorizedOrcAcidRowBatchReader { OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null); OrcInputFormat.AcidDirInfo adi = gen.call(); List<OrcInputFormat.SplitStrategy<?>> splitStrategies = OrcInputFormat.determineSplitStrategies( - null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, + null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents, null, null, true); assertEquals(1, splitStrategies.size()); List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits(); http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/queries/clientnegative/load_data_into_acid.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/load_data_into_acid.q b/ql/src/test/queries/clientnegative/load_data_into_acid.q index fba1496..2ac5b56 100644 --- a/ql/src/test/queries/clientnegative/load_data_into_acid.q +++ b/ql/src/test/queries/clientnegative/load_data_into_acid.q @@ -1,7 +1,5 @@ -set hive.strict.checks.bucketing=false; set hive.support.concurrency=true; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; -set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; create table acid_ivot( ctinyint TINYINT, @@ -15,7 +13,7 @@ create table acid_ivot( ctimestamp1 TIMESTAMP, ctimestamp2 TIMESTAMP, cboolean1 BOOLEAN, - cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true'); + cboolean2 BOOLEAN) stored as orc TBLPROPERTIES ('transactional'='true'); LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot; http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/results/clientnegative/load_data_into_acid.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientnegative/load_data_into_acid.q.out b/ql/src/test/results/clientnegative/load_data_into_acid.q.out index cd829ba..46b5cdd 100644 --- a/ql/src/test/results/clientnegative/load_data_into_acid.q.out +++ b/ql/src/test/results/clientnegative/load_data_into_acid.q.out @@ -10,7 +10,7 @@ PREHOOK: query: create table acid_ivot( ctimestamp1 TIMESTAMP, ctimestamp2 TIMESTAMP, cboolean1 BOOLEAN, - cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true') + cboolean2 BOOLEAN) stored as orc TBLPROPERTIES ('transactional'='true') PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@acid_ivot @@ -26,8 +26,8 @@ POSTHOOK: query: create table acid_ivot( ctimestamp1 TIMESTAMP, ctimestamp2 TIMESTAMP, cboolean1 BOOLEAN, - cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true') + cboolean2 BOOLEAN) stored as orc TBLPROPERTIES ('transactional'='true') POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@acid_ivot -FAILED: SemanticException [Error 10266]: LOAD DATA... statement is not supported on transactional table default@acid_ivot. +FAILED: SemanticException [Error 30023]: alltypesorc file name is not valid in Load Data into Acid table default.acid_ivot. Examples of valid names are: 00000_0, 00000_0_copy_1 http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/results/clientpositive/acid_table_stats.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/acid_table_stats.q.out b/ql/src/test/results/clientpositive/acid_table_stats.q.out index d0fbcac..4c8297e 100644 --- a/ql/src/test/results/clientpositive/acid_table_stats.q.out +++ b/ql/src/test/results/clientpositive/acid_table_stats.q.out @@ -38,6 +38,7 @@ Table Parameters: rawDataSize 0 totalSize 0 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/results/clientpositive/autoColumnStats_4.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/autoColumnStats_4.q.out b/ql/src/test/results/clientpositive/autoColumnStats_4.q.out index 2bc1789..b3df04f 100644 --- a/ql/src/test/results/clientpositive/autoColumnStats_4.q.out +++ b/ql/src/test/results/clientpositive/autoColumnStats_4.q.out @@ -29,6 +29,7 @@ Table Parameters: rawDataSize 0 totalSize 0 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information @@ -198,6 +199,7 @@ Table Parameters: rawDataSize 0 totalSize 1798 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information @@ -241,6 +243,7 @@ Table Parameters: rawDataSize 0 totalSize 2909 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/results/clientpositive/mm_default.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/mm_default.q.out b/ql/src/test/results/clientpositive/mm_default.q.out index ebbcb9d..1345efd 100644 --- a/ql/src/test/results/clientpositive/mm_default.q.out +++ b/ql/src/test/results/clientpositive/mm_default.q.out @@ -324,6 +324,7 @@ Table Parameters: rawDataSize 0 totalSize 0 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index 25caf29..da10313 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -128,7 +128,12 @@ public final class TransactionalValidationListener extends MetaStorePreEventList parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, transactionalValue); } if ("true".equalsIgnoreCase(transactionalValue) && !"true".equalsIgnoreCase(oldTransactionalValue)) { - //only need to check conformance if alter table enabled aicd + if(!isTransactionalPropertiesPresent) { + normazlieTransactionalPropertyDefault(newTable); + isTransactionalPropertiesPresent = true; + transactionalPropertiesValue = DEFAULT_TRANSACTIONAL_PROPERTY; + } + //only need to check conformance if alter table enabled acid if (!conformToAcid(newTable)) { // INSERT_ONLY tables don't have to conform to ACID requirement like ORC or bucketing if (transactionalPropertiesValue == null || !"insert_only".equalsIgnoreCase(transactionalPropertiesValue)) { @@ -232,6 +237,9 @@ public final class TransactionalValidationListener extends MetaStorePreEventList // normalize prop name parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString()); + if(transactionalProperties == null) { + normazlieTransactionalPropertyDefault(newTable); + } initializeTransactionalProperties(newTable); return; } @@ -241,6 +249,16 @@ public final class TransactionalValidationListener extends MetaStorePreEventList } /** + * When a table is marked transactional=true but transactional_properties is not set then + * transactional_properties should take on the default value. Easier to make this explicit in + * table definition than keep checking everywhere if it's set or not. + */ + private void normazlieTransactionalPropertyDefault(Table table) { + table.getParameters().put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, + DEFAULT_TRANSACTIONAL_PROPERTY); + + } + /** * Check that InputFormatClass/OutputFormatClass should implement * AcidInputFormat/AcidOutputFormat */