http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index d61b24b..37aaeb6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -50,6 +50,8 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit private boolean hasFooter; private boolean isOriginal; private boolean hasBase; + //partition root + private Path rootDir; private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>(); private long projColsUncompressedSize; private transient Object fileKey; @@ -70,7 +72,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit public OrcSplit(Path path, Object fileId, long offset, long length, String[] hosts, OrcTail orcTail, boolean isOriginal, boolean hasBase, - List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize, long fileLen) { + List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize, long fileLen, Path rootDir) { super(path, offset, length, hosts); // For HDFS, we could avoid serializing file ID and just replace the path with inode-based // path. However, that breaks bunch of stuff because Hive later looks up things by split path. @@ -79,6 +81,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit hasFooter = this.orcTail != null; this.isOriginal = isOriginal; this.hasBase = hasBase; + this.rootDir = rootDir; this.deltas.addAll(deltas); this.projColsUncompressedSize = projectedDataSize <= 0 ? length : projectedDataSize; // setting file length to Long.MAX_VALUE will let orc reader read file length from file system @@ -129,6 +132,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit ((Writable)fileKey).write(out); } out.writeLong(fileLen); + out.writeUTF(rootDir.toString()); } @Override @@ -168,6 +172,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit this.fileKey = fileId; } fileLen = in.readLong(); + rootDir = new Path(in.readUTF()); } public OrcTail getOrcTail() { @@ -186,6 +191,9 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit return hasBase; } + public Path getRootDir() { + return rootDir; + } public List<AcidInputFormat.DeltaMetaData> getDeltas() { return deltas; }
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 8f80710..138e56e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; @@ -357,7 +358,7 @@ public class VectorizedOrcAcidRowBatchReader int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId(); String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); - OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false); + OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isDeleteReader(true); assert !orcSplit.isOriginal() : "If this now supports Original splits, set up mergeOptions properly"; this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket, validTxnList, readerOptions, deleteDeltas, @@ -530,6 +531,9 @@ public class VectorizedOrcAcidRowBatchReader * For every call to next(), it returns the next smallest record id in the file if available. * Internally, the next() buffers a row batch and maintains an index pointer, reading the * next batch when the previous batch is exhausted. + * + * For unbucketed tables this will currently return all delete events. Once we trust that + * the N in bucketN for "base" spit is reliable, all delete events not matching N can be skipped. */ static class DeleteReaderValue { private VectorizedRowBatch batch; @@ -538,9 +542,10 @@ public class VectorizedOrcAcidRowBatchReader private final int bucketForSplit; // The bucket value should be same for all the records. private final ValidTxnList validTxnList; private boolean isBucketPropertyRepeating; + private final boolean isBucketedTable; public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket, - ValidTxnList validTxnList) throws IOException { + ValidTxnList validTxnList, boolean isBucketedTable) throws IOException { this.recordReader = deleteDeltaReader.rowsOptions(readerOptions); this.bucketForSplit = bucket; this.batch = deleteDeltaReader.getSchema().createRowBatch(); @@ -549,6 +554,7 @@ public class VectorizedOrcAcidRowBatchReader } this.indexPtrInBatch = 0; this.validTxnList = validTxnList; + this.isBucketedTable = isBucketedTable; checkBucketId();//check 1st batch } @@ -615,6 +621,13 @@ public class VectorizedOrcAcidRowBatchReader * either the split computation got messed up or we found some corrupted records. */ private void checkBucketId(int bucketPropertyFromRecord) throws IOException { + if(!isBucketedTable) { + /** + * in this case a file inside a delete_delta_x_y/bucketN may contain any value for + * bucketId in {@link RecordIdentifier#getBucketProperty()} + */ + return; + } int bucketIdFromRecord = BucketCodec.determineVersion(bucketPropertyFromRecord) .decodeWriterId(bucketPropertyFromRecord); if(bucketIdFromRecord != bucketForSplit) { @@ -686,14 +699,16 @@ public class VectorizedOrcAcidRowBatchReader this.rowIds = null; this.compressedOtids = null; int maxEventsInMemory = HiveConf.getIntVar(conf, ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY); + final boolean isBucketedTable = conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0; try { final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit); if (deleteDeltaDirs.length > 0) { int totalDeleteEventCount = 0; for (Path deleteDeltaDir : deleteDeltaDirs) { - Path deleteDeltaFile = AcidUtils.createBucketFile(deleteDeltaDir, bucket); - FileSystem fs = deleteDeltaFile.getFileSystem(conf); + FileSystem fs = deleteDeltaDir.getFileSystem(conf); + for(Path deleteDeltaFile : OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket, conf, + new OrcRawRecordMerger.Options().isCompacting(false), isBucketedTable)) { // NOTE: Calling last flush length below is more for future-proofing when we have // streaming deletes. But currently we don't support streaming deletes, and this can // be removed if this becomes a performance issue. @@ -721,7 +736,7 @@ public class VectorizedOrcAcidRowBatchReader throw new DeleteEventsOverflowMemoryException(); } DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader, - readerOptions, bucket, validTxnList); + readerOptions, bucket, validTxnList, isBucketedTable); DeleteRecordKey deleteRecordKey = new DeleteRecordKey(); if (deleteReaderValue.next(deleteRecordKey)) { sortMerger.put(deleteRecordKey, deleteReaderValue); @@ -730,6 +745,7 @@ public class VectorizedOrcAcidRowBatchReader } } } + } if (totalDeleteEventCount > 0) { // Initialize the rowId array when we have some delete events. rowIds = new long[totalDeleteEventCount]; http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index 76aa39f..4b11a4a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -206,6 +206,7 @@ public class SortedDynPartitionOptimizer extends Transform { if(!VirtualColumn.ROWID.getTypeInfo().equals(ci.getType())) { throw new IllegalStateException("expected 1st column to be ROW__ID but got wrong type: " + ci.toString()); } + //HIVE-17328: not sure this is correct... I don't think is gets wrapped in UDFToInteger.... bucketColumns.add(new ExprNodeColumnDesc(ci)); } else { if (!destTable.getSortCols().isEmpty()) { http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index bc6e0d5..e8acabe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7377,9 +7377,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } // Check constraints on acid tables. This includes - // * no insert overwrites - // * no use of vectorization - // * turns off reduce deduplication optimization, as that sometimes breaks acid // * Check that the table is bucketed // * Check that the table is not sorted // This method assumes you have already decided that this is an Acid write. Don't call it if @@ -7397,9 +7394,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { */ conf.set(AcidUtils.CONF_ACID_KEY, "true"); - if (table.getNumBuckets() < 1) { - throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, table.getTableName()); - } if (table.getSortCols() != null && table.getSortCols().size() > 0) { throw new SemanticException(ErrorMsg.ACID_NO_SORTED_BUCKETS, table.getTableName()); } http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 5e2146e..04ef7fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; @@ -93,7 +94,7 @@ public class CompactorMR { static final private String IS_MAJOR = "hive.compactor.is.major"; static final private String IS_COMPRESSED = "hive.compactor.is.compressed"; static final private String TABLE_PROPS = "hive.compactor.table.props"; - static final private String NUM_BUCKETS = "hive.compactor.num.buckets"; + static final private String NUM_BUCKETS = hive_metastoreConstants.BUCKET_COUNT; static final private String BASE_DIR = "hive.compactor.base.dir"; static final private String DELTA_DIRS = "hive.compactor.delta.dirs"; static final private String DIRS_TO_SEARCH = "hive.compactor.dirs.to.search"; http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index bff9884..0f129fc 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -69,68 +69,17 @@ import java.util.concurrent.TimeUnit; * Tests here are for multi-statement transactions (WIP) and those that don't need to * run with Acid 2.0 (see subclasses of TestTxnCommands2) */ -public class TestTxnCommands { +public class TestTxnCommands extends TestTxnCommandsBase { static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class); private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + TestTxnCommands.class.getCanonicalName() + "-" + System.currentTimeMillis() ).getPath().replaceAll("\\\\", "/"); - private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; - //bucket count for test tables; set it to 1 for easier debugging - private static int BUCKET_COUNT = 2; - @Rule - public TestName testName = new TestName(); - private HiveConf hiveConf; - private Driver d; - private static enum Table { - ACIDTBL("acidTbl"), - ACIDTBLPART("acidTblPart"), - ACIDTBL2("acidTbl2"), - NONACIDORCTBL("nonAcidOrcTbl"), - NONACIDORCTBL2("nonAcidOrcTbl2"); - - private final String name; - @Override - public String toString() { - return name; - } - Table(String name) { - this.name = name; - } + @Override + String getTestDataDir() { + return TEST_DATA_DIR; } - @Before - public void setUp() throws Exception { - tearDown(); - hiveConf = new HiveConf(this.getClass()); - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); - hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); - hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); - hiveConf - .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, - "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); - hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); - TxnDbUtil.setConfValues(hiveConf); - TxnDbUtil.prepDb(); - File f = new File(TEST_WAREHOUSE_DIR); - if (f.exists()) { - FileUtil.fullyDelete(f); - } - if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) { - throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR); - } - SessionState.start(new SessionState(hiveConf)); - d = new Driver(hiveConf); - d.setMaxRows(10000); - dropTables(); - runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); - runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); - runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - } private void dropTables() throws Exception { for(Table t : Table.values()) { runStatementOnDriver("drop table if exists " + t); @@ -150,7 +99,7 @@ public class TestTxnCommands { FileUtils.deleteDirectory(new File(TEST_DATA_DIR)); } } - @Test + @Test//todo: what is this for? public void testInsertOverwrite() throws Exception { runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select a,b from " + Table.NONACIDORCTBL2); runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "3(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); @@ -172,7 +121,7 @@ public class TestTxnCommands { if(true) { return; } - Path bucket = AcidUtils.createBucketFile(new Path(new Path(TEST_WAREHOUSE_DIR, table.toString().toLowerCase()), AcidUtils.deltaSubdir(txnId, txnId, stmtId)), bucketNum); + Path bucket = AcidUtils.createBucketFile(new Path(new Path(getWarehouseDir(), table.toString().toLowerCase()), AcidUtils.deltaSubdir(txnId, txnId, stmtId)), bucketNum); FileOutputStream delta = new FileOutputStream(testName.getMethodName() + "_" + bucket.getParent().getName() + "_" + bucket.getName()); // try { // FileDump.printJsonData(hiveConf, bucket.toString(), delta); @@ -446,7 +395,7 @@ public class TestTxnCommands { } } Assert.assertNotNull(txnInfo); - Assert.assertEquals(12, txnInfo.getId()); + Assert.assertEquals(14, txnInfo.getId()); Assert.assertEquals(TxnState.OPEN, txnInfo.getState()); String s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false); String[] vals = s.split("\\s+"); @@ -490,33 +439,6 @@ public class TestTxnCommands { } } - /** - * takes raw data and turns it into a string as if from Driver.getResults() - * sorts rows in dictionary order - */ - private List<String> stringifyValues(int[][] rowsIn) { - return TestTxnCommands2.stringifyValues(rowsIn); - } - private String makeValuesClause(int[][] rows) { - return TestTxnCommands2.makeValuesClause(rows); - } - - private List<String> runStatementOnDriver(String stmt) throws Exception { - CommandProcessorResponse cpr = d.run(stmt); - if(cpr.getResponseCode() != 0) { - throw new RuntimeException(stmt + " failed: " + cpr); - } - List<String> rs = new ArrayList<String>(); - d.getResults(rs); - return rs; - } - private CommandProcessorResponse runStatementOnDriverNegative(String stmt) throws Exception { - CommandProcessorResponse cpr = d.run(stmt); - if(cpr.getResponseCode() != 0) { - return cpr; - } - throw new RuntimeException("Didn't get expected failure!"); - } @Test public void exchangePartition() throws Exception { @@ -872,8 +794,8 @@ public class TestTxnCommands { Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0")); Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5")); Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1")); - Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); - Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000014_0000014_0000/bucket_00001")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":16,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000016_0000016_0000/bucket_00001")); //run Compaction runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); TestTxnCommands2.runWorker(hiveConf); @@ -884,13 +806,13 @@ public class TestTxnCommands { } Assert.assertEquals("", 4, rs.size()); Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12")); - Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000014/bucket_00000")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000016/bucket_00000")); Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2")); - Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000014/bucket_00001")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000016/bucket_00001")); Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5")); - Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000014/bucket_00001")); - Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); - Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000014/bucket_00001")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000016/bucket_00001")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":16,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000016/bucket_00001")); //make sure they are the same before and after compaction } @@ -940,4 +862,4 @@ public class TestTxnCommands { int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, {3, -1}, {3, -1}}; Assert.assertEquals(stringifyValues(expected), r); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 0e0fca3..21b4a2c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -304,8 +304,8 @@ public class TestTxnCommands2 { // 1. Insert five rows to Non-ACID table. runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2),(3,4),(5,6),(7,8),(9,10)"); - // 2. Convert NONACIDORCTBL to ACID table. - runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + // 2. Convert NONACIDORCTBL to ACID table. //todo: remove trans_prop after HIVE-17089 + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b = b*2 where b in (4,10)"); runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 7"); @@ -331,6 +331,7 @@ public class TestTxnCommands2 { /** * see HIVE-16177 * See also {@link TestTxnCommands#testNonAcidToAcidConversion01()} + * {@link TestTxnNoBuckets#testCTAS()} */ @Test public void testNonAcidToAcidConversion02() throws Exception { @@ -341,8 +342,8 @@ public class TestTxnCommands2 { //create 1 row in a file 000001_0_copy2 (and empty 000000_0_copy2?) runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,6)"); - //convert the table to Acid - runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + //convert the table to Acid //todo: remove trans_prop after HIVE-17089 + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); List<String> rs1 = runStatementOnDriver("describe "+ Table.NONACIDORCTBL); //create a some of delta directories runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,15),(1,16)"); @@ -361,6 +362,7 @@ public class TestTxnCommands2 { * All ROW__IDs are unique on read after conversion to acid * ROW__IDs are exactly the same before and after compaction * Also check the file name (only) after compaction for completeness + * Note: order of rows in a file ends up being the reverse of order in values clause (why?!) */ String[][] expected = { {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13", "bucket_00000"}, @@ -2022,7 +2024,7 @@ public class TestTxnCommands2 { } static String makeValuesClause(int[][] rows) { assert rows.length > 0; - StringBuilder sb = new StringBuilder("values"); + StringBuilder sb = new StringBuilder(" values"); for(int[] row : rows) { assert row.length > 0; if(row.length > 1) { http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsBase.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsBase.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsBase.java new file mode 100644 index 0000000..d6e709d --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsBase.java @@ -0,0 +1,162 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TestName; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class TestTxnCommandsBase { + //bucket count for test tables; set it to 1 for easier debugging + final static int BUCKET_COUNT = 2; + @Rule + public TestName testName = new TestName(); + HiveConf hiveConf; + Driver d; + enum Table { + ACIDTBL("acidTbl"), + ACIDTBLPART("acidTblPart"), + ACIDTBL2("acidTbl2"), + NONACIDORCTBL("nonAcidOrcTbl"), + NONACIDORCTBL2("nonAcidOrcTbl2"), + NONACIDNONBUCKET("nonAcidNonBucket"); + + final String name; + @Override + public String toString() { + return name; + } + Table(String name) { + this.name = name; + } + } + + @Before + public void setUp() throws Exception { + setUpInternal(); + } + void setUpInternal() throws Exception { + tearDown(); + hiveConf = new HiveConf(this.getClass()); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir()); + hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); + hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); + hiveConf + .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); + TxnDbUtil.setConfValues(hiveConf); + TxnDbUtil.prepDb(); + File f = new File(getWarehouseDir()); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + if (!(new File(getWarehouseDir()).mkdirs())) { + throw new RuntimeException("Could not create " + getWarehouseDir()); + } + SessionState.start(new SessionState(hiveConf)); + d = new Driver(hiveConf); + d.setMaxRows(10000); + dropTables(); + runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + "(a int, b int) stored as orc"); + } + private void dropTables() throws Exception { + for(TestTxnCommandsBase.Table t : TestTxnCommandsBase.Table.values()) { + runStatementOnDriver("drop table if exists " + t); + } + } + @After + public void tearDown() throws Exception { + try { + if (d != null) { + dropTables(); + d.destroy(); + d.close(); + d = null; + } + } finally { + TxnDbUtil.cleanDb(); + FileUtils.deleteDirectory(new File(getTestDataDir())); + } + } + String getWarehouseDir() { + return getTestDataDir() + "/warehouse"; + } + abstract String getTestDataDir(); + /** + * takes raw data and turns it into a string as if from Driver.getResults() + * sorts rows in dictionary order + */ + List<String> stringifyValues(int[][] rowsIn) { + return TestTxnCommands2.stringifyValues(rowsIn); + } + String makeValuesClause(int[][] rows) { + return TestTxnCommands2.makeValuesClause(rows); + } + + List<String> runStatementOnDriver(String stmt) throws Exception { + CommandProcessorResponse cpr = d.run(stmt); + if(cpr.getResponseCode() != 0) { + throw new RuntimeException(stmt + " failed: " + cpr); + } + List<String> rs = new ArrayList<String>(); + d.getResults(rs); + return rs; + } + CommandProcessorResponse runStatementOnDriverNegative(String stmt) throws Exception { + CommandProcessorResponse cpr = d.run(stmt); + if(cpr.getResponseCode() != 0) { + return cpr; + } + throw new RuntimeException("Didn't get expected failure!"); + } + /** + * Will assert that actual files match expected. + * @param expectedFiles - suffixes of expected Paths. Must be the same length + * @param rootPath - table or patition root where to start looking for actual files, recursively + */ + void assertExpectedFileSet(Set<String> expectedFiles, String rootPath) throws Exception { + int suffixLength = 0; + for(String s : expectedFiles) { + if(suffixLength > 0) { + assert suffixLength == s.length() : "all entries must be the same length. current: " + s; + } + suffixLength = s.length(); + } + FileSystem fs = FileSystem.get(hiveConf); + Set<String> actualFiles = new HashSet<>(); + RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(new Path(rootPath), true); + while (remoteIterator.hasNext()) { + LocatedFileStatus lfs = remoteIterator.next(); + if(!lfs.isDirectory() && org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER.accept(lfs.getPath())) { + String p = lfs.getPath().toString(); + actualFiles.add(p.substring(p.length() - suffixLength, p.length())); + } + } + Assert.assertEquals("Unexpected file list", expectedFiles, actualFiles); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java new file mode 100644 index 0000000..7aca6b2 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -0,0 +1,297 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.BucketCodec; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class TestTxnNoBuckets extends TestTxnCommandsBase { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnNoBuckets.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnNoBuckets.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + @Override + @Before + public void setUp() throws Exception { + setUpInternal(); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + } + + /** + * Tests that Acid can work with un-bucketed tables. + */ + @Test + public void testNoBuckets() throws Exception { + int[][] sourceVals1 = {{0,0,0},{3,3,3}}; + int[][] sourceVals2 = {{1,1,1},{2,2,2}}; + runStatementOnDriver("drop table if exists tmp"); + runStatementOnDriver("create table tmp (c1 integer, c2 integer, c3 integer) stored as orc"); + runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals1)); + runStatementOnDriver("insert into tmp " + makeValuesClause(sourceVals2)); + runStatementOnDriver("drop table if exists nobuckets"); + runStatementOnDriver("create table nobuckets (c1 integer, c2 integer, c3 integer) stored " + + "as orc tblproperties('transactional'='true', 'transactional_properties'='default')"); + String stmt = "insert into nobuckets select * from tmp"; + runStatementOnDriver(stmt); + List<String> rs = runStatementOnDriver( + "select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by ROW__ID"); + Assert.assertEquals("", 4, rs.size()); + LOG.warn("after insert"); + for(String s : rs) { + LOG.warn(s); + } + /**the insert creates 2 output files (presumably because there are 2 input files) + * The number in the file name is writerId. This is the number encoded in ROW__ID.bucketId - + * see {@link org.apache.hadoop.hive.ql.io.BucketCodec}*/ + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t0\t")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t1\t")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001")); + /*todo: WTF? + RS for update seems to spray randomly... is that OK? maybe as long as all resultant files have different names... will they? + Assuming we name them based on taskId, we should create bucketX and bucketY. + we delete events can be written to bucketX file it could be useful for filter delete for a split by file name since the insert + events seem to be written to a proper bucketX file. In fact this may reduce the number of changes elsewhere like compactor... maybe + But this limits the parallelism - what is worse, you don't know what the parallelism should be until you have a list of all the + input files since bucket count is no longer a metadata property. Also, with late Update split, the file name has already been determined + from taskId so the Insert part won't end up matching the bucketX property necessarily. + With early Update split, the Insert can still be an insert - i.e. go to appropriate bucketX. But deletes will still go wherever (random shuffle) + unless you know all the bucketX files to be read - may not be worth the trouble. + * 2nd: something in FS fails. ArrayIndexOutOfBoundsException: 1 at FileSinkOperator.process(FileSinkOperator.java:779)*/ + runStatementOnDriver("update nobuckets set c3 = 17 where c3 in(0,1)"); + rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID"); + LOG.warn("after update"); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001")); + //so update has 1 writer which creates bucket0 where both new rows land + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000021_0000021_0000/bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t17\t")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000021_0000021_0000/bucket_00000")); + + Set<String> expectedFiles = new HashSet<>(); + //both delete events land in a single bucket0. Each has a different ROW__ID.bucketId value (even writerId in it is different) + expectedFiles.add("ts/delete_delta_0000021_0000021_0000/bucket_00000"); + expectedFiles.add("nobuckets/delta_0000019_0000019_0000/bucket_00000"); + expectedFiles.add("nobuckets/delta_0000019_0000019_0000/bucket_00001"); + expectedFiles.add("nobuckets/delta_0000021_0000021_0000/bucket_00000"); + //check that we get the right files on disk + assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets"); + //todo: it would be nice to check the contents of the files... could use orc.FileDump - it has + // methods to print to a supplied stream but those are package private + + runStatementOnDriver("alter table nobuckets compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID"); + LOG.warn("after major compact"); + for(String s : rs) { + LOG.warn(s); + } + /* +âââ base_0000021 +â  âââ bucket_00000 +â  âââ bucket_00001 +âââ delete_delta_0000021_0000021_0000 +â  âââ bucket_00000 +âââ delta_0000019_0000019_0000 +â  âââ bucket_00000 +â  âââ bucket_00001 +âââ delta_0000021_0000021_0000 + âââ bucket_00000 + */ + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3\t")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/base_0000021/bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/base_0000021/bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t17\t")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/base_0000021/bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/base_0000021/bucket_00001")); + + expectedFiles.clear(); + expectedFiles.add("delete_delta_0000021_0000021_0000/bucket_00000"); + expectedFiles.add("uckets/delta_0000019_0000019_0000/bucket_00000"); + expectedFiles.add("uckets/delta_0000019_0000019_0000/bucket_00001"); + expectedFiles.add("uckets/delta_0000021_0000021_0000/bucket_00000"); + expectedFiles.add("/warehouse/nobuckets/base_0000021/bucket_00000"); + expectedFiles.add("/warehouse/nobuckets/base_0000021/bucket_00001"); + assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets"); + + TestTxnCommands2.runCleaner(hiveConf); + rs = runStatementOnDriver("select c1, c2, c3 from nobuckets order by c1, c2, c3"); + int[][] result = {{0,0,17},{1,1,17},{2,2,2},{3,3,3}}; + Assert.assertEquals("Unexpected result after clean", stringifyValues(result), rs); + + expectedFiles.clear(); + expectedFiles.add("nobuckets/base_0000021/bucket_00000"); + expectedFiles.add("nobuckets/base_0000021/bucket_00001"); + assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets"); + } + + /** + * all of these pass but don't do exactly the right thing + * files land as if it's not an acid table "warehouse/myctas4/000000_0" + * even though in {@link org.apache.hadoop.hive.metastore.TransactionalValidationListener} fires + * and sees it as transactional table + * look for QB.isCTAS() and CreateTableDesc() in SemanticAnalyzer + * + * On read, these files are treated like non acid to acid conversion + * + * see HIVE-15899 + * See CTAS tests in TestAcidOnTez + */ + @Test + public void testCTAS() throws Exception { + int[][] values = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + makeValuesClause(values)); + runStatementOnDriver("create table myctas stored as ORC TBLPROPERTIES ('transactional" + + "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL); + List<String> rs = runStatementOnDriver("select * from myctas order by a, b"); + Assert.assertEquals(stringifyValues(values), rs); + + runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values)); + runStatementOnDriver("create table myctas2 stored as ORC TBLPROPERTIES ('transactional" + + "'='true', 'transactional_properties'='default') as select a, b from " + Table.ACIDTBL); + rs = runStatementOnDriver("select * from myctas2 order by a, b"); + Assert.assertEquals(stringifyValues(values), rs); + + runStatementOnDriver("create table myctas3 stored as ORC TBLPROPERTIES ('transactional" + + "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL + + " union all select a, b from " + Table.ACIDTBL); + rs = runStatementOnDriver("select * from myctas3 order by a, b"); + Assert.assertEquals(stringifyValues(new int[][] {{1,2},{1,2},{3,4},{3,4}}), rs); + + runStatementOnDriver("create table myctas4 stored as ORC TBLPROPERTIES ('transactional" + + "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL + + " union distinct select a, b from " + Table.ACIDTBL); + rs = runStatementOnDriver("select * from myctas4 order by a, b"); + Assert.assertEquals(stringifyValues(values), rs); + } + /** + * see HIVE-16177 + * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()} todo need test with > 1 bucket file + */ + @Test + public void testToAcidConversion02() throws Exception { + //create 2 rows in a file 00000_0 + runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,2),(1,3)"); + //create 4 rows in a file 000000_0_copy_1 + runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(0,12),(0,13),(1,4),(1,5)"); + //create 1 row in a file 000000_0_copy_2 + runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,6)"); + + //convert the table to Acid //todo: remove trans_prop after HIVE-17089 + runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); + List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID"); + LOG.warn("before acid ops (after convert)"); + for(String s : rs) { + LOG.warn(s); + } + //create a some of delta directories + runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(0,15),(1,16)"); + runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set b = 120 where a = 0 and b = 12"); + runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(0,17)"); + runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 1 and b = 3"); + + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a,b"); + LOG.warn("before compact"); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); + /* + * All ROW__IDs are unique on read after conversion to acid + * ROW__IDs are exactly the same before and after compaction + * Also check the file name (only) after compaction for completeness + */ + String[][] expected = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t0\t13", "bucket_00000", "000000_0_copy_1"}, + {"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000", "bucket_00000"}, + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000", "bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000", "bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "bucket_00000", "000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t1\t4", "bucket_00000", "000000_0_copy_1"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t1\t5", "bucket_00000", "000000_0_copy_1"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":6}\t1\t6", "bucket_00000", "000000_0_copy_2"}, + {"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\t1\t16", "bucket_00000", "bucket_00000"} + }; + Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size()); + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][2])); + } + //run Compaction + runStatementOnDriver("alter table "+ Table.NONACIDNONBUCKET +" compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + /* + nonacidnonbucket/ + âââ 000000_0 + âââ 000000_0_copy_1 + âââ 000000_0_copy_2 + âââ base_0000021 + â  âââ bucket_00000 + âââ delete_delta_0000019_0000019_0000 + â  âââ bucket_00000 + âââ delete_delta_0000021_0000021_0000 + â  âââ bucket_00000 + âââ delta_0000018_0000018_0000 + â  âââ bucket_00000 + âââ delta_0000019_0000019_0000 + â  âââ bucket_00000 + âââ delta_0000020_0000020_0000 + âââ bucket_00000 + + 6 directories, 9 files + */ + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a,b"); + LOG.warn("after compact"); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertEquals("Unexpected row count after compaction", expected.length, rs.size()); + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); + } + //make sure they are the same before and after compaction + } + /** + * Currently CTAS doesn't support bucketed tables. Correspondingly Acid only supports CTAS for + * unbucketed tables. This test is here to make sure that if CTAS is made to support unbucketed + * tables, that it raises a red flag for Acid. + */ + @Test + public void testCtasBucketed() throws Exception { + runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,2),(1,3)"); + CommandProcessorResponse cpr = runStatementOnDriverNegative("create table myctas " + + "clustered by (a) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true') as " + + "select a, b from " + Table.NONACIDORCTBL); + int j = ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode();//this code doesn't propagate +// Assert.assertEquals("Wrong msg", ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode(), cpr.getErrorCode()); + Assert.assertTrue(cpr.getErrorMessage().contains("CREATE-TABLE-AS-SELECT does not support")); + } +} + http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index f73d058..4c30732 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -3420,6 +3420,7 @@ public class TestInputOutputFormat { assertTrue(split.toString().contains("hasFooter=false")); assertTrue(split.toString().contains("hasBase=true")); assertTrue(split.toString().contains("deltas=0")); + assertTrue(split.toString().contains("isOriginal=true")); if (split instanceof OrcSplit) { assertFalse("No footer serialize test for non-vector reader, hasFooter is not expected in" + " orc splits.", ((OrcSplit) split).hasFooter()); @@ -3435,11 +3436,13 @@ public class TestInputOutputFormat { } // call-1: open to read footer - split 1 => mock:/mocktable5/0_0 // call-2: open to read data - split 1 => mock:/mocktable5/0_0 - // call-3: open to read footer - split 2 => mock:/mocktable5/0_1 - // call-4: open to read data - split 2 => mock:/mocktable5/0_1 - // call-5: AcidUtils.getAcidState - getLen() mock:/mocktable5/0_0 - // call-6: AcidUtils.getAcidState - getLen() mock:/mocktable5/0_1 - assertEquals(6, readOpsDelta); + // call-3: getAcidState - split 1 => mock:/mocktable5 (to compute offset for original read) + // call-4: open to read footer - split 2 => mock:/mocktable5/0_1 + // call-5: open to read data - split 2 => mock:/mocktable5/0_1 + // call-6: getAcidState - split 2 => mock:/mocktable5 (to compute offset for original read) + // call-7: open to read footer - split 2 => mock:/mocktable5/0_0 (to get row count) + // call-8: file status - split 2 => mock:/mocktable5/0_0 + assertEquals(8, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3498,6 +3501,7 @@ public class TestInputOutputFormat { assertTrue(split.toString().contains("hasFooter=true")); assertTrue(split.toString().contains("hasBase=true")); assertTrue(split.toString().contains("deltas=0")); + assertTrue(split.toString().contains("isOriginal=true")); if (split instanceof OrcSplit) { assertTrue("Footer serialize test for ACID reader, hasFooter is expected in" + " orc splits.", ((OrcSplit) split).hasFooter()); @@ -3512,10 +3516,12 @@ public class TestInputOutputFormat { } } // call-1: open to read data - split 1 => mock:/mocktable6/0_0 - // call-2: open to read data - split 2 => mock:/mocktable6/0_1 - // call-3: AcidUtils.getAcidState - getLen() mock:/mocktable6/0_0 - // call-4: AcidUtils.getAcidState - getLen() mock:/mocktable6/0_1 - assertEquals(4, readOpsDelta); + // call-2: AcidUtils.getAcidState - split 1 => ls mock:/mocktable6 + // call-3: open to read data - split 2 => mock:/mocktable6/0_1 + // call-4: AcidUtils.getAcidState - split 2 => ls mock:/mocktable6 + // call-5: read footer - split 2 => mock:/mocktable6/0_0 (to get offset since it's original file) + // call-6: file stat - split 2 => mock:/mocktable6/0_0 + assertEquals(6, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3883,7 +3889,7 @@ public class TestInputOutputFormat { conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0,2"); OrcSplit split = new OrcSplit(testFilePath, null, 0, fileLength, new String[0], null, false, true, - new ArrayList<AcidInputFormat.DeltaMetaData>(), fileLength, fileLength); + new ArrayList<AcidInputFormat.DeltaMetaData>(), fileLength, fileLength, workDir); OrcInputFormat inputFormat = new OrcInputFormat(); AcidInputFormat.RowReader<OrcStruct> reader = inputFormat.getReader(split, new AcidInputFormat.Options(conf)); @@ -3911,7 +3917,7 @@ public class TestInputOutputFormat { conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0,2,3"); split = new OrcSplit(testFilePath, null, 0, fileLength, new String[0], null, false, true, - new ArrayList<AcidInputFormat.DeltaMetaData>(), fileLength, fileLength); + new ArrayList<AcidInputFormat.DeltaMetaData>(), fileLength, fileLength, workDir); inputFormat = new OrcInputFormat(); reader = inputFormat.getReader(split, new AcidInputFormat.Options(conf)); record = 0; http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 82cf108..9628a40 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -826,12 +826,9 @@ public class TestOrcRawRecordMerger { merger.close(); //now run as if it's a minor Compaction so we don't collapse events - //it's not exactly like minor compaction since MC would not have a baseReader //here there is only 1 "split" since we only have data for 1 bucket - baseReader = OrcFile.createReader(basePath, - OrcFile.readerOptions(conf)); merger = - new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET, + new OrcRawRecordMerger(conf, false, null, false, BUCKET, createMaximalTxnList(), new Reader.Options(), AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(true)); assertEquals(null, merger.getMinKey()); @@ -844,40 +841,86 @@ public class TestOrcRawRecordMerger { assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.DELETE_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id); + assertNull(OrcRecordUpdater.getRow(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.DELETE_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id); + assertNull(OrcRecordUpdater.getRow(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.DELETE_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id); + assertNull(OrcRecordUpdater.getRow(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.DELETE_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id); + assertNull(OrcRecordUpdater.getRow(event)); + + //data from delta_200_200 + assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 0), id); - assertEquals("first", getValue(event)); + assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id); + assertEquals("update 1", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id); - assertEquals("second", getValue(event)); + assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id); + assertEquals("update 2", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id); + assertEquals("update 3", getValue(event)); + + assertEquals(false, merger.next(id, event)); + merger.close(); + + //now run as if it's a major Compaction so we collapse events + //here there is only 1 "split" since we only have data for 1 bucket + baseReader = OrcFile.createReader(basePath, + OrcFile.readerOptions(conf)); + merger = + new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, + createMaximalTxnList(), new Reader.Options(), + AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options() + .isCompacting(true).isMajorCompaction(true)); + assertEquals(null, merger.getMinKey()); + assertEquals(null, merger.getMaxKey()); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id); assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 0), id); - assertEquals("third", getValue(event)); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id); + assertEquals("second", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id); assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); - assertEquals(OrcRecordUpdater.INSERT_OPERATION, + assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 0), id); - assertEquals("fourth", getValue(event)); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id); + assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, @@ -904,12 +947,6 @@ public class TestOrcRawRecordMerger { assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); - assertEquals(OrcRecordUpdater.INSERT_OPERATION, - OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 0), id); - assertEquals("eighth", getValue(event)); - - assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id); @@ -918,12 +955,6 @@ public class TestOrcRawRecordMerger { assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 0), id); - assertEquals("ninth", getValue(event)); - - assertEquals(true, merger.next(id, event)); - assertEquals(OrcRecordUpdater.INSERT_OPERATION, - OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 9, 0), id); assertEquals("tenth", getValue(event)); @@ -949,7 +980,6 @@ public class TestOrcRawRecordMerger { assertEquals(false, merger.next(id, event)); merger.close(); - // try ignoring the 200 transaction and make sure it works still ValidTxnList txns = new ValidReadTxnList("2000:200:200"); //again 1st split is for base/ http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/queries/clientnegative/create_not_acid.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/create_not_acid.q b/ql/src/test/queries/clientnegative/create_not_acid.q index 8d6c9ac..8ed13fb 100644 --- a/ql/src/test/queries/clientnegative/create_not_acid.q +++ b/ql/src/test/queries/clientnegative/create_not_acid.q @@ -2,5 +2,5 @@ set hive.support.concurrency=true; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; -create table acid_notbucketed(a int, b varchar(128)) stored as orc TBLPROPERTIES ('transactional'='true'); +create table acid_notbucketed(a int, b varchar(128)) TBLPROPERTIES ('transactional'='true'); http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/queries/clientpositive/acid_no_buckets.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/acid_no_buckets.q b/ql/src/test/queries/clientpositive/acid_no_buckets.q new file mode 100644 index 0000000..c2f713e --- /dev/null +++ b/ql/src/test/queries/clientpositive/acid_no_buckets.q @@ -0,0 +1,210 @@ +--this has 4 groups of tests +--Acid tables w/o bucketing +--the tests with bucketing (make sure we get the same results) +--same tests with and w/o vectorization + +set hive.mapred.mode=nonstrict; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.vectorized.execution.enabled=false; +set hive.explain.user=false; +set hive.merge.cardinality.check=true; + +drop table if exists srcpart_acid; +CREATE TABLE srcpart_acid (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default'); +insert into srcpart_acid PARTITION (ds, hr) select * from srcpart; + +--2 rows for 413, 1 row for 43, 2 for 213, 1 for 44 in kv1.txt (in each partition) +select ds, hr, key, value from srcpart_acid where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer); + +analyze table srcpart_acid PARTITION(ds, hr) compute statistics; +analyze table srcpart_acid PARTITION(ds, hr) compute statistics for columns; +explain update srcpart_acid set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'; +update srcpart_acid set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'; +select ds, hr, key, value from srcpart_acid where value like '%updated' order by ds, hr, cast(key as integer); + +insert into srcpart_acid PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003'); +select ds, hr, key, value from srcpart_acid where cast(key as integer) > 1000 order by ds, hr, cast(key as integer); + +analyze table srcpart_acid PARTITION(ds, hr) compute statistics; +analyze table srcpart_acid PARTITION(ds, hr) compute statistics for columns; +explain delete from srcpart_acid where key in( '1001', '213', '43'); +--delete some rows from initial load, some that were updated and some that were inserted +delete from srcpart_acid where key in( '1001', '213', '43'); + +--make sure we deleted everything that should've been deleted +select count(*) from srcpart_acid where key in( '1001', '213', '43'); +--make sure nothing extra was deleted (2000 + 3 (insert) - 4 - 1 - 8 = 1990) +select count(*) from srcpart_acid; + +--todo: should really have a way to run compactor here.... + +--update should match 1 rows in 1 partition +--delete should drop everything from 1 partition +--insert should do nothing +merge into srcpart_acid t using (select distinct ds, hr, key, value from srcpart_acid) s +on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value +when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge') +when matched and s.ds='2008-04-08' and s.hr=='12' then delete +when not matched then insert values('this','should','not','be there'); + +--check results +--should be 0 +select count(*) from srcpart_acid where ds='2008-04-08' and hr=='12'; +--should be 1 rows +select ds, hr, key, value from srcpart_acid where value like '%updated by merge'; +--should be 0 +select count(*) from srcpart_acid where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'; +drop table if exists srcpart_acid; + + +drop table if exists srcpart_acidb; +CREATE TABLE srcpart_acidb (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) CLUSTERED BY(key) INTO 2 BUCKETS stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default'); +insert into srcpart_acidb PARTITION (ds, hr) select * from srcpart; + +--2 rows for 413, 1 row for 43, 2 for 213, 2 for 12 in kv1.txt (in each partition) +select ds, hr, key, value from srcpart_acidb where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer); + +analyze table srcpart_acidb PARTITION(ds, hr) compute statistics; +analyze table srcpart_acidb PARTITION(ds, hr) compute statistics for columns; +explain update srcpart_acidb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'; +update srcpart_acidb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'; +select ds, hr, key, value from srcpart_acidb where value like '%updated' order by ds, hr, cast(key as integer); + +insert into srcpart_acidb PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003'); +select ds, hr, key, value from srcpart_acidb where cast(key as integer) > 1000 order by ds, hr, cast(key as integer); + +analyze table srcpart_acidb PARTITION(ds, hr) compute statistics; +analyze table srcpart_acidb PARTITION(ds, hr) compute statistics for columns; +explain delete from srcpart_acidb where key in( '1001', '213', '43'); +--delete some rows from initial load, some that were updated and some that were inserted +delete from srcpart_acidb where key in( '1001', '213', '43'); + +--make sure we deleted everything that should've been deleted +select count(*) from srcpart_acidb where key in( '1001', '213', '43'); +--make sure nothing extra was deleted (2000 + 3 (insert) - 4 - 1 - 8 = 1990) +select count(*) from srcpart_acidb; + + +--todo: should really have a way to run compactor here.... + +--update should match 1 rows in 1 partition +--delete should drop everything from 1 partition +--insert should do nothing +merge into srcpart_acidb t using (select distinct ds, hr, key, value from srcpart_acidb) s +on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value +when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge') +when matched and s.ds='2008-04-08' and s.hr=='12' then delete +when not matched then insert values('this','should','not','be there'); + +--check results +--should be 0 +select count(*) from srcpart_acidb where ds='2008-04-08' and hr=='12'; +--should be 1 rows +select ds, hr, key, value from srcpart_acidb where value like '%updated by merge'; +--should be 0 +select count(*) from srcpart_acidb where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'; +drop table if exists srcpart_acidb; + + + +--now same thing but vectorized +set hive.vectorized.execution.enabled=true; + +drop table if exists srcpart_acidv; +CREATE TABLE srcpart_acidv (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default'); +insert into srcpart_acidv PARTITION (ds, hr) select * from srcpart; + +--2 rows for 413, 21 row for 43, 2 for 213, 2 for 12 in kv1.txt (in each partition) +select ds, hr, key, value from srcpart_acidv where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer); + +analyze table srcpart_acidv PARTITION(ds, hr) compute statistics; +analyze table srcpart_acidv PARTITION(ds, hr) compute statistics for columns; +explain update srcpart_acidv set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'; +update srcpart_acidv set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'; +select ds, hr, key, value from srcpart_acidv where value like '%updated' order by ds, hr, cast(key as integer); + +insert into srcpart_acidv PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003'); +select ds, hr, key, value from srcpart_acidv where cast(key as integer) > 1000 order by ds, hr, cast(key as integer); + +analyze table srcpart_acidv PARTITION(ds, hr) compute statistics; +analyze table srcpart_acidv PARTITION(ds, hr) compute statistics for columns; +explain delete from srcpart_acidv where key in( '1001', '213', '43'); +--delete some rows from initial load, some that were updated and some that were inserted +delete from srcpart_acidv where key in( '1001', '213', '43'); + +--make sure we deleted everything that should've been deleted +select count(*) from srcpart_acidv where key in( '1001', '213', '43'); +--make sure nothing extra was deleted (2000 + 3 - 4 - 1 - 8 = 1990) +select count(*) from srcpart_acidv; + +--todo: should really have a way to run compactor here.... + +--update should match 1 rows in 1 partition +--delete should drop everything from 1 partition +--insert should do nothing +merge into srcpart_acidv t using (select distinct ds, hr, key, value from srcpart_acidv) s +on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value +when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge') +when matched and s.ds='2008-04-08' and s.hr=='12' then delete +when not matched then insert values('this','should','not','be there'); + +--check results +--should be 0 +select count(*) from srcpart_acidv where ds='2008-04-08' and hr=='12'; +--should be 1 rows +select ds, hr, key, value from srcpart_acidv where value like '%updated by merge'; +--should be 0 +select count(*) from srcpart_acidv where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'; +drop table if exists srcpart_acidv; + + + +drop table if exists srcpart_acidvb; +CREATE TABLE srcpart_acidvb (key STRING, value STRING) PARTITIONED BY (ds STRING, hr STRING) CLUSTERED BY(key) INTO 2 BUCKETS stored as ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default'); +insert into srcpart_acidvb PARTITION (ds, hr) select * from srcpart; + +--2 rows for 413, 1 row for 43, 2 for 213, 2 for 12 in kv1.txt (in each partition) +select ds, hr, key, value from srcpart_acidvb where cast(key as integer) in(413,43) and hr='11' order by ds, hr, cast(key as integer); + +analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics; +analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics for columns; +explain update srcpart_acidvb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'; +update srcpart_acidvb set value = concat(value, 'updated') where cast(key as integer) in(413,43) and hr='11'; +select ds, hr, key, value from srcpart_acidvb where value like '%updated' order by ds, hr, cast(key as integer); + +insert into srcpart_acidvb PARTITION (ds='2008-04-08', hr=='11') values ('1001','val1001'),('1002','val1002'),('1003','val1003'); +select ds, hr, key, value from srcpart_acidvb where cast(key as integer) > 1000 order by ds, hr, cast(key as integer); + +analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics; +analyze table srcpart_acidvb PARTITION(ds, hr) compute statistics for columns; +explain delete from srcpart_acidvb where key in( '1001', '213', '43'); +--delete some rows from initial load, some that were updated and some that were inserted +delete from srcpart_acidvb where key in( '1001', '213', '43'); + +--make sure we deleted everything that should've been deleted +select count(*) from srcpart_acidvb where key in( '1001', '213', '43'); +--make sure nothing extra was deleted (2000 + 3 (insert) - 4 - 1 - 8 = 1990) +select count(*) from srcpart_acidvb; + + +--todo: should really have a way to run compactor here.... + +--update should match 1 rows in 1 partition +--delete should drop everything from 1 partition +--insert should do nothing +merge into srcpart_acidvb t using (select distinct ds, hr, key, value from srcpart_acidvb) s +on s.ds=t.ds and s.hr=t.hr and s.key=t.key and s.value=t.value +when matched and s.ds='2008-04-08' and s.hr=='11' and s.key='44' then update set value=concat(s.value,'updated by merge') +when matched and s.ds='2008-04-08' and s.hr=='12' then delete +when not matched then insert values('this','should','not','be there'); + +--check results +--should be 0 +select count(*) from srcpart_acidvb where ds='2008-04-08' and hr=='12'; +--should be 1 rows +select ds, hr, key, value from srcpart_acidvb where value like '%updated by merge'; +--should be 0 +select count(*) from srcpart_acidvb where ds = 'this' and hr = 'should' and key = 'not' and value = 'be there'; +drop table if exists srcpart_acidvb; http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/results/clientnegative/create_not_acid.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientnegative/create_not_acid.q.out b/ql/src/test/results/clientnegative/create_not_acid.q.out index bb8f6c9..4e775e5 100644 --- a/ql/src/test/results/clientnegative/create_not_acid.q.out +++ b/ql/src/test/results/clientnegative/create_not_acid.q.out @@ -1,5 +1,5 @@ -PREHOOK: query: create table acid_notbucketed(a int, b varchar(128)) stored as orc TBLPROPERTIES ('transactional'='true') +PREHOOK: query: create table acid_notbucketed(a int, b varchar(128)) TBLPROPERTIES ('transactional'='true') PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@acid_notbucketed -FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:The table must be bucketed and stored using an ACID compliant format (such as ORC)) +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:The table must be stored using an ACID compliant format (such as ORC)) http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/results/clientnegative/delete_non_acid_table.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientnegative/delete_non_acid_table.q.out b/ql/src/test/results/clientnegative/delete_non_acid_table.q.out index a9b884a..19fd5fb 100644 --- a/ql/src/test/results/clientnegative/delete_non_acid_table.q.out +++ b/ql/src/test/results/clientnegative/delete_non_acid_table.q.out @@ -34,4 +34,4 @@ POSTHOOK: Input: default@not_an_acid_table2 -1070883071 0ruyd6Y50JpdGRf6HqD -1070551679 iUR3Q -1069736047 k17Am8uPHWk02cEf1jet -FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table default.not_an_acid_table2 that does not use an AcidOutputFormat or is not bucketed +FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table default.not_an_acid_table2 that is not transactional http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/test/results/clientnegative/update_non_acid_table.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientnegative/update_non_acid_table.q.out b/ql/src/test/results/clientnegative/update_non_acid_table.q.out index 381b0db..02946fc 100644 --- a/ql/src/test/results/clientnegative/update_non_acid_table.q.out +++ b/ql/src/test/results/clientnegative/update_non_acid_table.q.out @@ -34,4 +34,4 @@ POSTHOOK: Input: default@not_an_acid_table -1070883071 0ruyd6Y50JpdGRf6HqD -1070551679 iUR3Q -1069736047 k17Am8uPHWk02cEf1jet -FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table default.not_an_acid_table that does not use an AcidOutputFormat or is not bucketed +FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table default.not_an_acid_table that is not transactional