HIVE-16077 UPDATE/DELETE fails with numBuckets > numReducers (Eugene Koifman, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f53d07b8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f53d07b8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f53d07b8 Branch: refs/heads/master Commit: f53d07b86e98f6a4f85b90a78701b8cad60244af Parents: 5fc4900 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Fri Jul 28 17:56:03 2017 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Fri Jul 28 17:56:03 2017 -0700 ---------------------------------------------------------------------- .../test/resources/testconfiguration.properties | 2 + .../hadoop/hive/ql/exec/FileSinkOperator.java | 51 +- .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 1 + .../apache/hadoop/hive/ql/TestTxnCommands.java | 46 + .../clientpositive/bucket_num_reducers_acid.q | 30 + .../clientpositive/bucket_num_reducers_acid2.q | 33 + .../bucket_num_reducers_acid.q.out | 33 + .../bucket_num_reducers_acid2.q.out | 44 + .../dynpart_sort_optimization_acid.q.out | 1475 ------------------ 9 files changed, 225 insertions(+), 1490 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 6e88a4e..ba6c3e4 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -12,6 +12,8 @@ minimr.query.files=infer_bucket_sort_map_operators.q,\ index_bitmap_auto.q,\ scriptfile1.q,\ bucket_num_reducers2.q,\ + bucket_num_reducers_acid.q,\ + bucket_num_reducers_acid2.q,\ scriptfile1_win.q # These tests are disabled for minimr http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 4d46d65..8999f6f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -250,6 +250,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws HiveException { + //should this close updaters[]? for (int idx = 0; idx < outWriters.length; idx++) { if (outWriters[idx] != null) { try { @@ -750,16 +751,15 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements LOG.info(toString() + ": records written - " + numRows); } - // This should always be 0 for the final result file - int writerOffset = findWriterOffset(row); + int writerOffset; // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same // for a given operator branch prediction should work quite nicely on it. // RecordUpdateer expects to get the actual row, not a serialized version of it. Thus we // pass the row rather than recordValue. if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) { - rowOutWriters[writerOffset].write(recordValue); + rowOutWriters[findWriterOffset(row)].write(recordValue); } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) { - fpaths.updaters[writerOffset].insert(conf.getTransactionId(), row); + fpaths.updaters[findWriterOffset(row)].insert(conf.getTransactionId(), row); } else { // TODO I suspect we could skip much of the stuff above this in the function in the case // of update and delete. But I don't understand all of the side effects of the above @@ -772,22 +772,39 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField)); int bucketNum = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty); - if (fpaths.acidLastBucket != bucketNum) { - fpaths.acidLastBucket = bucketNum; - // Switch files - fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 : ++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater( - jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset], - rowInspector, reporter, 0); + writerOffset = 0; + if (multiFileSpray) { + //bucket_num_reducers_acid.q, TestTxnCommands.testMoreBucketsThanReducers() + if (!bucketMap.containsKey(bucketNum)) { + String extraMsg = " (no path info/)" + recId; + if (fpaths != null && fpaths.finalPaths != null && fpaths.finalPaths.length > 0) { + extraMsg = " (finalPaths[0]=" + fpaths.finalPaths[0] + ")/" + recId; + } + throw new IllegalStateException("Found bucketNum=" + bucketNum + + " from data but no mapping in 'bucketMap'." + extraMsg); + } + writerOffset = bucketMap.get(bucketNum); + } + if (fpaths.updaters[writerOffset] == null) { + /*data for delete commands always have ROW__ID which implies that the bucket ID + * for each row is known. RecordUpdater creates bucket_N file based on 'bucketNum' thus + * delete events always land in the proper bucket_N file. This could even handle + * cases where multiple writers are writing bucket_N file for the same N in which case + * Hive.copyFiles() will make one of them bucket_N_copy_M in the final location. The + * reset of acid (read path) doesn't know how to handle copy_N files except for 'original' + * files (HIVE-16177)*/ + fpaths.updaters[writerOffset] = HiveFileFormatUtils.getAcidRecordUpdater( + jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[writerOffset], + rowInspector, reporter, 0); if (LOG.isDebugEnabled()) { LOG.debug("Created updater for bucket number " + bucketNum + " using file " + - fpaths.outPaths[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset]); + fpaths.outPaths[writerOffset]); } } - if (conf.getWriteType() == AcidUtils.Operation.UPDATE) { - fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset].update(conf.getTransactionId(), row); + fpaths.updaters[writerOffset].update(conf.getTransactionId(), row); } else if (conf.getWriteType() == AcidUtils.Operation.DELETE) { - fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset].delete(conf.getTransactionId(), row); + fpaths.updaters[writerOffset].delete(conf.getTransactionId(), row); } else { throw new HiveException("Unknown write type " + conf.getWriteType().toString()); } @@ -817,6 +834,11 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements if (!multiFileSpray) { return 0; } else { + assert getConf().getWriteType() != AcidUtils.Operation.DELETE && + getConf().getWriteType() != AcidUtils.Operation.UPDATE : + "Unexpected operation type: " + getConf().getWriteType(); + //this is not used for DELETE commands (partitionEval is not set up correctly + // (or needed) for that Object[] bucketFieldValues = new Object[partitionEval.length]; for(int i = 0; i < partitionEval.length; i++) { bucketFieldValues[i] = partitionEval[i].evaluate(row); @@ -826,7 +848,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements int bucketNum = prtner.getBucket(key, null, totalFiles); return bucketMap.get(bucketNum); } - } /** http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index d40b89a..c30e8fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -479,6 +479,7 @@ public class OrcRecordUpdater implements RecordUpdater { } } else { if (writer == null) { + //so that we create empty bucket files when needed (but see HIVE-17138) writer = OrcFile.createWriter(path, writerOptions); } writer.close(); // normal close. http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index c531aeb..c50c1a8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -888,4 +888,50 @@ public class TestTxnCommands { //make sure they are the same before and after compaction } + //@Ignore("see bucket_num_reducers_acid.q") + @Test + public void testMoreBucketsThanReducers() throws Exception { + //see bucket_num_reducers.q bucket_num_reducers2.q + // todo: try using set VerifyNumReducersHook.num.reducers=10; + d.destroy(); + HiveConf hc = new HiveConf(hiveConf); + hc.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 1); + //this is used in multiple places, SemanticAnalyzer.getBucketingSortingDest() among others + hc.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 1); + hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); + d = new Driver(hc); + d.setMaxRows(10000); + runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,1)");//txn X write to bucket1 + runStatementOnDriver("insert into " + Table.ACIDTBL + " values(0,0),(3,3)");// txn X + 1 write to bucket0 + bucket1 + runStatementOnDriver("update " + Table.ACIDTBL + " set b = -1"); + List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBL + " order by a, b"); + int[][] expected = {{0, -1}, {1, -1}, {3, -1}}; + Assert.assertEquals(stringifyValues(expected), r); + } + //@Ignore("see bucket_num_reducers_acid2.q") + @Test + public void testMoreBucketsThanReducers2() throws Exception { + //todo: try using set VerifyNumReducersHook.num.reducers=10; + //see bucket_num_reducers.q bucket_num_reducers2.q + d.destroy(); + HiveConf hc = new HiveConf(hiveConf); + hc.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 2); + //this is used in multiple places, SemanticAnalyzer.getBucketingSortingDest() among others + hc.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 2); + d = new Driver(hc); + d.setMaxRows(10000); + runStatementOnDriver("create table fourbuckets (a int, b int) clustered by (a) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + //below value for a is bucket id, for b - txn id (logically) + runStatementOnDriver("insert into fourbuckets values(0,1),(1,1)");//txn X write to b0 + b1 + runStatementOnDriver("insert into fourbuckets values(2,2),(3,2)");// txn X + 1 write to b2 + b3 + runStatementOnDriver("insert into fourbuckets values(0,3),(1,3)");//txn X + 2 write to b0 + b1 + runStatementOnDriver("insert into fourbuckets values(2,4),(3,4)");//txn X + 3 write to b2 + b3 + //so with 2 FileSinks and 4 buckets, FS1 should see (0,1),(2,2),(0,3)(2,4) since data is sorted by ROW__ID where tnxid is the first component + //FS2 should see (1,1),(3,2),(1,3),(3,4) + + runStatementOnDriver("update fourbuckets set b = -1"); + List<String> r = runStatementOnDriver("select * from fourbuckets order by a, b"); + int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, {3, -1}, {3, -1}}; + Assert.assertEquals(stringifyValues(expected), r); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/ql/src/test/queries/clientpositive/bucket_num_reducers_acid.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/bucket_num_reducers_acid.q b/ql/src/test/queries/clientpositive/bucket_num_reducers_acid.q new file mode 100644 index 0000000..a44668e --- /dev/null +++ b/ql/src/test/queries/clientpositive/bucket_num_reducers_acid.q @@ -0,0 +1,30 @@ +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.exec.mode.local.auto=false; + +set mapred.reduce.tasks = 1; + +-- This test sets number of mapred tasks to 1 for a table with 2 buckets, +-- and uses a post-hook to confirm that 1 tasks were created + +drop table if exists bucket_nr_acid; +create table bucket_nr_acid (a int, b int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true'); +set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyNumReducersHook; +set VerifyNumReducersHook.num.reducers=1; + +-- txn X write to b1 +insert into bucket_nr_acid values(1,1); +-- txn X + 1 write to bucket0 + b1 +insert into bucket_nr_acid values(0,0),(3,3); + +update bucket_nr_acid set b = -1; +set hive.exec.post.hooks=; +select * from bucket_nr_acid order by a, b; + +drop table bucket_nr_acid; + + + + + + http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q b/ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q new file mode 100644 index 0000000..2e6aa61 --- /dev/null +++ b/ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q @@ -0,0 +1,33 @@ +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.exec.mode.local.auto=false; + +set mapred.reduce.tasks = 2; + +-- This test sets number of mapred tasks to 2 for a table with 4 buckets, +-- and uses a post-hook to confirm that 1 tasks were created + +drop table if exists bucket_nr_acid2; +create table bucket_nr_acid2 (a int, b int) clustered by (a) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true'); +set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyNumReducersHook; +set VerifyNumReducersHook.num.reducers=2; + +-- txn X write to b0 + b1 +insert into bucket_nr_acid2 values(0,1),(1,1); +-- txn X + 1 write to b2 + b3 +insert into bucket_nr_acid2 values(2,2),(3,2); +-- txn X + 2 write to b0 + b1 +insert into bucket_nr_acid2 values(0,3),(1,3); +-- txn X + 3 write to b2 + b3 +insert into bucket_nr_acid2 values(2,4),(3,4); + +-- so with 2 FileSinks and 4 buckets, FS1 should see (0,1),(2,2),(0,3)(2,4) since data is sorted by +-- ROW__ID where tnxid is the first component FS2 should see (1,1),(3,2),(1,3),(3,4) + + +update bucket_nr_acid2 set b = -1; +set hive.exec.post.hooks=; +select * from bucket_nr_acid2 order by a, b; + +drop table bucket_nr_acid2; + http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/ql/src/test/results/clientpositive/bucket_num_reducers_acid.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/bucket_num_reducers_acid.q.out b/ql/src/test/results/clientpositive/bucket_num_reducers_acid.q.out new file mode 100644 index 0000000..8ea23d7 --- /dev/null +++ b/ql/src/test/results/clientpositive/bucket_num_reducers_acid.q.out @@ -0,0 +1,33 @@ +PREHOOK: query: drop table if exists bucket_nr_acid +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists bucket_nr_acid +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table bucket_nr_acid (a int, b int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@bucket_nr_acid +POSTHOOK: query: create table bucket_nr_acid (a int, b int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@bucket_nr_acid +PREHOOK: query: insert into bucket_nr_acid values(1,1) +PREHOOK: type: QUERY +PREHOOK: Output: default@bucket_nr_acid +PREHOOK: query: insert into bucket_nr_acid values(0,0),(3,3) +PREHOOK: type: QUERY +PREHOOK: Output: default@bucket_nr_acid +PREHOOK: query: update bucket_nr_acid set b = -1 +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_nr_acid +PREHOOK: Output: default@bucket_nr_acid +PREHOOK: query: select * from bucket_nr_acid order by a, b +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_nr_acid +#### A masked pattern was here #### +0 -1 +1 -1 +3 -1 +PREHOOK: query: drop table bucket_nr_acid +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@bucket_nr_acid +PREHOOK: Output: default@bucket_nr_acid http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/ql/src/test/results/clientpositive/bucket_num_reducers_acid2.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/bucket_num_reducers_acid2.q.out b/ql/src/test/results/clientpositive/bucket_num_reducers_acid2.q.out new file mode 100644 index 0000000..cabb4f7 --- /dev/null +++ b/ql/src/test/results/clientpositive/bucket_num_reducers_acid2.q.out @@ -0,0 +1,44 @@ +PREHOOK: query: drop table if exists bucket_nr_acid2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists bucket_nr_acid2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table bucket_nr_acid2 (a int, b int) clustered by (a) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@bucket_nr_acid2 +POSTHOOK: query: create table bucket_nr_acid2 (a int, b int) clustered by (a) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@bucket_nr_acid2 +PREHOOK: query: insert into bucket_nr_acid2 values(0,1),(1,1) +PREHOOK: type: QUERY +PREHOOK: Output: default@bucket_nr_acid2 +PREHOOK: query: insert into bucket_nr_acid2 values(2,2),(3,2) +PREHOOK: type: QUERY +PREHOOK: Output: default@bucket_nr_acid2 +PREHOOK: query: insert into bucket_nr_acid2 values(0,3),(1,3) +PREHOOK: type: QUERY +PREHOOK: Output: default@bucket_nr_acid2 +PREHOOK: query: insert into bucket_nr_acid2 values(2,4),(3,4) +PREHOOK: type: QUERY +PREHOOK: Output: default@bucket_nr_acid2 +PREHOOK: query: update bucket_nr_acid2 set b = -1 +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_nr_acid2 +PREHOOK: Output: default@bucket_nr_acid2 +PREHOOK: query: select * from bucket_nr_acid2 order by a, b +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_nr_acid2 +#### A masked pattern was here #### +0 -1 +0 -1 +1 -1 +1 -1 +2 -1 +2 -1 +3 -1 +3 -1 +PREHOOK: query: drop table bucket_nr_acid2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@bucket_nr_acid2 +PREHOOK: Output: default@bucket_nr_acid2