HIVE-16077 UPDATE/DELETE fails with numBuckets > numReducers (Eugene Koifman, 
reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f53d07b8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f53d07b8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f53d07b8

Branch: refs/heads/master
Commit: f53d07b86e98f6a4f85b90a78701b8cad60244af
Parents: 5fc4900
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Fri Jul 28 17:56:03 2017 -0700
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Fri Jul 28 17:56:03 2017 -0700

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |    2 +
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |   51 +-
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java |    1 +
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |   46 +
 .../clientpositive/bucket_num_reducers_acid.q   |   30 +
 .../clientpositive/bucket_num_reducers_acid2.q  |   33 +
 .../bucket_num_reducers_acid.q.out              |   33 +
 .../bucket_num_reducers_acid2.q.out             |   44 +
 .../dynpart_sort_optimization_acid.q.out        | 1475 ------------------
 9 files changed, 225 insertions(+), 1490 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 6e88a4e..ba6c3e4 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -12,6 +12,8 @@ minimr.query.files=infer_bucket_sort_map_operators.q,\
   index_bitmap_auto.q,\
   scriptfile1.q,\
   bucket_num_reducers2.q,\
+  bucket_num_reducers_acid.q,\
+  bucket_num_reducers_acid2.q,\
   scriptfile1_win.q
 
 # These tests are disabled for minimr

http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 4d46d65..8999f6f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -250,6 +250,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     }
 
     public void abortWriters(FileSystem fs, boolean abort, boolean delete) 
throws HiveException {
+      //should this close updaters[]?
       for (int idx = 0; idx < outWriters.length; idx++) {
         if (outWriters[idx] != null) {
           try {
@@ -750,16 +751,15 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
         LOG.info(toString() + ": records written - " + numRows);
       }
 
-      // This should always be 0 for the final result file
-      int writerOffset = findWriterOffset(row);
+      int writerOffset;
       // This if/else chain looks ugly in the inner loop, but given that it 
will be 100% the same
       // for a given operator branch prediction should work quite nicely on it.
       // RecordUpdateer expects to get the actual row, not a serialized 
version of it.  Thus we
       // pass the row rather than recordValue.
       if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
-        rowOutWriters[writerOffset].write(recordValue);
+        rowOutWriters[findWriterOffset(row)].write(recordValue);
       } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
-        fpaths.updaters[writerOffset].insert(conf.getTransactionId(), row);
+        fpaths.updaters[findWriterOffset(row)].insert(conf.getTransactionId(), 
row);
       } else {
         // TODO I suspect we could skip much of the stuff above this in the 
function in the case
         // of update and delete.  But I don't understand all of the side 
effects of the above
@@ -772,22 +772,39 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
             bucketInspector.get(recIdInspector.getStructFieldData(recId, 
bucketField));
         int bucketNum = 
           
BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
-        if (fpaths.acidLastBucket != bucketNum) {
-          fpaths.acidLastBucket = bucketNum;
-          // Switch files
-          
fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)
 ? 0 : ++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
-              jc, conf.getTableInfo(), bucketNum, conf, 
fpaths.outPaths[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)
 ? 0 :fpaths.acidFileOffset],
-              rowInspector, reporter, 0);
+        writerOffset = 0;
+        if (multiFileSpray) {
+          //bucket_num_reducers_acid.q, 
TestTxnCommands.testMoreBucketsThanReducers()
+          if (!bucketMap.containsKey(bucketNum)) {
+            String extraMsg = "  (no path info/)" + recId;
+            if (fpaths != null && fpaths.finalPaths != null && 
fpaths.finalPaths.length > 0) {
+              extraMsg = "  (finalPaths[0]=" + fpaths.finalPaths[0] + ")/" + 
recId;
+            }
+            throw new IllegalStateException("Found bucketNum=" + bucketNum +
+              " from data but no mapping in 'bucketMap'." + extraMsg);
+          }
+          writerOffset = bucketMap.get(bucketNum);
+        }
+        if (fpaths.updaters[writerOffset] == null) {
+          /*data for delete commands always have ROW__ID which implies that 
the bucket ID
+          * for each row is known.  RecordUpdater creates bucket_N file based 
on 'bucketNum' thus
+          * delete events always land in the proper bucket_N file.  This could 
even handle
+          * cases where multiple writers are writing bucket_N file for the 
same N in which case
+          * Hive.copyFiles() will make one of them bucket_N_copy_M in the 
final location.  The
+          * reset of acid (read path) doesn't know how to handle copy_N files 
except for 'original'
+          * files (HIVE-16177)*/
+          fpaths.updaters[writerOffset] = 
HiveFileFormatUtils.getAcidRecordUpdater(
+            jc, conf.getTableInfo(), bucketNum, conf, 
fpaths.outPaths[writerOffset],
+            rowInspector, reporter, 0);
           if (LOG.isDebugEnabled()) {
             LOG.debug("Created updater for bucket number " + bucketNum + " 
using file " +
-                
fpaths.outPaths[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)
 ? 0 :fpaths.acidFileOffset]);
+              fpaths.outPaths[writerOffset]);
           }
         }
-
         if (conf.getWriteType() == AcidUtils.Operation.UPDATE) {
-          
fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)
 ? 0 :fpaths.acidFileOffset].update(conf.getTransactionId(), row);
+            fpaths.updaters[writerOffset].update(conf.getTransactionId(), row);
         } else if (conf.getWriteType() == AcidUtils.Operation.DELETE) {
-          
fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)
 ? 0 :fpaths.acidFileOffset].delete(conf.getTransactionId(), row);
+            fpaths.updaters[writerOffset].delete(conf.getTransactionId(), row);
         } else {
           throw new HiveException("Unknown write type " + 
conf.getWriteType().toString());
         }
@@ -817,6 +834,11 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     if (!multiFileSpray) {
       return 0;
     } else {
+      assert getConf().getWriteType() != AcidUtils.Operation.DELETE &&
+        getConf().getWriteType() != AcidUtils.Operation.UPDATE :
+        "Unexpected operation type: " + getConf().getWriteType();
+      //this is not used for DELETE commands (partitionEval is not set up 
correctly
+      // (or needed) for that
       Object[] bucketFieldValues = new Object[partitionEval.length];
       for(int i = 0; i < partitionEval.length; i++) {
         bucketFieldValues[i] = partitionEval[i].evaluate(row);
@@ -826,7 +848,6 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       int bucketNum = prtner.getBucket(key, null, totalFiles);
       return bucketMap.get(bucketNum);
     }
-
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index d40b89a..c30e8fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -479,6 +479,7 @@ public class OrcRecordUpdater implements RecordUpdater {
         }
       } else {
         if (writer == null) {
+          //so that we create empty bucket files when needed (but see 
HIVE-17138)
           writer = OrcFile.createWriter(path, writerOptions);
         }
         writer.close(); // normal close.

http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index c531aeb..c50c1a8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -888,4 +888,50 @@ public class TestTxnCommands {
 
     //make sure they are the same before and after compaction
   }
+  //@Ignore("see bucket_num_reducers_acid.q")
+  @Test
+  public void testMoreBucketsThanReducers() throws Exception {
+    //see bucket_num_reducers.q bucket_num_reducers2.q
+    // todo: try using set VerifyNumReducersHook.num.reducers=10;
+    d.destroy();
+    HiveConf hc = new HiveConf(hiveConf);
+    hc.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 1);
+    //this is used in multiple places, 
SemanticAnalyzer.getBucketingSortingDest() among others
+    hc.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 1);
+    hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false);
+    d = new Driver(hc);
+    d.setMaxRows(10000);
+    runStatementOnDriver("insert into " + Table.ACIDTBL + " 
values(1,1)");//txn X write to bucket1
+    runStatementOnDriver("insert into " + Table.ACIDTBL + " 
values(0,0),(3,3)");// txn X + 1 write to bucket0 + bucket1
+    runStatementOnDriver("update " + Table.ACIDTBL + " set b = -1");
+    List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBL + " 
order by a, b");
+    int[][] expected = {{0, -1}, {1, -1}, {3, -1}};
+    Assert.assertEquals(stringifyValues(expected), r);
+  }
+  //@Ignore("see bucket_num_reducers_acid2.q")
+  @Test
+  public void testMoreBucketsThanReducers2() throws Exception {
+    //todo: try using set VerifyNumReducersHook.num.reducers=10;
+    //see bucket_num_reducers.q bucket_num_reducers2.q
+    d.destroy();
+    HiveConf hc = new HiveConf(hiveConf);
+    hc.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 2);
+    //this is used in multiple places, 
SemanticAnalyzer.getBucketingSortingDest() among others
+    hc.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 2);
+    d = new Driver(hc);
+    d.setMaxRows(10000);
+    runStatementOnDriver("create table fourbuckets (a int, b int) clustered by 
(a) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    //below value for a is bucket id, for b - txn id (logically)
+    runStatementOnDriver("insert into fourbuckets values(0,1),(1,1)");//txn X 
write to b0 + b1
+    runStatementOnDriver("insert into fourbuckets values(2,2),(3,2)");// txn X 
+ 1 write to b2 + b3
+    runStatementOnDriver("insert into fourbuckets values(0,3),(1,3)");//txn X 
+ 2 write to b0 + b1
+    runStatementOnDriver("insert into fourbuckets values(2,4),(3,4)");//txn X 
+ 3 write to b2 + b3
+    //so with 2 FileSinks and 4 buckets, FS1 should see (0,1),(2,2),(0,3)(2,4) 
since data is sorted by ROW__ID where tnxid is the first component
+    //FS2 should see (1,1),(3,2),(1,3),(3,4)
+
+    runStatementOnDriver("update fourbuckets set b = -1");
+    List<String> r = runStatementOnDriver("select * from fourbuckets order by 
a, b");
+    int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, 
{3, -1}, {3, -1}};
+    Assert.assertEquals(stringifyValues(expected), r);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/ql/src/test/queries/clientpositive/bucket_num_reducers_acid.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/bucket_num_reducers_acid.q 
b/ql/src/test/queries/clientpositive/bucket_num_reducers_acid.q
new file mode 100644
index 0000000..a44668e
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/bucket_num_reducers_acid.q
@@ -0,0 +1,30 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.exec.mode.local.auto=false;
+
+set mapred.reduce.tasks = 1;
+
+-- This test sets number of mapred tasks to 1 for a table with 2 buckets,
+-- and uses a post-hook to confirm that 1 tasks were created
+
+drop table if exists bucket_nr_acid;
+create table bucket_nr_acid (a int, b int) clustered by (a) into 2 buckets 
stored as orc TBLPROPERTIES ('transactional'='true');
+set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyNumReducersHook;
+set VerifyNumReducersHook.num.reducers=1;
+
+-- txn X write to b1
+insert into bucket_nr_acid values(1,1);
+-- txn X + 1 write to bucket0 + b1
+insert into bucket_nr_acid values(0,0),(3,3);
+
+update bucket_nr_acid set b = -1;
+set hive.exec.post.hooks=;
+select * from bucket_nr_acid order by a, b;
+
+drop table bucket_nr_acid;
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q 
b/ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q
new file mode 100644
index 0000000..2e6aa61
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q
@@ -0,0 +1,33 @@
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.exec.mode.local.auto=false;
+
+set mapred.reduce.tasks = 2;
+
+-- This test sets number of mapred tasks to 2 for a table with 4 buckets,
+-- and uses a post-hook to confirm that 1 tasks were created
+
+drop table if exists bucket_nr_acid2;
+create table bucket_nr_acid2 (a int, b int) clustered by (a) into 4 buckets 
stored as orc TBLPROPERTIES ('transactional'='true');
+set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyNumReducersHook;
+set VerifyNumReducersHook.num.reducers=2;
+
+-- txn X write to b0 + b1
+insert into bucket_nr_acid2 values(0,1),(1,1);
+-- txn X + 1 write to b2 + b3
+insert into bucket_nr_acid2 values(2,2),(3,2);
+-- txn X + 2 write to b0 + b1
+insert into bucket_nr_acid2 values(0,3),(1,3);
+-- txn X + 3 write to b2 + b3
+insert into bucket_nr_acid2 values(2,4),(3,4);
+
+-- so with 2 FileSinks and 4 buckets, FS1 should see (0,1),(2,2),(0,3)(2,4) 
since data is sorted by
+-- ROW__ID where tnxid is the first component FS2 should see 
(1,1),(3,2),(1,3),(3,4)
+
+
+update bucket_nr_acid2 set b = -1;
+set hive.exec.post.hooks=;
+select * from bucket_nr_acid2 order by a, b;
+
+drop table bucket_nr_acid2;
+

http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/ql/src/test/results/clientpositive/bucket_num_reducers_acid.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/bucket_num_reducers_acid.q.out 
b/ql/src/test/results/clientpositive/bucket_num_reducers_acid.q.out
new file mode 100644
index 0000000..8ea23d7
--- /dev/null
+++ b/ql/src/test/results/clientpositive/bucket_num_reducers_acid.q.out
@@ -0,0 +1,33 @@
+PREHOOK: query: drop table if exists bucket_nr_acid
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists bucket_nr_acid
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table bucket_nr_acid (a int, b int) clustered by (a) 
into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@bucket_nr_acid
+POSTHOOK: query: create table bucket_nr_acid (a int, b int) clustered by (a) 
into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@bucket_nr_acid
+PREHOOK: query: insert into bucket_nr_acid values(1,1)
+PREHOOK: type: QUERY
+PREHOOK: Output: default@bucket_nr_acid
+PREHOOK: query: insert into bucket_nr_acid values(0,0),(3,3)
+PREHOOK: type: QUERY
+PREHOOK: Output: default@bucket_nr_acid
+PREHOOK: query: update bucket_nr_acid set b = -1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bucket_nr_acid
+PREHOOK: Output: default@bucket_nr_acid
+PREHOOK: query: select * from bucket_nr_acid order by a, b
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bucket_nr_acid
+#### A masked pattern was here ####
+0      -1
+1      -1
+3      -1
+PREHOOK: query: drop table bucket_nr_acid
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@bucket_nr_acid
+PREHOOK: Output: default@bucket_nr_acid

http://git-wip-us.apache.org/repos/asf/hive/blob/f53d07b8/ql/src/test/results/clientpositive/bucket_num_reducers_acid2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/bucket_num_reducers_acid2.q.out 
b/ql/src/test/results/clientpositive/bucket_num_reducers_acid2.q.out
new file mode 100644
index 0000000..cabb4f7
--- /dev/null
+++ b/ql/src/test/results/clientpositive/bucket_num_reducers_acid2.q.out
@@ -0,0 +1,44 @@
+PREHOOK: query: drop table if exists bucket_nr_acid2
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists bucket_nr_acid2
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table bucket_nr_acid2 (a int, b int) clustered by (a) 
into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@bucket_nr_acid2
+POSTHOOK: query: create table bucket_nr_acid2 (a int, b int) clustered by (a) 
into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@bucket_nr_acid2
+PREHOOK: query: insert into bucket_nr_acid2 values(0,1),(1,1)
+PREHOOK: type: QUERY
+PREHOOK: Output: default@bucket_nr_acid2
+PREHOOK: query: insert into bucket_nr_acid2 values(2,2),(3,2)
+PREHOOK: type: QUERY
+PREHOOK: Output: default@bucket_nr_acid2
+PREHOOK: query: insert into bucket_nr_acid2 values(0,3),(1,3)
+PREHOOK: type: QUERY
+PREHOOK: Output: default@bucket_nr_acid2
+PREHOOK: query: insert into bucket_nr_acid2 values(2,4),(3,4)
+PREHOOK: type: QUERY
+PREHOOK: Output: default@bucket_nr_acid2
+PREHOOK: query: update bucket_nr_acid2 set b = -1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bucket_nr_acid2
+PREHOOK: Output: default@bucket_nr_acid2
+PREHOOK: query: select * from bucket_nr_acid2 order by a, b
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bucket_nr_acid2
+#### A masked pattern was here ####
+0      -1
+0      -1
+1      -1
+1      -1
+2      -1
+2      -1
+3      -1
+3      -1
+PREHOOK: query: drop table bucket_nr_acid2
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@bucket_nr_acid2
+PREHOOK: Output: default@bucket_nr_acid2

Reply via email to