Author: gates Date: Mon Jan 26 21:21:45 2015 New Revision: 1654899 URL: http://svn.apache.org/r1654899 Log: HIVE-8966 Delta files created by hive hcatalog streaming cannot be compacted (Jihong Liu and Alan Gates, reviewed by Owen O'Malley)
Added: hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java Added: hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java?rev=1654899&view=auto ============================================================================== --- hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java (added) +++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java Mon Jan 26 21:21:45 2015 @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import java.util.Arrays; + +/** + * An implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by readers. + * This class will view a transaction as valid only if it is committed. Both open and aborted + * transactions will be seen as invalid. + */ +public class ValidReadTxnList implements ValidTxnList { + + protected long[] exceptions; + protected long highWatermark; + + public ValidReadTxnList() { + this(new long[0], Long.MAX_VALUE); + } + + public ValidReadTxnList(long[] exceptions, long highWatermark) { + if (exceptions.length == 0) { + this.exceptions = exceptions; + } else { + this.exceptions = exceptions.clone(); + Arrays.sort(this.exceptions); + } + this.highWatermark = highWatermark; + } + + public ValidReadTxnList(String value) { + readFromString(value); + } + + @Override + public boolean isTxnValid(long txnid) { + if (highWatermark < txnid) { + return false; + } + return Arrays.binarySearch(exceptions, txnid) < 0; + } + + @Override + public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { + // check the easy cases first + if (highWatermark < minTxnId) { + return RangeResponse.NONE; + } else if (exceptions.length > 0 && exceptions[0] > maxTxnId) { + return RangeResponse.ALL; + } + + // since the exceptions and the range in question overlap, count the + // exceptions in the range + long count = Math.max(0, maxTxnId - highWatermark); + for(long txn: exceptions) { + if (minTxnId <= txn && txn <= maxTxnId) { + count += 1; + } + } + + if (count == 0) { + return RangeResponse.ALL; + } else if (count == (maxTxnId - minTxnId + 1)) { + return RangeResponse.NONE; + } else { + return RangeResponse.SOME; + } + } + + @Override + public String toString() { + return writeToString(); + } + + @Override + public String writeToString() { + StringBuilder buf = new StringBuilder(); + buf.append(highWatermark); + if (exceptions.length == 0) { + buf.append(':'); + } else { + for(long except: exceptions) { + buf.append(':'); + buf.append(except); + } + } + return buf.toString(); + } + + @Override + public void readFromString(String src) { + if (src == null) { + highWatermark = Long.MAX_VALUE; + exceptions = new long[0]; + } else { + String[] values = src.split(":"); + highWatermark = Long.parseLong(values[0]); + exceptions = new long[values.length - 1]; + for(int i = 1; i < values.length; ++i) { + exceptions[i-1] = Long.parseLong(values[i]); + } + } + } + + @Override + public long getHighWatermark() { + return highWatermark; + } + + @Override + public long[] getInvalidTransactions() { + return exceptions; + } +} + Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java (original) +++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java Mon Jan 26 21:21:45 2015 @@ -38,21 +38,23 @@ public interface ValidTxnList { public enum RangeResponse {NONE, SOME, ALL}; /** - * Indicates whether a given transaction has been committed and should be - * viewed as valid for read. + * Indicates whether a given transaction is valid. Note that valid may have different meanings + * for different implementations, as some will only want to see committed transactions and some + * both committed and aborted. * @param txnid id for the transaction - * @return true if committed, false otherwise + * @return true if valid, false otherwise */ - public boolean isTxnCommitted(long txnid); + public boolean isTxnValid(long txnid); /** - * Find out if a range of transaction ids have been committed. + * Find out if a range of transaction ids are valid. Note that valid may have different meanings + * for different implementations, as some will only want to see committed transactions and some + * both committed and aborted. * @param minTxnId minimum txnid to look for, inclusive * @param maxTxnId maximum txnid to look for, inclusive - * @return Indicate whether none, some, or all of these transactions have been - * committed. + * @return Indicate whether none, some, or all of these transactions are valid. */ - public RangeResponse isTxnRangeCommitted(long minTxnId, long maxTxnId); + public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId); /** * Write this validTxnList into a string. This should produce a string that @@ -74,9 +76,10 @@ public interface ValidTxnList { public long getHighWatermark(); /** - * Get the list of transactions under the high water mark that are still - * open. - * @return a list of open transaction ids + * Get the list of transactions under the high water mark that are not valid. Note that invalid + * may have different meanings for different implementations, as some will only want to see open + * transactions and some both open and aborted. + * @return a list of invalid transaction ids */ - public long[] getOpenTransactions(); + public long[] getInvalidTransactions(); } Added: hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java URL: http://svn.apache.org/viewvc/hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java?rev=1654899&view=auto ============================================================================== --- hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java (added) +++ hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java Mon Jan 26 21:21:45 2015 @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; + +/** + * Tests for {@link ValidReadTxnList} + */ +public class TestValidReadTxnList { + + @Test + public void noExceptions() throws Exception { + ValidTxnList txnList = new ValidReadTxnList(new long[0], 1); + String str = txnList.writeToString(); + Assert.assertEquals("1:", str); + ValidTxnList newList = new ValidReadTxnList(); + newList.readFromString(str); + Assert.assertTrue(newList.isTxnValid(1)); + Assert.assertFalse(newList.isTxnValid(2)); + } + + @Test + public void exceptions() throws Exception { + ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, 5); + String str = txnList.writeToString(); + Assert.assertEquals("5:2:4", str); + ValidTxnList newList = new ValidReadTxnList(); + newList.readFromString(str); + Assert.assertTrue(newList.isTxnValid(1)); + Assert.assertFalse(newList.isTxnValid(2)); + Assert.assertTrue(newList.isTxnValid(3)); + Assert.assertFalse(newList.isTxnValid(4)); + Assert.assertTrue(newList.isTxnValid(5)); + Assert.assertFalse(newList.isTxnValid(6)); + } + + @Test + public void longEnoughToCompress() throws Exception { + long[] exceptions = new long[1000]; + for (int i = 0; i < 1000; i++) exceptions[i] = i + 100; + ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000); + String str = txnList.writeToString(); + ValidTxnList newList = new ValidReadTxnList(); + newList.readFromString(str); + for (int i = 0; i < 100; i++) Assert.assertTrue(newList.isTxnValid(i)); + for (int i = 100; i < 1100; i++) Assert.assertFalse(newList.isTxnValid(i)); + for (int i = 1100; i < 2001; i++) Assert.assertTrue(newList.isTxnValid(i)); + Assert.assertFalse(newList.isTxnValid(2001)); + } + + @Test + public void readWriteConfig() throws Exception { + long[] exceptions = new long[1000]; + for (int i = 0; i < 1000; i++) exceptions[i] = i + 100; + ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000); + String str = txnList.writeToString(); + Configuration conf = new Configuration(); + conf.set(ValidTxnList.VALID_TXNS_KEY, str); + File tmpFile = File.createTempFile("TestValidTxnImpl", "readWriteConfig"); + DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpFile)); + conf.write(out); + out.close(); + DataInputStream in = new DataInputStream(new FileInputStream(tmpFile)); + Configuration newConf = new Configuration(); + newConf.readFields(in); + Assert.assertEquals(str, newConf.get(ValidTxnList.VALID_TXNS_KEY)); + } +} Modified: hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java (original) +++ hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java Mon Jan 26 21:21:45 2015 @@ -107,7 +107,7 @@ public class StreamingIntegrationTester .hasArgs() .withArgName("partition-values") .withDescription("partition values, must be provided in order of partition columns, " + - "if not provided table is assumed to not be partitioned") + "if not provided table is assumed to not be partitioned") .withLongOpt("partition") .withValueSeparator(',') .create('p')); @@ -264,7 +264,7 @@ public class StreamingIntegrationTester this.batches = batches; this.writerNumber = writerNumber; this.recordsPerTxn = recordsPerTxn; - this.frequency = frequency; + this.frequency = frequency * 1000; this.abortPct = abortPct; this.cols = cols; this.types = types; @@ -279,8 +279,8 @@ public class StreamingIntegrationTester conn = endPoint.newConnection(true); RecordWriter writer = new DelimitedInputWriter(cols, ",", endPoint); - long start = System.currentTimeMillis(); for (int i = 0; i < batches; i++) { + long start = System.currentTimeMillis(); LOG.info("Starting batch " + i); TransactionBatch batch = conn.fetchTransactionBatch(txnsPerBatch, writer); try { Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java (original) +++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java Mon Jan 26 21:21:45 2015 @@ -19,12 +19,8 @@ package org.apache.hadoop.hive.metastore import junit.framework.Assert; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidTxnListImpl; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.LockComponentBuilder; -import org.apache.hadoop.hive.metastore.LockRequestBuilder; import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockState; @@ -69,10 +65,10 @@ public class TestHiveMetaStoreTxns { client.rollbackTxn(1); client.commitTxn(2); ValidTxnList validTxns = client.getValidTxns(); - Assert.assertFalse(validTxns.isTxnCommitted(1)); - Assert.assertTrue(validTxns.isTxnCommitted(2)); - Assert.assertFalse(validTxns.isTxnCommitted(3)); - Assert.assertFalse(validTxns.isTxnCommitted(4)); + Assert.assertFalse(validTxns.isTxnValid(1)); + Assert.assertTrue(validTxns.isTxnValid(2)); + Assert.assertFalse(validTxns.isTxnValid(3)); + Assert.assertFalse(validTxns.isTxnValid(4)); } @Test @@ -84,17 +80,17 @@ public class TestHiveMetaStoreTxns { client.rollbackTxn(1); client.commitTxn(2); ValidTxnList validTxns = client.getValidTxns(3); - Assert.assertFalse(validTxns.isTxnCommitted(1)); - Assert.assertTrue(validTxns.isTxnCommitted(2)); - Assert.assertTrue(validTxns.isTxnCommitted(3)); - Assert.assertFalse(validTxns.isTxnCommitted(4)); + Assert.assertFalse(validTxns.isTxnValid(1)); + Assert.assertTrue(validTxns.isTxnValid(2)); + Assert.assertTrue(validTxns.isTxnValid(3)); + Assert.assertFalse(validTxns.isTxnValid(4)); } @Test public void testTxnRange() throws Exception { ValidTxnList validTxns = client.getValidTxns(); Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeCommitted(1L, 3L)); + validTxns.isTxnRangeValid(1L, 3L)); List<Long> tids = client.openTxns("me", 5).getTxn_ids(); HeartbeatTxnRangeResponse rsp = client.heartbeatTxnRange(1, 5); @@ -108,43 +104,43 @@ public class TestHiveMetaStoreTxns { validTxns = client.getValidTxns(); System.out.println("validTxns = " + validTxns); Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeCommitted(2L, 2L)); + validTxns.isTxnRangeValid(2L, 2L)); Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeCommitted(2L, 3L)); + validTxns.isTxnRangeValid(2L, 3L)); Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeCommitted(2L, 4L)); + validTxns.isTxnRangeValid(2L, 4L)); Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeCommitted(3L, 4L)); + validTxns.isTxnRangeValid(3L, 4L)); Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeCommitted(1L, 4L)); + validTxns.isTxnRangeValid(1L, 4L)); Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeCommitted(2L, 5L)); + validTxns.isTxnRangeValid(2L, 5L)); Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeCommitted(1L, 2L)); + validTxns.isTxnRangeValid(1L, 2L)); Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeCommitted(4L, 5L)); + validTxns.isTxnRangeValid(4L, 5L)); Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeCommitted(1L, 1L)); + validTxns.isTxnRangeValid(1L, 1L)); Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeCommitted(5L, 10L)); + validTxns.isTxnRangeValid(5L, 10L)); - validTxns = new ValidTxnListImpl("10:4:5:6"); + validTxns = new ValidReadTxnList("10:4:5:6"); Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeCommitted(4,6)); + validTxns.isTxnRangeValid(4,6)); Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeCommitted(7, 10)); + validTxns.isTxnRangeValid(7, 10)); Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeCommitted(7, 11)); + validTxns.isTxnRangeValid(7, 11)); Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeCommitted(3, 6)); + validTxns.isTxnRangeValid(3, 6)); Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeCommitted(4, 7)); + validTxns.isTxnRangeValid(4, 7)); Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeCommitted(1, 12)); + validTxns.isTxnRangeValid(1, 12)); Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeCommitted(1, 3)); + validTxns.isTxnRangeValid(1, 3)); } @Test @@ -219,32 +215,32 @@ public class TestHiveMetaStoreTxns { @Test public void stringifyValidTxns() throws Exception { // Test with just high water mark - ValidTxnList validTxns = new ValidTxnListImpl("1:"); + ValidTxnList validTxns = new ValidReadTxnList("1:"); String asString = validTxns.toString(); Assert.assertEquals("1:", asString); - validTxns = new ValidTxnListImpl(asString); + validTxns = new ValidReadTxnList(asString); Assert.assertEquals(1, validTxns.getHighWatermark()); - Assert.assertNotNull(validTxns.getOpenTransactions()); - Assert.assertEquals(0, validTxns.getOpenTransactions().length); + Assert.assertNotNull(validTxns.getInvalidTransactions()); + Assert.assertEquals(0, validTxns.getInvalidTransactions().length); asString = validTxns.toString(); Assert.assertEquals("1:", asString); - validTxns = new ValidTxnListImpl(asString); + validTxns = new ValidReadTxnList(asString); Assert.assertEquals(1, validTxns.getHighWatermark()); - Assert.assertNotNull(validTxns.getOpenTransactions()); - Assert.assertEquals(0, validTxns.getOpenTransactions().length); + Assert.assertNotNull(validTxns.getInvalidTransactions()); + Assert.assertEquals(0, validTxns.getInvalidTransactions().length); // Test with open transactions - validTxns = new ValidTxnListImpl("10:5:3"); + validTxns = new ValidReadTxnList("10:5:3"); asString = validTxns.toString(); if (!asString.equals("10:3:5") && !asString.equals("10:5:3")) { Assert.fail("Unexpected string value " + asString); } - validTxns = new ValidTxnListImpl(asString); + validTxns = new ValidReadTxnList(asString); Assert.assertEquals(10, validTxns.getHighWatermark()); - Assert.assertNotNull(validTxns.getOpenTransactions()); - Assert.assertEquals(2, validTxns.getOpenTransactions().length); + Assert.assertNotNull(validTxns.getInvalidTransactions()); + Assert.assertEquals(2, validTxns.getInvalidTransactions().length); boolean sawThree = false, sawFive = false; - for (long tid : validTxns.getOpenTransactions()) { + for (long tid : validTxns.getInvalidTransactions()) { if (tid == 3) sawThree = true; else if (tid == 5) sawFive = true; else Assert.fail("Unexpected value " + tid); Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java (original) +++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java Mon Jan 26 21:21:45 2015 @@ -1,8 +1,12 @@ package org.apache.hadoop.hive.ql.txn.compactor; -import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -15,20 +19,28 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.streaming.DelimitedInputWriter; import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.apache.hive.hcatalog.streaming.StreamingConnection; +import org.apache.hive.hcatalog.streaming.StreamingException; import org.apache.hive.hcatalog.streaming.TransactionBatch; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -184,7 +196,7 @@ public class TestCompactor { Assert.assertEquals("numNdv a", 1, colAStats.getNumDVs()); StringColumnStatsData colBStats = colStats.get(1).getStatsData().getStringStats(); Assert.assertEquals("maxColLen b", 3, colBStats.getMaxColLen()); - Assert.assertEquals("avgColLen b", 3.0, colBStats.getAvgColLen()); + Assert.assertEquals("avgColLen b", 3.0, colBStats.getAvgColLen(), 0.01); Assert.assertEquals("numNulls b", 0, colBStats.getNumNulls()); Assert.assertEquals("nunDVs", 2, colBStats.getNumDVs()); @@ -268,6 +280,299 @@ public class TestCompactor { colBStatsPart2, colStats.get(1).getStatsData().getStringStats()); } + @Test + public void minorCompactWhileStreaming() throws Exception { + String dbName = "default"; + String tblName = "cws"; + List<String> colNames = Arrays.asList("a", "b"); + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC", driver); + + HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); + DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); + StreamingConnection connection = endPt.newConnection(false); + try { + // Write a couple of batches + for (int i = 0; i < 2; i++) { + writeBatch(connection, writer, false); + } + + // Start a third batch, but don't close it. + writeBatch(connection, writer, true); + + // Now, compact + CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); + String[] names = new String[stat.length]; + Path resultFile = null; + for (int i = 0; i < names.length; i++) { + names[i] = stat[i].getPath().getName(); + if (names[i].equals("delta_0000001_0000004")) { + resultFile = stat[i].getPath(); + } + } + Arrays.sort(names); + Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002", + "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"}); + checkExpectedTxnsPresent(null, new Path[]{resultFile}, 0, 1L, 4L); + + } finally { + connection.close(); + } + } + + @Test + public void majorCompactWhileStreaming() throws Exception { + String dbName = "default"; + String tblName = "cws"; + List<String> colNames = Arrays.asList("a", "b"); + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC", driver); + + HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); + DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); + StreamingConnection connection = endPt.newConnection(false); + try { + // Write a couple of batches + for (int i = 0; i < 2; i++) { + writeBatch(connection, writer, false); + } + + // Start a third batch, but don't close it. + writeBatch(connection, writer, true); + + // Now, compact + CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter); + Assert.assertEquals(1, stat.length); + String name = stat[0].getPath().getName(); + Assert.assertEquals(name, "base_0000004"); + checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L); + } finally { + connection.close(); + } + } + + @Test + public void minorCompactAfterAbort() throws Exception { + String dbName = "default"; + String tblName = "cws"; + List<String> colNames = Arrays.asList("a", "b"); + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC", driver); + + HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); + DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); + StreamingConnection connection = endPt.newConnection(false); + try { + // Write a couple of batches + for (int i = 0; i < 2; i++) { + writeBatch(connection, writer, false); + } + + // Start a third batch, abort everything, don't properly close it + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.abort(); + txnBatch.beginNextTransaction(); + txnBatch.abort(); + + // Now, compact + CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); + String[] names = new String[stat.length]; + Path resultDelta = null; + for (int i = 0; i < names.length; i++) { + names[i] = stat[i].getPath().getName(); + if (names[i].equals("delta_0000001_0000006")) { + resultDelta = stat[i].getPath(); + } + } + Arrays.sort(names); + Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002", + "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"}); + checkExpectedTxnsPresent(null, new Path[]{resultDelta}, 0, 1L, 4L); + } finally { + connection.close(); + } + } + + @Test + public void majorCompactAfterAbort() throws Exception { + String dbName = "default"; + String tblName = "cws"; + List<String> colNames = Arrays.asList("a", "b"); + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC", driver); + + HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); + DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); + StreamingConnection connection = endPt.newConnection(false); + try { + // Write a couple of batches + for (int i = 0; i < 2; i++) { + writeBatch(connection, writer, false); + } + + // Start a third batch, but don't close it. + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.abort(); + txnBatch.beginNextTransaction(); + txnBatch.abort(); + + + // Now, compact + CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter); + Assert.assertEquals(1, stat.length); + String name = stat[0].getPath().getName(); + Assert.assertEquals(name, "base_0000006"); + checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L); + } finally { + connection.close(); + } + } + private void writeBatch(StreamingConnection connection, DelimitedInputWriter writer, + boolean closeEarly) + throws InterruptedException, StreamingException { + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("50,Kiev".getBytes()); + txnBatch.write("51,St. Petersburg".getBytes()); + txnBatch.write("44,Boston".getBytes()); + txnBatch.commit(); + + if (!closeEarly) { + txnBatch.beginNextTransaction(); + txnBatch.write("52,Tel Aviv".getBytes()); + txnBatch.write("53,Atlantis".getBytes()); + txnBatch.write("53,Boston".getBytes()); + txnBatch.commit(); + + txnBatch.close(); + } + } + + private void checkExpectedTxnsPresent(Path base, Path[] deltas, int bucket, long min, long max) + throws IOException { + ValidTxnList txnList = new ValidTxnList() { + @Override + public boolean isTxnValid(long txnid) { + return true; + } + + @Override + public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { + return RangeResponse.ALL; + } + + @Override + public String writeToString() { + return ""; + } + + @Override + public void readFromString(String src) { + + } + + @Override + public long getHighWatermark() { + return Long.MAX_VALUE; + } + + @Override + public long[] getInvalidTransactions() { + return new long[0]; + } + }; + + OrcInputFormat aif = new OrcInputFormat(); + + AcidInputFormat.RawReader<OrcStruct> reader = + aif.getRawReader(new Configuration(), false, bucket, txnList, base, deltas); + RecordIdentifier identifier = reader.createKey(); + OrcStruct value = reader.createValue(); + long currentTxn = min; + boolean seenCurrentTxn = false; + while (reader.next(identifier, value)) { + if (!seenCurrentTxn) { + Assert.assertEquals(currentTxn, identifier.getTransactionId()); + seenCurrentTxn = true; + } + if (currentTxn != identifier.getTransactionId()) { + Assert.assertEquals(currentTxn + 1, identifier.getTransactionId()); + currentTxn++; + } + } + Assert.assertEquals(max, currentTxn); + } + /** * convenience method to execute a select stmt and dump results to log file */ Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (original) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java Mon Jan 26 21:21:45 2015 @@ -1747,12 +1747,12 @@ public class HiveMetaStoreClient impleme @Override public ValidTxnList getValidTxns() throws TException { - return TxnHandler.createValidTxnList(client.get_open_txns(), 0); + return TxnHandler.createValidReadTxnList(client.get_open_txns(), 0); } @Override public ValidTxnList getValidTxns(long currentTxn) throws TException { - return TxnHandler.createValidTxnList(client.get_open_txns(), currentTxn); + return TxnHandler.createValidReadTxnList(client.get_open_txns(), currentTxn); } @Override Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Mon Jan 26 21:21:45 2015 @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.util.StringUtils; @@ -576,6 +577,25 @@ public class CompactionTxnHandler extend return findColumnsWithStats(ci); } } + + /** + * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a + * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to + * compact the files, and thus treats only open transactions as invalid. + * @param txns txn list from the metastore + * @return a valid txn list. + */ + public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) { + long highWater = txns.getTxn_high_water_mark(); + long minOpenTxn = Long.MAX_VALUE; + long[] exceptions = new long[txns.getOpen_txnsSize()]; + int i = 0; + for (TxnInfo txn : txns.getOpen_txns()) { + if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId()); + exceptions[i++] = txn.getId(); + } + return new ValidCompactorTxnList(exceptions, minOpenTxn, highWater); + } } Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Mon Jan 26 21:21:45 2015 @@ -29,7 +29,7 @@ import org.apache.commons.dbcp.PoolingDa import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidTxnListImpl; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.shims.ShimLoader; @@ -249,14 +249,15 @@ public class TxnHandler { /** * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a - * {@link org.apache.hadoop.hive.common.ValidTxnList}. + * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to + * read the files, and thus treats both open and aborted transactions as invalid. * @param txns txn list from the metastore * @param currentTxn Current transaction that the user has open. If this is greater than 0 it * will be removed from the exceptions list so that the user sees his own * transaction as valid. * @return a valid txn list. */ - public static ValidTxnList createValidTxnList(GetOpenTxnsResponse txns, long currentTxn) { + public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) { long highWater = txns.getTxn_high_water_mark(); Set<Long> open = txns.getOpen_txns(); long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)]; @@ -265,7 +266,7 @@ public class TxnHandler { if (currentTxn > 0 && currentTxn == txn) continue; exceptions[i++] = txn; } - return new ValidTxnListImpl(exceptions, highWater); + return new ValidReadTxnList(exceptions, highWater); } public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { Added: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java?rev=1654899&view=auto ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java (added) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java Mon Jan 26 21:21:45 2015 @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.txn; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hive.common.ValidReadTxnList; + +import java.util.Arrays; + +/** + * And implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor. + * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it + * is committed or aborted. Additionally it will return none if there are any open transactions + * below the max transaction given, since we don't want to compact above open transactions. For + * {@link #isTxnValid} it will still view a transaction as valid only if it is committed. These + * produce the logic we need to assure that the compactor only sees records less than the lowest + * open transaction when choosing which files to compact, but that it still ignores aborted + * records when compacting. + */ +public class ValidCompactorTxnList extends ValidReadTxnList { + + // The minimum open transaction id + private long minOpenTxn; + + public ValidCompactorTxnList() { + super(); + minOpenTxn = -1; + } + + /** + * + * @param exceptions list of all open and aborted transactions + * @param minOpen lowest open transaction + * @param highWatermark highest committed transaction + */ + public ValidCompactorTxnList(long[] exceptions, long minOpen, long highWatermark) { + super(exceptions, highWatermark); + minOpenTxn = minOpen; + } + + public ValidCompactorTxnList(String value) { + super(value); + } + + @Override + public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { + if (highWatermark < minTxnId) { + return RangeResponse.NONE; + } else if (minOpenTxn < 0) { + return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE; + } else { + return minOpenTxn > maxTxnId ? RangeResponse.ALL : RangeResponse.NONE; + } + } + + @Override + public String writeToString() { + StringBuilder buf = new StringBuilder(); + buf.append(highWatermark); + buf.append(':'); + buf.append(minOpenTxn); + if (exceptions.length == 0) { + buf.append(':'); + } else { + for(long except: exceptions) { + buf.append(':'); + buf.append(except); + } + } + return buf.toString(); + } + + @Override + public void readFromString(String src) { + if (src == null) { + highWatermark = Long.MAX_VALUE; + exceptions = new long[0]; + } else { + String[] values = src.split(":"); + highWatermark = Long.parseLong(values[0]); + minOpenTxn = Long.parseLong(values[1]); + exceptions = new long[values.length - 2]; + for(int i = 2; i < values.length; ++i) { + exceptions[i-2] = Long.parseLong(values[i]); + } + } + } + + @VisibleForTesting + long getMinOpenTxn() { + return minOpenTxn; + } +} Added: hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java?rev=1654899&view=auto ============================================================================== --- hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java (added) +++ hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java Mon Jan 26 21:21:45 2015 @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.hive.common.ValidTxnList; +import org.junit.Assert; +import org.junit.Test; + +public class TestValidCompactorTxnList { + + @Test + public void minTxnHigh() { + ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 4}, 3, 5); + ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9); + Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp); + } + + @Test + public void maxTxnLow() { + ValidTxnList txns = new ValidCompactorTxnList(new long[]{13, 14}, 13, 15); + ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9); + Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp); + } + + @Test + public void minTxnHighNoExceptions() { + ValidTxnList txns = new ValidCompactorTxnList(new long[0], -1, 5); + ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9); + Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp); + } + + @Test + public void maxTxnLowNoExceptions() { + ValidTxnList txns = new ValidCompactorTxnList(new long[0], -1, 15); + ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9); + Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp); + } + + @Test + public void exceptionsAllBelow() { + ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 6}, 3, 15); + ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9); + Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp); + } + + @Test + public void exceptionsInMidst() { + ValidTxnList txns = new ValidCompactorTxnList(new long[]{8}, 8, 15); + ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9); + Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp); + } + + @Test + public void writeToString() { + ValidTxnList txns = new ValidCompactorTxnList(new long[]{9, 7, 10}, 9, 37); + Assert.assertEquals("37:9:7:9:10", txns.writeToString()); + txns = new ValidCompactorTxnList(); + Assert.assertEquals(Long.toString(Long.MAX_VALUE) + ":-1:", txns.writeToString()); + txns = new ValidCompactorTxnList(new long[0], -1, 23); + Assert.assertEquals("23:-1:", txns.writeToString()); + } + + @Test + public void readFromString() { + ValidCompactorTxnList txns = new ValidCompactorTxnList("37:9:7:9:10"); + Assert.assertEquals(37L, txns.getHighWatermark()); + Assert.assertEquals(9L, txns.getMinOpenTxn()); + Assert.assertArrayEquals(new long[]{7L, 9L, 10L}, txns.getInvalidTransactions()); + txns = new ValidCompactorTxnList("21:-1:"); + Assert.assertEquals(21L, txns.getHighWatermark()); + Assert.assertEquals(-1L, txns.getMinOpenTxn()); + Assert.assertEquals(0, txns.getInvalidTransactions().length); + + } +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Mon Jan 26 21:21:45 2015 @@ -43,7 +43,14 @@ public class AcidUtils { // This key will be put in the conf file when planning an acid operation public static final String CONF_ACID_KEY = "hive.doing.acid"; public static final String BASE_PREFIX = "base_"; + public static final PathFilter baseFileFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(BASE_PREFIX); + } + }; public static final String DELTA_PREFIX = "delta_"; + public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length"; public static final PathFilter deltaFileFilter = new PathFilter() { @Override public boolean accept(Path path) { @@ -54,7 +61,8 @@ public class AcidUtils { public static final PathFilter bucketFileFilter = new PathFilter() { @Override public boolean accept(Path path) { - return path.getName().startsWith(BUCKET_PREFIX); + return path.getName().startsWith(BUCKET_PREFIX) && + !path.getName().endsWith(DELTA_SIDE_FILE_SUFFIX); } }; public static final String BUCKET_DIGITS = "%05d"; @@ -369,7 +377,7 @@ public class AcidUtils { } } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) { ParsedDelta delta = parseDelta(child); - if (txnList.isTxnRangeCommitted(delta.minTransaction, + if (txnList.isTxnRangeValid(delta.minTransaction, delta.maxTransaction) != ValidTxnList.RangeResponse.NONE) { working.add(delta); @@ -402,7 +410,7 @@ public class AcidUtils { for(ParsedDelta next: working) { if (next.maxTransaction > current) { // are any of the new transactions ones that we care about? - if (txnList.isTxnRangeCommitted(current+1, next.maxTransaction) != + if (txnList.isTxnRangeValid(current+1, next.maxTransaction) != ValidTxnList.RangeResponse.NONE) { deltas.add(next); current = next.maxTransaction; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Mon Jan 26 21:21:45 2015 @@ -30,7 +30,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidTxnListImpl; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -408,7 +408,7 @@ public class OrcInputFormat implements } String value = conf.get(ValidTxnList.VALID_TXNS_KEY, Long.MAX_VALUE + ":"); - transactionList = new ValidTxnListImpl(value); + transactionList = new ValidReadTxnList(value); } int getSchedulers() { @@ -1132,7 +1132,7 @@ public class OrcInputFormat implements } String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY, Long.MAX_VALUE + ":"); - ValidTxnList validTxnList = new ValidTxnListImpl(txnString); + ValidTxnList validTxnList = new ValidReadTxnList(txnString); final OrcRawRecordMerger records = new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket, validTxnList, readOptions, deltas); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java Mon Jan 26 21:21:45 2015 @@ -575,7 +575,7 @@ public class OrcRawRecordMerger implemen } // if this transaction isn't ok, skip over it - if (!validTxnList.isTxnCommitted( + if (!validTxnList.isTxnValid( ((ReaderKey) recordIdentifier).getCurrentTransactionId())) { continue; } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java Mon Jan 26 21:21:45 2015 @@ -129,7 +129,7 @@ public class OrcRecordUpdater implements } static Path getSideFile(Path main) { - return new Path(main + "_flush_length"); + return new Path(main + AcidUtils.DELTA_SIDE_FILE_SUFFIX); } static int getOperation(OrcStruct struct) { Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java Mon Jan 26 21:21:45 2015 @@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.lockmg import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidTxnListImpl; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.Context; @@ -199,7 +199,7 @@ class DummyTxnManager extends HiveTxnMan @Override public ValidTxnList getValidTxns() throws LockException { - return new ValidTxnListImpl(); + return new ValidReadTxnList(); } @Override Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java Mon Jan 26 21:21:45 2015 @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidTxnListImpl; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; @@ -183,7 +183,7 @@ public class Cleaner extends CompactorTh // Create a bogus validTxnList with a high water mark set to MAX_LONG and no open // transactions. This assures that all deltas are treated as valid and all we return are // obsolete files. - final ValidTxnList txnList = new ValidTxnListImpl(); + final ValidTxnList txnList = new ValidReadTxnList(); if (runJobAsSelf(ci.runAs)) { removeFiles(location, txnList); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java Mon Jan 26 21:21:45 2015 @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidTxnListImpl; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -159,7 +159,7 @@ public class CompactorMR { if (parsedDeltas == null || parsedDeltas.size() == 0) { // Seriously, no deltas? Can't compact that. - LOG.error("No delta files found to compact in " + sd.getLocation()); + LOG.error( "No delta files found to compact in " + sd.getLocation()); return; } @@ -505,7 +505,7 @@ public class CompactorMR { AcidInputFormat<WritableComparable, V> aif = instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME)); ValidTxnList txnList = - new ValidTxnListImpl(jobConf.get(ValidTxnList.VALID_TXNS_KEY)); + new ValidReadTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY)); boolean isMajor = jobConf.getBoolean(IS_MAJOR, false); AcidInputFormat.RawReader<V> reader = Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java Mon Jan 26 21:21:45 2015 @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -77,7 +78,8 @@ public class Initiator extends Compactor // don't doom the entire thread. try { ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); - ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns(), 0); + ValidTxnList txns = + CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold); LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java Mon Jan 26 21:21:45 2015 @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; @@ -121,7 +122,8 @@ public class Worker extends CompactorThr final boolean isMajor = ci.isMajorCompaction(); final ValidTxnList txns = - TxnHandler.createValidTxnList(txnHandler.getOpenTxns(), 0); + CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); + LOG.debug("ValidCompactTxnList: " + txns.writeToString()); final StringBuffer jobName = new StringBuffer(name); jobName.append("-compactor-"); jobName.append(ci.getFullPartitionName()); Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java Mon Jan 26 21:21:45 2015 @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidTxnListImpl; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.AcidInputFormat; @@ -51,7 +51,6 @@ import org.apache.hadoop.hive.serde2.Ser import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -114,7 +113,7 @@ public class TestFileSinkOperator { "testFileSinkOperator"); tmpdir.mkdir(); tmpdir.deleteOnExit(); - txnList = new ValidTxnListImpl(new long[]{}, 2); + txnList = new ValidReadTxnList(new long[]{}, 2); } @Test Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java Mon Jan 26 21:21:45 2015 @@ -20,7 +20,8 @@ package org.apache.hadoop.hive.ql.io; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ValidTxnListImpl; +import org.apache.hadoop.hive.metastore.txn.ValidCompactorTxnList; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem; @@ -91,7 +92,7 @@ public class TestAcidUtils { new MockFile("mock:/tbl/part1/subdir/000000_0", 0, new byte[0])); AcidUtils.Directory dir = AcidUtils.getAcidState(new MockPath(fs, "/tbl/part1"), conf, - new ValidTxnListImpl("100:")); + new ValidReadTxnList("100:")); assertEquals(null, dir.getBaseDirectory()); assertEquals(0, dir.getCurrentDirectories().size()); assertEquals(0, dir.getObsolete().size()); @@ -121,7 +122,7 @@ public class TestAcidUtils { new MockFile("mock:/tbl/part1/delta_101_101/bucket_0", 0, new byte[0])); AcidUtils.Directory dir = AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs, - "mock:/tbl/part1"), conf, new ValidTxnListImpl("100:")); + "mock:/tbl/part1"), conf, new ValidReadTxnList("100:")); assertEquals(null, dir.getBaseDirectory()); List<FileStatus> obsolete = dir.getObsolete(); assertEquals(2, obsolete.size()); @@ -162,7 +163,7 @@ public class TestAcidUtils { new MockFile("mock:/tbl/part1/delta_90_120/bucket_0", 0, new byte[0])); AcidUtils.Directory dir = AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs, - "mock:/tbl/part1"), conf, new ValidTxnListImpl("100:")); + "mock:/tbl/part1"), conf, new ValidReadTxnList("100:")); assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString()); List<FileStatus> obsolete = dir.getObsolete(); assertEquals(5, obsolete.size()); @@ -191,7 +192,7 @@ public class TestAcidUtils { new MockFile("mock:/tbl/part1/base_200/bucket_0", 500, new byte[0])); Path part = new MockPath(fs, "/tbl/part1"); AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("150:")); + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:")); assertEquals("mock:/tbl/part1/base_200", dir.getBaseDirectory().toString()); List<FileStatus> obsoletes = dir.getObsolete(); assertEquals(4, obsoletes.size()); @@ -202,7 +203,7 @@ public class TestAcidUtils { assertEquals(0, dir.getOriginalFiles().size()); assertEquals(0, dir.getCurrentDirectories().size()); // we should always get the latest base - dir = AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("10:")); + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:")); assertEquals("mock:/tbl/part1/base_200", dir.getBaseDirectory().toString()); } @@ -216,7 +217,7 @@ public class TestAcidUtils { new MockFile("mock:/tbl/part1/000001_1", 500, new byte[0])); Path part = new MockPath(fs, "/tbl/part1"); AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("150:")); + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:")); // The two original buckets won't be in the obsolete list because we don't look at those // until we have determined there is no base. List<FileStatus> obsolete = dir.getObsolete(); @@ -239,7 +240,7 @@ public class TestAcidUtils { new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0])); Path part = new MockPath(fs, "mock:/tbl/part1"); AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("100:")); + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:")); assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); List<FileStatus> obsolete = dir.getObsolete(); assertEquals(2, obsolete.size()); @@ -252,4 +253,51 @@ public class TestAcidUtils { assertEquals("mock:/tbl/part1/delta_000062_62", delts.get(2).getPath().toString()); assertEquals("mock:/tbl/part1/delta_0000063_63", delts.get(3).getPath().toString()); } + + @Test + public void deltasWithOpenTxnInRead() throws Exception { + Configuration conf = new Configuration(); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4")); + List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories(); + assertEquals(2, delts.size()); + assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString()); + } + + @Test + public void deltasWithOpenTxnsNotInCompact() throws Exception { + Configuration conf = new Configuration(); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = + AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("100:4")); + List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories(); + assertEquals(1, delts.size()); + assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); + } + + @Test + public void deltasWithOpenTxnsNotInCompact2() throws Exception { + Configuration conf = new Configuration(); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0" + AcidUtils.DELTA_SIDE_FILE_SUFFIX, 500, + new byte[0]), + new MockFile("mock:/tbl/part1/delta_6_10/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = + AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("100:4")); + List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories(); + assertEquals(1, delts.size()); + assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); + } + + } Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java Mon Jan 26 21:21:45 2015 @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidTxnListImpl; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordIdentifier; @@ -301,7 +301,7 @@ public class TestOrcRawRecordMerger { } private static ValidTxnList createMaximalTxnList() { - return new ValidTxnListImpl(Long.MAX_VALUE + ":"); + return new ValidReadTxnList(Long.MAX_VALUE + ":"); } @Test @@ -492,7 +492,7 @@ public class TestOrcRawRecordMerger { .maximumTransactionId(100); of.getRecordUpdater(root, options).close(false); - ValidTxnList txnList = new ValidTxnListImpl("200:"); + ValidTxnList txnList = new ValidReadTxnList("200:"); AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList); Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(), @@ -550,7 +550,7 @@ public class TestOrcRawRecordMerger { ru.delete(200, new MyRow("", 8, 0, BUCKET)); ru.close(false); - ValidTxnList txnList = new ValidTxnListImpl("200:"); + ValidTxnList txnList = new ValidReadTxnList("200:"); AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList); assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory()); @@ -734,7 +734,7 @@ public class TestOrcRawRecordMerger { merger.close(); // try ignoring the 200 transaction and make sure it works still - ValidTxnList txns = new ValidTxnListImpl("2000:200"); + ValidTxnList txns = new ValidReadTxnList("2000:200"); merger = new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, txns, new Reader.Options(), Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java?rev=1654899&r1=1654898&r2=1654899&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java Mon Jan 26 21:21:45 2015 @@ -51,6 +51,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.Stack; import java.util.concurrent.atomic.AtomicBoolean; @@ -156,6 +157,11 @@ public abstract class CompactorTest { addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true); } + protected void addLengthFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords) + throws Exception { + addFile(t, p, minTxn, maxTxn, numRecords, FileType.LENGTH_FILE, 2, true); + } + protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords) throws Exception { addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true); } @@ -185,9 +191,21 @@ public abstract class CompactorTest { return paths; } - protected void burnThroughTransactions(int num) throws MetaException, NoSuchTxnException, TxnAbortedException { + protected void burnThroughTransactions(int num) + throws MetaException, NoSuchTxnException, TxnAbortedException { + burnThroughTransactions(num, null, null); + } + + protected void burnThroughTransactions(int num, Set<Long> open, Set<Long> aborted) + throws MetaException, NoSuchTxnException, TxnAbortedException { OpenTxnsResponse rsp = txnHandler.openTxns(new OpenTxnRequest(num, "me", "localhost")); - for (long tid : rsp.getTxn_ids()) txnHandler.commitTxn(new CommitTxnRequest(tid)); + for (long tid : rsp.getTxn_ids()) { + if (aborted != null && aborted.contains(tid)) { + txnHandler.abortTxn(new AbortTxnRequest(tid)); + } else if (open == null || (open != null && !open.contains(tid))) { + txnHandler.commitTxn(new CommitTxnRequest(tid)); + } + } } protected void stopThread() { @@ -249,7 +267,7 @@ public abstract class CompactorTest { return location; } - private enum FileType {BASE, DELTA, LEGACY}; + private enum FileType {BASE, DELTA, LEGACY, LENGTH_FILE}; private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords, FileType type, int numBuckets, @@ -259,6 +277,7 @@ public abstract class CompactorTest { String filename = null; switch (type) { case BASE: filename = "base_" + maxTxn; break; + case LENGTH_FILE: // Fall through to delta case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break; case LEGACY: break; // handled below } @@ -273,12 +292,19 @@ public abstract class CompactorTest { Path dir = new Path(location, filename); fs.mkdirs(dir); partFile = AcidUtils.createBucketFile(dir, bucket); + if (type == FileType.LENGTH_FILE) { + partFile = new Path(partFile.toString() + AcidUtils.DELTA_SIDE_FILE_SUFFIX); + } } FSDataOutputStream out = fs.create(partFile); - for (int i = 0; i < numRecords; i++) { - RecordIdentifier ri = new RecordIdentifier(maxTxn - 1, bucket, i); - ri.write(out); - out.writeBytes("mary had a little lamb its fleece was white as snow\n"); + if (type == FileType.LENGTH_FILE) { + out.writeInt(numRecords); + } else { + for (int i = 0; i < numRecords; i++) { + RecordIdentifier ri = new RecordIdentifier(maxTxn - 1, bucket, i); + ri.write(out); + out.writeBytes("mary had a little lamb its fleece was white as snow\n"); + } } out.close(); }