HIVE-16832 duplicate ROW__ID possible in multi insert into transactional table 
(Eugene Koifman, reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6af30bf2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6af30bf2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6af30bf2

Branch: refs/heads/master
Commit: 6af30bf2b515b1cedce435c5b3e0acdd7eb9891d
Parents: 353781c
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Wed Jul 12 16:02:16 2017 -0700
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Wed Jul 12 16:02:16 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../mutate/worker/BucketIdResolverImpl.java     |   6 +-
 .../mutate/worker/MutatorCoordinator.java       |   7 +-
 .../streaming/mutate/worker/MutatorImpl.java    |  16 +-
 .../streaming/mutate/StreamingAssert.java       |   3 +-
 .../streaming/mutate/TestMutations.java         |  49 ++++--
 .../mutate/worker/TestBucketIdResolverImpl.java |   6 +-
 .../mutate/worker/TestMutatorImpl.java          |   2 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |   6 +-
 .../hadoop/hive/ql/io/AcidOutputFormat.java     |  12 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |   4 +-
 .../apache/hadoop/hive/ql/io/BucketCodec.java   | 111 ++++++++++++
 .../hadoop/hive/ql/io/RecordIdentifier.java     |  21 ++-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |   8 +-
 .../hive/ql/io/orc/OrcRawRecordMerger.java      |  17 +-
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java |  87 +++++-----
 .../io/orc/VectorizedOrcAcidRowBatchReader.java | 174 ++++++++++++++-----
 .../ql/parse/UpdateDeleteSemanticAnalyzer.java  |  10 +-
 .../apache/hadoop/hive/ql/udf/UDFToInteger.java |   5 +-
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |  58 +------
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 132 ++++++++++----
 .../ql/TestTxnCommands2WithSplitUpdate.java     |  41 -----
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java |   8 +-
 .../hive/ql/io/orc/TestInputOutputFormat.java   |   9 +-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  | 109 ++++++------
 .../hive/ql/io/orc/TestOrcRecordUpdater.java    |  15 +-
 .../TestVectorizedOrcAcidRowBatchReader.java    |   2 +
 .../clientpositive/acid_bucket_pruning.q        |   8 +-
 .../clientpositive/acid_table_stats.q.out       |  14 +-
 .../clientpositive/autoColumnStats_4.q.out      |   4 +-
 .../insert_values_orig_table_use_metadata.q.out |  24 +--
 .../llap/acid_bucket_pruning.q.out              |  30 +++-
 .../test/results/clientpositive/row__id.q.out   |  18 +-
 33 files changed, 637 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 231dc9f..d31d5a0 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1321,6 +1321,9 @@ public class HiveConf extends Configuration {
     HIVETESTMODEROLLBACKTXN("hive.test.rollbacktxn", false, "For testing only. 
 Will mark every ACID transaction aborted", false),
     HIVETESTMODEFAILCOMPACTION("hive.test.fail.compaction", false, "For 
testing only.  Will cause CompactorMR to fail.", false),
     HIVETESTMODEFAILHEARTBEATER("hive.test.fail.heartbeater", false, "For 
testing only.  Will cause Heartbeater to fail.", false),
+    TESTMODE_BUCKET_CODEC_VERSION("hive.test.bucketcodec.version", 1,
+      "For testing only.  Will make ACID subsystem write 
RecordIdentifier.bucketId in specified\n" +
+        "format", false),
 
     HIVEMERGEMAPFILES("hive.merge.mapfiles", true,
         "Merge small files at the end of a map-only job"),

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
index 571e076..7c2cade 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
@@ -19,6 +19,8 @@ package org.apache.hive.hcatalog.streaming.mutate.worker;
 
 import java.util.List;
 
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
@@ -68,7 +70,9 @@ public class BucketIdResolverImpl implements BucketIdResolver 
{
   @Override
   public Object attachBucketIdToRecord(Object record) {
     int bucketId = computeBucketId(record);
-    RecordIdentifier recordIdentifier = new 
RecordIdentifier(INVALID_TRANSACTION_ID, bucketId, INVALID_ROW_ID);
+    int bucketProperty =
+      BucketCodec.V1.encode(new 
AcidOutputFormat.Options(null).bucket(bucketId));
+    RecordIdentifier recordIdentifier = new 
RecordIdentifier(INVALID_TRANSACTION_ID, bucketProperty, INVALID_ROW_ID);
     structObjectInspector.setStructFieldData(record, recordIdentifierField, 
recordIdentifier);
     return record;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
index 1ad0842..ae23153 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -183,7 +184,7 @@ public class MutatorCoordinator implements Closeable, 
Flushable {
   private void reconfigureState(OperationType operationType, List<String> 
newPartitionValues, Object record)
     throws WorkerException {
     RecordIdentifier newRecordIdentifier = 
extractRecordIdentifier(operationType, newPartitionValues, record);
-    int newBucketId = newRecordIdentifier.getBucketId();
+    int newBucketId = newRecordIdentifier.getBucketProperty();
 
     if (newPartitionValues == null) {
       newPartitionValues = Collections.emptyList();
@@ -209,8 +210,10 @@ public class MutatorCoordinator implements Closeable, 
Flushable {
   private RecordIdentifier extractRecordIdentifier(OperationType 
operationType, List<String> newPartitionValues,
       Object record) throws BucketIdException {
     RecordIdentifier recordIdentifier = 
recordInspector.extractRecordIdentifier(record);
+    int bucketIdFromRecord = BucketCodec.determineVersion(
+      
recordIdentifier.getBucketProperty()).decodeWriterId(recordIdentifier.getBucketProperty());
     int computedBucketId = bucketIdResolver.computeBucketId(record);
-    if (operationType != OperationType.DELETE && 
recordIdentifier.getBucketId() != computedBucketId) {
+    if (operationType != OperationType.DELETE && bucketIdFromRecord != 
computedBucketId) {
       throw new BucketIdException("RecordIdentifier.bucketId != computed 
bucketId (" + computedBucketId
           + ") for record " + recordIdentifier + " in partition " + 
newPartitionValues + ".");
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
index 8998de9..05cf8b7 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -31,20 +33,24 @@ public class MutatorImpl implements Mutator {
 
   private final long transactionId;
   private final Path partitionPath;
-  private final int bucketId;
+  private final int bucketProperty;
   private final Configuration configuration;
   private final int recordIdColumn;
   private final ObjectInspector objectInspector;
   private RecordUpdater updater;
 
+  /**
+   * @param bucketProperty - from existing {@link 
RecordIdentifier#getBucketProperty()}
+   * @throws IOException
+   */
   public MutatorImpl(Configuration configuration, int recordIdColumn, 
ObjectInspector objectInspector,
-      AcidOutputFormat<?, ?> outputFormat, long transactionId, Path 
partitionPath, int bucketId) throws IOException {
+      AcidOutputFormat<?, ?> outputFormat, long transactionId, Path 
partitionPath, int bucketProperty) throws IOException {
     this.configuration = configuration;
     this.recordIdColumn = recordIdColumn;
     this.objectInspector = objectInspector;
     this.transactionId = transactionId;
     this.partitionPath = partitionPath;
-    this.bucketId = bucketId;
+    this.bucketProperty = bucketProperty;
 
     updater = createRecordUpdater(outputFormat);
   }
@@ -84,10 +90,12 @@ public class MutatorImpl implements Mutator {
   @Override
   public String toString() {
     return "ObjectInspectorMutator [transactionId=" + transactionId + ", 
partitionPath=" + partitionPath
-        + ", bucketId=" + bucketId + "]";
+        + ", bucketId=" + bucketProperty + "]";
   }
 
   protected RecordUpdater createRecordUpdater(AcidOutputFormat<?, ?> 
outputFormat) throws IOException {
+    int bucketId = BucketCodec
+      .determineVersion(bucketProperty).decodeWriterId(bucketProperty); 
     return outputFormat.getRecordUpdater(
         partitionPath,
         new AcidOutputFormat.Options(configuration)

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
index 6867679..de41d34 100644
--- 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
@@ -148,7 +147,7 @@ public class StreamingAssert {
     while (recordReader.next(key, value)) {
       RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
       Record record = new Record(new 
RecordIdentifier(recordIdentifier.getTransactionId(),
-          recordIdentifier.getBucketId(), recordIdentifier.getRowId()), 
value.toString());
+          recordIdentifier.getBucketProperty(), recordIdentifier.getRowId()), 
value.toString());
       System.out.println(record);
       records.add(record);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
index f1de1df..ab9f313 100644
--- 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hive.hcatalog.streaming.TestStreaming;
 import org.apache.hive.hcatalog.streaming.mutate.StreamingAssert.Factory;
@@ -101,6 +103,10 @@ public class TestMutations {
         .addColumn("msg", "string")
         .bucketCols(Collections.singletonList("string"));
   }
+  private static int encodeBucket(int bucketId) {
+    return BucketCodec.V1.encode(
+      new AcidOutputFormat.Options(null).bucket(bucketId));
+  }
 
   @Test
   public void testTransactionBatchEmptyCommitPartitioned() throws Exception {
@@ -242,7 +248,8 @@ public class TestMutations {
     List<Record> readRecords = streamingAssertions.readRecords();
     assertThat(readRecords.size(), is(1));
     assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
-    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 0L)));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L,
+      encodeBucket(0), 0L)));
 
     assertThat(transaction.getState(), is(COMMITTED));
     client.close();
@@ -299,7 +306,8 @@ public class TestMutations {
     List<Record> readRecords = streamingAssertions.readRecords();
     assertThat(readRecords.size(), is(1));
     assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
-    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 0L)));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L,
+      encodeBucket(0), 0L)));
 
     // EUROPE_UK
     streamingAssertions = assertionFactory.newStreamingAssert(table, 
EUROPE_UK);
@@ -310,7 +318,8 @@ public class TestMutations {
     readRecords = streamingAssertions.readRecords();
     assertThat(readRecords.size(), is(1));
     assertThat(readRecords.get(0).getRow(), is("{2, Hello streaming}"));
-    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 0L)));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L,
+      encodeBucket(0), 0L)));
 
     // EUROPE_FRANCE
     streamingAssertions = assertionFactory.newStreamingAssert(table, 
EUROPE_FRANCE);
@@ -321,9 +330,11 @@ public class TestMutations {
     readRecords = streamingAssertions.readRecords();
     assertThat(readRecords.size(), is(2));
     assertThat(readRecords.get(0).getRow(), is("{3, Hello streaming}"));
-    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 0L)));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L,
+      encodeBucket(0), 0L)));
     assertThat(readRecords.get(1).getRow(), is("{4, Bonjour streaming}"));
-    assertThat(readRecords.get(1).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 1L)));
+    assertThat(readRecords.get(1).getRecordIdentifier(), is(new 
RecordIdentifier(1L,
+      encodeBucket(0), 1L)));
 
     client.close();
   }
@@ -369,7 +380,8 @@ public class TestMutations {
     List<Record> readRecords = streamingAssertions.readRecords();
     assertThat(readRecords.size(), is(1));
     assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
-    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 0L)));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L,
+      encodeBucket(0), 0L)));
 
     assertThat(transaction.getState(), is(COMMITTED));
     client.close();
@@ -499,13 +511,15 @@ public class TestMutations {
         "Namaste streaming 3"));
 
     mutateCoordinator.update(ASIA_INDIA, new MutableRecord(2, "UPDATED: 
Namaste streaming 2", new RecordIdentifier(1L,
-        0, 1L)));
+      encodeBucket(0), 1L)));
     mutateCoordinator.insert(ASIA_INDIA, asiaIndiaRecord3);
-    mutateCoordinator.delete(EUROPE_UK, new MutableRecord(3, "Hello streaming 
1", new RecordIdentifier(1L, 0, 0L)));
+    mutateCoordinator.delete(EUROPE_UK, new MutableRecord(3, "Hello streaming 
1", new RecordIdentifier(1L,
+      encodeBucket(0), 0L)));
     mutateCoordinator.delete(EUROPE_FRANCE,
-        new MutableRecord(5, "Bonjour streaming 1", new RecordIdentifier(1L, 
0, 0L)));
+        new MutableRecord(5, "Bonjour streaming 1", new RecordIdentifier(1L,
+          encodeBucket(0), 0L)));
     mutateCoordinator.update(EUROPE_FRANCE, new MutableRecord(6, "UPDATED: 
Bonjour streaming 2", new RecordIdentifier(
-        1L, 0, 1L)));
+        1L, encodeBucket(0), 1L)));
     mutateCoordinator.close();
 
     mutateTransaction.commit();
@@ -518,11 +532,14 @@ public class TestMutations {
     List<Record> indiaRecords = indiaAssertions.readRecords();
     assertThat(indiaRecords.size(), is(3));
     assertThat(indiaRecords.get(0).getRow(), is("{1, Namaste streaming 1}"));
-    assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 0L)));
+    assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L,
+      encodeBucket(0), 0L)));
     assertThat(indiaRecords.get(1).getRow(), is("{2, UPDATED: Namaste 
streaming 2}"));
-    assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 1L)));
+    assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new 
RecordIdentifier(1L,
+      encodeBucket(0), 1L)));
     assertThat(indiaRecords.get(2).getRow(), is("{20, Namaste streaming 3}"));
-    assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new 
RecordIdentifier(2L, 0, 0L)));
+    assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new 
RecordIdentifier(2L,
+      encodeBucket(0), 0L)));
 
     StreamingAssert ukAssertions = assertionFactory.newStreamingAssert(table, 
EUROPE_UK);
     ukAssertions.assertMinTransactionId(1L);
@@ -530,7 +547,8 @@ public class TestMutations {
     List<Record> ukRecords = ukAssertions.readRecords();
     assertThat(ukRecords.size(), is(1));
     assertThat(ukRecords.get(0).getRow(), is("{4, Hello streaming 2}"));
-    assertThat(ukRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 1L)));
+    assertThat(ukRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L,
+      encodeBucket(0), 1L)));
 
     StreamingAssert franceAssertions = 
assertionFactory.newStreamingAssert(table, EUROPE_FRANCE);
     franceAssertions.assertMinTransactionId(1L);
@@ -538,7 +556,8 @@ public class TestMutations {
     List<Record> franceRecords = franceAssertions.readRecords();
     assertThat(franceRecords.size(), is(1));
     assertThat(franceRecords.get(0).getRow(), is("{6, UPDATED: Bonjour 
streaming 2}"));
-    assertThat(franceRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 1L)));
+    assertThat(franceRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L,
+      encodeBucket(0), 1L)));
 
     client.close();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
index 437946b..03c28a3 100644
--- 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
@@ -20,6 +20,8 @@ package org.apache.hive.hcatalog.streaming.mutate.worker;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hive.hcatalog.streaming.mutate.MutableRecord;
@@ -40,7 +42,9 @@ public class TestBucketIdResolverImpl {
   public void testAttachBucketIdToRecord() {
     MutableRecord record = new MutableRecord(1, "hello");
     capturingBucketIdResolver.attachBucketIdToRecord(record);
-    assertThat(record.rowId, is(new RecordIdentifier(-1L, 1, -1L)));
+    assertThat(record.rowId, is(new RecordIdentifier(-1L, 
+      BucketCodec.V1.encode(new AcidOutputFormat.Options(null).bucket(1)),
+      -1L)));
     assertThat(record.id, is(1));
     assertThat(record.msg.toString(), is("hello"));
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java
index 9aeeb31..2273e06 100644
--- 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java
@@ -75,7 +75,7 @@ public class TestMutatorImpl {
   public void testCreatesRecordReader() throws IOException {
     verify(mockOutputFormat).getRecordUpdater(eq(PATH), 
captureOptions.capture());
     Options options = captureOptions.getValue();
-    assertThat(options.getBucket(), is(BUCKET_ID));
+    assertThat(options.getBucketId(), is(BUCKET_ID));
     assertThat(options.getConfiguration(), is((Configuration) configuration));
     assertThat(options.getInspector(), is(mockObjectInspector));
     assertThat(options.getRecordIdColumn(), is(RECORD_ID_COLUMN));

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 3e09432..4d46d65 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -75,7 +76,6 @@ import org.slf4j.LoggerFactory;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
-import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -768,8 +768,10 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
         // Find the bucket id, and switch buckets if need to
         ObjectInspector rowInspector = bDynParts ? subSetOI : 
outputObjInspector;
         Object recId = 
((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField);
-        int bucketNum =
+        int bucketProperty =
             bucketInspector.get(recIdInspector.getStructFieldData(recId, 
bucketField));
+        int bucketNum = 
+          
BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
         if (fpaths.acidLastBucket != bucketNum) {
           fpaths.acidLastBucket = bucketNum;
           // Switch files

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
index 405cfde..a614bde 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
@@ -51,7 +51,7 @@ public interface AcidOutputFormat<K extends 
WritableComparable, V> extends HiveO
     private Reporter reporter;
     private long minimumTransactionId;
     private long maximumTransactionId;
-    private int bucket;
+    private int bucketId;
     /**
      * Based on {@link 
org.apache.hadoop.hive.ql.metadata.Hive#mvFile(HiveConf, FileSystem, Path, 
FileSystem, Path, boolean, boolean)}
      * _copy_N starts with 1.
@@ -176,12 +176,12 @@ public interface AcidOutputFormat<K extends 
WritableComparable, V> extends HiveO
     }
 
     /**
-     * The bucket that is included in this file.
-     * @param bucket the bucket number
+     * The bucketId that is included in this file.
+     * @param bucket the bucketId number
      * @return this
      */
     public Options bucket(int bucket) {
-      this.bucket = bucket;
+      this.bucketId = bucket;
       return this;
     }
 
@@ -293,8 +293,8 @@ public interface AcidOutputFormat<K extends 
WritableComparable, V> extends HiveO
       return writingDeleteDelta;
     }
 
-    public int getBucket() {
-      return bucket;
+    public int getBucketId() {
+      return bucketId;
     }
 
     public int getRecordIdColumn() {

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 1c03736..1e33424 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -196,7 +196,7 @@ public class AcidUtils {
     String subdir;
     if (options.getOldStyle()) {
       return new Path(directory, String.format(LEGACY_FILE_BUCKET_DIGITS,
-          options.getBucket()) + "_0");
+          options.getBucketId()) + "_0");
     } else if (options.isWritingBase()) {
       subdir = BASE_PREFIX + String.format(DELTA_DIGITS,
           options.getMaximumTransactionId());
@@ -217,7 +217,7 @@ public class AcidUtils {
                         options.getMaximumTransactionId(),
                         options.getStatementId());
     }
-    return createBucketFile(new Path(directory, subdir), options.getBucket());
+    return createBucketFile(new Path(directory, subdir), 
options.getBucketId());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java
new file mode 100644
index 0000000..d1c2898
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java
@@ -0,0 +1,111 @@
+package org.apache.hadoop.hive.ql.io;
+
+/**
+ * This class makes sense of {@link RecordIdentifier#getBucketProperty()}.  Up 
until ASF Hive 3.0 this
+ * field was simply the bucket ID.  Since 3.0 it does bit packing to store 
several things:
+ * top 3 bits - version describing the format (we can only have 8).
+ * The rest is version specific - see below.
+ */
+public enum BucketCodec {
+  /**
+   * This is the "legacy" version.  The whole {@code bucket} value just has 
the bucket ID in it.
+   * The numeric code for this version is 0. (Assumes bucket ID takes less 
than 29 bits... which
+   * implies top 3 bits are 000 so data written before Hive 3.0 is readable 
with this scheme).
+   */
+  V0(0) {
+    @Override
+    public int decodeWriterId(int bucketProperty) {
+      return bucketProperty;
+    }
+    @Override
+    public int decodeStatementId(int bucketProperty) {
+      return 0;
+    }
+    @Override
+    public int encode(AcidOutputFormat.Options options) {
+      return options.getBucketId();
+    }
+  },
+  /**
+   * Represents format of "bucket" property in Hive 3.0.
+   * top 3 bits - version code.
+   * next 1 bit - reserved for future
+   * next 12 bits - the bucket ID
+   * next 4 bits reserved for future
+   * remaining 12 bits - the statement ID - 0-based numbering of all 
statements within a
+   * transaction.  Each leg of a multi-insert statement gets a separate 
statement ID.
+   * The reserved bits align it so that it easier to interpret it in Hex.
+   * 
+   * Constructs like Merge and Multi-Insert may have multiple tasks writing 
data that belongs to
+   * the same physical bucket file.  For example, a Merge stmt with update and 
insert clauses,
+   * (and split update enabled - should be the default in 3.0).  A task on 
behalf of insert may
+   * be writing a row into bucket 0 and another task in the update branch may 
be writing an insert
+   * event into bucket 0.  Each of these task are writing to different delta 
directory - distinguished
+   * by statement ID.  By including both bucket ID and statement ID in {@link 
RecordIdentifier}
+   * we ensure that {@link RecordIdentifier} is unique.
+   * 
+   * The intent is that sorting rows by {@link RecordIdentifier} groups rows 
in the same physical
+   * bucket next to each other.
+   * For any row created by a given version of Hive, top 3 bits are constant.  
The next
+   * most significant bits are the bucket ID, then the statement ID.  This 
ensures that
+   * {@link org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer} 
works which is
+   * designed so that each task only needs to keep 1 writer opened at a time.  
It could be
+   * configured such that a single writer sees data for multiple buckets so it 
must "group" data
+   * by bucket ID (and then sort within each bucket as required) which is 
achieved via sorting
+   * by {@link RecordIdentifier} which includes the {@link 
RecordIdentifier#getBucketProperty()}
+   * which has the actual bucket ID in the high order bits.  This scheme also 
ensures that 
+   * {@link org.apache.hadoop.hive.ql.exec.FileSinkOperator#process(Object, 
int)} works in case
+   * there numBuckets > numReducers.  (The later could be fixed by changing 
how writers are
+   * initialized in "if (fpaths.acidLastBucket != bucketNum) {")
+   */
+  V1(1) {
+    @Override
+    public int decodeWriterId(int bucketProperty) {
+      return (bucketProperty & 0b0000_1111_1111_1111_0000_0000_0000_0000) >>> 
16;
+    }
+    @Override
+    public int decodeStatementId(int bucketProperty) {
+      return (bucketProperty & 0b0000_0000_0000_0000_0000_1111_1111_1111);
+    }
+    @Override
+    public int encode(AcidOutputFormat.Options options) {
+      return this.version << 29 | options.getBucketId() << 16 |
+        (options.getStatementId() >= 0 ? options.getStatementId() : 0);
+    }
+  };
+  private static int TOP3BITS_MASK = 0b1110_0000_0000_0000_0000_0000_0000_0000;
+  public static BucketCodec determineVersion(int bucket) {
+    assert 7 << 29 == BucketCodec.TOP3BITS_MASK;
+    //look at top 3 bits and return appropriate enum
+    try {
+      return getCodec((BucketCodec.TOP3BITS_MASK & bucket) >>> 29);
+    }
+    catch(IllegalArgumentException ex) {
+      throw new IllegalArgumentException(ex.getMessage() + " Cannot decode 
version from " + bucket);
+    }
+  }
+  public static BucketCodec getCodec(int version) {
+    switch (version) {
+      case 0:
+        return BucketCodec.V0;
+      case 1:
+        return BucketCodec.V1;
+      default:
+        throw new IllegalArgumentException("Illegal 'bucket' format. Version=" 
+ version);
+    }
+  }
+  final int version;
+  BucketCodec(int version) {
+    this.version = version;
+  }
+
+  /**
+   * For bucketed tables this the bucketId, otherwise writerId
+   */
+  public abstract int decodeWriterId(int bucketProperty);
+  public abstract int decodeStatementId(int bucketProperty);
+  public abstract int encode(AcidOutputFormat.Options options);
+  public int getVersion() {
+    return version;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
index 7f2c169..87635c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
@@ -89,7 +89,7 @@ public class RecordIdentifier implements 
WritableComparable<RecordIdentifier> {
         return;
       }
       struct[Field.transactionId.ordinal()] = ri.getTransactionId();
-      struct[Field.bucketId.ordinal()] = ri.getBucketId();
+      struct[Field.bucketId.ordinal()] = ri.getBucketProperty();
       struct[Field.rowId.ordinal()] = ri.getRowId();
     }
   }
@@ -142,10 +142,10 @@ public class RecordIdentifier implements 
WritableComparable<RecordIdentifier> {
   }
 
   /**
-   * What was the original bucket id for the last row?
-   * @return the bucket id
+   * See {@link BucketCodec} for details
+   * @return the bucket value;
    */
-  public int getBucketId() {
+  public int getBucketProperty() {
     return bucketId;
   }
 
@@ -219,7 +219,16 @@ public class RecordIdentifier implements 
WritableComparable<RecordIdentifier> {
 
   @Override
   public String toString() {
-    return "{originalTxn: " + transactionId + ", bucket: " +
-        bucketId + ", row: " + getRowId() + "}";
+    BucketCodec codec = 
+      BucketCodec.determineVersion(bucketId);
+    String s = "(" + codec.getVersion() + "." + codec.decodeWriterId(bucketId) 
+
+      "." + codec.decodeStatementId(bucketId) + ")";
+    return "{originalTxn: " + transactionId + ", " + bucketToString() + ", 
row: " + getRowId() +"}";
+  }
+  protected String bucketToString() {
+    BucketCodec codec =
+      BucketCodec.determineVersion(bucketId);
+    return  "bucket: " + bucketId + "(" + codec.getVersion() + "." +
+      codec.decodeWriterId(bucketId) + "." + codec.decodeStatementId(bucketId) 
+ ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index f9e17a9..de49fc8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -1953,10 +1953,10 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     if (split.hasBase()) {
       AcidOutputFormat.Options acidIOOptions =
         AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf);
-      if(acidIOOptions.getBucket() < 0) {
+      if(acidIOOptions.getBucketId() < 0) {
         LOG.warn("Can't determine bucket ID for " + split.getPath() + "; 
ignoring");
       }
-      bucket = acidIOOptions.getBucket();
+      bucket = acidIOOptions.getBucketId();
       if(split.isOriginal()) {
         
mergerOptions.copyIndex(acidIOOptions.getCopyNumber()).bucketPath(split.getPath());
       }
@@ -2033,7 +2033,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
       }
       AcidOutputFormat.Options bucketInfo =
         AcidUtils.parseBaseOrDeltaBucketFilename(stat.getPath(), fs.getConf());
-      if(bucketInfo.getBucket() == bucket) {
+      if(bucketInfo.getBucketId() == bucket) {
         return stat.getPath();
       }
     }
@@ -2211,7 +2211,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
         AcidOutputFormat.Options opts = 
AcidUtils.parseBaseOrDeltaBucketFilename
             (child.getFileStatus().getPath(), context.conf);
         opts.writingBase(true);
-        int b = opts.getBucket();
+        int b = opts.getBucketId();
         // If the bucket is in the valid range, mark it as covered.
         // I wish Hive actually enforced bucketing all of the time.
         if (b >= 0 && b < covered.length) {

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index ffcdf6a..650f2af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -80,6 +80,13 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
   @VisibleForTesting
   public final static class ReaderKey extends RecordIdentifier{
     private long currentTransactionId;
+    /**
+     * This is the value from delta file name which may be different from 
value encode in 
+     * {@link RecordIdentifier#getBucketProperty()} in case of Update/Delete.
+     * So for Acid 1.0 + multi-stmt txn, if {@code isSameRow() == true}, then 
it must be an update
+     * or delete event.  For Acid 2.0 + multi-stmt txn, it must be a delete 
event.
+     * No 2 Insert events from can ever agree on {@link RecordIdentifier}
+     */
     private int statementId;//sort on this descending, like 
currentTransactionId
 
     public ReaderKey() {
@@ -174,8 +181,8 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
 
     @Override
     public String toString() {
-      return "{originalTxn: " + getTransactionId() + ", bucket: " +
-          getBucketId() + ", row: " + getRowId() + ", currentTxn: " +
+      return "{originalTxn: " + getTransactionId() + ", " +
+          bucketToString() + ", row: " + getRowId() + ", currentTxn: " +
           currentTransactionId + ", statementId: "+ statementId + "}";
     }
   }
@@ -375,7 +382,7 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
           for (HadoopShims.HdfsFileStatusWithId f : 
directoryState.getOriginalFiles()) {
             AcidOutputFormat.Options bucketOptions =
               
AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
-            if (bucketOptions.getBucket() != bucket) {
+            if (bucketOptions.getBucketId() != bucket) {
               continue;
             }
             if(haveSeenCurrentFile) {
@@ -426,7 +433,7 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
           for (HadoopShims.HdfsFileStatusWithId f : 
directoryState.getOriginalFiles()) {
             AcidOutputFormat.Options bucketOptions =
               
AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
-            if (bucketOptions.getBucket() == bucket) {
+            if (bucketOptions.getBucketId() == bucket) {
               numFilesInBucket++;
               if(numFilesInBucket > 1) {
                 isLastFileForThisBucket = false;
@@ -540,7 +547,7 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
     private Reader advanceToNextFile() throws IOException {
       while(nextFileIndex < originalFiles.size()) {
         AcidOutputFormat.Options bucketOptions = 
AcidUtils.parseBaseOrDeltaBucketFilename(originalFiles.get(nextFileIndex).getFileStatus().getPath(),
 conf);
-        if (bucketOptions.getBucket() == bucket) {
+        if (bucketOptions.getBucketId() == bucket) {
           break;
         }
         nextFileIndex++;

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 65f4a24..d40b89a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -29,10 +29,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -48,10 +51,12 @@ import org.apache.orc.impl.OrcAcidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * A RecordUpdater where the files are stored as ORC.
+ * A note on various record structures: the {@code row} coming in (as in 
{@link #insert(long, Object)}
+ * for example), is a struct like <RecordIdentifier, f1, ... fn> but what is 
written to the file
+ * * is <op, otid, writerId, rowid, ctid, <f1, ... fn>> (see {@link 
#createEventSchema(ObjectInspector)})
+ * So there are OIs here to make the translation.
  */
 public class OrcRecordUpdater implements RecordUpdater {
 
@@ -96,7 +101,6 @@ public class OrcRecordUpdater implements RecordUpdater {
   private final IntWritable bucket = new IntWritable();
   private final LongWritable rowId = new LongWritable();
   private long insertedRows = 0;
-  private long rowIdOffset = 0;
   // This records how many rows have been inserted or deleted.  It is separate 
from insertedRows
   // because that is monotonically increasing to give new unique row ids.
   private long rowCountDelta = 0;
@@ -111,6 +115,7 @@ public class OrcRecordUpdater implements RecordUpdater {
   private LongObjectInspector rowIdInspector; // OI for the long row id inside 
the recordIdentifier
   private LongObjectInspector origTxnInspector; // OI for the original txn 
inside the record
   // identifer
+  private IntObjectInspector bucketInspector;
 
   static int getOperation(OrcStruct struct) {
     return ((IntWritable) struct.getFieldValue(OPERATION)).get();
@@ -200,7 +205,18 @@ public class OrcRecordUpdater implements RecordUpdater {
       this.acidOperationalProperties =
           AcidUtils.getAcidOperationalProperties(options.getConfiguration());
     }
-    this.bucket.set(options.getBucket());
+    BucketCodec bucketCodec = BucketCodec.V1;
+    if(options.getConfiguration() != null) {
+      //so that we can test "old" files
+      Configuration hc = options.getConfiguration();
+      if(hc.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST.name(), false) ||
+        hc.getBoolean(HiveConf.ConfVars.HIVE_IN_TEZ_TEST.name(), false)) {
+        bucketCodec = BucketCodec.getCodec(
+          hc.getInt(HiveConf.ConfVars.TESTMODE_BUCKET_CODEC_VERSION.name(),
+            BucketCodec.V1.getVersion()));
+      }
+    }
+    this.bucket.set(bucketCodec.encode(options));
     this.path = AcidUtils.createFilename(path, options);
     this.deleteEventWriter = null;
     this.deleteEventPath = null;
@@ -283,41 +299,6 @@ public class OrcRecordUpdater implements RecordUpdater {
   public String toString() {
     return getClass().getName() + "[" + path +"]";
   }
-  /**
-   * To handle multiple INSERT... statements in a single transaction, we want 
to make sure
-   * to generate unique {@code rowId} for all inserted rows of the transaction.
-   * @return largest rowId created by previous statements (maybe 0)
-   * @throws IOException
-   */
-  private long findRowIdOffsetForInsert() throws IOException {
-    /*
-    * 1. need to know bucket we are writing to
-    * 2. need to know which delta dir it's in
-    * Then,
-    * 1. find the same bucket file in previous (insert) delta dir for this txn
-    *    (Note: in case of split_update, we can ignore the delete_delta dirs)
-    * 2. read the footer and get AcidStats which has insert count
-     * 2.1 if AcidStats.inserts>0 add to the insert count.
-     *  else go to previous delta file
-     *  For example, consider insert/update/insert case...*/
-    if(options.getStatementId() <= 0) {
-      return 0;//there is only 1 statement in this transaction (so far)
-    }
-    long totalInserts = 0;
-    for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; 
pastStmt--) {
-      Path matchingBucket = 
AcidUtils.createFilename(options.getFinalDestination(), 
options.clone().statementId(pastStmt));
-      if(!fs.exists(matchingBucket)) {
-        continue;
-      }
-      Reader reader = OrcFile.createReader(matchingBucket, 
OrcFile.readerOptions(options.getConfiguration()));
-      //no close() on Reader?!
-      AcidStats acidStats = OrcAcidUtils.parseAcidStats(reader);
-      if(acidStats.inserts > 0) {
-        totalInserts += acidStats.inserts;
-      }
-    }
-    return totalInserts;
-  }
   // Find the record identifier column (if there) and return a possibly new 
ObjectInspector that
   // will strain out the record id for the underlying writer.
   private ObjectInspector findRecId(ObjectInspector inspector, int 
rowIdColNum) {
@@ -338,6 +319,7 @@ public class OrcRecordUpdater implements RecordUpdater {
       originalTxnField = fields.get(0);
       origTxnInspector = 
(LongObjectInspector)originalTxnField.getFieldObjectInspector();
       bucketField = fields.get(1);
+      bucketInspector = (IntObjectInspector) 
bucketField.getFieldObjectInspector();
       rowIdField = fields.get(2);
       rowIdInspector = 
(LongObjectInspector)rowIdField.getFieldObjectInspector();
 
@@ -346,11 +328,11 @@ public class OrcRecordUpdater implements RecordUpdater {
       return newInspector;
     }
   }
-
   private void addSimpleEvent(int operation, long currentTransaction, long 
rowId, Object row)
       throws IOException {
     this.operation.set(operation);
     this.currentTransaction.set(currentTransaction);
+    Integer currentBucket = null;
     // If this is an insert, originalTransaction should be set to this 
transaction.  If not,
     // it will be reset by the following if anyway.
     long originalTransaction = currentTransaction;
@@ -359,9 +341,8 @@ public class OrcRecordUpdater implements RecordUpdater {
       originalTransaction = origTxnInspector.get(
           recIdInspector.getStructFieldData(rowIdValue, originalTxnField));
       rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, 
rowIdField));
-    }
-    else if(operation == INSERT_OPERATION) {
-      rowId += rowIdOffset;
+      currentBucket = setBucket(bucketInspector.get(
+        recIdInspector.getStructFieldData(rowIdValue, bucketField)), 
operation);
     }
     this.rowId.set(rowId);
     this.originalTransaction.set(originalTransaction);
@@ -372,6 +353,7 @@ public class OrcRecordUpdater implements RecordUpdater {
       writer = OrcFile.createWriter(path, writerOptions);
     }
     writer.addRow(item);
+    restoreBucket(currentBucket, operation);
   }
 
   private void addSplitUpdateEvent(int operation, long currentTransaction, 
long rowId, Object row)
@@ -388,8 +370,11 @@ public class OrcRecordUpdater implements RecordUpdater {
             recIdInspector.getStructFieldData(rowValue, originalTxnField));
     rowId = rowIdInspector.get(
             recIdInspector.getStructFieldData(rowValue, rowIdField));
+    Integer currentBucket = null;
 
     if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) {
+      currentBucket = setBucket(bucketInspector.get(
+        recIdInspector.getStructFieldData(rowValue, bucketField)), operation);
       // Initialize a deleteEventWriter if not yet done. (Lazy initialization)
       if (deleteEventWriter == null) {
         // Initialize an indexBuilder for deleteEvents.
@@ -414,6 +399,7 @@ public class OrcRecordUpdater implements RecordUpdater {
       item.setFieldValue(OrcRecordUpdater.ROW, null); // ROW is null for 
delete events.
       deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalTransaction, 
bucket.get(), rowId);
       deleteEventWriter.addRow(item);
+      restoreBucket(currentBucket, operation);
     }
 
     if (operation == UPDATE_OPERATION) {
@@ -426,9 +412,6 @@ public class OrcRecordUpdater implements RecordUpdater {
   public void insert(long currentTransaction, Object row) throws IOException {
     if (this.currentTransaction.get() != currentTransaction) {
       insertedRows = 0;
-      //this method is almost no-op in hcatalog.streaming case since 
statementId == 0 is
-      //always true in that case
-      rowIdOffset = findRowIdOffsetForInsert();
     }
     if (acidOperationalProperties.isSplitUpdate()) {
       addSplitUpdateEvent(INSERT_OPERATION, currentTransaction, 
insertedRows++, row);
@@ -442,7 +425,6 @@ public class OrcRecordUpdater implements RecordUpdater {
   public void update(long currentTransaction, Object row) throws IOException {
     if (this.currentTransaction.get() != currentTransaction) {
       insertedRows = 0;
-      rowIdOffset = findRowIdOffsetForInsert();
     }
     if (acidOperationalProperties.isSplitUpdate()) {
       addSplitUpdateEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
@@ -664,4 +646,15 @@ public class OrcRecordUpdater implements RecordUpdater {
       return recId;
     }
   }
+  private void restoreBucket(Integer currentBucket, int operation) {
+    if(currentBucket != null) {
+      setBucket(currentBucket, operation);
+    }
+  }
+  private int setBucket(int bucketProperty, int operation) {
+    assert operation == UPDATE_OPERATION || operation == DELETE_OPERATION;
+    int currentBucketProperty = bucket.get();
+    bucket.set(bucketProperty);
+    return currentBucketProperty;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 29f5a8e..8f80710 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -36,6 +36,7 @@ import 
org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputSplit;
@@ -353,7 +354,7 @@ public class VectorizedOrcAcidRowBatchReader
       throws IOException {
         final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit);
         if (deleteDeltas.length > 0) {
-          int bucket = 
AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
+          int bucket = 
AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), 
conf).getBucketId();
           String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
           this.validTxnList = (txnString == null) ? new ValidReadTxnList() : 
new ValidReadTxnList(txnString);
           OrcRawRecordMerger.Options mergerOptions = new 
OrcRawRecordMerger.Options().isCompacting(false);
@@ -470,9 +471,9 @@ public class VectorizedOrcAcidRowBatchReader
    * An implementation for DeleteEventRegistry that optimizes for performance 
by loading
    * all the delete events into memory at once from all the delete delta files.
    * It starts by reading all the delete events through a regular sort merge 
logic
-   * into two vectors- one for original transaction id (otid), and the other 
for row id.
-   * (In the current version, since the bucket id should be same for all the 
delete deltas,
-   * it is not stored). The otids are likely to be repeated very often, as a 
single transaction
+   * into 3 vectors- one for original transaction id (otid), one for bucket 
property and one for
+   * row id.  See {@link BucketCodec} for more about bucket property.
+   * The otids are likely to be repeated very often, as a single transaction
    * often deletes thousands of rows. Hence, the otid vector is compressed to 
only store the
    * toIndex and fromIndex ranges in the larger row id vector. Now, querying 
whether a
    * record id is deleted or not, is done by performing a binary search on the
@@ -483,21 +484,22 @@ public class VectorizedOrcAcidRowBatchReader
    */
    static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry {
     /**
-     * A simple wrapper class to hold the (otid, rowId) pair.
+     * A simple wrapper class to hold the (otid, bucketProperty, rowId) pair.
      */
     static class DeleteRecordKey implements Comparable<DeleteRecordKey> {
       private long originalTransactionId;
+      /**
+       * see {@link BucketCodec}
+       */
+      private int bucketProperty; 
       private long rowId;
       public DeleteRecordKey() {
         this.originalTransactionId = -1;
         this.rowId = -1;
       }
-      public DeleteRecordKey(long otid, long rowId) {
-        this.originalTransactionId = otid;
-        this.rowId = rowId;
-      }
-      public void set(long otid, long rowId) {
+      public void set(long otid, int bucketProperty, long rowId) {
         this.originalTransactionId = otid;
+        this.bucketProperty = bucketProperty;
         this.rowId = rowId;
       }
 
@@ -509,11 +511,18 @@ public class VectorizedOrcAcidRowBatchReader
         if (originalTransactionId != other.originalTransactionId) {
           return originalTransactionId < other.originalTransactionId ? -1 : 1;
         }
+        if(bucketProperty != other.bucketProperty) {
+          return bucketProperty < other.bucketProperty ? -1 : 1;
+        }
         if (rowId != other.rowId) {
           return rowId < other.rowId ? -1 : 1;
         }
         return 0;
       }
+      @Override
+      public String toString() {
+        return "otid: " + originalTransactionId + " bucketP:" + bucketProperty 
+ " rowid: " + rowId;
+      }
     }
 
     /**
@@ -528,6 +537,7 @@ public class VectorizedOrcAcidRowBatchReader
       private int indexPtrInBatch;
       private final int bucketForSplit; // The bucket value should be same for 
all the records.
       private final ValidTxnList validTxnList;
+      private boolean isBucketPropertyRepeating;
 
       public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options 
readerOptions, int bucket,
           ValidTxnList validTxnList) throws IOException {
@@ -539,6 +549,7 @@ public class VectorizedOrcAcidRowBatchReader
         }
         this.indexPtrInBatch = 0;
         this.validTxnList = validTxnList;
+        checkBucketId();//check 1st batch
       }
 
       public boolean next(DeleteRecordKey deleteRecordKey) throws IOException {
@@ -550,37 +561,19 @@ public class VectorizedOrcAcidRowBatchReader
           if (indexPtrInBatch >= batch.size) {
             // We have exhausted our current batch, read the next batch.
             if (recordReader.nextBatch(batch)) {
-              // Whenever we are reading a batch, we must ensure that all the 
records in the batch
-              // have the same bucket id as the bucket id of the split. If 
not, throw exception.
-              // NOTE: this assertion might not hold, once virtual bucketing 
is in place. However,
-              // it should be simple to fix that case. Just replace check for 
bucket equality with
-              // a check for valid bucket mapping. Until virtual bucketing is 
added, it means
-              // either the split computation got messed up or we found some 
corrupted records.
-              long bucketForRecord = ((LongColumnVector) 
batch.cols[OrcRecordUpdater.BUCKET]).vector[0];
-              if ((batch.size > 1 && 
!batch.cols[OrcRecordUpdater.BUCKET].isRepeating)
-                  || (bucketForRecord != bucketForSplit)){
-                throw new IOException("Corrupted records with different bucket 
ids "
-                    + "from the containing bucket file found! Expected bucket 
id "
-                    + bucketForSplit + ", however found the bucket id " + 
bucketForRecord);
-              }
+              checkBucketId();
               indexPtrInBatch = 0; // After reading the batch, reset the 
pointer to beginning.
             } else {
               return false; // no more batches to read, exhausted the reader.
             }
           }
-          int originalTransactionIndex =
-              batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? 
0 : indexPtrInBatch;
-          long originalTransaction =
-              ((LongColumnVector) 
batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[originalTransactionIndex];
-          long rowId = ((LongColumnVector) 
batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch];
-          int currentTransactionIndex =
-              batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating ? 0 
: indexPtrInBatch;
-          long currentTransaction =
-              ((LongColumnVector) 
batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[currentTransactionIndex];
+          long currentTransaction = setCurrentDeleteKey(deleteRecordKey);
+          if(!isBucketPropertyRepeating) {
+            checkBucketId(deleteRecordKey.bucketProperty);
+          }
           ++indexPtrInBatch;
           if (validTxnList.isTxnValid(currentTransaction)) {
             isValidNext = true;
-            deleteRecordKey.set(originalTransaction, rowId);
           }
         }
         return true;
@@ -589,8 +582,51 @@ public class VectorizedOrcAcidRowBatchReader
       public void close() throws IOException {
         this.recordReader.close();
       }
+      private long setCurrentDeleteKey(DeleteRecordKey deleteRecordKey) {
+        int originalTransactionIndex =
+          batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? 0 : 
indexPtrInBatch;
+        long originalTransaction =
+          ((LongColumnVector) 
batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[originalTransactionIndex];
+        int bucketPropertyIndex =
+          batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? 0 : 
indexPtrInBatch;
+        int bucketProperty = 
(int)((LongColumnVector)batch.cols[OrcRecordUpdater.BUCKET]).vector[bucketPropertyIndex];
+        long rowId = ((LongColumnVector) 
batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch];
+        int currentTransactionIndex =
+          batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating ? 0 : 
indexPtrInBatch;
+        long currentTransaction =
+          ((LongColumnVector) 
batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[currentTransactionIndex];
+        deleteRecordKey.set(originalTransaction, bucketProperty, rowId);
+        return currentTransaction;
+      }
+      private void checkBucketId() throws IOException {
+        isBucketPropertyRepeating = 
batch.cols[OrcRecordUpdater.BUCKET].isRepeating;
+        if(isBucketPropertyRepeating) {
+          int bucketPropertyFromRecord = (int)((LongColumnVector)
+            batch.cols[OrcRecordUpdater.BUCKET]).vector[0];
+          checkBucketId(bucketPropertyFromRecord);
+        }
+      }
+      /**
+       * Whenever we are reading a batch, we must ensure that all the records 
in the batch
+       * have the same bucket id as the bucket id of the split. If not, throw 
exception.
+       * NOTE: this assertion might not hold, once virtual bucketing is in 
place. However,
+       * it should be simple to fix that case. Just replace check for bucket 
equality with
+       * a check for valid bucket mapping. Until virtual bucketing is added, 
it means
+       * either the split computation got messed up or we found some corrupted 
records.
+       */
+      private void checkBucketId(int bucketPropertyFromRecord) throws 
IOException {
+        int bucketIdFromRecord = 
BucketCodec.determineVersion(bucketPropertyFromRecord)
+          .decodeWriterId(bucketPropertyFromRecord);
+        if(bucketIdFromRecord != bucketForSplit) {
+          DeleteRecordKey dummy = new DeleteRecordKey();
+          long curTxnId = setCurrentDeleteKey(dummy);
+          throw new IOException("Corrupted records with different bucket ids "
+            + "from the containing bucket file found! Expected bucket id "
+            + bucketForSplit + ", however found the bucket id " + 
bucketIdFromRecord +
+            " from " + dummy + " curTxnId: " + curTxnId);
+        }
+      }
     }
-
     /**
      * A CompressedOtid class stores a compressed representation of the 
original
      * transaction ids (otids) read from the delete delta files. Since the 
record ids
@@ -599,13 +635,15 @@ public class VectorizedOrcAcidRowBatchReader
      * the toIndex. These fromIndex and toIndex reference the larger vector 
formed by
      * concatenating the correspondingly ordered rowIds.
      */
-    private class CompressedOtid implements Comparable<CompressedOtid> {
-      long originalTransactionId;
-      int fromIndex; // inclusive
-      int toIndex; // exclusive
+    private final class CompressedOtid implements Comparable<CompressedOtid> {
+      final long originalTransactionId;
+      final int bucketProperty;
+      final int fromIndex; // inclusive
+      final int toIndex; // exclusive
 
-      public CompressedOtid(long otid, int fromIndex, int toIndex) {
+      CompressedOtid(long otid, int bucketProperty, int fromIndex, int 
toIndex) {
         this.originalTransactionId = otid;
+        this.bucketProperty = bucketProperty;
         this.fromIndex = fromIndex;
         this.toIndex = toIndex;
       }
@@ -616,10 +654,24 @@ public class VectorizedOrcAcidRowBatchReader
         if (originalTransactionId != other.originalTransactionId) {
           return originalTransactionId < other.originalTransactionId ? -1 : 1;
         }
+        if(bucketProperty != other.bucketProperty) {
+          return bucketProperty < other.bucketProperty ? -1 : 1;
+        }
         return 0;
       }
     }
 
+    /**
+     * Food for thought:
+     * this is a bit problematic - in order to load 
ColumnizedDeleteEventRegistry we still open
+     * all delete deltas at once - possibly causing OOM same as for {@link 
SortMergedDeleteEventRegistry}
+     * which uses {@link OrcRawRecordMerger}.  Why not load all delete_delta 
sequentially.  Each
+     * dd is sorted by {@link RecordIdentifier} so we could create a BTree 
like structure where the
+     * 1st level is an array of originalTransactionId where each entry points 
at an array
+     * of bucketIds where each entry points at an array of rowIds.  We could 
probably use ArrayList
+     * to manage insertion as the structure is built (LinkedList?).  This 
should reduce memory
+     * footprint (as far as OrcReader to a single reader) - probably bad for 
LLAP IO
+     */
     private TreeMap<DeleteRecordKey, DeleteReaderValue> sortMerger;
     private long rowIds[];
     private CompressedOtid compressedOtids[];
@@ -627,7 +679,7 @@ public class VectorizedOrcAcidRowBatchReader
 
     public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit,
         Reader.Options readerOptions) throws IOException, 
DeleteEventsOverflowMemoryException {
-      int bucket = 
AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
+      int bucket = 
AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), 
conf).getBucketId();
       String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
       this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new 
ValidReadTxnList(txnString);
       this.sortMerger = new TreeMap<DeleteRecordKey, DeleteReaderValue>();
@@ -690,11 +742,22 @@ public class VectorizedOrcAcidRowBatchReader
       }
     }
 
+    /**
+     * This is not done quite right.  The intent of {@link CompressedOtid} is 
a hedge against
+     * "delete from T" that generates a huge number of delete events possibly 
even 2G - max array
+     * size.  (assuming no one txn inserts > 2G rows (in a bucket)).  As 
implemented, the algorithm
+     * first loads all data into one array otid[] and rowIds[] which defeats 
the purpose.
+     * In practice we should be filtering delete evens by min/max ROW_ID from 
the split.  The later
+     * is also not yet implemented: HIVE-16812.
+     */
     private void readAllDeleteEventsFromDeleteDeltas() throws IOException {
       if (sortMerger == null || sortMerger.isEmpty()) return; // trivial case, 
nothing to read.
       int distinctOtids = 0;
       long lastSeenOtid = -1;
+      int lastSeenBucketProperty = -1;
       long otids[] = new long[rowIds.length];
+      int[] bucketProperties = new int [rowIds.length];
+      
       int index = 0;
       while (!sortMerger.isEmpty()) {
         // The sortMerger is a heap data structure that stores a pair of
@@ -710,11 +773,14 @@ public class VectorizedOrcAcidRowBatchReader
         DeleteRecordKey deleteRecordKey = entry.getKey();
         DeleteReaderValue deleteReaderValue = entry.getValue();
         otids[index] = deleteRecordKey.originalTransactionId;
+        bucketProperties[index] = deleteRecordKey.bucketProperty;
         rowIds[index] = deleteRecordKey.rowId;
         ++index;
-        if (lastSeenOtid != deleteRecordKey.originalTransactionId) {
+        if (lastSeenOtid != deleteRecordKey.originalTransactionId ||
+          lastSeenBucketProperty != deleteRecordKey.bucketProperty) {
           ++distinctOtids;
           lastSeenOtid = deleteRecordKey.originalTransactionId;
+          lastSeenBucketProperty = deleteRecordKey.bucketProperty;
         }
         if (deleteReaderValue.next(deleteRecordKey)) {
           sortMerger.put(deleteRecordKey, deleteReaderValue);
@@ -728,20 +794,24 @@ public class VectorizedOrcAcidRowBatchReader
       // the fromIndex(inclusive) and toIndex(exclusive) for each unique otid.
       this.compressedOtids = new CompressedOtid[distinctOtids];
       lastSeenOtid = otids[0];
+      lastSeenBucketProperty = bucketProperties[0];
       int fromIndex = 0, pos = 0;
       for (int i = 1; i < otids.length; ++i) {
-        if (otids[i] != lastSeenOtid) {
-          compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, 
i);
+        if (otids[i] != lastSeenOtid || lastSeenBucketProperty != 
bucketProperties[i]) {
+          compressedOtids[pos] = 
+            new CompressedOtid(lastSeenOtid, lastSeenBucketProperty, 
fromIndex, i);
           lastSeenOtid = otids[i];
+          lastSeenBucketProperty = bucketProperties[i];
           fromIndex = i;
           ++pos;
         }
       }
       // account for the last distinct otid
-      compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, 
otids.length);
+      compressedOtids[pos] =
+        new CompressedOtid(lastSeenOtid, lastSeenBucketProperty, fromIndex, 
otids.length);
     }
 
-    private boolean isDeleted(long otid, long rowId) {
+    private boolean isDeleted(long otid, int bucketProperty, long rowId) {
       if (compressedOtids == null || rowIds == null) {
         return false;
       }
@@ -755,8 +825,8 @@ public class VectorizedOrcAcidRowBatchReader
           || otid > compressedOtids[compressedOtids.length - 
1].originalTransactionId) {
         return false;
       }
-      // Create a dummy key for searching the otid in the compressed otid 
ranges.
-      CompressedOtid key = new CompressedOtid(otid, -1, -1);
+      // Create a dummy key for searching the otid/bucket in the compressed 
otid ranges.
+      CompressedOtid key = new CompressedOtid(otid, bucketProperty, -1, -1);
       int pos = Arrays.binarySearch(compressedOtids, key);
       if (pos >= 0) {
         // Otid with the given value found! Searching now for rowId...
@@ -788,6 +858,12 @@ public class VectorizedOrcAcidRowBatchReader
       long repeatedOriginalTransaction = (originalTransactionVector != null) ? 
-1
           : ((LongColumnVector) 
batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0];
 
+      long[] bucketProperties =
+        batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? null
+          : ((LongColumnVector)batch.cols[OrcRecordUpdater.BUCKET]).vector;
+      int repeatedBucketProperty = (bucketProperties != null) ? -1
+        : (int)((LongColumnVector) 
batch.cols[OrcRecordUpdater.BUCKET]).vector[0];
+
       long[] rowIdVector =
           ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector;
 
@@ -796,8 +872,10 @@ public class VectorizedOrcAcidRowBatchReader
           setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) {
         long otid = originalTransactionVector != null ? 
originalTransactionVector[setBitIndex]
                                                     : 
repeatedOriginalTransaction ;
+        int bucketProperty = bucketProperties != null ? 
(int)bucketProperties[setBitIndex]
+          : repeatedBucketProperty;
         long rowId = rowIdVector[setBitIndex];
-        if (isDeleted(otid, rowId)) {
+        if (isDeleted(otid, bucketProperty, rowId)) {
           selectedBitSet.clear(setBitIndex);
         }
      }

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index 0541a40..78c511b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -696,15 +696,7 @@ public class UpdateDeleteSemanticAnalyzer extends 
SemanticAnalyzer {
       if(numWhenMatchedUpdateClauses > 1) {
         throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_UPDATE, 
ctx.getCmd());
       }
-      assert numInsertClauses < 2;
-      if(numInsertClauses == 1 && numWhenMatchedUpdateClauses == 1) {
-        
if(AcidUtils.getAcidOperationalProperties(targetTable).isSplitUpdate()) {
-          throw new IllegalStateException("Tables with " +
-            hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "=" +
-            TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY + " 
currently do not " +
-            "support MERGE with both Insert and Update clauses.");
-        }
-      }
+      assert numInsertClauses < 2: "too many Insert clauses";
     }
     if(numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && 
extraPredicate == null) {
       throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, 
ctx.getCmd());

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java
index 461ef86..1de7604 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java
@@ -24,6 +24,7 @@ import 
org.apache.hadoop.hive.ql.exec.vector.expressions.CastDecimalToLong;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.CastDoubleToLong;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.CastStringToLong;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.CastTimestampToLong;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -220,7 +221,9 @@ public class UDFToInteger extends UDF {
     if (i == null) {
       return null;
     } else {
-      intWritable.set(i.getBucketId());
+      BucketCodec decoder =
+        BucketCodec.determineVersion(i.getBucketProperty());
+      intWritable.set(decoder.decodeWriterId(i.getBucketProperty()));
       return intWritable;
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/6af30bf2/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 5fb89d0..c531aeb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -493,53 +493,10 @@ public class TestTxnCommands {
    * sorts rows in dictionary order
    */
   private List<String> stringifyValues(int[][] rowsIn) {
-    assert rowsIn.length > 0;
-    int[][] rows = rowsIn.clone();
-    Arrays.sort(rows, new RowComp());
-    List<String> rs = new ArrayList<String>();
-    for(int[] row : rows) {
-      assert row.length > 0;
-      StringBuilder sb = new StringBuilder();
-      for(int value : row) {
-        sb.append(value).append("\t");
-      }
-      sb.setLength(sb.length() - 1);
-      rs.add(sb.toString());
-    }
-    return rs;
-  }
-  private static final class RowComp implements Comparator<int[]> {
-    @Override
-    public int compare(int[] row1, int[] row2) {
-      assert row1 != null && row2 != null && row1.length == row2.length;
-      for(int i = 0; i < row1.length; i++) {
-        int comp = Integer.compare(row1[i], row2[i]);
-        if(comp != 0) {
-          return comp;
-        }
-      }
-      return 0;
-    }
+    return TestTxnCommands2.stringifyValues(rowsIn);
   }
   private String makeValuesClause(int[][] rows) {
-    assert rows.length > 0;
-    StringBuilder sb = new StringBuilder("values");
-    for(int[] row : rows) {
-      assert row.length > 0;
-      if(row.length > 1) {
-        sb.append("(");
-      }
-      for(int value : row) {
-        sb.append(value).append(",");
-      }
-      sb.setLength(sb.length() - 1);//remove trailing comma
-      if(row.length > 1) {
-        sb.append(")");
-      }
-      sb.append(",");
-    }
-    sb.setLength(sb.length() - 1);//remove trailing comma
-    return sb.toString();
+    return TestTxnCommands2.makeValuesClause(rows);
   }
 
   private List<String> runStatementOnDriver(String stmt) throws Exception {
@@ -559,7 +516,6 @@ public class TestTxnCommands {
     throw new RuntimeException("Didn't get expected failure!");
   }
 
-//  @Ignore
   @Test
   public void exchangePartition() throws Exception {
     runStatementOnDriver("create database ex1");
@@ -757,9 +713,9 @@ public class TestTxnCommands {
     runStatementOnDriver("insert into " + Table.ACIDTBL + " " + 
makeValuesClause(vals));
     String query = "merge into " + Table.ACIDTBL +
       " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
-      "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
-      "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE " +
-      "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
+      "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + //updates (2,1) -> 
(2,0)
+      "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE " +//deletes (4,3)
+      "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";//inserts (11,11)
     runStatementOnDriver(query);
 
     List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + 
" order by a,b");
@@ -910,7 +866,7 @@ public class TestTxnCommands {
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0"));
     Assert.assertTrue(rs.get(2), 
rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
     Assert.assertTrue(rs.get(2), 
rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1"));
-    Assert.assertTrue(rs.get(3), 
rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":1,\"rowid\":0}\t1\t17"));
+    Assert.assertTrue(rs.get(3), 
rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
     Assert.assertTrue(rs.get(3), 
rs.get(3).endsWith("nonacidorctbl/000001_0_copy_1"));
     //run Compaction
     runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL 
+" compact 'major'");
@@ -927,7 +883,7 @@ public class TestTxnCommands {
     Assert.assertTrue(rs.get(1), 
rs.get(1).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
     Assert.assertTrue(rs.get(2), 
rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
     Assert.assertTrue(rs.get(2), 
rs.get(2).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
-    Assert.assertTrue(rs.get(3), 
rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":1,\"rowid\":0}\t1\t17"));
+    Assert.assertTrue(rs.get(3), 
rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
     Assert.assertTrue(rs.get(3), 
rs.get(3).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
 
     //make sure they are the same before and after compaction

Reply via email to