http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 236e585..5c13781 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -34,8 +34,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StringableMap; -import org.apache.hadoop.hive.common.ValidCompactorTxnList; -import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -110,7 +110,7 @@ public class CompactorMR { } private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, StorageDescriptor sd, - ValidTxnList txns, CompactionInfo ci) { + ValidWriteIdList writeIds, CompactionInfo ci) { JobConf job = new JobConf(conf); job.setJobName(jobName); job.setOutputKeyClass(NullWritable.class); @@ -135,7 +135,7 @@ public class CompactorMR { job.setBoolean(IS_COMPRESSED, sd.isCompressed()); job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString()); job.setInt(NUM_BUCKETS, sd.getNumBuckets()); - job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); + job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString()); overrideMRProps(job, t.getParameters()); // override MR properties from tblproperties if applicable if (ci.properties != null) { overrideTblProps(job, t.getParameters(), ci.properties); @@ -197,12 +197,12 @@ public class CompactorMR { * @param jobName name to run this job with * @param t metastore table * @param sd metastore storage descriptor - * @param txns list of valid transactions + * @param writeIds list of valid write ids * @param ci CompactionInfo * @throws java.io.IOException if the job fails */ - void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, - ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException { + void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, ValidWriteIdList writeIds, + CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException { if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); @@ -213,18 +213,18 @@ public class CompactorMR { if (AcidUtils.isInsertOnlyTable(t.getParameters())) { LOG.debug("Going to delete directories for aborted transactions for MM table " + t.getDbName() + "." + t.getTableName()); - removeFiles(conf, sd.getLocation(), txns, t); + removeFiles(conf, sd.getLocation(), writeIds, t); return; } - JobConf job = createBaseJobConf(conf, jobName, t, sd, txns, ci); + JobConf job = createBaseJobConf(conf, jobName, t, sd, writeIds, ci); // Figure out and encode what files we need to read. We do this here (rather than in // getSplits below) because as part of this we discover our minimum and maximum transactions, // and discovering that in getSplits is too late as we then have no way to pass it to our // mapper. - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns, false, true); + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds, false, true); List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories(); int maxDeltastoHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA); if(parsedDeltas.size() > maxDeltastoHandle) { @@ -242,14 +242,14 @@ public class CompactorMR { "runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API."); int numMinorCompactions = parsedDeltas.size() / maxDeltastoHandle; for(int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) { - JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, txns, ci); + JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, writeIds, ci); launchCompactionJob(jobMinorCompact, null, CompactionType.MINOR, null, parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle), maxDeltastoHandle, -1, conf, txnHandler, ci.id, jobName); } //now recompute state since we've done minor compactions and have different 'best' set of deltas - dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns); + dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds); } StringableList dirsToSearch = new StringableList(); @@ -280,8 +280,8 @@ public class CompactorMR { if (parsedDeltas.size() == 0 && dir.getOriginalFiles().size() == 0) { // Skip compaction if there's no delta files AND there's no original files String minOpenInfo = "."; - if(txns.getMinOpenTxn() != null) { - minOpenInfo = " with min Open " + JavaUtils.txnIdToString(txns.getMinOpenTxn()) + + if(writeIds.getMinOpenWriteId() != null) { + minOpenInfo = " with min Open " + JavaUtils.writeIdToString(writeIds.getMinOpenWriteId()) + ". Compaction cannot compact above this txnid"; } LOG.error("No delta files or original files found to compact in " + sd.getLocation() + @@ -316,8 +316,8 @@ public class CompactorMR { LOG.debug("Adding delta " + delta.getPath() + " to directories to search"); dirsToSearch.add(delta.getPath()); deltaDirs.add(delta.getPath()); - minTxn = Math.min(minTxn, delta.getMinTransaction()); - maxTxn = Math.max(maxTxn, delta.getMaxTransaction()); + minTxn = Math.min(minTxn, delta.getMinWriteId()); + maxTxn = Math.max(maxTxn, delta.getMaxWriteId()); } if (baseDir != null) job.set(BASE_DIR, baseDir.toString()); @@ -379,9 +379,9 @@ public class CompactorMR { } // Remove the directories for aborted transactions only - private void removeFiles(HiveConf conf, String location, ValidTxnList txnList, Table t) + private void removeFiles(HiveConf conf, String location, ValidWriteIdList writeIdList, Table t) throws IOException { - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, txnList, + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, writeIdList, Ref.from(false), false, t.getParameters()); // For MM table, we only want to delete delta dirs for aborted txns. List<FileStatus> abortedDirs = dir.getAbortedDirectories(); @@ -718,13 +718,13 @@ public class CompactorMR { @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class AcidInputFormat<WritableComparable, V> aif = instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME)); - ValidTxnList txnList = - new ValidCompactorTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY)); + ValidWriteIdList writeIdList = + new ValidCompactorWriteIdList(jobConf.get(ValidWriteIdList.VALID_WRITEIDS_KEY)); boolean isMajor = jobConf.getBoolean(IS_MAJOR, false); AcidInputFormat.RawReader<V> reader = aif.getRawReader(jobConf, isMajor, split.getBucket(), - txnList, split.getBaseDir(), split.getDeltaDirs()); + writeIdList, split.getBaseDir(), split.getDeltaDirs()); RecordIdentifier identifier = reader.createKey(); V value = reader.createValue(); getWriter(reporter, reader.getObjectInspector(), split.getBucket()); @@ -779,8 +779,8 @@ public class CompactorMR { .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false)) .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()) .reporter(reporter) - .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) - .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) + .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) + .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) .bucket(bucket) .statementId(-1);//setting statementId == -1 makes compacted delta files use //delta_xxxx_yyyy format @@ -804,8 +804,8 @@ public class CompactorMR { .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false)) .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()) .reporter(reporter) - .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) - .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) + .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) + .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) .bucket(bucket) .statementId(-1);//setting statementId == -1 makes compacted delta files use //delta_xxxx_yyyy format @@ -926,8 +926,8 @@ public class CompactorMR { AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .writingBase(conf.getBoolean(IS_MAJOR, false)) .isCompressed(conf.getBoolean(IS_COMPRESSED, false)) - .minimumTransactionId(conf.getLong(MIN_TXN, Long.MAX_VALUE)) - .maximumTransactionId(conf.getLong(MAX_TXN, Long.MIN_VALUE)) + .minimumWriteId(conf.getLong(MIN_TXN, Long.MAX_VALUE)) + .maximumWriteId(conf.getLong(MAX_TXN, Long.MIN_VALUE)) .bucket(0) .statementId(-1); Path newDeltaDir = AcidUtils.createFilename(finalLocation, options).getParent();
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index a52e023..7eda7fb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -21,11 +21,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionResponse; import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; @@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -88,8 +90,6 @@ public class Initiator extends CompactorThread { startedAt = System.currentTimeMillis(); //todo: add method to only get current i.e. skip history - more efficient ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); - ValidTxnList txns = - TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold); LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); @@ -143,12 +143,21 @@ public class Initiator extends CompactorThread { ", assuming it has been dropped and moving on."); continue; } + + // Compaction doesn't work under a transaction and hence pass null for validTxnList + // The response will have one entry per table and hence we get only one ValidWriteIdList + String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); + GetValidWriteIdsRequest rqst + = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName), null); + ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList( + txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); + StorageDescriptor sd = resolveStorageDescriptor(t, p); String runAs = findUserToRunAs(sd.getLocation(), t); /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive. * Long term we should consider having a thread pool here and running checkForCompactionS * in parallel*/ - CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, t.getParameters(), runAs); + CompactionType compactionNeeded = checkForCompaction(ci, tblValidWriteIds, sd, t.getParameters(), runAs); if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded); } catch (Throwable t) { LOG.error("Caught exception while trying to determine if we should compact " + @@ -215,7 +224,7 @@ public class Initiator extends CompactorThread { } private CompactionType checkForCompaction(final CompactionInfo ci, - final ValidTxnList txns, + final ValidWriteIdList writeIds, final StorageDescriptor sd, final Map<String, String> tblproperties, final String runAs) @@ -227,7 +236,7 @@ public class Initiator extends CompactorThread { return CompactionType.MAJOR; } if (runJobAsSelf(runAs)) { - return determineCompactionType(ci, txns, sd, tblproperties); + return determineCompactionType(ci, writeIds, sd, tblproperties); } else { LOG.info("Going to initiate as user " + runAs); UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, @@ -235,7 +244,7 @@ public class Initiator extends CompactorThread { CompactionType compactionType = ugi.doAs(new PrivilegedExceptionAction<CompactionType>() { @Override public CompactionType run() throws Exception { - return determineCompactionType(ci, txns, sd, tblproperties); + return determineCompactionType(ci, writeIds, sd, tblproperties); } }); try { @@ -248,7 +257,7 @@ public class Initiator extends CompactorThread { } } - private CompactionType determineCompactionType(CompactionInfo ci, ValidTxnList txns, + private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdList writeIds, StorageDescriptor sd, Map<String, String> tblproperties) throws IOException, InterruptedException { @@ -259,7 +268,7 @@ public class Initiator extends CompactorThread { boolean noBase = false; Path location = new Path(sd.getLocation()); FileSystem fs = location.getFileSystem(conf); - AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns, false, false); + AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, writeIds, false, false); Path base = dir.getBaseDirectory(); long baseSize = 0; FileStatus stat = null; http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index e5ebf9a..c47e78e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -18,16 +18,17 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.Driver; @@ -138,10 +139,15 @@ public class Worker extends CompactorThread { } final boolean isMajor = ci.isMajorCompaction(); - final ValidTxnList txns = - TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); - LOG.debug("ValidCompactTxnList: " + txns.writeToString()); - txnHandler.setCompactionHighestTxnId(ci, txns.getHighWatermark()); + + // Compaction doesn't work under a transaction and hence pass 0 for current txn Id + // The response will have one entry per table and hence we get only one OpenWriteIds + String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName), null); + final ValidWriteIdList tblValidWriteIds = + TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); + LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); + txnHandler.setCompactionHighestWriteId(ci, tblValidWriteIds.getHighWatermark()); final StringBuilder jobName = new StringBuilder(name); jobName.append("-compactor-"); jobName.append(ci.getFullPartitionName()); @@ -164,14 +170,14 @@ public class Worker extends CompactorThread { launchedJob = true; try { if (runJobAsSelf(runAs)) { - mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler); + mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler); } else { UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), UserGroupInformation.getLoginUser()); ugi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { - mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler); + mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler); return null; } }); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 2663061..45890ed 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hive.metastore.txn; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; @@ -56,7 +58,7 @@ import static junit.framework.Assert.assertTrue; import static junit.framework.Assert.fail; /** - * Tests for TxnHandler. + * Tests for CompactionTxnHandler. */ public class TestCompactionTxnHandler { @@ -478,6 +480,13 @@ public class TestCompactionTxnHandler { String tableName = "adp_table"; OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")); long txnId = openTxns.getTxn_ids().get(0); + + AllocateTableWriteIdsResponse writeIds + = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(openTxns.getTxn_ids(), dbName, tableName)); + long writeId = writeIds.getTxnToWriteIds().get(0).getWriteId(); + assertEquals(txnId, writeIds.getTxnToWriteIds().get(0).getTxnId()); + assertEquals(1, writeId); + // lock a table, as in dynamic partitions LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName); lc.setIsDynamicPartitionWrite(true); @@ -489,7 +498,7 @@ public class TestCompactionTxnHandler { LockResponse lock = txnHandler.lock(lr); assertEquals(LockState.ACQUIRED, lock.getState()); - AddDynamicPartitions adp = new AddDynamicPartitions(txnId, dbName, tableName, + AddDynamicPartitions adp = new AddDynamicPartitions(txnId, writeId, dbName, tableName, Arrays.asList("ds=yesterday", "ds=today")); adp.setOperationType(dop); txnHandler.addDynamicPartitions(adp); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 9eaf039..7b510dd 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -21,6 +21,8 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; import org.apache.hadoop.hive.metastore.api.CheckLockRequest; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; @@ -71,6 +73,7 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -149,7 +152,14 @@ public class TestTxnHandler { txnHandler.abortTxn(new AbortTxnRequest(1)); List<String> parts = new ArrayList<String>(); parts.add("p=1"); - AddDynamicPartitions adp = new AddDynamicPartitions(3, "default", "T", parts); + + AllocateTableWriteIdsResponse writeIds + = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(3L), "default", "T")); + long writeId = writeIds.getTxnToWriteIds().get(0).getWriteId(); + assertEquals(3, writeIds.getTxnToWriteIds().get(0).getTxnId()); + assertEquals(1, writeId); + + AddDynamicPartitions adp = new AddDynamicPartitions(3, writeId, "default", "T", parts); adp.setOperationType(DataOperationType.INSERT); txnHandler.addDynamicPartitions(adp); GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 2a1545f..470856b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -760,14 +760,18 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { Assert.assertEquals(536936448, BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1))); Assert.assertEquals("", 4, rs.size()); - Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12")); + Assert.assertTrue(rs.get(0), + rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12")); Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000000_0_copy_1")); - Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2")); + Assert.assertTrue(rs.get(1), + rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2")); Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0")); - Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5")); + Assert.assertTrue(rs.get(2), + rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5")); Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1")); - Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":16,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); - Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000016_0000016_0000/bucket_00001")); + Assert.assertTrue(rs.get(3), + rs.get(3).startsWith("{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000001_0000001_0000/bucket_00001")); //run Compaction runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); TestTxnCommands2.runWorker(hiveConf); @@ -777,14 +781,18 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { LOG.warn(s); } Assert.assertEquals("", 4, rs.size()); - Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12")); - Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000016/bucket_00000")); - Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2")); - Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000016/bucket_00001")); - Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5")); - Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000016/bucket_00001")); - Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":16,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); - Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000016/bucket_00001")); + Assert.assertTrue(rs.get(0), + rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000001/bucket_00000")); + Assert.assertTrue(rs.get(1), + rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000001/bucket_00001")); + Assert.assertTrue(rs.get(2), + rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000001/bucket_00001")); + Assert.assertTrue(rs.get(3), + rs.get(3).startsWith("{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000001/bucket_00001")); //make sure they are the same before and after compaction } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index bab6d5e..2eead9e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -34,7 +34,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; @@ -366,15 +366,15 @@ public class TestTxnCommands2 { * Note: order of rows in a file ends up being the reverse of order in values clause (why?!) */ String[][] expected = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13", "bucket_00000"}, - {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000"}, - {"{\"transactionid\":22,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000"}, - {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000"}, - {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2", "bucket_00001"}, - {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":3}\t1\t4", "bucket_00001"}, - {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "bucket_00001"}, - {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t6", "bucket_00001"}, - {"{\"transactionid\":20,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"} + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13", "bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000"}, + {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000"}, + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2", "bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":3}\t1\t4", "bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t6", "bucket_00001"}, + {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"} }; Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size()); for(int i = 0; i < expected.length; i++) { @@ -759,11 +759,11 @@ public class TestTxnCommands2 { FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Arrays.sort(buckets); if (numDelta == 1) { - Assert.assertEquals("delta_0000024_0000024_0000", status[i].getPath().getName()); + Assert.assertEquals("delta_0000001_0000001_0000", status[i].getPath().getName()); Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } else if (numDelta == 2) { - Assert.assertEquals("delta_0000025_0000025_0000", status[i].getPath().getName()); + Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName()); Assert.assertEquals(1, buckets.length); Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } @@ -772,7 +772,7 @@ public class TestTxnCommands2 { FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Arrays.sort(buckets); if (numDeleteDelta == 1) { - Assert.assertEquals("delete_delta_0000024_0000024_0000", status[i].getPath().getName()); + Assert.assertEquals("delete_delta_0000001_0000001_0000", status[i].getPath().getName()); Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } @@ -819,7 +819,7 @@ public class TestTxnCommands2 { Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } else if (numBase == 2) { // The new base dir now has two bucket files, since the delta dir has two bucket files - Assert.assertEquals("base_0000025", status[i].getPath().getName()); + Assert.assertEquals("base_0000002", status[i].getPath().getName()); Assert.assertEquals(1, buckets.length); Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } @@ -845,7 +845,7 @@ public class TestTxnCommands2 { status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); Assert.assertEquals(1, status.length); - Assert.assertEquals("base_0000025", status[0].getPath().getName()); + Assert.assertEquals("base_0000002", status[0].getPath().getName()); FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); Arrays.sort(buckets); Assert.assertEquals(1, buckets.length); @@ -861,7 +861,7 @@ public class TestTxnCommands2 { public void testValidTxnsBookkeeping() throws Exception { // 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf runStatementOnDriver("select * from " + Table.NONACIDORCTBL); - String value = hiveConf.get(ValidTxnList.VALID_TXNS_KEY); + String value = hiveConf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY); Assert.assertNull("The entry should be null for query that doesn't involve ACID tables", value); } @@ -874,9 +874,9 @@ public class TestTxnCommands2 { //this will cause next txn to be marked aborted but the data is still written to disk hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(tableData2)); - assert hiveConf.get(ValidTxnList.VALID_TXNS_KEY) == null : "previous txn should've cleaned it"; + assert hiveConf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY) == null : "previous txn should've cleaned it"; //so now if HIVEFETCHTASKCONVERSION were to use a stale value, it would use a - //ValidTxnList with HWM=MAX_LONG, i.e. include the data for aborted txn + //ValidWriteIdList with HWM=MAX_LONG, i.e. include the data for aborted txn List<String> rs = runStatementOnDriver("select * from " + Table.ACIDTBL); Assert.assertEquals("Extra data", 2, rs.size()); } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java index 3a3272f..0a305a4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -105,13 +105,13 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; String[][] expected = new String[][]{ - {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000020_0000020_0000/000000_0"}, - {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"}}; + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}}; checkResult(expected, testQuery, isVectorized, "load data inpath"); runStatementOnDriver("update T set b = 17 where a = 1"); String[][] expected2 = new String[][]{ - {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"}, - {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000023_0000023_0000/bucket_00000"} + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"} }; checkResult(expected2, testQuery, isVectorized, "update"); @@ -121,15 +121,15 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("alter table T compact 'minor'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected3 = new String[][] { - {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000020_0000027/bucket_00000"}, - {"{\"transactionid\":26,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000020_0000027/bucket_00000"} + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004/bucket_00000"}, + {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004/bucket_00000"} }; checkResult(expected3, testQuery, isVectorized, "delete compact minor"); runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' overwrite into table T"); String[][] expected4 = new String[][]{ - {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000031/000000_0"}, - {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000031/000000_0"}}; + {"{\"transactionid\":5,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000005/000000_0"}, + {"{\"transactionid\":5,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000005/000000_0"}}; checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite"); //load same data again (additive) @@ -138,9 +138,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("delete from T where a = 3");//matches 2 rows runStatementOnDriver("insert into T values(2,2)"); String[][] expected5 = new String[][]{ - {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000035_0000035_0000/bucket_00000"}, - {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000035_0000035_0000/bucket_00000"}, - {"{\"transactionid\":37,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000037_0000037_0000/bucket_00000"} + {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"}, + {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"}, + {"{\"transactionid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000009_0000009_0000/bucket_00000"} }; checkResult(expected5, testQuery, isVectorized, "load data inpath overwrite update"); @@ -148,9 +148,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("alter table T compact 'major'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected6 = new String[][]{ - {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000037/bucket_00000"}, - {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000037/bucket_00000"}, - {"{\"transactionid\":37,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000037/bucket_00000"} + {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000009/bucket_00000"}, + {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000009/bucket_00000"}, + {"{\"transactionid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009/bucket_00000"} }; checkResult(expected6, testQuery, isVectorized, "load data inpath compact major"); } @@ -173,22 +173,22 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; String[][] expected = new String[][] { - //normal insert - {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000016_0000/bucket_00000"}, - {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000016_0000/bucket_00000"}, - //Load Data - {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000021_0000021_0000/000000_0"}, - {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000021_0000021_0000/000000_0"}}; + //normal insert + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000001_0000/bucket_00000"}, + //Load Data + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000002_0000002_0000/000000_0"}, + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000002_0000002_0000/000000_0"}}; checkResult(expected, testQuery, isVectorized, "load data inpath"); //test minor compaction runStatementOnDriver("alter table T compact 'minor'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected1 = new String[][] { - {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000021/bucket_00000"}, - {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000021/bucket_00000"}, - {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000016_0000021/bucket_00000"}, - {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000016_0000021/bucket_00000"} + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000002/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000002/bucket_00000"}, + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000002/bucket_00000"}, + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000002/bucket_00000"} }; checkResult(expected1, testQuery, isVectorized, "load data inpath (minor)"); @@ -197,11 +197,11 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("alter table T compact 'major'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected2 = new String[][] { - {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000027/bucket_00000"}, - {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000027/bucket_00000"}, - {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000027/bucket_00000"}, - {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000027/bucket_00000"}, - {"{\"transactionid\":27,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000027/bucket_00000"} + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000003/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000003/bucket_00000"}, + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000003/bucket_00000"}, + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000003/bucket_00000"}, + {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000003/bucket_00000"} }; checkResult(expected2, testQuery, isVectorized, "load data inpath (major)"); @@ -210,8 +210,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'"); runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/2/data' overwrite into table T"); String[][] expected3 = new String[][] { - {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000033/000000_0"}, - {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000033/000000_0"}}; + {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000004/000000_0"}, + {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000004/000000_0"}}; checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite"); //one more major compaction @@ -219,9 +219,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("alter table T compact 'major'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected4 = new String[][] { - {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000036/bucket_00000"}, - {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000036/bucket_00000"}, - {"{\"transactionid\":36,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000036/bucket_00000"}}; + {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000005/bucket_00000"}, + {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000005/bucket_00000"}, + {"{\"transactionid\":5,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000005/bucket_00000"}}; checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite (major)"); } /** @@ -254,24 +254,23 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { /* {"transactionid":0,"bucketid":536870912,"rowid":0} 0 2/000000_0 {"transactionid":0,"bucketid":536870912,"rowid":1} 0 4/000000_0 -{"transactionid":24,"bucketid":536870912,"rowid":0} 4 4/delta_0000024_0000024_0000/000000_0 -{"transactionid":24,"bucketid":536870912,"rowid":1} 5 5/delta_0000024_0000024_0000/000000_0 +{"transactionid":1,"bucketid":536870912,"rowid":0} 4 4/delta_0000001_0000001_0000/000000_0 +{"transactionid":1,"bucketid":536870912,"rowid":1} 5 5/delta_0000001_0000001_0000/000000_0 */ String[][] expected = new String[][] { - //from pre-acid insert - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"}, - //from Load Data into acid converted table - {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000024_0000024_0000/000000_0"}, - {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000024_0000024_0000/000000_0"}, - {"{\"transactionid\":24,\"bucketid\":536936448,\"rowid\":0}\t2\t2", "t/delta_0000024_0000024_0000/000001_0"}, - {"{\"transactionid\":24,\"bucketid\":536936448,\"rowid\":1}\t3\t3", "t/delta_0000024_0000024_0000/000001_0"}, - {"{\"transactionid\":24,\"bucketid\":537001984,\"rowid\":0}\t4\t4", "t/delta_0000024_0000024_0000/000002_0"}, - {"{\"transactionid\":24,\"bucketid\":537001984,\"rowid\":1}\t5\t5", "t/delta_0000024_0000024_0000/000002_0"}, + //from pre-acid insert + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"}, + //from Load Data into acid converted table + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t2", "t/delta_0000001_0000001_0000/000001_0"}, + {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3", "t/delta_0000001_0000001_0000/000001_0"}, + {"{\"transactionid\":1,\"bucketid\":537001984,\"rowid\":0}\t4\t4", "t/delta_0000001_0000001_0000/000002_0"}, + {"{\"transactionid\":1,\"bucketid\":537001984,\"rowid\":1}\t5\t5", "t/delta_0000001_0000001_0000/000002_0"}, }; checkResult(expected, testQuery, isVectorized, "load data inpath"); - //create more staging data with copy_N files and do LD+Overwrite runStatementOnDriver("insert into Tstage values(5,6),(7,8)"); runStatementOnDriver("insert into Tstage values(8,8)"); @@ -279,9 +278,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' overwrite into table T"); String[][] expected2 = new String[][] { - {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000030/000000_0"}, - {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000030/000000_0"}, - {"{\"transactionid\":30,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000030/000001_0"} + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000002/000000_0"}, + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000002/000000_0"}, + {"{\"transactionid\":2,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000002/000001_0"} }; checkResult(expected2, testQuery, isVectorized, "load data inpath overwrite"); @@ -291,11 +290,10 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { TestTxnCommands2.runWorker(hiveConf); String[][] expected3 = new String[][] { - {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000033/bucket_00000"}, - {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000033/bucket_00000"}, - {"{\"transactionid\":30,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000033/bucket_00001"}, - {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t9\t9", "t/base_0000033/bucket_00000"} - + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000003/bucket_00000"}, + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000003/bucket_00000"}, + {"{\"transactionid\":2,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000003/bucket_00001"}, + {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t9\t9", "t/base_0000003/bucket_00000"} }; checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite (major)"); } @@ -326,12 +324,12 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { List<String> rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); String[][] expected = new String[][] { - {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000020_0000020_0000/000000_0"}, - {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000020_0000020_0000/000000_0"}, - {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t2", "t/p=1/delta_0000024_0000024_0000/000000_0"}, - {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4", "t/p=1/delta_0000024_0000024_0000/000000_0"}, - {"{\"transactionid\":28,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t2", "t/p=1/delta_0000028_0000028_0000/000000_0"}, - {"{\"transactionid\":28,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4", "t/p=1/delta_0000028_0000028_0000/000000_0"}}; + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t2", "t/p=1/delta_0000002_0000002_0000/000000_0"}, + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4", "t/p=1/delta_0000002_0000002_0000/000000_0"}, + {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t2", "t/p=1/delta_0000003_0000003_0000/000000_0"}, + {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4", "t/p=1/delta_0000003_0000003_0000/000000_0"}}; checkExpected(rs, expected, "load data inpath partitioned"); @@ -340,10 +338,10 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("truncate table Tstage"); runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/4/data' overwrite into table T partition(p=1)"); String[][] expected2 = new String[][] { - {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000020_0000020_0000/000000_0"}, - {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000020_0000020_0000/000000_0"}, - {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t1\t5\t2", "t/p=1/base_0000033/000000_0"}, - {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t1\t5\t4", "t/p=1/base_0000033/000000_0"}}; + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t1\t5\t2", "t/p=1/base_0000004/000000_0"}, + {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":1}\t1\t5\t4", "t/p=1/base_0000004/000000_0"}}; rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); checkExpected(rs, expected2, "load data inpath partitioned overwrite"); } @@ -405,20 +403,20 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; String[][] expected = new String[][] { - {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000019_0000019_0000/bucket_00000"}, - {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000019_0000019_0000/bucket_00000"}, - {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000019_0000019_0001/000000_0"}, - {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000019_0000019_0001/000000_0"} + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000001_0000001_0001/000000_0"}, + {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000001_0000001_0001/000000_0"} }; checkResult(expected, testQuery, isVectorized, "load data inpath"); runStatementOnDriver("alter table T compact 'major'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected2 = new String[][] { - {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000019/bucket_00000"}, - {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000019/bucket_00000"}, - {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000019/bucket_00000"}, - {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000019/bucket_00000"} + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000001/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000001/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000001/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000001/bucket_00000"} }; checkResult(expected2, testQuery, isVectorized, "load data inpath (major)"); //at lest for now, Load Data w/Overwrite is not allowed in a txn: HIVE-18154 @@ -444,8 +442,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; String[][] expected = new String[][] { - {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000019_0000019_0000/bucket_00000"}, - {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000019_0000019_0000/bucket_00000"} + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"} }; checkResult(expected, testQuery, isVectorized, "load data inpath"); }