Repository: hive Updated Branches: refs/heads/branch-1 f4020cfce -> e654efeb3
HVIE-11540 Too many delta files during Compaction - OOM (Eugene Koifman, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e654efeb Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e654efeb Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e654efeb Branch: refs/heads/branch-1 Commit: e654efeb32c62fb5cd56214b823526173cb009bb Parents: f4020cf Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Sat Oct 24 22:01:20 2015 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Sat Oct 24 22:01:20 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 15 +- .../hive/ql/txn/compactor/CompactorMR.java | 143 ++++++++++++------- .../hadoop/hive/ql/txn/compactor/Worker.java | 6 +- .../hive/ql/txn/compactor/CompactorTest.java | 4 + .../hive/ql/txn/compactor/TestWorker.java | 120 ++++++++++++++-- 6 files changed, 225 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e654efeb/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2febd39..4724523 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1500,6 +1500,8 @@ public class HiveConf extends Configuration { HIVE_COMPACTOR_DELTA_PCT_THRESHOLD("hive.compactor.delta.pct.threshold", 0.1f, "Percentage (fractional) size of the delta files relative to the base that will trigger\n" + "a major compaction. (1.0 = 100%, so the default 0.1 = 10%.)"), + COMPACTOR_MAX_NUM_DELTA("hive.compactor.max.num.delta", 500, "Maximum number of delta files that " + + "the compactor will attempt to handle in a single job."), HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD("hive.compactor.abortedtxn.threshold", 1000, "Number of aborted transactions involving a given table or partition that will trigger\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/e654efeb/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index c7e0780..8f60e9d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -129,6 +129,9 @@ public class AcidUtils { return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId); } + public static String baseDir(long txnId) { + return BASE_PREFIX + String.format(DELTA_DIGITS, txnId); + } /** * Create a filename for a bucket file. * @param directory the partition directory @@ -218,14 +221,16 @@ public class AcidUtils { Path getBaseDirectory(); /** - * Get the list of original files. + * Get the list of original files. Not {@code null}. * @return the list of original files (eg. 000000_0) */ List<FileStatus> getOriginalFiles(); /** * Get the list of base and delta directories that are valid and not - * obsolete. + * obsolete. Not {@code null}. List must be sorted in a specific way. + * See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta)} + * for details. * @return the minimal list of current directories */ List<ParsedDelta> getCurrentDirectories(); @@ -234,7 +239,7 @@ public class AcidUtils { * Get the list of obsolete directories. After filtering out bases and * deltas that are not selected by the valid transaction list, return the * list of original files, bases, and deltas that have been replaced by - * more up to date ones. + * more up to date ones. Not {@code null}. */ List<FileStatus> getObsolete(); } @@ -281,6 +286,7 @@ public class AcidUtils { * happens in a different process; thus it's possible to have bases/deltas with * overlapping txnId boundaries. The sort order helps figure out the "best" set of files * to use to get data. + * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before delta_5_10 (and delta_11_20) */ @Override public int compareTo(ParsedDelta parsedDelta) { @@ -493,6 +499,9 @@ public class AcidUtils { } Collections.sort(working); + //so now, 'working' should be sorted like delta_5_20 delta_5_10 delta_11_20 delta_51_60 for example + //and we want to end up with the best set containing all relevant data: delta_5_20 delta_51_60, + //subject to list of 'exceptions' in 'txnList' (not show in above example). long current = bestBaseTxn; int lastStmtId = -1; for(ParsedDelta next: working) { http://git-wip-us.apache.org/repos/asf/hive/blob/e654efeb/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index a45536e..436c36d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; @@ -93,18 +94,8 @@ public class CompactorMR { public CompactorMR() { } - /** - * Run a compactor job. - * @param conf Hive configuration file - * @param jobName name to run this job with - * @param t metastore table - * @param sd metastore storage descriptor - * @param txns list of valid transactions - * @param isMajor is this a major compaction? - * @throws java.io.IOException if the job fails - */ - void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, - ValidTxnList txns, boolean isMajor, Worker.StatsUpdater su) throws IOException { + private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, StorageDescriptor sd, + ValidTxnList txns) { JobConf job = new JobConf(conf); job.setJobName(jobName); job.setOutputKeyClass(NullWritable.class); @@ -116,7 +107,7 @@ public class CompactorMR { job.setInputFormat(CompactorInputFormat.class); job.setOutputFormat(NullOutputFormat.class); job.setOutputCommitter(CompactorOutputCommitter.class); - + String queueName = conf.getVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE); if(queueName != null && queueName.length() > 0) { job.setQueueName(queueName); @@ -126,12 +117,26 @@ public class CompactorMR { job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString()); job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat()); job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat()); - job.setBoolean(IS_MAJOR, isMajor); job.setBoolean(IS_COMPRESSED, sd.isCompressed()); job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString()); job.setInt(NUM_BUCKETS, sd.getNumBuckets()); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); setColumnTypes(job, sd.getCols()); + return job; + } + /** + * Run Compaction which may consist of several jobs on the cluster. + * @param conf Hive configuration file + * @param jobName name to run this job with + * @param t metastore table + * @param sd metastore storage descriptor + * @param txns list of valid transactions + * @param ci CompactionInfo + * @throws java.io.IOException if the job fails + */ + void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, + ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su) throws IOException { + JobConf job = createBaseJobConf(conf, jobName, t, sd, txns); // Figure out and encode what files we need to read. We do this here (rather than in // getSplits below) because as part of this we discover our minimum and maximum transactions, @@ -139,9 +144,36 @@ public class CompactorMR { // mapper. AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns); + List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories(); + int maxDeltastoHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA); + if(parsedDeltas.size() > maxDeltastoHandle) { + /** + * if here, that means we have very high number of delta files. This may be sign of a temporary + * glitch or a real issue. For example, if transaction batch size or transaction size is set too + * low for the event flow rate in Streaming API, it may generate lots of delta files very + * quickly. Another possibility is that Compaction is repeatedly failing and not actually compacting. + * Thus, force N minor compactions first to reduce number of deltas and then follow up with + * the compaction actually requested in {@link ci} which now needs to compact a lot fewer deltas + */ + LOG.warn(parsedDeltas.size() + " delta files found for " + ci.getFullPartitionName() + + " located at " + sd.getLocation() + "! This is likely a sign of misconfiguration, " + + "especially if this message repeats. Check that compaction is running properly. Check for any " + + "runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API."); + int numMinorCompactions = parsedDeltas.size() / maxDeltastoHandle; + for(int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) { + JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, txns); + launchCompactionJob(jobMinorCompact, + null, CompactionType.MINOR, null, + parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle), + maxDeltastoHandle, -1); + } + //now recompute state since we've done minor compactions and have different 'best' set of deltas + dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns); + } + StringableList dirsToSearch = new StringableList(); Path baseDir = null; - if (isMajor) { + if (ci.isMajorCompaction()) { // There may not be a base dir if the partition was empty before inserts or if this // partition is just now being converted to ACID. baseDir = dir.getBaseDirectory(); @@ -163,14 +195,26 @@ public class CompactorMR { } } - List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories(); - - if (parsedDeltas == null || parsedDeltas.size() == 0) { + if (parsedDeltas.size() == 0) { // Seriously, no deltas? Can't compact that. LOG.error( "No delta files found to compact in " + sd.getLocation()); + //couldn't someone want to run a Major compaction to convert old table to ACID? return; } + launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(), + dir.getCurrentDirectories().size(), dir.getObsolete().size()); + + su.gatherStats(); + } + private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType, + StringableList dirsToSearch, + List<AcidUtils.ParsedDelta> parsedDeltas, + int curDirNumber, int obsoleteDirNumber) throws IOException { + job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR); + if(dirsToSearch == null) { + dirsToSearch = new StringableList(); + } StringableList deltaDirs = new StringableList(); long minTxn = Long.MAX_VALUE; long maxTxn = Long.MIN_VALUE; @@ -187,18 +231,15 @@ public class CompactorMR { job.set(DIRS_TO_SEARCH, dirsToSearch.toString()); job.setLong(MIN_TXN, minTxn); job.setLong(MAX_TXN, maxTxn); - LOG.debug("Setting minimum transaction to " + minTxn); - LOG.debug("Setting maximume transaction to " + maxTxn); + LOG.info("Submitting " + compactionType + " compaction job '" + + job.getJobName() + "' to " + job.getQueueName() + " queue. " + + "(current delta dirs count=" + curDirNumber + + ", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]"); RunningJob rj = JobClient.runJob(job); - LOG.info("Submitted " + (isMajor ? CompactionType.MAJOR : CompactionType.MINOR) + " compaction job '" + - jobName + "' with jobID=" + rj.getID() + " to " + job.getQueueName() + " queue. " + - "(current delta dirs count=" + dir.getCurrentDirectories().size() + - ", obsolete delta dirs count=" + dir.getObsolete()); + LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID()); rj.waitForCompletion(); - su.gatherStats(); } - /** * Set the column names and types into the job conf for the input format * to use. @@ -245,8 +286,8 @@ public class CompactorMR { * @throws IOException */ CompactorInputSplit(Configuration hadoopConf, int bucket, List<Path> files, Path base, - Path[] deltas) - throws IOException { + Path[] deltas) + throws IOException { bucketNum = bucket; this.base = base; this.deltas = deltas; @@ -396,7 +437,7 @@ public class CompactorMR { // If this is a base or delta directory, then we need to be looking for the bucket files. // But if it's a legacy file then we need to add it directly. if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) || - dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) { + dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) { boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter); for(FileStatus f : files) { @@ -414,7 +455,7 @@ public class CompactorMR { for (Map.Entry<Integer, BucketTracker> e : splitToBucketMap.entrySet()) { BucketTracker bt = e.getValue(); splits.add(new CompactorInputSplit(entries, e.getKey(), bt.buckets, - bt.sawBase ? baseDir : null, deltaDirs)); + bt.sawBase ? baseDir : null, deltaDirs)); } LOG.debug("Returning " + splits.size() + " splits"); @@ -423,7 +464,7 @@ public class CompactorMR { @Override public RecordReader<NullWritable, CompactorInputSplit> getRecordReader( - InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException { + InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException { return new CompactorRecordReader((CompactorInputSplit)inputSplit); } @@ -431,7 +472,7 @@ public class CompactorMR { Map<Integer, BucketTracker> splitToBucketMap) { if (!matcher.find()) { LOG.warn("Found a non-bucket file that we thought matched the bucket pattern! " + - file.toString() + " Matcher=" + matcher.toString()); + file.toString() + " Matcher=" + matcher.toString()); } int bucketNum = Integer.valueOf(matcher.group()); BucketTracker bt = splitToBucketMap.get(bucketNum); @@ -456,7 +497,7 @@ public class CompactorMR { } static class CompactorRecordReader - implements RecordReader<NullWritable, CompactorInputSplit> { + implements RecordReader<NullWritable, CompactorInputSplit> { private CompactorInputSplit split; CompactorRecordReader(CompactorInputSplit split) { @@ -501,7 +542,7 @@ public class CompactorMR { } static class CompactorMap<V extends Writable> - implements Mapper<WritableComparable, CompactorInputSplit, NullWritable, NullWritable> { + implements Mapper<WritableComparable, CompactorInputSplit, NullWritable, NullWritable> { JobConf jobConf; RecordWriter writer; @@ -515,15 +556,15 @@ public class CompactorMR { // Based on the split we're passed we go instantiate the real reader and then iterate on it // until it finishes. @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class - AcidInputFormat<WritableComparable, V> aif = - instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME)); + AcidInputFormat<WritableComparable, V> aif = + instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME)); ValidTxnList txnList = - new ValidReadTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY)); + new ValidReadTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY)); boolean isMajor = jobConf.getBoolean(IS_MAJOR, false); AcidInputFormat.RawReader<V> reader = - aif.getRawReader(jobConf, isMajor, split.getBucket(), - txnList, split.getBaseDir(), split.getDeltaDirs()); + aif.getRawReader(jobConf, isMajor, split.getBucket(), + txnList, split.getBaseDir(), split.getDeltaDirs()); RecordIdentifier identifier = reader.createKey(); V value = reader.createValue(); getWriter(reporter, reader.getObjectInspector(), split.getBucket()); @@ -551,20 +592,20 @@ public class CompactorMR { if (writer == null) { AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf); options.inspector(inspector) - .writingBase(jobConf.getBoolean(IS_MAJOR, false)) - .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false)) - .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()) - .reporter(reporter) - .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) - .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) - .bucket(bucket) - .statementId(-1);//setting statementId == -1 makes compacted delta files use + .writingBase(jobConf.getBoolean(IS_MAJOR, false)) + .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false)) + .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()) + .reporter(reporter) + .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) + .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) + .bucket(bucket) + .statementId(-1);//setting statementId == -1 makes compacted delta files use //delta_xxxx_yyyy format // Instantiate the underlying output format @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class - AcidOutputFormat<WritableComparable, V> aof = - instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME)); + AcidOutputFormat<WritableComparable, V> aof = + instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME)); writer = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options); } @@ -719,7 +760,7 @@ public class CompactorMR { Path finalLocation = new Path(conf.get(FINAL_LOCATION)); FileSystem fs = tmpLocation.getFileSystem(conf); LOG.debug("Moving contents of " + tmpLocation.toString() + " to " + - finalLocation.toString()); + finalLocation.toString()); FileStatus[] contents = fs.listStatus(tmpLocation); for (int i = 0; i < contents.length; i++) { @@ -738,4 +779,4 @@ public class CompactorMR { fs.delete(tmpLocation, true); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/e654efeb/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 0548117..cc7441a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -77,7 +77,7 @@ public class Worker extends CompactorThread { // Make sure nothing escapes this run method and kills the metastore at large, // so wrap it in a big catch Throwable statement. try { - CompactionInfo ci = txnHandler.findNextToCompact(name); + final CompactionInfo ci = txnHandler.findNextToCompact(name); if (ci == null && !stop.get()) { try { @@ -158,14 +158,14 @@ public class Worker extends CompactorThread { launchedJob = true; try { if (runJobAsSelf(runAs)) { - mr.run(conf, jobName.toString(), t, sd, txns, isMajor, su); + mr.run(conf, jobName.toString(), t, sd, txns, ci, su); } else { UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), UserGroupInformation.getLoginUser()); ugi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { - mr.run(conf, jobName.toString(), t, sd, txns, isMajor, su); + mr.run(conf, jobName.toString(), t, sd, txns, ci, su); return null; } }); http://git-wip-us.apache.org/repos/asf/hive/blob/e654efeb/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 5a8c932..39c0571 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -516,6 +516,10 @@ public abstract class CompactorTest { abstract boolean useHive130DeltaDirName(); String makeDeltaDirName(long minTxnId, long maxTxnId) { + if(minTxnId != maxTxnId) { + //covers both streaming api and post compaction style. + return makeDeltaDirNameCompacted(minTxnId, maxTxnId); + } return useHive130DeltaDirName() ? AcidUtils.deltaSubdir(minTxnId, maxTxnId, 0) : AcidUtils.deltaSubdir(minTxnId, maxTxnId); } http://git-wip-us.apache.org/repos/asf/hive/blob/e654efeb/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 11e5333..245e839 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.*; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -29,6 +30,7 @@ import org.junit.Test; import java.io.*; import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -37,6 +39,10 @@ import java.util.Map; /** * Tests for the worker thread and its MR jobs. + * todo: most delta files in this test suite use txn id range, i.e. [N,N+M] + * That means that they all look like they were created by compaction or by streaming api. + * Delta files created by SQL should have [N,N] range (and a suffix in v1.3 and later) + * Need to change some of these to have better test coverage. */ public class TestWorker extends CompactorTest { static final private String CLASS_NAME = TestWorker.class.getName(); @@ -325,18 +331,14 @@ public class TestWorker extends CompactorTest { // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - boolean is130 = this instanceof TestWorker2; - Assert.assertEquals(is130 ? 5 : 4, stat.length); + Assert.assertEquals(4, stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); Assert.assertEquals("base_20", stat[0].getPath().getName()); - if(is130) {//in1.3.0 orig delta is delta_00021_00022_0000 and compacted one is delta_00021_00022... - Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[1].getPath().getName()); - } - Assert.assertEquals(makeDeltaDirName(21, 22), stat[1 + (is130 ? 1 : 0)].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(23, 25), stat[2 + (is130 ? 1 : 0)].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(26, 27), stat[3 + (is130 ? 1 : 0)].getPath().getName()); + Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[1].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName()); } @Test @@ -508,6 +510,108 @@ public class TestWorker extends CompactorTest { } @Test + public void minorNoBaseLotsOfDeltas() throws Exception { + compactNoBaseLotsOfDeltas(CompactionType.MINOR); + } + @Test + public void majorNoBaseLotsOfDeltas() throws Exception { + compactNoBaseLotsOfDeltas(CompactionType.MAJOR); + } + private void compactNoBaseLotsOfDeltas(CompactionType type) throws Exception { + conf.setIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA, 2); + Table t = newTable("default", "mapwb", true); + Partition p = newPartition(t, "today"); + +// addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 21L, 2); + addDeltaFile(t, p, 23L, 23L, 2); + //make it look like streaming API use case + addDeltaFile(t, p, 25L, 29L, 2); + addDeltaFile(t, p, 31L, 32L, 3); + //make it looks like 31-32 has been compacted, but not cleaned + addDeltaFile(t, p, 31L, 33L, 5); + addDeltaFile(t, p, 35L, 35L, 1); + + /*since COMPACTOR_MAX_NUM_DELTA=2, + we expect files 1,2 to be minor compacted by 1 job to produce delta_21_23 + * 3,5 to be minor compacted by 2nd job (file 4 is obsolete) to make delta_25_33 (4th is skipped) + * + * and then the 'requested' + * minor compaction to combine delta_21_23, delta_25_33 and delta_35_35 to make delta_21_35 + * or major compaction to create base_35*/ + burnThroughTransactions(35); + CompactionRequest rqst = new CompactionRequest("default", "mapwb", type); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + + startWorker(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); + Assert.assertEquals(9, stat.length); + + // Find the new delta file and make sure it has the right contents + BitSet matchesFound = new BitSet(9); + for (int i = 0; i < stat.length; i++) { + if(stat[i].getPath().getName().equals(makeDeltaDirName(21,21))) { + matchesFound.set(0); + } + else if(stat[i].getPath().getName().equals(makeDeltaDirName(23, 23))) { + matchesFound.set(1); + } + else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25, 29))) { + matchesFound.set(2); + } + else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 32))) { + matchesFound.set(3); + } + else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 33))) { + matchesFound.set(4); + } + else if(stat[i].getPath().getName().equals(makeDeltaDirName(35, 35))) { + matchesFound.set(5); + } + else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,23))) { + matchesFound.set(6); + } + else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25,33))) { + matchesFound.set(7); + } + switch (type) { + //yes, both do set(8) + case MINOR: + if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,35))) { + matchesFound.set(8); + } + break; + case MAJOR: + if(stat[i].getPath().getName().equals(AcidUtils.baseDir(35))) { + matchesFound.set(8); + } + break; + default: + throw new IllegalStateException(); + } + } + StringBuilder sb = null; + for(int i = 0; i < stat.length; i++) { + if(!matchesFound.get(i)) { + if(sb == null) { + sb = new StringBuilder("Some files are missing at index: "); + } + sb.append(i).append(","); + } + } + if (sb != null) { + Assert.assertTrue(sb.toString(), false); + } + } + @Test public void majorPartitionWithBase() throws Exception { LOG.debug("Starting majorPartitionWithBase"); Table t = newTable("default", "mapwb", true);