HIVE-17205 - add functional support for unbucketed tables (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6be50b76 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6be50b76 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6be50b76 Branch: refs/heads/hive-14535 Commit: 6be50b76be5956b3c52ed6024fd7b4a3dee65fb6 Parents: 262d8f9 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Fri Aug 25 20:14:57 2017 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Fri Aug 25 20:15:26 2017 -0700 ---------------------------------------------------------------------- .../streaming/AbstractRecordWriter.java | 44 +- .../hive/hcatalog/streaming/HiveEndPoint.java | 4 +- .../apache/hive/hcatalog/streaming/package.html | 21 +- .../hive/hcatalog/streaming/TestStreaming.java | 86 +- .../hive/metastore/TestHiveMetaStore.java | 8 +- .../apache/hadoop/hive/ql/TestAcidOnTez.java | 367 +++- .../hive/ql/TestAcidOnTezWithSplitUpdate.java | 28 - .../test/resources/testconfiguration.properties | 4 +- .../TransactionalValidationListener.java | 42 +- .../org/apache/hadoop/hive/ql/ErrorMsg.java | 4 +- .../hadoop/hive/ql/exec/FileSinkOperator.java | 22 +- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 60 +- .../hive/ql/io/orc/OrcRawRecordMerger.java | 225 +- .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 14 +- .../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 10 +- .../io/orc/VectorizedOrcAcidRowBatchReader.java | 26 +- .../optimizer/SortedDynPartitionOptimizer.java | 1 + .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 6 - .../hive/ql/txn/compactor/CompactorMR.java | 3 +- .../apache/hadoop/hive/ql/TestTxnCommands.java | 108 +- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 12 +- .../hadoop/hive/ql/TestTxnCommandsBase.java | 162 ++ .../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 297 +++ .../hive/ql/io/orc/TestInputOutputFormat.java | 28 +- .../hive/ql/io/orc/TestOrcRawRecordMerger.java | 86 +- .../queries/clientnegative/create_not_acid.q | 2 +- .../queries/clientpositive/acid_no_buckets.q | 210 ++ .../clientnegative/create_not_acid.q.out | 4 +- .../clientnegative/delete_non_acid_table.q.out | 2 +- .../clientnegative/update_non_acid_table.q.out | 2 +- .../clientpositive/llap/acid_no_buckets.q.out | 1976 ++++++++++++++++++ 31 files changed, 3534 insertions(+), 330 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index e409e75..4ec10ad 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -46,6 +46,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Properties; @@ -54,19 +55,23 @@ import java.util.Properties; public abstract class AbstractRecordWriter implements RecordWriter { static final private Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName()); - final HiveConf conf; - final HiveEndPoint endPoint; + private final HiveConf conf; + private final HiveEndPoint endPoint; final Table tbl; - final IMetaStoreClient msClient; - protected final List<Integer> bucketIds; - ArrayList<RecordUpdater> updaters = null; + private final IMetaStoreClient msClient; + final List<Integer> bucketIds; + private ArrayList<RecordUpdater> updaters = null; - public final int totalBuckets; + private final int totalBuckets; + /** + * Indicates whether target table is bucketed + */ + private final boolean isBucketed; private final Path partitionPath; - final AcidOutputFormat<?,?> outf; + private final AcidOutputFormat<?,?> outf; private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write. private Long curBatchMinTxnId; private Long curBatchMaxTxnId; @@ -109,16 +114,22 @@ public abstract class AbstractRecordWriter implements RecordWriter { this.tbl = twp.tbl; this.partitionPath = twp.partitionPath; } - this.totalBuckets = tbl.getSd().getNumBuckets(); - if (totalBuckets <= 0) { - throw new StreamingException("Cannot stream to table that has not been bucketed : " - + endPoint); + this.isBucketed = tbl.getSd().getNumBuckets() > 0; + /** + * For unbucketed tables we have exactly 1 RecrodUpdater for each AbstractRecordWriter which + * ends up writing to a file bucket_000000 + * See also {@link #getBucket(Object)} + */ + this.totalBuckets = isBucketed ? tbl.getSd().getNumBuckets() : 1; + if(isBucketed) { + this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()); + this.bucketFieldData = new Object[bucketIds.size()]; + } + else { + bucketIds = Collections.emptyList(); } - this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()); - this.bucketFieldData = new Object[bucketIds.size()]; String outFormatName = this.tbl.getSd().getOutputFormat(); outf = (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf); - bucketFieldData = new Object[bucketIds.size()]; } catch(InterruptedException e) { throw new StreamingException(endPoint2.toString(), e); } catch (MetaException | NoSuchObjectException e) { @@ -169,6 +180,9 @@ public abstract class AbstractRecordWriter implements RecordWriter { // returns the bucket number to which the record belongs to protected int getBucket(Object row) throws SerializationError { + if(!isBucketed) { + return 0; + } ObjectInspector[] inspectors = getBucketObjectInspectors(); Object[] bucketFields = getBucketFields(row); return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets); @@ -204,7 +218,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { curBatchMaxTxnId = maxTxnID; updaters = new ArrayList<RecordUpdater>(totalBuckets); for (int bucket = 0; bucket < totalBuckets; bucket++) { - updaters.add(bucket, null); + updaters.add(bucket, null);//so that get(i) returns null rather than ArrayOutOfBounds } } http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 81f6155..28c98bd 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -20,6 +20,8 @@ package org.apache.hive.hcatalog.streaming; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.cli.CliSessionState; @@ -338,7 +340,7 @@ public class HiveEndPoint { // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table Map<String, String> params = t.getParameters(); if (params != null) { - String transactionalProp = params.get("transactional"); + String transactionalProp = params.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); if (transactionalProp == null || !transactionalProp.equalsIgnoreCase("true")) { LOG.error("'transactional' property is not set on Table " + endPoint); throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property" + http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html index ed4d307..a879b97 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html @@ -30,7 +30,7 @@ partition. Once data is committed it becomes immediately visible to all Hive queries initiated subsequently.</p> <p> -This API is intended for streaming clients such as Flume and Storm, +This API is intended for streaming clients such as NiFi, Flume and Storm, which continuously generate data. Streaming support is built on top of ACID based insert/update support in Hive.</p> @@ -56,10 +56,7 @@ A few things are currently required to use streaming. <ol> <li> Currently, only ORC storage format is supported. So '<b>stored as orc</b>' must be specified during table creation.</li> - <li> The hive table must be bucketed, but not sorted. So something like - '<b>clustered by (<i>colName</i>) into <i>10</i> buckets</b>' must - be specified during table creation. The number of buckets - is ideally the same as the number of streaming writers.</li> + <li> The hive table may be bucketed but must not be sorted. </li> <li> User of the client streaming process must have the necessary permissions to write to the table or partition and create partitions in the table.</li> @@ -67,7 +64,6 @@ A few things are currently required to use streaming. <ol> <li><b>hive.input.format = org.apache.hadoop.hive.ql.io.HiveInputFormat</b></li> - <li><b>hive.vectorized.execution.enabled = false</b></li> </ol></li> The above client settings are a temporary requirement and the intention is to drop the need for them in the near future. @@ -165,8 +161,21 @@ additional implementations of the <b>RecordWriter</b> interface. - Delimited text input.</li> <li> <a href="StrictJsonWriter.html"><b>StrictJsonWriter</b></a> - JSON text input.</li> + <li> <a href="StrictRegexWriter.html"><b>StrictRegexWriter</b></a> + - text input with regex.</li> </ul></p> +<h2>Performance, Concurrency, Etc.</h2> +<p> + Each StreamingConnection is writing data at the rate the underlying + FileSystem can accept it. If that is not sufficient, multiple StreamingConnection objects can + be created concurrently. +</p> +<p> + Each StreamingConnection can have at most 1 outstanding TransactionBatch and each TransactionBatch + may have at most 2 threads operaing on it. + See <a href="TransactionBatch.html"><b>TransactionBatch</b></a> +</p> </body> </html> http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index f3ef92b..49520ef 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -33,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -52,6 +53,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; @@ -65,6 +67,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; @@ -74,6 +77,7 @@ import org.apache.hadoop.hive.ql.io.orc.RecordReader; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService; +import org.apache.hadoop.hive.ql.txn.compactor.Worker; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -193,7 +197,6 @@ public class TestStreaming { conf = new HiveConf(this.getClass()); conf.set("fs.raw.impl", RawFileSystem.class.getName()); - conf.set("hive.enforce.bucketing", "true"); conf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); @@ -339,6 +342,83 @@ public class TestStreaming { } } + /** + * Test that streaming can write to unbucketed table. + */ + @Test + public void testNoBuckets() throws Exception { + queryTable(driver, "drop table if exists default.streamingnobuckets"); + //todo: why does it need transactional_properties? + queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true', 'transactional_properties'='default')"); + queryTable(driver, "insert into default.streamingnobuckets values('foo','bar')"); + List<String> rs = queryTable(driver, "select * from default.streamingnobuckets"); + Assert.assertEquals(1, rs.size()); + Assert.assertEquals("foo\tbar", rs.get(0)); + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "default", "streamingnobuckets", null); + String[] colNames1 = new String[] { "a", "b" }; + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt, connection); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, wr); + txnBatch.beginNextTransaction(); + txnBatch.write("a1,b2".getBytes()); + txnBatch.write("a3,b4".getBytes()); + txnBatch.commit(); + txnBatch.beginNextTransaction(); + txnBatch.write("a5,b6".getBytes()); + txnBatch.write("a7,b8".getBytes()); + txnBatch.commit(); + txnBatch.close(); + + Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); + rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); + + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000016_0000016_0000/bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000")); + Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8")); + Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000")); + + queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'"); + queryTable(driver, "delete from default.streamingnobuckets where a='a1'"); + rs = queryTable(driver, "select a, b from default.streamingnobuckets order by a, b"); + int row = 0; + Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++)); + Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++)); + Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++)); + Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++)); + + queryTable(driver, "alter table default.streamingnobuckets compact 'major'"); + runWorker(conf); + rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); + + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000022/bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000022/bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000022/bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t0")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000022/bucket_00000")); + } + + /** + * this is a clone from TestTxnStatement2.... + */ + public static void runWorker(HiveConf hiveConf) throws MetaException { + AtomicBoolean stop = new AtomicBoolean(true); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(hiveConf); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + } // stream data into streaming table with N buckets, then copy the data into another bucketed table // check if bucketing in both was done in the same way @@ -453,8 +533,8 @@ public class TestStreaming { } /** - * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, String...)} - there is - * little value in using InputFormat directly + * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, boolean, String...)} - + * there is little value in using InputFormat directly */ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles, String... records) throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java index 8bd23cc..50e5274 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java @@ -3018,7 +3018,7 @@ public abstract class TestHiveMetaStore extends TestCase { Table t = createTable(dbName, tblName, owner, params, null, sd, 0); Assert.assertTrue("Expected exception", false); } catch (MetaException e) { - Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage()); + Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage()); } // Fail - "transactional" is set to true, and the table is bucketed, but doesn't use ORC @@ -3031,7 +3031,7 @@ public abstract class TestHiveMetaStore extends TestCase { Table t = createTable(dbName, tblName, owner, params, null, sd, 0); Assert.assertTrue("Expected exception", false); } catch (MetaException e) { - Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage()); + Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage()); } // Succeed - "transactional" is set to true, and the table is bucketed, and uses ORC @@ -3064,13 +3064,14 @@ public abstract class TestHiveMetaStore extends TestCase { tblName += "1"; params.clear(); sd.unsetBucketCols(); + sd.setInputFormat("org.apache.hadoop.mapred.FileInputFormat"); t = createTable(dbName, tblName, owner, params, null, sd, 0); params.put("transactional", "true"); t.setParameters(params); client.alter_table(dbName, tblName, t); Assert.assertTrue("Expected exception", false); } catch (MetaException e) { - Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage()); + Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage()); } // Succeed - trying to set "transactional" to "true", and satisfies bucketing and Input/OutputFormat requirement @@ -3078,6 +3079,7 @@ public abstract class TestHiveMetaStore extends TestCase { params.clear(); sd.setNumBuckets(1); sd.setBucketCols(bucketCols); + sd.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); t = createTable(dbName, tblName, owner, params, null, sd, 0); params.put("transactional", "true"); t.setParameters(params); http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index 2bf9871..d0b5cf6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -22,7 +22,10 @@ import java.io.File; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; @@ -30,23 +33,26 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; -import org.apache.hadoop.hive.ql.txn.compactor.Worker; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class resides in itests to facilitate running query using Tez engine, since the jars are * fully loaded here, which is not the case if it stays in ql. */ public class TestAcidOnTez { + static final private Logger LOG = LoggerFactory.getLogger(TestAcidOnTez.class); private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + TestAcidOnTez.class.getCanonicalName() + "-" + System.currentTimeMillis() @@ -61,8 +67,10 @@ public class TestAcidOnTez { private static enum Table { ACIDTBL("acidTbl"), ACIDTBLPART("acidTblPart"), + ACIDNOBUCKET("acidNoBucket"), NONACIDORCTBL("nonAcidOrcTbl"), - NONACIDPART("nonAcidPart"); + NONACIDPART("nonAcidPart"), + NONACIDNONBUCKET("nonAcidNonBucket"); private final String name; @Override @@ -159,6 +167,359 @@ public class TestAcidOnTez { testJoin("tez", "MapJoin"); } + /** + * Tests non acid to acid conversion where starting table has non-standard layout, i.e. + * where "original" files are not immediate children of the partition dir + */ + @Test + public void testNonStandardConversion01() throws Exception { + HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + setupTez(confForTez); + //CTAS with non-ACID target table + runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + " stored as ORC TBLPROPERTIES('transactional'='false') as " + + "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez); + + List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a, b, INPUT__FILE__NAME"); + String expected0[][] = { + {"1\t2", "/1/000000_0"}, + {"3\t4", "/1/000000_0"}, + {"5\t6", "/1/000000_0"}, + {"5\t6", "/2/000000_0"}, + {"7\t8", "/2/000000_0"}, + {"9\t10", "/2/000000_0"}, + }; + Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size()); + //verify data and layout + for(int i = 0; i < expected0.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected0[i][0])); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected0[i][1])); + } + //make the table ACID + runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET TBLPROPERTIES ('transactional'='true')"); + + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID"); + LOG.warn("after ctas:"); + for (String s : rs) { + LOG.warn(s); + } + Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); + /* + * Expected result 0th entry i the RecordIdentifier + data. 1st entry file before compact*/ + String expected[][] = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"}, + }; + Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size()); + //verify data and layout + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); + } + //perform some Update/Delete + runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set a = 70, b = 80 where a = 7"); + runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 5"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID"); + LOG.warn("after update/delete:"); + for (String s : rs) { + LOG.warn(s); + } + String[][] expected2 = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000021_0000021_0000/bucket_00000"} + }; + Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); + //verify data and layout + for(int i = 0; i < expected2.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1])); + } + //now make sure delete deltas are present + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + String[] expectedDelDelta = {"delete_delta_0000021_0000021_0000", "delete_delta_0000022_0000022_0000"}; + for(FileStatus stat : status) { + for(int i = 0; i < expectedDelDelta.length; i++) { + if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) { + expectedDelDelta[i] = null; + } + } + } + for(int i = 0; i < expectedDelDelta.length; i++) { + Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on disk", expectedDelDelta[i]); + } + //run Minor compaction + runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'minor'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID"); + LOG.warn("after compact minor:"); + for (String s : rs) { + LOG.warn(s); + } + Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); + //verify the data is the same + for(int i = 0; i < expected2.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); + //todo: since HIVE-16669 is not done, Minor compact compacts insert delta as well - it should not + //Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1])); + } + //check we have right delete delta files after minor compaction + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + String[] expectedDelDelta2 = {"delete_delta_0000021_0000021_0000", "delete_delta_0000022_0000022_0000", "delete_delta_0000021_0000022"}; + for(FileStatus stat : status) { + for(int i = 0; i < expectedDelDelta2.length; i++) { + if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) { + expectedDelDelta2[i] = null; + break; + } + } + } + for(int i = 0; i < expectedDelDelta2.length; i++) { + Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found on disk", expectedDelDelta2[i]); + } + //run Major compaction + runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID"); + LOG.warn("after compact major:"); + for (String s : rs) { + LOG.warn(s); + } + Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size()); + for(int i = 0; i < expected2.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); + //everything is now in base/ + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000022/bucket_00000")); + } + } + /** + * Tests non acid to acid conversion where starting table has non-standard layout, i.e. + * where "original" files are not immediate children of the partition dir - partitioned table + * + * How to do this? CTAS is the only way to create data files which are not immediate children + * of the partition dir. CTAS/Union/Tez doesn't support partition tables. The only way is to copy + * data files in directly. + */ + @Ignore("HIVE-17214") + @Test + public void testNonStandardConversion02() throws Exception { + HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + confForTez.setBoolean("mapred.input.dir.recursive", true); + setupTez(confForTez); + runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + " stored as ORC " + + "TBLPROPERTIES('transactional'='false') as " + + "select a, b from " + Table.ACIDTBL + " where a <= 3 union all " + + "select a, b from " + Table.NONACIDORCTBL + " where a >= 7 " + + "union all select a, b from " + Table.ACIDTBL + " where a = 5", confForTez); + + List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " + + Table.NONACIDNONBUCKET + " order by a, b"); + String expected0[][] = { + {"1\t2", "/1/000000_0"}, + {"3\t4", "/1/000000_0"}, + {"5\t6", "/3/000000_0"}, + {"7\t8", "/2/000000_0"}, + {"9\t10", "/2/000000_0"}, + }; + Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size()); + //verify data and layout + for(int i = 0; i < expected0.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected0[i][0])); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected0[i][1])); + } + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + //ensure there is partition dir + runStatementOnDriver("insert into " + Table.NONACIDPART + " partition (p=1) values (100,110)"); + //creates more files in that partition + for(FileStatus stat : status) { + int limit = 5; + Path p = stat.getPath();//dirs 1/, 2/, 3/ + Path to = new Path(TEST_WAREHOUSE_DIR + "/" + Table.NONACIDPART+ "/p=1/" + p.getName()); + while(limit-- > 0 && !fs.rename(p, to)) { + Thread.sleep(200); + } + if(limit <= 0) { + throw new IllegalStateException("Could not rename " + p + " to " + to); + } + } + /* + This is what we expect on disk + ekoifman:warehouse ekoifman$ tree nonacidpart/ + nonacidpart/ + âââ p=1 + âââ 000000_0 + âââ 1 + â  âââ 000000_0 + âââ 2 + â  âââ 000000_0 + âââ 3 + âââ 000000_0 + +4 directories, 4 files + **/ + //make the table ACID + runStatementOnDriver("alter table " + Table.NONACIDPART + " SET TBLPROPERTIES ('transactional'='true')"); + rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID"); + LOG.warn("after acid conversion:"); + for (String s : rs) { + LOG.warn(s); + } + String[][] expected = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t100\t110\t1", "nonacidpart/p=1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", "nonacidpart/p=1/1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", "nonacidpart/p=1/1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", "nonacidpart/p=1/2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", "nonacidpart/p=1/2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", "nonacidpart/p=1/3/000000_0"} + }; + Assert.assertEquals("Wrong row count", expected.length, rs.size()); + //verify data and layout + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); + } + + //run Major compaction + runStatementOnDriver("alter table " + Table.NONACIDPART + " partition (p=1) compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID"); + LOG.warn("after major compaction:"); + for (String s : rs) { + LOG.warn(s); + } + //verify data and layout + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + Assert.assertTrue("Actual line(file) " + i + " ac: " + + rs.get(i), rs.get(i).endsWith("nonacidpart/p=1/base_-9223372036854775808/bucket_00000")); + } + + } + /** + * CTAS + Tez + Union creates a non-standard layout in table dir + * Each leg of the union places data into a subdir of the table/partition. Subdirs are named 1/, 2/, etc + * The way this currently works is that CTAS creates an Acid table but the insert statement writes + * the data in non-acid layout. Then on read, it's treated like an non-acid to acid conversion. + * Longer term CTAS should create acid layout from the get-go. + */ + @Test + public void testCtasTezUnion() throws Exception { + HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + setupTez(confForTez); + //CTAS with ACID target table + runStatementOnDriver("create table " + Table.ACIDNOBUCKET + " stored as ORC TBLPROPERTIES('transactional'='true') as " + + "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez); + List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID"); + LOG.warn("after ctas:"); + for (String s : rs) { + LOG.warn(s); + } + Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); + /* + * Expected result 0th entry i the RecordIdentifier + data. 1st entry file before compact*/ + String expected[][] = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"}, + }; + Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size()); + //verify data and layout + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); + } + //perform some Update/Delete + runStatementOnDriver("update " + Table.ACIDNOBUCKET + " set a = 70, b = 80 where a = 7"); + runStatementOnDriver("delete from " + Table.ACIDNOBUCKET + " where a = 5"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID"); + LOG.warn("after update/delete:"); + for (String s : rs) { + LOG.warn(s); + } + String[][] expected2 = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, + {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000019_0000019_0000/bucket_00000"} + }; + Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); + //verify data and layout + for(int i = 0; i < expected2.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1])); + } + //now make sure delete deltas are present + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + String[] expectedDelDelta = {"delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000"}; + for(FileStatus stat : status) { + for(int i = 0; i < expectedDelDelta.length; i++) { + if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) { + expectedDelDelta[i] = null; + } + } + } + for(int i = 0; i < expectedDelDelta.length; i++) { + Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on disk", expectedDelDelta[i]); + } + //run Minor compaction + runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'minor'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID"); + LOG.warn("after compact minor:"); + for (String s : rs) { + LOG.warn(s); + } + Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); + //verify the data is the same + for(int i = 0; i < expected2.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); + //todo: since HIVE-16669 is not done, Minor compact compacts insert delta as well - it should not + //Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1])); + } + //check we have right delete delta files after minor compaction + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + String[] expectedDelDelta2 = { "delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000", "delete_delta_0000019_0000020"}; + for(FileStatus stat : status) { + for(int i = 0; i < expectedDelDelta2.length; i++) { + if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) { + expectedDelDelta2[i] = null; + break; + } + } + } + for(int i = 0; i < expectedDelDelta2.length; i++) { + Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found on disk", expectedDelDelta2[i]); + } + //run Major compaction + runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID"); + LOG.warn("after compact major:"); + for (String s : rs) { + LOG.warn(s); + } + Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size()); + for(int i = 0; i < expected2.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); + //everything is now in base/ + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000020/bucket_00000")); + } + } // Ideally test like this should be a qfile test. However, the explain output from qfile is always // slightly different depending on where the test is run, specifically due to file size estimation private void testJoin(String engine, String joinType) throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java deleted file mode 100644 index 3dacf08..0000000 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql; - -/** - * Same as parent class but covers Acid 2.0 tables - */ -public class TestAcidOnTezWithSplitUpdate extends TestAcidOnTez { - @Override - String getTblProperties() { - return "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 37a3757..fa6a2aa 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -456,7 +456,9 @@ minillap.query.files=acid_bucket_pruning.q,\ llap_stats.q,\ multi_count_distinct_null.q -minillaplocal.query.files=acid_globallimit.q,\ +minillaplocal.query.files=\ + acid_no_buckets.q, \ + acid_globallimit.q,\ acid_vectorization_missing_cols.q,\ alter_merge_stats_orc.q,\ auto_join30.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index 023d703..3a3d184 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -98,14 +98,26 @@ public final class TransactionalValidationListener extends MetaStorePreEventList // that will use it down below. } } + Table oldTable = context.getOldTable(); + String oldTransactionalValue = null; + String oldTransactionalPropertiesValue = null; + for (String key : oldTable.getParameters().keySet()) { + if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) { + oldTransactionalValue = oldTable.getParameters().get(key); + } + if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) { + oldTransactionalPropertiesValue = oldTable.getParameters().get(key); + } + } + if (transactionalValuePresent) { //normalize prop name parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, transactionalValue); } - if ("true".equalsIgnoreCase(transactionalValue)) { + if ("true".equalsIgnoreCase(transactionalValue) && !"true".equalsIgnoreCase(oldTransactionalValue)) { + //only need to check conformance if alter table enabled aicd if (!conformToAcid(newTable)) { - throw new MetaException("The table must be bucketed and stored using an ACID compliant" + - " format (such as ORC)"); + throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)"); } if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) { @@ -115,17 +127,6 @@ public final class TransactionalValidationListener extends MetaStorePreEventList hasValidTransactionalValue = true; } - Table oldTable = context.getOldTable(); - String oldTransactionalValue = null; - String oldTransactionalPropertiesValue = null; - for (String key : oldTable.getParameters().keySet()) { - if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) { - oldTransactionalValue = oldTable.getParameters().get(key); - } - if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) { - oldTransactionalPropertiesValue = oldTable.getParameters().get(key); - } - } if (oldTransactionalValue == null ? transactionalValue == null @@ -195,8 +196,7 @@ public final class TransactionalValidationListener extends MetaStorePreEventList if ("true".equalsIgnoreCase(transactionalValue)) { if (!conformToAcid(newTable)) { - throw new MetaException("The table must be bucketed and stored using an ACID compliant" + - " format (such as ORC)"); + throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)"); } if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) { @@ -214,14 +214,12 @@ public final class TransactionalValidationListener extends MetaStorePreEventList throw new MetaException("'transactional' property of TBLPROPERTIES may only have value 'true'"); } - // Check if table is bucketed and InputFormatClass/OutputFormatClass should implement - // AcidInputFormat/AcidOutputFormat + /** + * Check that InputFormatClass/OutputFormatClass should implement + * AcidInputFormat/AcidOutputFormat + */ private boolean conformToAcid(Table table) throws MetaException { StorageDescriptor sd = table.getSd(); - if (sd.getBucketColsSize() < 1) { - return false; - } - try { Class inputFormatClass = Class.forName(sd.getInputFormat()); Class outputFormatClass = Class.forName(sd.getOutputFormat()); http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 9c9d4e7..b3ef916 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -418,8 +418,8 @@ public enum ErrorMsg { " does not support these operations."), VALUES_TABLE_CONSTRUCTOR_NOT_SUPPORTED(10296, "Values clause with table constructor not yet supported"), - ACID_OP_ON_NONACID_TABLE(10297, "Attempt to do update or delete on table {0} that does not use " + - "an AcidOutputFormat or is not bucketed", true), + ACID_OP_ON_NONACID_TABLE(10297, "Attempt to do update or delete on table {0} that is " + + "not transactional", true), ACID_NO_SORTED_BUCKETS(10298, "ACID insert, update, delete not supported on tables that are " + "sorted, table {0}", true), ALTER_TABLE_TYPE_PARTIAL_PARTITION_SPEC_NO_SUPPORTED(10299, http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 25ad1e9..bc265eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -36,6 +37,7 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.HivePartitioner; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.io.StreamingOutputFormat; @@ -285,6 +287,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements private transient int numFiles; protected transient boolean multiFileSpray; protected transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>(); + private transient boolean isBucketed = false; private transient ObjectInspector[] partitionObjectInspectors; protected transient HivePartitioner<HiveKey, Object> prtner; @@ -345,6 +348,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements isNativeTable = !conf.getTableInfo().isNonNative(); isTemporary = conf.isTemporary(); multiFileSpray = conf.isMultiFileSpray(); + this.isBucketed = hconf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0; totalFiles = conf.getTotalFiles(); numFiles = conf.getNumFiles(); dpCtx = conf.getDynPartCtx(); @@ -791,9 +795,23 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements * Hive.copyFiles() will make one of them bucket_N_copy_M in the final location. The * reset of acid (read path) doesn't know how to handle copy_N files except for 'original' * files (HIVE-16177)*/ + int writerId = -1; + if(!isBucketed) { + assert !multiFileSpray; + assert writerOffset == 0; + /**For un-bucketed tables, Deletes with ROW__IDs with different 'bucketNum' values can + * be written to the same bucketN file. + * N in this case is writerId and there is no relationship + * between the file name and any property of the data in it. Inserts will be written + * to bucketN file such that all {@link RecordIdentifier#getBucketProperty()} indeed + * contain writerId=N. + * Since taskId is unique (at least per statementId and thus + * per [delete_]delta_x_y_stmtId/) there will not be any copy_N files.*/ + writerId = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId)); + } fpaths.updaters[writerOffset] = HiveFileFormatUtils.getAcidRecordUpdater( - jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[writerOffset], - rowInspector, reporter, 0); + jc, conf.getTableInfo(), writerId >= 0 ? writerId : bucketNum, conf, + fpaths.outPaths[writerOffset], rowInspector, reporter, 0); if (LOG.isDebugEnabled()) { LOG.debug("Created updater for bucket number " + bucketNum + " using file " + fpaths.outPaths[writerOffset]); http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 751fca8..69a9f9f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -964,6 +964,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private final boolean allowSyntheticFileIds; private final boolean isDefaultFs; + /** + * @param dir - root of partition dir + */ public BISplitStrategy(Context context, FileSystem fs, Path dir, List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal, List<DeltaMetaData> deltas, boolean[] covered, boolean allowSyntheticFileIds, boolean isDefaultFs) { @@ -996,7 +999,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(), entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true, - deltas, -1, logicalLen); + deltas, -1, logicalLen, dir); splits.add(orcSplit); } } @@ -1017,18 +1020,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, * ACID split strategy is used when there is no base directory (when transactions are enabled). */ static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> { - private Path dir; + Path dir; private List<DeltaMetaData> deltas; - private boolean[] covered; - private int numBuckets; private AcidOperationalProperties acidOperationalProperties; - + /** + * @param dir root of partition dir + */ ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered, AcidOperationalProperties acidOperationalProperties) { this.dir = dir; - this.numBuckets = numBuckets; this.deltas = deltas; - this.covered = covered; this.acidOperationalProperties = acidOperationalProperties; } @@ -1234,6 +1235,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private final UserGroupInformation ugi; private final boolean allowSyntheticFileIds; private SchemaEvolution evolution; + //this is the root of the partition in which the 'file' is located + private final Path rootDir; public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs) throws IOException { @@ -1250,6 +1253,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, this.isOriginal = splitInfo.isOriginal; this.deltas = splitInfo.deltas; this.hasBase = splitInfo.hasBase; + this.rootDir = splitInfo.dir; this.projColsUncompressedSize = -1; this.deltaSplits = splitInfo.getSplits(); this.allowSyntheticFileIds = allowSyntheticFileIds; @@ -1361,7 +1365,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, fileKey = new SyntheticFileId(file); } return new OrcSplit(file.getPath(), fileKey, offset, length, hosts, - orcTail, isOriginal, hasBase, deltas, scaledProjSize, fileLen); + orcTail, isOriginal, hasBase, deltas, scaledProjSize, fileLen, rootDir); } private static final class OffsetAndLength { // Java cruft; pair of long. @@ -1641,7 +1645,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, pathFutures.add(ecs.submit(fileGenerator)); } - boolean isTransactionalTableScan = + boolean isTransactionalTableScan =//this never seems to be set correctly HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); boolean isSchemaEvolution = HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION); TypeDescription readerSchema = @@ -1932,16 +1936,17 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, final OrcSplit split = (OrcSplit) inputSplit; final Path path = split.getPath(); - Path root; if (split.hasBase()) { if (split.isOriginal()) { - root = path.getParent(); + root = split.getRootDir(); } else { root = path.getParent().getParent(); + assert root.equals(split.getRootDir()) : "root mismatch: baseDir=" + split.getRootDir() + + " path.p.p=" + root; } - } else {//here path is a delta/ but above it's a partition/ - root = path; + } else { + throw new IllegalStateException("Split w/o base: " + path); } // Retrieve the acidOperationalProperties for the table, initialized in HiveInputFormat. @@ -2037,21 +2042,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } }; } - private static Path findOriginalBucket(FileSystem fs, - Path directory, - int bucket) throws IOException { - for(FileStatus stat: fs.listStatus(directory)) { - if(stat.getLen() <= 0) { - continue; - } - AcidOutputFormat.Options bucketInfo = - AcidUtils.parseBaseOrDeltaBucketFilename(stat.getPath(), fs.getConf()); - if(bucketInfo.getBucketId() == bucket) { - return stat.getPath(); - } - } - throw new IllegalArgumentException("Can't find bucket " + bucket + " in " + directory); - } static Reader.Options createOptionsForReader(Configuration conf) { /** @@ -2275,20 +2265,22 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, ) throws IOException { Reader reader = null; boolean isOriginal = false; - if (baseDirectory != null) { - Path bucketFile; + if (baseDirectory != null) {//this is NULL for minor compaction + Path bucketFile = null; if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) { bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket); } else { + /**we don't know which file to start reading - + * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} does*/ isOriginal = true; - bucketFile = findOriginalBucket(baseDirectory.getFileSystem(conf), - baseDirectory, bucket); } - reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf)); + if(bucketFile != null) { + reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf)); + } } OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options() .isCompacting(true) - .rootPath(baseDirectory); + .rootPath(baseDirectory).isMajorCompaction(baseDirectory != null); return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal, bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions); } http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 97c4e3d..cbbb4c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.BucketCodec; @@ -29,7 +31,6 @@ import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; -import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +69,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ private final RecordIdentifier maxKey; // an extra value so that we can return it while reading ahead private OrcStruct extraValue; - /** * A RecordIdentifier extended with the current transaction id. This is the * key of our merge sort with the originalTransaction, bucket, and rowId @@ -294,9 +294,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * Running multiple Insert statements on the same partition (of non acid table) creates files * like so: 00000_0, 00000_0_copy1, 00000_0_copy2, etc. So the OriginalReaderPair must treat all * of these files as part of a single logical bucket file. + * + * Also, for unbucketed (non acid) tables, there are no guarantees where data files may be placed. + * For example, CTAS+Tez+Union creates subdirs 1/, 2/, etc for each leg of the Union. Thus the + * data file need not be an immediate child of partition dir. All files for a given writerId are + * treated as one logical unit to assign {@link RecordIdentifier}s to them consistently. * * For Compaction, where each split includes the whole bucket, this means reading over all the * files in order to assign ROW__ID.rowid in one sequence for the entire logical bucket. + * For unbucketed tables, a Compaction split is all files written by a given writerId. * * For a read after the table is marked transactional but before it's rewritten into a base/ * by compaction, each of the original files may be split into many pieces. For each split we @@ -305,7 +311,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * split of the original file and used to filter rows from all the deltas. The ROW__ID.rowid for * the rows of the 'original' file of course, must be assigned from the beginning of logical * bucket. The last split of the logical bucket, i.e. the split that has the end of last file, - * should include all insert events from deltas. + * should include all insert events from deltas (last sentence is obsolete for Acid 2: HIVE-17320) */ private static abstract class OriginalReaderPair implements ReaderPair { OrcStruct nextRecord; @@ -407,18 +413,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ RecordIdentifier newMaxKey = maxKey; recordReader = reader.rowsOptions(options); /** - * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc We don't - * know N a priori so if this is true, then the current split is from 0000_0_copyN file. + * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copy_N. etc We don't + * know N a priori so if this is true, then the current split is from 0000_0_copy_N file. * It's needed to correctly set maxKey. In particular, set maxKey==null if this split * is the tail of the last file for this logical bucket to include all deltas written after - * non-acid to acid table conversion. + * non-acid to acid table conversion (todo: HIVE-17320). + * Also, see comments at {@link OriginalReaderPair} about unbucketed tables. */ - boolean isLastFileForThisBucket = false; + boolean isLastFileForThisBucket = true; boolean haveSeenCurrentFile = false; long rowIdOffsetTmp = 0; - if (mergerOptions.getCopyIndex() > 0) { + { //the split is from something other than the 1st file of the logical bucket - compute offset - AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validTxnList, false, true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { @@ -467,23 +473,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ maxKey.setRowId(maxKey.getRowId() + rowIdOffset); } } - } else { - rowIdOffset = 0; - isLastFileForThisBucket = true; - AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), - conf, validTxnList, false, true); - int numFilesInBucket = 0; - for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { - AcidOutputFormat.Options bucketOptions = - AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); - if (bucketOptions.getBucketId() == bucketId) { - numFilesInBucket++; - if (numFilesInBucket > 1) { - isLastFileForThisBucket = false; - break; - } - } - } } if (!isLastFileForThisBucket && maxKey == null) { /* @@ -651,6 +640,12 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ } /** * Find the key range for original bucket files. + * For unbucketed tables the insert event data is still written to bucket_N file except that + * N is just a writer ID - it still matches {@link RecordIdentifier#getBucketProperty()}. For + * 'original' files (ubucketed) the same applies. A file 000000_0 encodes a taskId/wirterId and + * at read time we synthesize {@link RecordIdentifier#getBucketProperty()} to match the file name + * and so the same bucketProperty is used here to create minKey/maxKey, i.e. these keys are valid + * to filter data from delete_delta files even for unbucketed tables. * @param reader the reader * @param bucket the bucket number we are reading * @param options the options for reading with @@ -740,7 +735,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ */ static Reader.Options createEventOptions(Reader.Options options) { Reader.Options result = options.clone(); - //result.range(options.getOffset(), Long.MAX_VALUE);WTF? result.include(options.getInclude()); // slide the column names down by 6 for the name array @@ -755,11 +749,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ return result; } + /** + * {@link OrcRawRecordMerger} Acid reader is used slightly differently in various contexts. + * This makes the "context" explicit. + */ static class Options { private int copyIndex = 0; private boolean isCompacting = false; private Path bucketPath; private Path rootPath; + private boolean isMajorCompaction = false; + private boolean isDeleteReader = false; Options copyIndex(int copyIndex) { assert copyIndex >= 0; this.copyIndex = copyIndex; @@ -767,6 +767,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ } Options isCompacting(boolean isCompacting) { this.isCompacting = isCompacting; + assert !isDeleteReader; return this; } Options bucketPath(Path bucketPath) { @@ -777,6 +778,16 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ this.rootPath = rootPath; return this; } + Options isMajorCompaction(boolean isMajor) { + this.isMajorCompaction = isMajor; + assert !isDeleteReader; + return this; + } + Options isDeleteReader(boolean isDeleteReader) { + this.isDeleteReader = isDeleteReader; + assert !isCompacting; + return this; + } /** * 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix */ @@ -788,7 +799,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ } /** * Full path to the data file - * @return */ Path getBucketPath() { return bucketPath; @@ -797,6 +807,22 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * Partition folder (Table folder if not partitioned) */ Path getRootPath() { return rootPath; } + /** + * @return true if major compaction, false if minor + */ + boolean isMajorCompaction() { + return isMajorCompaction && isCompacting; + } + boolean isMinorCompaction() { + return !isMajorCompaction && isCompacting; + } + /** + * true if this is only processing delete deltas to load in-memory table for + * vectorized reader + */ + boolean isDeleteReader() { + return isDeleteReader; + } } /** * Create a reader that merge sorts the ACID events together. @@ -820,6 +846,39 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ this.offset = options.getOffset(); this.length = options.getLength(); this.validTxnList = validTxnList; + /** + * @since Hive 3.0 + * With split update (HIVE-14035) we have base/, delta/ and delete_delta/ - the latter only + * has Delete events and the others only have Insert events. Thus {@link #baseReader} is + * a split of a file in base/ or delta/. + * + * For Compaction, each split (for now) is a logical bucket, i.e. all files from base/ + delta(s)/ + * for a given bucket ID and delete_delta(s)/ + * + * For bucketed tables, the data files are named bucket_N and all rows in this file are such + * that {@link org.apache.hadoop.hive.ql.io.BucketCodec#decodeWriterId(int)} of + * {@link RecordIdentifier#getBucketProperty()} is N. This is currently true for all types of + * files but may not be true for for delete_delta/ files in the future. + * + * For un-bucketed tables, the system is designed so that it works when there is no relationship + * between delete_delta file name (bucket_N) and the value of {@link RecordIdentifier#getBucketProperty()}. + * (Later we this maybe optimized to take advantage of situations where it is known that + * bucket_N matches bucketProperty().) This implies that for a given {@link baseReader} all + * files in delete_delta/ have to be opened ({@link ReaderPair} created). Insert events are + * still written such that N in file name (writerId) matches what's in bucketProperty(). + * + * Compactor for un-bucketed tables works exactly the same as for bucketed ones though it + * should be optimized (see HIVE-17206). In particular, each split is a set of files + * created by a writer with the same writerId, i.e. all bucket_N files across base/ & + * deleta/ for the same N. Unlike bucketed tables, there is no relationship between + * any values in user columns to file name. + * The maximum N is determined by the number of writers the system chose for the the "largest" + * write into a given partition. + * + * In both cases, Compactor should be changed so that Minor compaction is run very often and + * only compacts delete_delta/. Major compaction can do what it does now. + */ + boolean isBucketed = conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0; TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); @@ -829,16 +888,26 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ // modify the options to reflect the event instead of the base row Reader.Options eventOptions = createEventOptions(options); - if (reader == null) { + if((mergerOptions.isCompacting() && mergerOptions.isMinorCompaction()) || + mergerOptions.isDeleteReader()) { + //for minor compaction, there is no progress report and we don't filter deltas baseReader = null; minKey = maxKey = null; + assert reader == null : "unexpected input reader during minor compaction: " + + mergerOptions.getRootPath(); } else { KeyInterval keyInterval; - // find the min/max based on the offset and length (and more for 'original') - if (isOriginal) { - keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf); + if (mergerOptions.isCompacting()) { + assert mergerOptions.isMajorCompaction(); + //compaction doesn't filter deltas but *may* have a reader for 'base' + keyInterval = new KeyInterval(null, null); } else { - keyInterval = discoverKeyBounds(reader, options); + // find the min/max based on the offset and length (and more for 'original') + if (isOriginal) { + keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf); + } else { + keyInterval = discoverKeyBounds(reader, options); + } } LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey()); // use the min/max instead of the byte range @@ -849,8 +918,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ if(mergerOptions.isCompacting()) { pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions, conf, validTxnList); - } - else { + } else { pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList); } @@ -868,35 +936,31 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ baseReader = pair.getRecordReader(); } - // we always want to read all of the deltas - eventOptions.range(0, Long.MAX_VALUE); if (deltaDirectory != null) { + /*whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no + * user columns + * HIVE-17320: we should compute a SARG to push down min/max key to delete_delta*/ + Reader.Options deltaEventOptions = eventOptions.clone() + .searchArgument(null, null).range(0, Long.MAX_VALUE); for(Path delta: deltaDirectory) { if(!mergerOptions.isCompacting() && !AcidUtils.isDeleteDelta(delta)) { //all inserts should be in baseReader for normal read so this should always be delete delta if not compacting throw new IllegalStateException(delta + " is not delete delta and is not compacting."); } ReaderKey key = new ReaderKey(); - Path deltaFile = AcidUtils.createBucketFile(delta, bucket); AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta); - FileSystem fs = deltaFile.getFileSystem(conf); - long length = OrcAcidUtils.getLastFlushLength(fs, deltaFile); - if (length != -1 && fs.exists(deltaFile)) { - Reader deltaReader = OrcFile.createReader(deltaFile, - OrcFile.readerOptions(conf).maxLength(length)); - Reader.Options deltaEventOptions = null; - if(eventOptions.getSearchArgument() != null) { - // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as - // it can produce wrong results (if the latest valid version of the record is filtered out by - // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record) - // unless the delta only has insert events - AcidStats acidStats = OrcAcidUtils.parseAcidStats(deltaReader); - if(acidStats.deletes > 0 || acidStats.updates > 0) { - deltaEventOptions = eventOptions.clone().searchArgument(null, null); - } + for (Path deltaFile : getDeltaFiles(delta, bucket, conf, mergerOptions, isBucketed)) { + FileSystem fs = deltaFile.getFileSystem(conf); + if(!fs.exists(deltaFile)) { + continue; } + /* side files are only created by streaming ingest. If this is a compaction, we may + * have an insert delta/ here with side files there because the original writer died.*/ + long length = AcidUtils.getLogicalLength(fs, fs.getFileStatus(deltaFile)); + assert length >= 0; + Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length)); ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, - deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId()); + deltaEventOptions, deltaDir.getStatementId()); if (deltaPair.nextRecord() != null) { readers.put(key, deltaPair); } @@ -921,6 +985,59 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ } } + /** + * This determines the set of {@link ReaderPairAcid} to create for a given delta/. + * For unbucketed tables {@code bucket} can be thought of as a write tranche. + */ + static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Configuration conf, + Options mergerOptions, boolean isBucketed) throws IOException { + if(isBucketed) { + /** + * for bucketed tables (for now) we always trust that the N in bucketN file name means that + * all records have {@link RecordIdentifier#getBucketProperty()} encoding bucketId = N. This + * means that a delete event in bucketN can only modify an insert in another bucketN file for + * the same N. (Down the road we may trust it only in certain delta dirs) + * + * Compactor takes all types of deltas for a given bucket. For regular read, any file that + * contains (only) insert events is treated as base and only + * delete_delta/ are treated as deltas. + */ + assert (!mergerOptions.isCompacting && + deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX) + ) || mergerOptions.isCompacting : "Unexpected delta: " + deltaDirectory; + Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket); + return new Path[]{deltaFile}; + } + /** + * For unbucketed tables insert events are also stored in bucketN files but here N is + * the writer ID. We can trust that N matches info in {@link RecordIdentifier#getBucketProperty()} + * delta_x_y but it's not required since we can't trust N for delete_delta_x_x/bucketN. + * Thus we always have to take all files in a delete_delta. + * For regular read, any file that has (only) insert events is treated as base so + * {@link deltaDirectory} can only be delete_delta and so we take all files in it. + * For compacting, every split contains base/bN + delta(s)/bN + delete_delta(s){all buckets} for + * a given N. + */ + if(deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + //it's not wrong to take all delete events for bucketed tables but it's more efficient + //to only take those that belong to the 'bucket' assuming we trust the file name + //un-bucketed table - get all files + FileSystem fs = deltaDirectory.getFileSystem(conf); + FileStatus[] dataFiles = fs.listStatus(deltaDirectory, AcidUtils.bucketFileFilter); + Path[] deltaFiles = new Path[dataFiles.length]; + int i = 0; + for (FileStatus stat : dataFiles) { + deltaFiles[i++] = stat.getPath(); + }//todo: need a test where we actually have more than 1 file + return deltaFiles; + } + //if here it must be delta_x_y - insert events only, so we must be compacting + assert mergerOptions.isCompacting() : "Expected to be called as part of compaction"; + Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket); + return new Path[] {deltaFile}; + + } + @VisibleForTesting RecordIdentifier getMinKey() { return minKey; http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 429960b..1e19a91 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -243,7 +243,8 @@ public class OrcRecordUpdater implements RecordUpdater { } if (options.getMinimumTransactionId() != options.getMaximumTransactionId() && !options.isWritingBase()){ - flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), true, 8, + //throw if file already exists as that should never happen + flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), false, 8, options.getReporter()); flushLengths.writeLong(0); OrcInputFormat.SHIMS.hflush(flushLengths); @@ -349,6 +350,12 @@ public class OrcRecordUpdater implements RecordUpdater { return newInspector; } } + + /** + * The INSERT event always uses {@link #bucket} that this {@link RecordUpdater} was created with + * thus even for unbucketed tables, the N in bucket_N file name matches writerId/bucketId even for + * late split + */ private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row) throws IOException { this.operation.set(operation); @@ -394,6 +401,11 @@ public class OrcRecordUpdater implements RecordUpdater { Integer currentBucket = null; if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) { + /** + * make sure bucketProperty in the delete event is from the {@link row} rather than whatever + * {@link this#bucket} is. For bucketed tables, the 2 must agree on bucketId encoded in it + * not for necessarily the whole value. For unbucketed tables there is no relationship. + */ currentBucket = setBucket(bucketInspector.get( recIdInspector.getStructFieldData(rowValue, bucketField)), operation); // Initialize a deleteEventWriter if not yet done. (Lazy initialization)