Repository: hive
Updated Branches:
  refs/heads/master 3991dba30 -> 994d98c09


http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
new file mode 100644
index 0000000..703cef6
--- /dev/null
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import static 
org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState.ABORTED;
+import static 
org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState.COMMITTED;
+import static 
org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils.databaseBuilder;
+import static 
org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils.tableBuilder;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hive.hcatalog.streaming.TestStreaming;
+import org.apache.hive.hcatalog.streaming.mutate.StreamingAssert.Factory;
+import org.apache.hive.hcatalog.streaming.mutate.StreamingAssert.Record;
+import 
org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils.TableBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClient;
+import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClientBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable;
+import org.apache.hive.hcatalog.streaming.mutate.client.Transaction;
+import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolver;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinator;
+import 
org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinatorBuilder;
+import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorFactory;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * This test is based on {@link TestStreaming} and has a similar core set of 
tests to ensure that basic transactional
+ * behaviour is as expected in the {@link RecordMutator} line. This is 
complemented with a set of tests related to the
+ * use of update and delete operations.
+ */
+public class TestMutations {
+
+  private static final List<String> EUROPE_FRANCE = Arrays.asList("Europe", 
"France");
+  private static final List<String> EUROPE_UK = Arrays.asList("Europe", "UK");
+  private static final List<String> ASIA_INDIA = Arrays.asList("Asia", 
"India");
+  // id
+  private static final int[] BUCKET_COLUMN_INDEXES = new int[] { 0 };
+  private static final int RECORD_ID_COLUMN = 2;
+
+  @Rule
+  public TemporaryFolder warehouseFolder = new TemporaryFolder();
+
+  private StreamingTestUtils testUtils = new StreamingTestUtils();
+  private HiveConf conf;
+  private IMetaStoreClient metaStoreClient;
+  private String metaStoreUri;
+  private Database database;
+  private TableBuilder partitionedTableBuilder;
+  private TableBuilder unpartitionedTableBuilder;
+  private Factory assertionFactory;
+
+  public TestMutations() throws Exception {
+    conf = testUtils.newHiveConf(metaStoreUri);
+    testUtils.prepareTransactionDatabase(conf);
+    metaStoreClient = testUtils.newMetaStoreClient(conf);
+    assertionFactory = new StreamingAssert.Factory(metaStoreClient, conf);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    database = 
databaseBuilder(warehouseFolder.getRoot()).name("testing").dropAndCreate(metaStoreClient);
+
+    partitionedTableBuilder = tableBuilder(database)
+        .name("partitioned")
+        .addColumn("id", "int")
+        .addColumn("msg", "string")
+        .partitionKeys("continent", "country");
+
+    unpartitionedTableBuilder = tableBuilder(database)
+        .name("unpartitioned")
+        .addColumn("id", "int")
+        .addColumn("msg", "string");
+  }
+
+  @Test
+  public void testTransactionBatchEmptyCommitPartitioned() throws Exception {
+    Table table = 
partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), true)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    transaction.begin();
+
+    transaction.commit();
+    assertThat(transaction.getState(), is(COMMITTED));
+    client.close();
+  }
+
+  @Test
+  public void testTransactionBatchEmptyCommitUnpartitioned() throws Exception {
+    Table table = unpartitionedTableBuilder.create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), false)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    transaction.begin();
+
+    transaction.commit();
+    assertThat(transaction.getState(), is(COMMITTED));
+    client.close();
+  }
+
+  @Test
+  public void testTransactionBatchEmptyAbortPartitioned() throws Exception {
+    Table table = 
partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), true)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    List<AcidTable> destinations = client.getTables();
+
+    transaction.begin();
+
+    MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, 
MutableRecord.class, RECORD_ID_COLUMN,
+        BUCKET_COLUMN_INDEXES);
+    MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    coordinator.close();
+
+    transaction.abort();
+    assertThat(transaction.getState(), is(ABORTED));
+    client.close();
+  }
+
+  @Test
+  public void testTransactionBatchEmptyAbortUnartitioned() throws Exception {
+    Table table = unpartitionedTableBuilder.create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), false)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    List<AcidTable> destinations = client.getTables();
+
+    transaction.begin();
+
+    MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, 
MutableRecord.class, RECORD_ID_COLUMN,
+        BUCKET_COLUMN_INDEXES);
+    MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    coordinator.close();
+
+    transaction.abort();
+    assertThat(transaction.getState(), is(ABORTED));
+    client.close();
+  }
+
+  @Test
+  public void testTransactionBatchCommitPartitioned() throws Exception {
+    Table table = 
partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), true)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    List<AcidTable> destinations = client.getTables();
+
+    transaction.begin();
+
+    MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, 
MutableRecord.class, RECORD_ID_COLUMN,
+        BUCKET_COLUMN_INDEXES);
+    MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    BucketIdResolver bucketIdAppender = 
mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets());
+    MutableRecord record = (MutableRecord) 
bucketIdAppender.attachBucketIdToRecord(new MutableRecord(1,
+        "Hello streaming"));
+    coordinator.insert(ASIA_INDIA, record);
+    coordinator.close();
+
+    transaction.commit();
+
+    StreamingAssert streamingAssertions = 
assertionFactory.newStreamingAssert(table, ASIA_INDIA);
+    streamingAssertions.assertMinTransactionId(1L);
+    streamingAssertions.assertMaxTransactionId(1L);
+    streamingAssertions.assertExpectedFileCount(1);
+
+    List<Record> readRecords = streamingAssertions.readRecords();
+    assertThat(readRecords.size(), is(1));
+    assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 0L)));
+
+    assertThat(transaction.getState(), is(COMMITTED));
+    client.close();
+  }
+
+  @Test
+  public void testMulti() throws Exception {
+    Table table = 
partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), true)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    List<AcidTable> destinations = client.getTables();
+
+    transaction.begin();
+
+    MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, 
MutableRecord.class, RECORD_ID_COLUMN,
+        BUCKET_COLUMN_INDEXES);
+    MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    BucketIdResolver bucketIdResolver = 
mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets());
+    MutableRecord asiaIndiaRecord1 = (MutableRecord) 
bucketIdResolver.attachBucketIdToRecord(new MutableRecord(1,
+        "Hello streaming"));
+    MutableRecord europeUkRecord1 = (MutableRecord) 
bucketIdResolver.attachBucketIdToRecord(new MutableRecord(2,
+        "Hello streaming"));
+    MutableRecord europeFranceRecord1 = (MutableRecord) 
bucketIdResolver.attachBucketIdToRecord(new MutableRecord(3,
+        "Hello streaming"));
+    MutableRecord europeFranceRecord2 = (MutableRecord) 
bucketIdResolver.attachBucketIdToRecord(new MutableRecord(4,
+        "Bonjour streaming"));
+
+    coordinator.insert(ASIA_INDIA, asiaIndiaRecord1);
+    coordinator.insert(EUROPE_UK, europeUkRecord1);
+    coordinator.insert(EUROPE_FRANCE, europeFranceRecord1);
+    coordinator.insert(EUROPE_FRANCE, europeFranceRecord2);
+    coordinator.close();
+
+    transaction.commit();
+
+    // ASIA_INDIA
+    StreamingAssert streamingAssertions = 
assertionFactory.newStreamingAssert(table, ASIA_INDIA);
+    streamingAssertions.assertMinTransactionId(1L);
+    streamingAssertions.assertMaxTransactionId(1L);
+    streamingAssertions.assertExpectedFileCount(1);
+
+    List<Record> readRecords = streamingAssertions.readRecords();
+    assertThat(readRecords.size(), is(1));
+    assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 0L)));
+
+    // EUROPE_UK
+    streamingAssertions = assertionFactory.newStreamingAssert(table, 
EUROPE_UK);
+    streamingAssertions.assertMinTransactionId(1L);
+    streamingAssertions.assertMaxTransactionId(1L);
+    streamingAssertions.assertExpectedFileCount(1);
+
+    readRecords = streamingAssertions.readRecords();
+    assertThat(readRecords.size(), is(1));
+    assertThat(readRecords.get(0).getRow(), is("{2, Hello streaming}"));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 0L)));
+
+    // EUROPE_FRANCE
+    streamingAssertions = assertionFactory.newStreamingAssert(table, 
EUROPE_FRANCE);
+    streamingAssertions.assertMinTransactionId(1L);
+    streamingAssertions.assertMaxTransactionId(1L);
+    streamingAssertions.assertExpectedFileCount(1);
+
+    readRecords = streamingAssertions.readRecords();
+    assertThat(readRecords.size(), is(2));
+    assertThat(readRecords.get(0).getRow(), is("{3, Hello streaming}"));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 0L)));
+    assertThat(readRecords.get(1).getRow(), is("{4, Bonjour streaming}"));
+    assertThat(readRecords.get(1).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 1L)));
+
+    client.close();
+  }
+
+  @Test
+  public void testTransactionBatchCommitUnpartitioned() throws Exception {
+    Table table = unpartitionedTableBuilder.create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), false)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    List<AcidTable> destinations = client.getTables();
+
+    transaction.begin();
+
+    MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, 
MutableRecord.class, RECORD_ID_COLUMN,
+        BUCKET_COLUMN_INDEXES);
+    MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    BucketIdResolver bucketIdResolver = 
mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets());
+    MutableRecord record = (MutableRecord) 
bucketIdResolver.attachBucketIdToRecord(new MutableRecord(1,
+        "Hello streaming"));
+
+    coordinator.insert(Collections.<String> emptyList(), record);
+    coordinator.close();
+
+    transaction.commit();
+
+    StreamingAssert streamingAssertions = 
assertionFactory.newStreamingAssert(table);
+    streamingAssertions.assertMinTransactionId(1L);
+    streamingAssertions.assertMaxTransactionId(1L);
+    streamingAssertions.assertExpectedFileCount(1);
+
+    List<Record> readRecords = streamingAssertions.readRecords();
+    assertThat(readRecords.size(), is(1));
+    assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 0L)));
+
+    assertThat(transaction.getState(), is(COMMITTED));
+    client.close();
+  }
+
+  @Test
+  public void testTransactionBatchAbort() throws Exception {
+    Table table = 
partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), true)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction transaction = client.newTransaction();
+
+    List<AcidTable> destinations = client.getTables();
+
+    transaction.begin();
+
+    MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, 
MutableRecord.class, RECORD_ID_COLUMN,
+        BUCKET_COLUMN_INDEXES);
+    MutatorCoordinator coordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    BucketIdResolver bucketIdResolver = 
mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets());
+    MutableRecord record1 = (MutableRecord) 
bucketIdResolver.attachBucketIdToRecord(new MutableRecord(1,
+        "Hello streaming"));
+    MutableRecord record2 = (MutableRecord) 
bucketIdResolver.attachBucketIdToRecord(new MutableRecord(2,
+        "Welcome to streaming"));
+
+    coordinator.insert(ASIA_INDIA, record1);
+    coordinator.insert(ASIA_INDIA, record2);
+    coordinator.close();
+
+    transaction.abort();
+
+    assertThat(transaction.getState(), is(ABORTED));
+
+    client.close();
+
+    StreamingAssert streamingAssertions = 
assertionFactory.newStreamingAssert(table, ASIA_INDIA);
+    streamingAssertions.assertNothingWritten();
+  }
+
+  @Test
+  public void testUpdatesAndDeletes() throws Exception {
+    // Set up some base data then stream some inserts/updates/deletes to a 
number of partitions
+    MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, 
MutableRecord.class, RECORD_ID_COLUMN,
+        BUCKET_COLUMN_INDEXES);
+
+    // INSERT DATA
+    //
+    Table table = 
partitionedTableBuilder.addPartition(ASIA_INDIA).addPartition(EUROPE_FRANCE).create(metaStoreClient);
+
+    MutatorClient client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), true)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction insertTransaction = client.newTransaction();
+
+    List<AcidTable> destinations = client.getTables();
+
+    insertTransaction.begin();
+
+    MutatorCoordinator insertCoordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    BucketIdResolver bucketIdResolver = 
mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets());
+    MutableRecord asiaIndiaRecord1 = (MutableRecord) 
bucketIdResolver.attachBucketIdToRecord(new MutableRecord(1,
+        "Namaste streaming 1"));
+    MutableRecord asiaIndiaRecord2 = (MutableRecord) 
bucketIdResolver.attachBucketIdToRecord(new MutableRecord(2,
+        "Namaste streaming 2"));
+    MutableRecord europeUkRecord1 = (MutableRecord) 
bucketIdResolver.attachBucketIdToRecord(new MutableRecord(3,
+        "Hello streaming 1"));
+    MutableRecord europeUkRecord2 = (MutableRecord) 
bucketIdResolver.attachBucketIdToRecord(new MutableRecord(4,
+        "Hello streaming 2"));
+    MutableRecord europeFranceRecord1 = (MutableRecord) 
bucketIdResolver.attachBucketIdToRecord(new MutableRecord(5,
+        "Bonjour streaming 1"));
+    MutableRecord europeFranceRecord2 = (MutableRecord) 
bucketIdResolver.attachBucketIdToRecord(new MutableRecord(6,
+        "Bonjour streaming 2"));
+
+    insertCoordinator.insert(ASIA_INDIA, asiaIndiaRecord1);
+    insertCoordinator.insert(ASIA_INDIA, asiaIndiaRecord2);
+    insertCoordinator.insert(EUROPE_UK, europeUkRecord1);
+    insertCoordinator.insert(EUROPE_UK, europeUkRecord2);
+    insertCoordinator.insert(EUROPE_FRANCE, europeFranceRecord1);
+    insertCoordinator.insert(EUROPE_FRANCE, europeFranceRecord2);
+    insertCoordinator.close();
+
+    insertTransaction.commit();
+
+    assertThat(insertTransaction.getState(), is(COMMITTED));
+    client.close();
+
+    // MUTATE DATA
+    //
+    client = new MutatorClientBuilder()
+        .addSinkTable(table.getDbName(), table.getTableName(), true)
+        .metaStoreUri(metaStoreUri)
+        .build();
+    client.connect();
+
+    Transaction mutateTransaction = client.newTransaction();
+
+    destinations = client.getTables();
+
+    mutateTransaction.begin();
+
+    MutatorCoordinator mutateCoordinator = new MutatorCoordinatorBuilder()
+        .metaStoreUri(metaStoreUri)
+        .table(destinations.get(0))
+        .mutatorFactory(mutatorFactory)
+        .build();
+
+    bucketIdResolver = 
mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets());
+    MutableRecord asiaIndiaRecord3 = (MutableRecord) 
bucketIdResolver.attachBucketIdToRecord(new MutableRecord(20,
+        "Namaste streaming 3"));
+
+    mutateCoordinator.update(ASIA_INDIA, new MutableRecord(2, "UPDATED: 
Namaste streaming 2", new RecordIdentifier(1L,
+        0, 1L)));
+    mutateCoordinator.insert(ASIA_INDIA, asiaIndiaRecord3);
+    mutateCoordinator.delete(EUROPE_UK, new MutableRecord(3, "Hello streaming 
1", new RecordIdentifier(1L, 0, 0L)));
+    mutateCoordinator.delete(EUROPE_FRANCE,
+        new MutableRecord(5, "Bonjour streaming 1", new RecordIdentifier(1L, 
0, 0L)));
+    mutateCoordinator.update(EUROPE_FRANCE, new MutableRecord(6, "UPDATED: 
Bonjour streaming 2", new RecordIdentifier(
+        1L, 0, 1L)));
+    mutateCoordinator.close();
+
+    mutateTransaction.commit();
+
+    assertThat(mutateTransaction.getState(), is(COMMITTED));
+
+    StreamingAssert indiaAssertions = 
assertionFactory.newStreamingAssert(table, ASIA_INDIA);
+    indiaAssertions.assertMinTransactionId(1L);
+    indiaAssertions.assertMaxTransactionId(2L);
+    List<Record> indiaRecords = indiaAssertions.readRecords();
+    assertThat(indiaRecords.size(), is(3));
+    assertThat(indiaRecords.get(0).getRow(), is("{1, Namaste streaming 1}"));
+    assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 0L)));
+    assertThat(indiaRecords.get(1).getRow(), is("{2, UPDATED: Namaste 
streaming 2}"));
+    assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 1L)));
+    assertThat(indiaRecords.get(2).getRow(), is("{20, Namaste streaming 3}"));
+    assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new 
RecordIdentifier(2L, 0, 0L)));
+
+    StreamingAssert ukAssertions = assertionFactory.newStreamingAssert(table, 
EUROPE_UK);
+    ukAssertions.assertMinTransactionId(1L);
+    ukAssertions.assertMaxTransactionId(2L);
+    List<Record> ukRecords = ukAssertions.readRecords();
+    assertThat(ukRecords.size(), is(1));
+    assertThat(ukRecords.get(0).getRow(), is("{4, Hello streaming 2}"));
+    assertThat(ukRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 1L)));
+
+    StreamingAssert franceAssertions = 
assertionFactory.newStreamingAssert(table, EUROPE_FRANCE);
+    franceAssertions.assertMinTransactionId(1L);
+    franceAssertions.assertMaxTransactionId(2L);
+    List<Record> franceRecords = franceAssertions.readRecords();
+    assertThat(franceRecords.size(), is(1));
+    assertThat(franceRecords.get(0).getRow(), is("{6, UPDATED: Bonjour 
streaming 2}"));
+    assertThat(franceRecords.get(0).getRecordIdentifier(), is(new 
RecordIdentifier(1L, 0, 1L)));
+
+    client.close();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java
new file mode 100644
index 0000000..706697a
--- /dev/null
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java
@@ -0,0 +1,66 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils;
+import org.junit.Test;
+
+public class TestAcidTableSerializer {
+
+  @Test
+  public void testSerializeDeserialize() throws Exception {
+    Database database = StreamingTestUtils.databaseBuilder(new 
File("/tmp")).name("db_1").build();
+    Table table = StreamingTestUtils
+        .tableBuilder(database)
+        .name("table_1")
+        .addColumn("one", "string")
+        .addColumn("two", "integer")
+        .partitionKeys("partition")
+        .addPartition("p1")
+        .buckets(10)
+        .build();
+
+    AcidTable acidTable = new AcidTable("db_1", "table_1", true, 
TableType.SINK);
+    acidTable.setTable(table);
+    acidTable.setTransactionId(42L);
+
+    String encoded = AcidTableSerializer.encode(acidTable);
+    System.out.println(encoded);
+    AcidTable decoded = AcidTableSerializer.decode(encoded);
+
+    assertThat(decoded.getDatabaseName(), is("db_1"));
+    assertThat(decoded.getTableName(), is("table_1"));
+    assertThat(decoded.createPartitions(), is(true));
+    assertThat(decoded.getOutputFormatName(), 
is("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"));
+    assertThat(decoded.getTotalBuckets(), is(10));
+    assertThat(decoded.getQualifiedName(), is("DB_1.TABLE_1"));
+    assertThat(decoded.getTransactionId(), is(42L));
+    assertThat(decoded.getTableType(), is(TableType.SINK));
+    assertThat(decoded.getTable(), is(table));
+  }
+
+  @Test
+  public void testSerializeDeserializeNoTableNoTransaction() throws Exception {
+    AcidTable acidTable = new AcidTable("db_1", "table_1", true, 
TableType.SINK);
+
+    String encoded = AcidTableSerializer.encode(acidTable);
+    AcidTable decoded = AcidTableSerializer.decode(encoded);
+
+    assertThat(decoded.getDatabaseName(), is("db_1"));
+    assertThat(decoded.getTableName(), is("table_1"));
+    assertThat(decoded.createPartitions(), is(true));
+    assertThat(decoded.getOutputFormatName(), is(nullValue()));
+    assertThat(decoded.getTotalBuckets(), is(0));
+    assertThat(decoded.getQualifiedName(), is("DB_1.TABLE_1"));
+    assertThat(decoded.getTransactionId(), is(0L));
+    assertThat(decoded.getTableType(), is(TableType.SINK));
+    assertThat(decoded.getTable(), is(nullValue()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java
new file mode 100644
index 0000000..ca3f7b2
--- /dev/null
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java
@@ -0,0 +1,176 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
+import 
org.apache.hive.hcatalog.streaming.mutate.client.lock.LockFailureListener;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMutatorClient {
+
+  private static final long TRANSACTION_ID = 42L;
+  private static final String TABLE_NAME_1 = "TABLE_1";
+  private static final String TABLE_NAME_2 = "TABLE_2";
+  private static final String DB_NAME = "DB_1";
+  private static final String USER = "user";
+  private static final AcidTable TABLE_1 = new AcidTable(DB_NAME, 
TABLE_NAME_1, true, TableType.SINK);
+  private static final AcidTable TABLE_2 = new AcidTable(DB_NAME, 
TABLE_NAME_2, true, TableType.SINK);
+
+  @Mock
+  private IMetaStoreClient mockMetaStoreClient;
+  @Mock
+  private Lock mockLock;
+  @Mock
+  private Table mockTable1, mockTable2;
+  @Mock
+  private StorageDescriptor mockSd;
+  @Mock
+  private Map<String, String> mockParameters;
+  @Mock
+  private HiveConf mockConfiguration;
+  @Mock
+  private LockFailureListener mockLockFailureListener;
+
+  private MutatorClient client;
+
+  @Before
+  public void configureMocks() throws Exception {
+    when(mockMetaStoreClient.getTable(DB_NAME, 
TABLE_NAME_1)).thenReturn(mockTable1);
+    when(mockTable1.getDbName()).thenReturn(DB_NAME);
+    when(mockTable1.getTableName()).thenReturn(TABLE_NAME_1);
+    when(mockTable1.getSd()).thenReturn(mockSd);
+    when(mockTable1.getParameters()).thenReturn(mockParameters);
+    when(mockMetaStoreClient.getTable(DB_NAME, 
TABLE_NAME_2)).thenReturn(mockTable2);
+    when(mockTable2.getDbName()).thenReturn(DB_NAME);
+    when(mockTable2.getTableName()).thenReturn(TABLE_NAME_2);
+    when(mockTable2.getSd()).thenReturn(mockSd);
+    when(mockTable2.getParameters()).thenReturn(mockParameters);
+    when(mockSd.getNumBuckets()).thenReturn(1, 2);
+    when(mockSd.getOutputFormat()).thenReturn(OrcOutputFormat.class.getName());
+    
when(mockParameters.get("transactional")).thenReturn(Boolean.TRUE.toString());
+
+    when(mockMetaStoreClient.openTxn(USER)).thenReturn(TRANSACTION_ID);
+
+    client = new MutatorClient(mockMetaStoreClient, mockConfiguration, 
mockLockFailureListener, USER,
+        Collections.singletonList(TABLE_1));
+  }
+
+  @Test
+  public void testCheckValidTableConnect() throws Exception {
+    List<AcidTable> inTables = new ArrayList<>();
+    inTables.add(TABLE_1);
+    inTables.add(TABLE_2);
+    client = new MutatorClient(mockMetaStoreClient, mockConfiguration, 
mockLockFailureListener, USER, inTables);
+
+    client.connect();
+    List<AcidTable> outTables = client.getTables();
+
+    assertThat(client.isConnected(), is(true));
+    assertThat(outTables.size(), is(2));
+    assertThat(outTables.get(0).getDatabaseName(), is(DB_NAME));
+    assertThat(outTables.get(0).getTableName(), is(TABLE_NAME_1));
+    assertThat(outTables.get(0).getTotalBuckets(), is(2));
+    assertThat(outTables.get(0).getOutputFormatName(), 
is(OrcOutputFormat.class.getName()));
+    assertThat(outTables.get(0).getTransactionId(), is(0L));
+    assertThat(outTables.get(0).getTable(), is(mockTable1));
+    assertThat(outTables.get(1).getDatabaseName(), is(DB_NAME));
+    assertThat(outTables.get(1).getTableName(), is(TABLE_NAME_2));
+    assertThat(outTables.get(1).getTotalBuckets(), is(2));
+    assertThat(outTables.get(1).getOutputFormatName(), 
is(OrcOutputFormat.class.getName()));
+    assertThat(outTables.get(1).getTransactionId(), is(0L));
+    assertThat(outTables.get(1).getTable(), is(mockTable2));
+  }
+
+  @Test
+  public void testCheckNonTransactionalTableConnect() throws Exception {
+    
when(mockParameters.get("transactional")).thenReturn(Boolean.FALSE.toString());
+
+    try {
+      client.connect();
+      fail();
+    } catch (ConnectionException e) {
+    }
+
+    assertThat(client.isConnected(), is(false));
+  }
+
+  @Test
+  public void testCheckUnBucketedTableConnect() throws Exception {
+    when(mockSd.getNumBuckets()).thenReturn(0);
+
+    try {
+      client.connect();
+      fail();
+    } catch (ConnectionException e) {
+    }
+
+    assertThat(client.isConnected(), is(false));
+  }
+
+  @Test
+  public void testMetaStoreFailsOnConnect() throws Exception {
+    when(mockMetaStoreClient.getTable(anyString(), anyString())).thenThrow(new 
TException());
+
+    try {
+      client.connect();
+      fail();
+    } catch (ConnectionException e) {
+    }
+
+    assertThat(client.isConnected(), is(false));
+  }
+
+  @Test(expected = ConnectionException.class)
+  public void testGetDestinationsFailsIfNotConnected() throws Exception {
+    client.getTables();
+  }
+
+  @Test
+  public void testNewTransaction() throws Exception {
+    List<AcidTable> inTables = new ArrayList<>();
+    inTables.add(TABLE_1);
+    inTables.add(TABLE_2);
+    client = new MutatorClient(mockMetaStoreClient, mockConfiguration, 
mockLockFailureListener, USER, inTables);
+
+    client.connect();
+    Transaction transaction = client.newTransaction();
+    List<AcidTable> outTables = client.getTables();
+
+    assertThat(client.isConnected(), is(true));
+
+    assertThat(transaction.getTransactionId(), is(TRANSACTION_ID));
+    assertThat(transaction.getState(), is(TxnState.INACTIVE));
+    assertThat(outTables.get(0).getTransactionId(), is(TRANSACTION_ID));
+    assertThat(outTables.get(1).getTransactionId(), is(TRANSACTION_ID));
+  }
+
+  @Test
+  public void testCloseClosesClient() throws Exception {
+    client.close();
+    assertThat(client.isConnected(), is(false));
+    verify(mockMetaStoreClient).close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java
new file mode 100644
index 0000000..179207a
--- /dev/null
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java
@@ -0,0 +1,95 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestTransaction {
+
+  private static final String USER = "user";
+  private static final long TRANSACTION_ID = 10L;
+
+  @Mock
+  private Lock mockLock;
+  @Mock
+  private IMetaStoreClient mockMetaStoreClient;
+
+  private Transaction transaction;
+
+  @Before
+  public void createTransaction() throws Exception {
+    when(mockLock.getUser()).thenReturn(USER);
+    when(mockMetaStoreClient.openTxn(USER)).thenReturn(TRANSACTION_ID);
+    transaction = new Transaction(mockMetaStoreClient, mockLock);
+  }
+
+  @Test
+  public void testInitialState() {
+    assertThat(transaction.getState(), is(TransactionBatch.TxnState.INACTIVE));
+    assertThat(transaction.getTransactionId(), is(TRANSACTION_ID));
+  }
+
+  @Test
+  public void testBegin() throws Exception {
+    transaction.begin();
+
+    verify(mockLock).acquire(TRANSACTION_ID);
+    assertThat(transaction.getState(), is(TransactionBatch.TxnState.OPEN));
+  }
+
+  @Test
+  public void testBeginLockFails() throws Exception {
+    doThrow(new LockException("")).when(mockLock).acquire(TRANSACTION_ID);
+
+    try {
+      transaction.begin();
+    } catch (TransactionException ignore) {
+    }
+
+    assertThat(transaction.getState(), is(TransactionBatch.TxnState.INACTIVE));
+  }
+
+  @Test
+  public void testCommit() throws Exception {
+    transaction.commit();
+
+    verify(mockLock).release();
+    verify(mockMetaStoreClient).commitTxn(TRANSACTION_ID);
+    assertThat(transaction.getState(), 
is(TransactionBatch.TxnState.COMMITTED));
+  }
+
+  @Test(expected = TransactionException.class)
+  public void testCommitLockFails() throws Exception {
+    doThrow(new LockException("")).when(mockLock).release();
+    transaction.commit();
+  }
+
+  @Test
+  public void testAbort() throws Exception {
+    transaction.abort();
+
+    verify(mockLock).release();
+    verify(mockMetaStoreClient).rollbackTxn(TRANSACTION_ID);
+    assertThat(transaction.getState(), is(TransactionBatch.TxnState.ABORTED));
+  }
+
+  @Test(expected = TransactionException.class)
+  public void testAbortLockFails() throws Exception {
+    doThrow(new LockException("")).when(mockLock).release();
+    transaction.abort();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java
new file mode 100644
index 0000000..8e6d06e
--- /dev/null
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java
@@ -0,0 +1,100 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestHeartbeatTimerTask {
+
+  private static final long TRANSACTION_ID = 10L;
+  private static final long LOCK_ID = 1L;
+  private static final List<Table> TABLES = createTable();
+
+  @Mock
+  private IMetaStoreClient mockMetaStoreClient;
+  @Mock
+  private LockFailureListener mockListener;
+
+  private HeartbeatTimerTask task;
+
+  @Before
+  public void create() throws Exception {
+    task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, 
TRANSACTION_ID, TABLES, LOCK_ID);
+  }
+
+  @Test
+  public void testRun() throws Exception {
+    task.run();
+
+    verify(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+  }
+
+  @Test
+  public void testRunNullTransactionId() throws Exception {
+    task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, 
TABLES, LOCK_ID);
+
+    task.run();
+
+    verify(mockMetaStoreClient).heartbeat(0, LOCK_ID);
+  }
+
+  @Test
+  public void testRunHeartbeatFailsNoSuchLockException() throws Exception {
+    NoSuchLockException exception = new NoSuchLockException();
+    doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, 
LOCK_ID);
+
+    task.run();
+
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, 
Arrays.asList("DB.TABLE"), exception);
+  }
+
+  @Test
+  public void testRunHeartbeatFailsNoSuchTxnException() throws Exception {
+    NoSuchTxnException exception = new NoSuchTxnException();
+    doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, 
LOCK_ID);
+
+    task.run();
+
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, 
Arrays.asList("DB.TABLE"), exception);
+  }
+
+  @Test
+  public void testRunHeartbeatFailsTxnAbortedException() throws Exception {
+    TxnAbortedException exception = new TxnAbortedException();
+    doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, 
LOCK_ID);
+
+    task.run();
+
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, 
Arrays.asList("DB.TABLE"), exception);
+  }
+
+  @Test
+  public void testRunHeartbeatFailsTException() throws Exception {
+    TException exception = new TException();
+    doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, 
LOCK_ID);
+
+    task.run();
+  }
+
+  private static List<Table> createTable() {
+    Table table = new Table();
+    table.setDbName("DB");
+    table.setTableName("TABLE");
+    return Arrays.asList(table);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
new file mode 100644
index 0000000..ef1e80c
--- /dev/null
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java
@@ -0,0 +1,283 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+import static org.apache.hadoop.hive.metastore.api.LockState.ABORT;
+import static org.apache.hadoop.hive.metastore.api.LockState.ACQUIRED;
+import static org.apache.hadoop.hive.metastore.api.LockState.NOT_ACQUIRED;
+import static org.apache.hadoop.hive.metastore.api.LockState.WAITING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.Timer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.google.common.collect.ImmutableList;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestLock {
+
+  private static final Table TABLE_1 = createTable("DB", "ONE");
+  private static final Table TABLE_2 = createTable("DB", "TWO");
+  private static final List<Table> TABLES = ImmutableList.of(TABLE_1, TABLE_2);
+  private static final long LOCK_ID = 42;
+  private static final long TRANSACTION_ID = 109;
+  private static final String USER = "ewest";
+
+  @Mock
+  private IMetaStoreClient mockMetaStoreClient;
+  @Mock
+  private LockFailureListener mockListener;
+  @Mock
+  private LockResponse mockLockResponse;
+  @Mock
+  private HeartbeatFactory mockHeartbeatFactory;
+  @Mock
+  private Timer mockHeartbeat;
+  @Captor
+  private ArgumentCaptor<LockRequest> requestCaptor;
+
+  private Lock lock;
+  private HiveConf configuration = new HiveConf();
+
+  @Before
+  public void injectMocks() throws Exception {
+    
when(mockMetaStoreClient.lock(any(LockRequest.class))).thenReturn(mockLockResponse);
+    when(mockLockResponse.getLockid()).thenReturn(LOCK_ID);
+    when(mockLockResponse.getState()).thenReturn(ACQUIRED);
+    when(
+        mockHeartbeatFactory.newInstance(any(IMetaStoreClient.class), 
any(LockFailureListener.class), any(Long.class),
+            any(Collection.class), anyLong(), 
anyInt())).thenReturn(mockHeartbeat);
+
+    lock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, configuration, 
mockListener, USER, TABLES, 3, 0);
+  }
+
+  @Test
+  public void testAcquireReadLockWithNoIssues() throws Exception {
+    lock.acquire();
+    assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
+    assertNull(lock.getTransactionId());
+  }
+
+  @Test
+  public void testAcquireTxnLockWithNoIssues() throws Exception {
+    lock.acquire(TRANSACTION_ID);
+    assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
+    assertEquals(Long.valueOf(TRANSACTION_ID), lock.getTransactionId());
+  }
+
+  @Test
+  public void testAcquireReadLockCheckHeartbeatCreated() throws Exception {
+    configuration.set("hive.txn.timeout", "100s");
+    lock.acquire();
+
+    verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), 
eq(mockListener), any(Long.class), eq(TABLES),
+        eq(LOCK_ID), eq(75));
+  }
+
+  @Test
+  public void testAcquireTxnLockCheckHeartbeatCreated() throws Exception {
+    configuration.set("hive.txn.timeout", "100s");
+    lock.acquire(TRANSACTION_ID);
+
+    verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), 
eq(mockListener), eq(TRANSACTION_ID), eq(TABLES),
+        eq(LOCK_ID), eq(75));
+  }
+
+  @Test
+  public void testAcquireLockCheckUser() throws Exception {
+    lock.acquire();
+    verify(mockMetaStoreClient).lock(requestCaptor.capture());
+    LockRequest actualRequest = requestCaptor.getValue();
+    assertEquals(USER, actualRequest.getUser());
+  }
+
+  @Test
+  public void testAcquireReadLockCheckLocks() throws Exception {
+    lock.acquire();
+    verify(mockMetaStoreClient).lock(requestCaptor.capture());
+
+    LockRequest request = requestCaptor.getValue();
+    assertEquals(0, request.getTxnid());
+    assertEquals(USER, request.getUser());
+    assertEquals(InetAddress.getLocalHost().getHostName(), 
request.getHostname());
+
+    List<LockComponent> components = request.getComponent();
+
+    assertEquals(2, components.size());
+
+    LockComponent expected1 = new LockComponent(LockType.SHARED_READ, 
LockLevel.TABLE, "DB");
+    expected1.setTablename("ONE");
+    assertTrue(components.contains(expected1));
+
+    LockComponent expected2 = new LockComponent(LockType.SHARED_READ, 
LockLevel.TABLE, "DB");
+    expected2.setTablename("TWO");
+    assertTrue(components.contains(expected2));
+  }
+
+  @Test
+  public void testAcquireTxnLockCheckLocks() throws Exception {
+    lock.acquire(TRANSACTION_ID);
+    verify(mockMetaStoreClient).lock(requestCaptor.capture());
+
+    LockRequest request = requestCaptor.getValue();
+    assertEquals(TRANSACTION_ID, request.getTxnid());
+    assertEquals(USER, request.getUser());
+    assertEquals(InetAddress.getLocalHost().getHostName(), 
request.getHostname());
+
+    List<LockComponent> components = request.getComponent();
+
+    System.out.println(components);
+    assertEquals(2, components.size());
+
+    LockComponent expected1 = new LockComponent(LockType.SHARED_READ, 
LockLevel.TABLE, "DB");
+    expected1.setTablename("ONE");
+    assertTrue(components.contains(expected1));
+
+    LockComponent expected2 = new LockComponent(LockType.SHARED_READ, 
LockLevel.TABLE, "DB");
+    expected2.setTablename("TWO");
+    assertTrue(components.contains(expected2));
+  }
+
+  @Test(expected = LockException.class)
+  public void testAcquireLockNotAcquired() throws Exception {
+    when(mockLockResponse.getState()).thenReturn(NOT_ACQUIRED);
+    lock.acquire();
+  }
+
+  @Test(expected = LockException.class)
+  public void testAcquireLockAborted() throws Exception {
+    when(mockLockResponse.getState()).thenReturn(ABORT);
+    lock.acquire();
+  }
+
+  @Test(expected = LockException.class)
+  public void testAcquireLockWithWaitRetriesExceeded() throws Exception {
+    when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, WAITING);
+    lock.acquire();
+  }
+
+  @Test
+  public void testAcquireLockWithWaitRetries() throws Exception {
+    when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, ACQUIRED);
+    lock.acquire();
+    assertEquals(Long.valueOf(LOCK_ID), lock.getLockId());
+  }
+
+  @Test
+  public void testReleaseLock() throws Exception {
+    lock.acquire();
+    lock.release();
+    verify(mockMetaStoreClient).unlock(LOCK_ID);
+  }
+
+  @Test
+  public void testReleaseLockNoLock() throws Exception {
+    lock.release();
+    verifyNoMoreInteractions(mockMetaStoreClient);
+  }
+
+  @Test
+  public void testReleaseLockCancelsHeartbeat() throws Exception {
+    lock.acquire();
+    lock.release();
+    verify(mockHeartbeat).cancel();
+  }
+
+  @Test
+  public void testReadHeartbeat() throws Exception {
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, null, TABLES, LOCK_ID);
+    task.run();
+    verify(mockMetaStoreClient).heartbeat(0, LOCK_ID);
+  }
+
+  @Test
+  public void testTxnHeartbeat() throws Exception {
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    task.run();
+    verify(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+  }
+
+  @Test
+  public void testReadHeartbeatFailsNoSuchLockException() throws Exception {
+    Throwable t = new NoSuchLockException();
+    doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, null, TABLES, LOCK_ID);
+    task.run();
+    verify(mockListener).lockFailed(LOCK_ID, null, Lock.asStrings(TABLES), t);
+  }
+
+  @Test
+  public void testTxnHeartbeatFailsNoSuchLockException() throws Exception {
+    Throwable t = new NoSuchLockException();
+    doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    task.run();
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, 
Lock.asStrings(TABLES), t);
+  }
+
+  @Test
+  public void testHeartbeatFailsNoSuchTxnException() throws Exception {
+    Throwable t = new NoSuchTxnException();
+    doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    task.run();
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, 
Lock.asStrings(TABLES), t);
+  }
+
+  @Test
+  public void testHeartbeatFailsTxnAbortedException() throws Exception {
+    Throwable t = new TxnAbortedException();
+    doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    task.run();
+    verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, 
Lock.asStrings(TABLES), t);
+  }
+
+  @Test
+  public void testHeartbeatContinuesTException() throws Exception {
+    Throwable t = new TException();
+    doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, 
mockListener, TRANSACTION_ID, TABLES, LOCK_ID);
+    task.run();
+    verifyZeroInteractions(mockListener);
+  }
+
+  private static Table createTable(String databaseName, String tableName) {
+    Table table = new Table();
+    table.setDbName(databaseName);
+    table.setTableName(tableName);
+    return table;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
new file mode 100644
index 0000000..f81373e
--- /dev/null
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
@@ -0,0 +1,38 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hive.hcatalog.streaming.mutate.MutableRecord;
+import org.junit.Test;
+
+public class TestBucketIdResolverImpl {
+
+  private static final int TOTAL_BUCKETS = 12;
+  private static final int RECORD_ID_COLUMN = 2;
+  // id - TODO: use a non-zero index to check for offset errors.
+  private static final int[] BUCKET_COLUMN_INDEXES = new int[] { 0 };
+
+  private BucketIdResolver capturingBucketIdResolver = new 
BucketIdResolverImpl(
+      ObjectInspectorFactory.getReflectionObjectInspector(MutableRecord.class,
+          ObjectInspectorFactory.ObjectInspectorOptions.JAVA), 
RECORD_ID_COLUMN, TOTAL_BUCKETS, BUCKET_COLUMN_INDEXES);
+
+  @Test
+  public void testAttachBucketIdToRecord() {
+    MutableRecord record = new MutableRecord(1, "hello");
+    capturingBucketIdResolver.attachBucketIdToRecord(record);
+    assertThat(record.rowId, is(new RecordIdentifier(-1L, 8, -1L)));
+    assertThat(record.id, is(1));
+    assertThat(record.msg.toString(), is("hello"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoBucketColumns() {
+    new 
BucketIdResolverImpl(ObjectInspectorFactory.getReflectionObjectInspector(MutableRecord.class,
+        ObjectInspectorFactory.ObjectInspectorOptions.JAVA), RECORD_ID_COLUMN, 
TOTAL_BUCKETS, new int[0]);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestGroupingValidator.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestGroupingValidator.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestGroupingValidator.java
new file mode 100644
index 0000000..74fa59b
--- /dev/null
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestGroupingValidator.java
@@ -0,0 +1,70 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.junit.Test;
+
+public class TestGroupingValidator {
+
+  private GroupingValidator validator = new GroupingValidator();
+
+  @Test
+  public void uniqueGroups() {
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1));
+    assertTrue(validator.isInSequence(Arrays.asList("c", "C"), 3));
+    assertTrue(validator.isInSequence(Arrays.asList("b", "B"), 2));
+  }
+
+  @Test
+  public void sameGroup() {
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1));
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1));
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1));
+  }
+
+  @Test
+  public void revisitedGroup() {
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1));
+    assertTrue(validator.isInSequence(Arrays.asList("c", "C"), 3));
+    assertFalse(validator.isInSequence(Arrays.asList("a", "A"), 1));
+  }
+
+  @Test
+  public void samePartitionDifferentBucket() {
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1));
+    assertTrue(validator.isInSequence(Arrays.asList("c", "C"), 3));
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 2));
+  }
+
+  @Test
+  public void sameBucketDifferentPartition() {
+    assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1));
+    assertTrue(validator.isInSequence(Arrays.asList("c", "C"), 3));
+    assertTrue(validator.isInSequence(Arrays.asList("b", "B"), 1));
+  }
+
+  @Test
+  public void uniqueGroupsNoPartition() {
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 1));
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 3));
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 2));
+  }
+
+  @Test
+  public void sameGroupNoPartition() {
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 1));
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 1));
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 1));
+  }
+
+  @Test
+  public void revisitedGroupNoPartition() {
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 1));
+    assertTrue(validator.isInSequence(Collections.<String> emptyList(), 3));
+    assertFalse(validator.isInSequence(Collections.<String> emptyList(), 1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
new file mode 100644
index 0000000..6e9ffa2
--- /dev/null
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
@@ -0,0 +1,234 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMutatorCoordinator {
+
+  private static final List<String> UNPARTITIONED = Collections.<String> 
emptyList();
+  private static final List<String> PARTITION_B = Arrays.asList("B");
+  private static final List<String> PARTITION_A = Arrays.asList("A");
+  private static final long TRANSACTION_ID = 2L;
+  private static final int BUCKET_ID = 0;
+  private static final Path PATH_A = new Path("X");
+  private static final Path PATH_B = new Path("B");
+  private static final Object RECORD = "RECORD";
+  private static final RecordIdentifier ROW__ID_B0_R0 = new 
RecordIdentifier(10L, BUCKET_ID, 0L);
+  private static final RecordIdentifier ROW__ID_B0_R1 = new 
RecordIdentifier(10L, BUCKET_ID, 1L);
+  private static final RecordIdentifier ROW__ID_B1_R0 = new 
RecordIdentifier(10L, BUCKET_ID + 1, 0L);
+  private static final RecordIdentifier ROW__ID_INSERT = new 
RecordIdentifier(-1L, BUCKET_ID, -1L);
+
+  @Mock
+  private IMetaStoreClient mockMetaStoreClient;
+  @Mock
+  private MutatorFactory mockMutatorFactory;
+  @Mock
+  private CreatePartitionHelper mockPartitionHelper;
+  @Mock
+  private GroupingValidator mockGroupingValidator;
+  @Mock
+  private SequenceValidator mockSequenceValidator;
+  @Mock
+  private AcidTable mockAcidTable;
+  @Mock
+  private RecordInspector mockRecordInspector;
+  @Mock
+  private BucketIdResolver mockBucketIdResolver;
+  @Mock
+  private Mutator mockMutator;
+
+  private MutatorCoordinator coordinator;
+
+  private HiveConf configuration = new HiveConf();
+
+  @Before
+  public void createCoordinator() throws Exception {
+    
when(mockAcidTable.getOutputFormatName()).thenReturn(OrcOutputFormat.class.getName());
+    when(mockAcidTable.getTotalBuckets()).thenReturn(1);
+    when(mockAcidTable.getTransactionId()).thenReturn(TRANSACTION_ID);
+    when(mockAcidTable.createPartitions()).thenReturn(true);
+    
when(mockMutatorFactory.newRecordInspector()).thenReturn(mockRecordInspector);
+    
when(mockMutatorFactory.newBucketIdResolver(anyInt())).thenReturn(mockBucketIdResolver);
+    when(mockMutatorFactory.newMutator(any(OrcOutputFormat.class), anyLong(), 
any(Path.class), anyInt())).thenReturn(
+        mockMutator);
+    
when(mockPartitionHelper.getPathForPartition(any(List.class))).thenReturn(PATH_A);
+    
when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_INSERT);
+    
when(mockSequenceValidator.isInSequence(any(RecordIdentifier.class))).thenReturn(true);
+    when(mockGroupingValidator.isInSequence(any(List.class), 
anyInt())).thenReturn(true);
+
+    coordinator = new MutatorCoordinator(mockMetaStoreClient, configuration, 
mockMutatorFactory, mockPartitionHelper,
+        mockGroupingValidator, mockSequenceValidator, mockAcidTable, false);
+  }
+
+  @Test
+  public void insert() throws Exception {
+    coordinator.insert(UNPARTITIONED, RECORD);
+
+    verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), 
eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutator).insert(RECORD);
+  }
+
+  @Test
+  public void multipleInserts() throws Exception {
+    coordinator.insert(UNPARTITIONED, RECORD);
+    coordinator.insert(UNPARTITIONED, RECORD);
+    coordinator.insert(UNPARTITIONED, RECORD);
+
+    verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), 
eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutator, times(3)).insert(RECORD);
+  }
+
+  @Test
+  public void insertPartitionChanges() throws Exception {
+    
when(mockPartitionHelper.getPathForPartition(PARTITION_A)).thenReturn(PATH_A);
+    
when(mockPartitionHelper.getPathForPartition(PARTITION_B)).thenReturn(PATH_B);
+
+    coordinator.insert(PARTITION_A, RECORD);
+    coordinator.insert(PARTITION_B, RECORD);
+
+    verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A);
+    verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B);
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), 
eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), 
eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID));
+    verify(mockMutator, times(2)).insert(RECORD);
+  }
+
+  @Test
+  public void bucketChanges() throws Exception {
+    
when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0,
 ROW__ID_B1_R0);
+
+    when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(0, 1);
+
+    coordinator.update(UNPARTITIONED, RECORD);
+    coordinator.delete(UNPARTITIONED, RECORD);
+
+    verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), 
eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory)
+        .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), 
eq(PATH_A), eq(BUCKET_ID + 1));
+    verify(mockMutator).update(RECORD);
+    verify(mockMutator).delete(RECORD);
+  }
+
+  @Test
+  public void partitionThenBucketChanges() throws Exception {
+    
when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0,
 ROW__ID_B0_R1, ROW__ID_B1_R0,
+        ROW__ID_INSERT);
+
+    when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(0, 0, 1, 0);
+
+    
when(mockPartitionHelper.getPathForPartition(PARTITION_A)).thenReturn(PATH_A);
+    
when(mockPartitionHelper.getPathForPartition(PARTITION_B)).thenReturn(PATH_B);
+
+    coordinator.update(PARTITION_A, RECORD);
+    coordinator.delete(PARTITION_B, RECORD);
+    coordinator.update(PARTITION_B, RECORD);
+    coordinator.insert(PARTITION_B, RECORD);
+
+    verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A);
+    verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B);
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), 
eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory, 
times(2)).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B),
+        eq(BUCKET_ID));
+    verify(mockMutatorFactory)
+        .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), 
eq(PATH_B), eq(BUCKET_ID + 1));
+    verify(mockMutator, times(2)).update(RECORD);
+    verify(mockMutator).delete(RECORD);
+    verify(mockMutator).insert(RECORD);
+    verify(mockSequenceValidator, times(4)).reset();
+  }
+
+  @Test(expected = RecordSequenceException.class)
+  public void outOfSequence() throws Exception {
+    
when(mockSequenceValidator.isInSequence(any(RecordIdentifier.class))).thenReturn(false);
+
+    coordinator.update(UNPARTITIONED, RECORD);
+    coordinator.delete(UNPARTITIONED, RECORD);
+
+    verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), 
eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutator).update(RECORD);
+    verify(mockMutator).delete(RECORD);
+  }
+  
+  @Test(expected = GroupRevisitedException.class)
+  public void revisitGroup() throws Exception {
+    when(mockGroupingValidator.isInSequence(any(List.class), 
anyInt())).thenReturn(false);
+    
+    coordinator.update(UNPARTITIONED, RECORD);
+    coordinator.delete(UNPARTITIONED, RECORD);
+    
+    verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), 
eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutator).update(RECORD);
+    verify(mockMutator).delete(RECORD);
+  }
+
+  @Test(expected = BucketIdException.class)
+  public void insertWithBadBucket() throws Exception {
+    
when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0);
+
+    when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(1);
+
+    coordinator.insert(UNPARTITIONED, RECORD);
+  }
+
+  @Test(expected = BucketIdException.class)
+  public void updateWithBadBucket() throws Exception {
+    
when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0);
+
+    when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(1);
+
+    coordinator.update(UNPARTITIONED, RECORD);
+  }
+
+  @Test
+  public void deleteWithBadBucket() throws Exception {
+    
when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0);
+
+    when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(1);
+
+    coordinator.delete(UNPARTITIONED, RECORD);
+  }
+
+  @Test
+  public void closeNoRecords() throws Exception {
+    coordinator.close();
+
+    // No mutator created
+    verifyZeroInteractions(mockMutator);
+  }
+
+  @Test
+  public void closeUsedCoordinator() throws Exception {
+    coordinator.insert(UNPARTITIONED, RECORD);
+    coordinator.close();
+
+    verify(mockMutator).close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java
new file mode 100644
index 0000000..b29c763
--- /dev/null
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java
@@ -0,0 +1,99 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMutatorImpl {
+
+  private static final Object RECORD = new Object();
+  private static final int RECORD_ID_COLUMN = 2;
+  private static final int BUCKET_ID = 0;
+  private static final Path PATH = new Path("X");
+  private static final long TRANSACTION_ID = 1L;
+
+  @Mock
+  private AcidOutputFormat<?, ?> mockOutputFormat;
+  @Mock
+  private ObjectInspector mockObjectInspector;
+  @Mock
+  private RecordUpdater mockRecordUpdater;
+  @Captor
+  private ArgumentCaptor<AcidOutputFormat.Options> captureOptions;
+
+  private final HiveConf configuration = new HiveConf();
+
+  private Mutator mutator;
+
+  @Before
+  public void injectMocks() throws IOException {
+    when(mockOutputFormat.getRecordUpdater(eq(PATH), 
any(Options.class))).thenReturn(mockRecordUpdater);
+    mutator = new MutatorImpl(configuration, RECORD_ID_COLUMN, 
mockObjectInspector, mockOutputFormat, TRANSACTION_ID,
+        PATH, BUCKET_ID);
+  }
+
+  @Test
+  public void testCreatesRecordReader() throws IOException {
+    verify(mockOutputFormat).getRecordUpdater(eq(PATH), 
captureOptions.capture());
+    Options options = captureOptions.getValue();
+    assertThat(options.getBucket(), is(BUCKET_ID));
+    assertThat(options.getConfiguration(), is((Configuration) configuration));
+    assertThat(options.getInspector(), is(mockObjectInspector));
+    assertThat(options.getRecordIdColumn(), is(RECORD_ID_COLUMN));
+    assertThat(options.getMinimumTransactionId(), is(TRANSACTION_ID));
+    assertThat(options.getMaximumTransactionId(), is(TRANSACTION_ID));
+  }
+
+  @Test
+  public void testInsertDelegates() throws IOException {
+    mutator.insert(RECORD);
+    verify(mockRecordUpdater).insert(TRANSACTION_ID, RECORD);
+  }
+
+  @Test
+  public void testUpdateDelegates() throws IOException {
+    mutator.update(RECORD);
+    verify(mockRecordUpdater).update(TRANSACTION_ID, RECORD);
+  }
+
+  @Test
+  public void testDeleteDelegates() throws IOException {
+    mutator.delete(RECORD);
+    verify(mockRecordUpdater).delete(TRANSACTION_ID, RECORD);
+  }
+
+  @Test
+  public void testCloseDelegates() throws IOException {
+    mutator.close();
+    verify(mockRecordUpdater).close(false);
+  }
+
+  @Test
+  public void testFlushDoesNothing() throws IOException {
+    mutator.flush();
+    verify(mockRecordUpdater, never()).flush();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java
new file mode 100644
index 0000000..389ad33
--- /dev/null
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java
@@ -0,0 +1,31 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hive.hcatalog.streaming.mutate.MutableRecord;
+import org.junit.Test;
+
+public class TestRecordInspectorImpl {
+
+  private static final int ROW_ID_COLUMN = 2;
+
+  private RecordInspectorImpl inspector = new 
RecordInspectorImpl(ObjectInspectorFactory.getReflectionObjectInspector(
+      MutableRecord.class, 
ObjectInspectorFactory.ObjectInspectorOptions.JAVA), ROW_ID_COLUMN);
+
+  @Test
+  public void testExtractRecordIdentifier() {
+    RecordIdentifier recordIdentifier = new RecordIdentifier(10L, 4, 20L);
+    MutableRecord record = new MutableRecord(1, "hello", recordIdentifier);
+    assertThat(inspector.extractRecordIdentifier(record), 
is(recordIdentifier));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNotAStructObjectInspector() {
+    new 
RecordInspectorImpl(PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, 
2);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java
new file mode 100644
index 0000000..33f9606
--- /dev/null
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java
@@ -0,0 +1,91 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.junit.Test;
+
+public class TestSequenceValidator {
+
+  private static final int BUCKET_ID = 1;
+
+  private SequenceValidator validator = new SequenceValidator();
+
+  @Test
+  public void testSingleInSequence() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), 
is(true));
+  }
+
+  @Test
+  public void testRowIdInSequence() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), 
is(true));
+  }
+
+  @Test
+  public void testTxIdInSequence() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(4L, BUCKET_ID, 0)), 
is(true));
+  }
+
+  @Test
+  public void testMixedInSequence() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 1)), 
is(true));
+  }
+
+  @Test
+  public void testNegativeTxId() {
+    assertThat(validator.isInSequence(new RecordIdentifier(-1L, BUCKET_ID, 
0)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), 
is(true));
+  }
+
+  @Test
+  public void testNegativeRowId() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 
-1)), is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), 
is(true));
+  }
+
+  @Test
+  public void testRowIdOutOfSequence() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), 
is(false));
+  }
+
+  @Test
+  public void testReset() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), 
is(true));
+    // New partition for example
+    validator.reset();
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), 
is(true));
+  }
+
+  @Test
+  public void testTxIdOutOfSequence() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(4L, BUCKET_ID, 0)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), 
is(false));
+  }
+
+  @Test
+  public void testMixedOutOfSequence() {
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 4)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), 
is(false));
+    assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 5)), 
is(true));
+    assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 6)), 
is(false));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testNullRecordIdentifier() {
+    validator.isInSequence(null);
+  }
+
+}

Reply via email to