HIVE-10165 Improve hive-hcatalog-streaming extensibility and support updates 
and deletes (Eliot West via gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/994d98c0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/994d98c0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/994d98c0

Branch: refs/heads/master
Commit: 994d98c0963ee48c2abbfee6f389d75c0223c8f1
Parents: 3991dba
Author: Alan Gates <ga...@hortonworks.com>
Authored: Tue Jun 30 14:59:55 2015 -0700
Committer: Alan Gates <ga...@hortonworks.com>
Committed: Tue Jun 30 14:59:55 2015 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 hcatalog/streaming/pom.xml                      |   6 +
 .../streaming/mutate/HiveConfFactory.java       |  63 +++
 .../mutate/UgiMetaStoreClientFactory.java       | 102 ++++
 .../streaming/mutate/client/AcidTable.java      | 112 ++++
 .../mutate/client/AcidTableSerializer.java      | 100 ++++
 .../mutate/client/ClientException.java          |  15 +
 .../mutate/client/ConnectionException.java      |  15 +
 .../streaming/mutate/client/MutatorClient.java  | 140 +++++
 .../mutate/client/MutatorClientBuilder.java     | 115 ++++
 .../streaming/mutate/client/TableType.java      |  37 ++
 .../streaming/mutate/client/Transaction.java    | 114 ++++
 .../mutate/client/TransactionException.java     |  15 +
 .../mutate/client/lock/HeartbeatFactory.java    |  30 +
 .../mutate/client/lock/HeartbeatTimerTask.java  |  66 +++
 .../streaming/mutate/client/lock/Lock.java      | 282 ++++++++++
 .../mutate/client/lock/LockException.java       |  15 +
 .../mutate/client/lock/LockFailureListener.java |  26 +
 .../mutate/doc-files/system-overview.dot        |  27 +
 .../hive/hcatalog/streaming/mutate/package.html | 495 +++++++++++++++++
 .../mutate/worker/BucketIdException.java        |  11 +
 .../mutate/worker/BucketIdResolver.java         |  11 +
 .../mutate/worker/BucketIdResolverImpl.java     |  76 +++
 .../mutate/worker/CreatePartitionHelper.java    |  83 +++
 .../mutate/worker/GroupRevisitedException.java  |  11 +
 .../mutate/worker/GroupingValidator.java        |  74 +++
 .../streaming/mutate/worker/Mutator.java        |  21 +
 .../mutate/worker/MutatorCoordinator.java       | 281 ++++++++++
 .../worker/MutatorCoordinatorBuilder.java       |  76 +++
 .../streaming/mutate/worker/MutatorFactory.java |  16 +
 .../streaming/mutate/worker/MutatorImpl.java    |  84 +++
 .../streaming/mutate/worker/OperationType.java  |   7 +
 .../worker/PartitionCreationException.java      |  15 +
 .../mutate/worker/RecordInspector.java          |  11 +
 .../mutate/worker/RecordInspectorImpl.java      |  45 ++
 .../mutate/worker/RecordSequenceException.java  |  11 +
 .../mutate/worker/SequenceValidator.java        |  49 ++
 .../mutate/worker/WorkerException.java          |  15 +
 .../streaming/mutate/ExampleUseCase.java        |  82 +++
 .../streaming/mutate/MutableRecord.java         |  50 ++
 .../mutate/ReflectiveMutatorFactory.java        |  51 ++
 .../streaming/mutate/StreamingAssert.java       | 191 +++++++
 .../streaming/mutate/StreamingTestUtils.java    | 261 +++++++++
 .../streaming/mutate/TestMutations.java         | 544 +++++++++++++++++++
 .../mutate/client/TestAcidTableSerializer.java  |  66 +++
 .../mutate/client/TestMutatorClient.java        | 176 ++++++
 .../mutate/client/TestTransaction.java          |  95 ++++
 .../client/lock/TestHeartbeatTimerTask.java     | 100 ++++
 .../streaming/mutate/client/lock/TestLock.java  | 283 ++++++++++
 .../mutate/worker/TestBucketIdResolverImpl.java |  38 ++
 .../mutate/worker/TestGroupingValidator.java    |  70 +++
 .../mutate/worker/TestMutatorCoordinator.java   | 234 ++++++++
 .../mutate/worker/TestMutatorImpl.java          |  99 ++++
 .../mutate/worker/TestRecordInspectorImpl.java  |  31 ++
 .../mutate/worker/TestSequenceValidator.java    |  91 ++++
 55 files changed, 5135 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index c5decaf..4d341a0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,4 @@ hcatalog/webhcat/java-client/target
 hcatalog/storage-handlers/hbase/target
 hcatalog/webhcat/svr/target
 conf/hive-default.xml.template
+.DS_Store

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/pom.xml b/hcatalog/streaming/pom.xml
index 2135e89..6d03ce1 100644
--- a/hcatalog/streaming/pom.xml
+++ b/hcatalog/streaming/pom.xml
@@ -89,6 +89,12 @@
       <optional>true</optional>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <optional>true</optional>
+      <version>3.3.2</version>
+    </dependency>
 
     <!-- test -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/HiveConfFactory.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/HiveConfFactory.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/HiveConfFactory.java
new file mode 100644
index 0000000..fcf446c
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/HiveConfFactory.java
@@ -0,0 +1,63 @@
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Creates/configures {@link HiveConf} instances with required ACID 
attributes. */
+public class HiveConfFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveConfFactory.class);
+  private static final String TRANSACTION_MANAGER = 
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+
+  public static HiveConf newInstance(Configuration configuration, Class<?> 
clazz, String metaStoreUri) {
+    HiveConf hiveConf = null;
+    if (configuration != null) {
+      if (!HiveConf.class.isAssignableFrom(configuration.getClass())) {
+        hiveConf = new HiveConf(configuration, clazz);
+      } else {
+        hiveConf = (HiveConf) configuration;
+      }
+    }
+
+    if (hiveConf == null) {
+      hiveConf = HiveConfFactory.newInstance(clazz, metaStoreUri);
+    } else {
+      HiveConfFactory.overrideSettings(hiveConf);
+    }
+    return hiveConf;
+  }
+
+  public static HiveConf newInstance(Class<?> clazz, String metaStoreUri) {
+    HiveConf conf = new HiveConf(clazz);
+    if (metaStoreUri != null) {
+      setHiveConf(conf, HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
+    }
+    overrideSettings(conf);
+    return conf;
+  }
+
+  public static void overrideSettings(HiveConf conf) {
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER, TRANSACTION_MANAGER);
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+    setHiveConf(conf, HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+    // Avoids creating Tez Client sessions internally as it takes much longer 
currently
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
+  }
+
+  private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, String 
value) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Overriding HiveConf setting : {} = {}", var, value);
+    }
+    conf.setVar(var, value);
+  }
+
+  private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, 
boolean value) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Overriding HiveConf setting : {} = {}", var, value);
+    }
+    conf.setBoolVar(var, value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/UgiMetaStoreClientFactory.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/UgiMetaStoreClientFactory.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/UgiMetaStoreClientFactory.java
new file mode 100644
index 0000000..2a4ddbe
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/UgiMetaStoreClientFactory.java
@@ -0,0 +1,102 @@
+package org.apache.hive.hcatalog.streaming.mutate;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+
+import com.google.common.reflect.AbstractInvocationHandler;
+
+/**
+ * Creates a proxied {@link IMetaStoreClient client} that wraps calls in a 
{@link PrivilegedExceptionAction} if the
+ * {@link UserGroupInformation} is specified. Invokes directly otherwise.
+ */
+public class UgiMetaStoreClientFactory {
+
+  private static Set<Method> I_META_STORE_CLIENT_METHODS = 
getIMetaStoreClientMethods();
+
+  private final String metaStoreUri;
+  private final HiveConf conf;
+  private final boolean secureMode;
+  private final UserGroupInformation authenticatedUser;
+  private final String user;
+
+  public UgiMetaStoreClientFactory(String metaStoreUri, HiveConf conf, 
UserGroupInformation authenticatedUser,
+      String user, boolean secureMode) {
+    this.metaStoreUri = metaStoreUri;
+    this.conf = conf;
+    this.authenticatedUser = authenticatedUser;
+    this.user = user;
+    this.secureMode = secureMode;
+    if (metaStoreUri != null) {
+      conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
+    }
+    if (secureMode) {
+      conf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true);
+    }
+  }
+
+  public IMetaStoreClient newInstance() throws MetaException {
+    return newInstance(new HiveMetaStoreClient(conf));
+  }
+
+  public IMetaStoreClient newInstance(IMetaStoreClient delegate) throws 
MetaException {
+    return createProxy(delegate, user, authenticatedUser);
+  }
+
+  @Override
+  public String toString() {
+    return "UgiMetaStoreClientFactory [metaStoreUri=" + metaStoreUri + ", 
secureMode=" + secureMode
+        + ", authenticatedUser=" + authenticatedUser + ", user=" + user + "]";
+  }
+
+  private IMetaStoreClient createProxy(final IMetaStoreClient delegate, final 
String user,
+      final UserGroupInformation authenticatedUser) {
+    InvocationHandler handler = new AbstractInvocationHandler() {
+
+      @Override
+      protected Object handleInvocation(Object proxy, final Method method, 
final Object[] args) throws Throwable {
+        try {
+          if (!I_META_STORE_CLIENT_METHODS.contains(method) || 
authenticatedUser == null) {
+            return method.invoke(delegate, args);
+          }
+          try {
+            return authenticatedUser.doAs(new 
PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws Exception {
+                return method.invoke(delegate, args);
+              }
+            });
+          } catch (IOException | InterruptedException e) {
+            throw new TException("PrivilegedExceptionAction failed as user '" 
+ user + "'.", e);
+          }
+        } catch (UndeclaredThrowableException | InvocationTargetException e) {
+          throw e.getCause();
+        }
+      }
+    };
+
+    ClassLoader classLoader = IMetaStoreClient.class.getClassLoader();
+    Class<?>[] interfaces = new Class<?>[] { IMetaStoreClient.class };
+    Object proxy = Proxy.newProxyInstance(classLoader, interfaces, handler);
+    return IMetaStoreClient.class.cast(proxy);
+  }
+
+  private static Set<Method> getIMetaStoreClientMethods() {
+    return new 
HashSet<>(Arrays.asList(IMetaStoreClient.class.getDeclaredMethods()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java
new file mode 100644
index 0000000..20747db
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java
@@ -0,0 +1,112 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+/**
+ * Describes an ACID table that can receive mutation events. Used to encode 
the information required by workers to write
+ * ACID events without requiring them to once more retrieve the data from the 
meta store db.
+ */
+public class AcidTable implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private final String databaseName;
+  private final String tableName;
+  private final boolean createPartitions;
+  private final TableType tableType;
+  private long transactionId;
+
+  private Table table;
+
+  AcidTable(String databaseName, String tableName, boolean createPartitions, 
TableType tableType) {
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.createPartitions = createPartitions;
+    this.tableType = tableType;
+  }
+
+  /**
+   * Returns {@code 0} until such a time that a {@link Transaction} has been 
acquired (when
+   * {@link MutatorClient#newTransaction()} exits), at which point this will 
return the
+   * {@link Transaction#getTransactionId() transaction id}.
+   */
+  public long getTransactionId() {
+    return transactionId;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public boolean createPartitions() {
+    return createPartitions;
+  }
+
+  /**
+   * Returns {@code null} until such a time that the table described by the 
{@link #getDatabaseName() database_name}
+   * {@code .}{@link #getTableName() table_name} has been resolved with the 
meta store database (when
+   * {@link MutatorClient#connect()} exits), at which point this will then 
return the corresponding
+   * {@link StorageDescriptor#getOutputFormat() OutputFormat}.
+   */
+  public String getOutputFormatName() {
+    return table != null ? table.getSd().getOutputFormat() : null;
+  }
+
+  /**
+   * Returns {@code 0} until such a time that the table described by the 
{@link #getDatabaseName() database_name}
+   * {@code .}{@link #getTableName() table_name} has been resolved with the 
meta store database (when
+   * {@link MutatorClient#connect()} exits), at which point this will then 
return the corresponding
+   * {@link StorageDescriptor#getNumBuckets() total bucket count}.
+   */
+  public int getTotalBuckets() {
+    return table != null ? table.getSd().getNumBuckets() : 0;
+  }
+
+  public TableType getTableType() {
+    return tableType;
+  }
+
+  public String getQualifiedName() {
+    return (databaseName + "." + tableName).toUpperCase();
+  }
+
+  /**
+   * Returns {@code null} until such a time that the table described by the 
{@link #getDatabaseName() database_name}
+   * {@code .}{@link #getTableName() table_name} has been resolved with the 
meta store database (when
+   * {@link MutatorClient#connect()} exits), at which point this will then 
return the corresponding {@link Table}.
+   * Provided as a convenience to API users who may wish to gather further 
meta data regarding the table without
+   * connecting with the meta store once more.
+   */
+  public Table getTable() {
+    return table;
+  }
+
+  void setTransactionId(long transactionId) {
+    this.transactionId = transactionId;
+  }
+
+  void setTable(Table table) {
+    if (!databaseName.equalsIgnoreCase(table.getDbName())) {
+      throw new IllegalArgumentException("Incorrect database name.");
+    }
+    if (!tableName.equalsIgnoreCase(table.getTableName())) {
+      throw new IllegalArgumentException("Incorrect table name.");
+    }
+    this.table = table;
+  }
+
+  @Override
+  public String toString() {
+    return "AcidTable [databaseName=" + databaseName + ", tableName=" + 
tableName + ", createPartitions="
+        + createPartitions + ", tableType=" + tableType + ", 
outputFormatName=" + getOutputFormatName()
+        + ", totalBuckets=" + getTotalBuckets() + ", transactionId=" + 
transactionId + "]";
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java
new file mode 100644
index 0000000..5d8a2bf
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java
@@ -0,0 +1,100 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility to serialize/deserialize {@link AcidTable AcidTables} into strings 
so that they can be easily transported as
+ * {@link Configuration} properties.
+ */
+public class AcidTableSerializer {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AcidTableSerializer.class);
+
+  /* Allow for improved schemes. */
+  private static final String PROLOG_V1 = "AcidTableV1:";
+
+  /** Returns a base 64 encoded representation of the supplied {@link 
AcidTable}. */
+  public static String encode(AcidTable table) throws IOException {
+    DataOutputStream data = null;
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    try {
+      data = new DataOutputStream(bytes);
+      data.writeUTF(table.getDatabaseName());
+      data.writeUTF(table.getTableName());
+      data.writeBoolean(table.createPartitions());
+      if (table.getTransactionId() <= 0) {
+        LOG.warn("Transaction ID <= 0. The recipient is probably expecting a 
transaction ID.");
+      }
+      data.writeLong(table.getTransactionId());
+      data.writeByte(table.getTableType().getId());
+
+      Table metaTable = table.getTable();
+      if (metaTable != null) {
+        byte[] thrift = new TSerializer(new 
TCompactProtocol.Factory()).serialize(metaTable);
+        data.writeInt(thrift.length);
+        data.write(thrift);
+      } else {
+        LOG.warn("Meta store table is null. The recipient is probably 
expecting an instance.");
+        data.writeInt(0);
+      }
+    } catch (TException e) {
+      throw new IOException("Error serializing meta store table.", e);
+    } finally {
+      data.close();
+    }
+
+    return PROLOG_V1 + new String(Base64.encodeBase64(bytes.toByteArray()), 
Charset.forName("UTF-8"));
+  }
+
+  /** Returns the {@link AcidTable} instance decoded from a base 64 
representation. */
+  public static AcidTable decode(String encoded) throws IOException {
+    if (!encoded.startsWith(PROLOG_V1)) {
+      throw new IllegalStateException("Unsupported version.");
+    }
+    encoded = encoded.substring(PROLOG_V1.length());
+
+    byte[] decoded = Base64.decodeBase64(encoded);
+    AcidTable table = null;
+    try (DataInputStream in = new DataInputStream(new 
ByteArrayInputStream(decoded))) {
+      String databaseName = in.readUTF();
+      String tableName = in.readUTF();
+      boolean createPartitions = in.readBoolean();
+      long transactionId = in.readLong();
+      TableType tableType = TableType.valueOf(in.readByte());
+      int thriftLength = in.readInt();
+
+      table = new AcidTable(databaseName, tableName, createPartitions, 
tableType);
+      table.setTransactionId(transactionId);
+
+      Table metaTable = null;
+      if (thriftLength > 0) {
+        metaTable = new Table();
+        try {
+          byte[] thriftEncoded = new byte[thriftLength];
+          in.readFully(thriftEncoded, 0, thriftLength);
+          new TDeserializer(new 
TCompactProtocol.Factory()).deserialize(metaTable, thriftEncoded);
+          table.setTable(metaTable);
+        } catch (TException e) {
+          throw new IOException("Error deserializing meta store table.", e);
+        }
+      }
+    }
+    return table;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ClientException.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ClientException.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ClientException.java
new file mode 100644
index 0000000..988dc38
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ClientException.java
@@ -0,0 +1,15 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+public class ClientException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  ClientException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  ClientException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ConnectionException.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ConnectionException.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ConnectionException.java
new file mode 100644
index 0000000..b54455a
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ConnectionException.java
@@ -0,0 +1,15 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+public class ConnectionException extends ClientException {
+
+  private static final long serialVersionUID = 1L;
+
+  ConnectionException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  ConnectionException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
new file mode 100644
index 0000000..2724525
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
@@ -0,0 +1,140 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
+import 
org.apache.hive.hcatalog.streaming.mutate.client.lock.LockFailureListener;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for orchestrating {@link Transaction Transactions} within which 
ACID table mutation events can occur.
+ * Typically this will be a large batch of delta operations.
+ */
+public class MutatorClient implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MutatorClient.class);
+  private static final String TRANSACTIONAL_PARAM_KEY = "transactional";
+
+  private final IMetaStoreClient metaStoreClient;
+  private final Lock.Options lockOptions;
+  private final List<AcidTable> tables;
+  private boolean connected;
+
+  MutatorClient(IMetaStoreClient metaStoreClient, HiveConf configuration, 
LockFailureListener lockFailureListener,
+      String user, Collection<AcidTable> tables) {
+    this.metaStoreClient = metaStoreClient;
+    this.tables = Collections.unmodifiableList(new ArrayList<>(tables));
+
+    lockOptions = new Lock.Options()
+        .configuration(configuration)
+        .lockFailureListener(lockFailureListener == null ? 
LockFailureListener.NULL_LISTENER : lockFailureListener)
+        .user(user);
+    for (AcidTable table : tables) {
+      lockOptions.addTable(table.getDatabaseName(), table.getTableName());
+    }
+  }
+
+  /**
+   * Connects to the {@link IMetaStoreClient meta store} that will be used to 
manage {@link Transaction} life-cycles.
+   * Also checks that the tables destined to receive mutation events are able 
to do so. The client should only hold one
+   * open transaction at any given time (TODO: enforce this).
+   */
+  public void connect() throws ConnectionException {
+    if (connected) {
+      throw new ConnectionException("Already connected.");
+    }
+    for (AcidTable table : tables) {
+      checkTable(metaStoreClient, table);
+    }
+    LOG.debug("Connected to end point {}", metaStoreClient);
+    connected = true;
+  }
+
+  /** Creates a new {@link Transaction} by opening a transaction with the 
{@link IMetaStoreClient meta store}. */
+  public Transaction newTransaction() throws TransactionException {
+    if (!connected) {
+      throw new TransactionException("Not connected - cannot create 
transaction.");
+    }
+    Transaction transaction = new Transaction(metaStoreClient, lockOptions);
+    for (AcidTable table : tables) {
+      table.setTransactionId(transaction.getTransactionId());
+    }
+    LOG.debug("Created transaction {}", transaction);
+    return transaction;
+  }
+
+  /** Did the client connect successfully. Note the the client may have since 
become disconnected. */
+  public boolean isConnected() {
+    return connected;
+  }
+
+  /**
+   * Closes the client releasing any {@link IMetaStoreClient meta store} 
connections held. Does not notify any open
+   * transactions (TODO: perhaps it should?)
+   */
+  @Override
+  public void close() throws IOException {
+    metaStoreClient.close();
+    LOG.debug("Closed client.");
+    connected = false;
+  }
+
+  /**
+   * Returns the list of managed {@link AcidTable AcidTables} that can receive 
mutation events under the control of this
+   * client.
+   */
+  public List<AcidTable> getTables() throws ConnectionException {
+    if (!connected) {
+      throw new ConnectionException("Not connected - cannot interrogate 
tables.");
+    }
+    return Collections.<AcidTable> unmodifiableList(tables);
+  }
+
+  @Override
+  public String toString() {
+    return "MutatorClient [metaStoreClient=" + metaStoreClient + ", 
connected=" + connected + "]";
+  }
+
+  private void checkTable(IMetaStoreClient metaStoreClient, AcidTable 
acidTable) throws ConnectionException {
+    try {
+      LOG.debug("Checking table {}.", acidTable.getQualifiedName());
+      Table metaStoreTable = 
metaStoreClient.getTable(acidTable.getDatabaseName(), acidTable.getTableName());
+
+      if (acidTable.getTableType() == TableType.SINK) {
+        Map<String, String> parameters = metaStoreTable.getParameters();
+        if (!Boolean.parseBoolean(parameters.get(TRANSACTIONAL_PARAM_KEY))) {
+          throw new ConnectionException("Cannot stream to table that is not 
transactional: '"
+              + acidTable.getQualifiedName() + "'.");
+        }
+        int totalBuckets = metaStoreTable.getSd().getNumBuckets();
+        LOG.debug("Table {} has {} buckets.", acidTable.getQualifiedName(), 
totalBuckets);
+        if (totalBuckets <= 0) {
+          throw new ConnectionException("Cannot stream to table that has not 
been bucketed: '"
+              + acidTable.getQualifiedName() + "'.");
+        }
+
+        String outputFormat = metaStoreTable.getSd().getOutputFormat();
+        LOG.debug("Table {} has {} OutputFormat.", 
acidTable.getQualifiedName(), outputFormat);
+        acidTable.setTable(metaStoreTable);
+      }
+    } catch (NoSuchObjectException e) {
+      throw new ConnectionException("Invalid table '" + 
acidTable.getQualifiedName() + "'", e);
+    } catch (TException e) {
+      throw new ConnectionException("Error communicating with the meta store", 
e);
+    }
+    LOG.debug("Table {} OK.", acidTable.getQualifiedName());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClientBuilder.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClientBuilder.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClientBuilder.java
new file mode 100644
index 0000000..6c21c59
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClientBuilder.java
@@ -0,0 +1,115 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.streaming.mutate.HiveConfFactory;
+import org.apache.hive.hcatalog.streaming.mutate.UgiMetaStoreClientFactory;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
+import 
org.apache.hive.hcatalog.streaming.mutate.client.lock.LockFailureListener;
+
+/** Convenience class for building {@link MutatorClient} instances. */
+public class MutatorClientBuilder {
+
+  private final Map<String, AcidTable> tables = new HashMap<>();
+  private HiveConf configuration;
+  private UserGroupInformation authenticatedUser;
+  private String metaStoreUri;
+  public LockFailureListener lockFailureListener;
+
+  public MutatorClientBuilder configuration(HiveConf conf) {
+    this.configuration = conf;
+    return this;
+  }
+
+  public MutatorClientBuilder authenticatedUser(UserGroupInformation 
authenticatedUser) {
+    this.authenticatedUser = authenticatedUser;
+    return this;
+  }
+
+  public MutatorClientBuilder metaStoreUri(String metaStoreUri) {
+    this.metaStoreUri = metaStoreUri;
+    return this;
+  }
+
+  /** Set a listener to handle {@link Lock} failure events - highly 
recommended. */
+  public MutatorClientBuilder lockFailureListener(LockFailureListener 
lockFailureListener) {
+    this.lockFailureListener = lockFailureListener;
+    return this;
+  }
+
+  /**
+   * Adds a mutation event destination (an ACID table) to be managed by this 
client, which is either unpartitioned or
+   * will is not to have partitions created automatically.
+   */
+  public MutatorClientBuilder addSourceTable(String databaseName, String 
tableName) {
+    addTable(databaseName, tableName, false, TableType.SOURCE);
+    return this;
+  }
+
+  /**
+   * Adds a mutation event destination (an ACID table) to be managed by this 
client, which is either unpartitioned or
+   * will is not to have partitions created automatically.
+   */
+  public MutatorClientBuilder addSinkTable(String databaseName, String 
tableName) {
+    return addSinkTable(databaseName, tableName, false);
+  }
+
+  /**
+   * Adds a partitioned mutation event destination (an ACID table) to be 
managed by this client, where new partitions
+   * will be created as needed.
+   */
+  public MutatorClientBuilder addSinkTable(String databaseName, String 
tableName, boolean createPartitions) {
+    addTable(databaseName, tableName, createPartitions, TableType.SINK);
+    return this;
+  }
+
+  private void addTable(String databaseName, String tableName, boolean 
createPartitions, TableType tableType) {
+    if (databaseName == null) {
+      throw new IllegalArgumentException("Database cannot be null");
+    }
+    if (tableName == null) {
+      throw new IllegalArgumentException("Table cannot be null");
+    }
+    String key = (databaseName + "." + tableName).toUpperCase();
+    AcidTable previous = tables.get(key);
+    if (previous != null) {
+      if (tableType == TableType.SINK && previous.getTableType() != 
TableType.SINK) {
+        tables.remove(key);
+      } else {
+        throw new IllegalArgumentException("Table has already been added: " + 
databaseName + "." + tableName);
+      }
+    }
+
+    Table table = new Table();
+    table.setDbName(databaseName);
+    table.setTableName(tableName);
+    tables.put(key, new AcidTable(databaseName, tableName, createPartitions, 
tableType));
+  }
+
+  /** Builds the client. */
+  public MutatorClient build() throws ClientException, MetaException {
+    String user = authenticatedUser == null ? System.getProperty("user.name") 
: authenticatedUser.getShortUserName();
+    boolean secureMode = authenticatedUser == null ? false : 
authenticatedUser.hasKerberosCredentials();
+
+    configuration = HiveConfFactory.newInstance(configuration, 
this.getClass(), metaStoreUri);
+
+    IMetaStoreClient metaStoreClient;
+    try {
+      metaStoreClient = new UgiMetaStoreClientFactory(metaStoreUri, 
configuration, authenticatedUser, user, secureMode)
+          .newInstance(HCatUtil.getHiveMetastoreClient(configuration));
+    } catch (IOException e) {
+      throw new ClientException("Could not create meta store client.", e);
+    }
+
+    return new MutatorClient(metaStoreClient, configuration, 
lockFailureListener, user, tables.values());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TableType.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TableType.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TableType.java
new file mode 100644
index 0000000..aa6d239
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TableType.java
@@ -0,0 +1,37 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+public enum TableType {
+  SOURCE((byte) 0),
+  SINK((byte) 1);
+
+  private static final TableType[] INDEX = buildIndex();
+
+  private static TableType[] buildIndex() {
+    TableType[] index = new TableType[TableType.values().length];
+    for (TableType type : values()) {
+      byte position = type.getId();
+      if (index[position] != null) {
+        throw new IllegalStateException("Overloaded index: " + position);
+      }
+      index[position] = type;
+    }
+    return index;
+  }
+
+  private byte id;
+
+  private TableType(byte id) {
+    this.id = id;
+  }
+
+  public byte getId() {
+    return id;
+  }
+
+  public static TableType valueOf(byte id) {
+    if (id < 0 || id >= INDEX.length) {
+      throw new IllegalArgumentException("Invalid id: " + id);
+    }
+    return INDEX[id];
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/Transaction.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/Transaction.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/Transaction.java
new file mode 100644
index 0000000..6532900
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/Transaction.java
@@ -0,0 +1,114 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
+import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Transaction {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Transaction.class);
+
+  private final Lock lock;
+  private final IMetaStoreClient metaStoreClient;
+  private final long transactionId;
+
+  private TxnState state;
+
+  Transaction(IMetaStoreClient metaStoreClient, Lock.Options lockOptions) 
throws TransactionException {
+    this(metaStoreClient, new Lock(metaStoreClient, lockOptions));
+  }
+
+  /** Visible for testing only. */
+  Transaction(IMetaStoreClient metaStoreClient, Lock lock) throws 
TransactionException {
+    this.metaStoreClient = metaStoreClient;
+    this.lock = lock;
+    transactionId = open(lock.getUser());
+  }
+
+  public long getTransactionId() {
+    return transactionId;
+  }
+
+  public TxnState getState() {
+    return state;
+  }
+
+  /**
+   * Begin the transaction. Acquires a {@link Lock} for the transaction and 
{@link AcidTable AcidTables}.
+   */
+  public void begin() throws TransactionException {
+    try {
+      lock.acquire(transactionId);
+    } catch (LockException e) {
+      throw new TransactionException("Unable to acquire lock for transaction: 
" + transactionId, e);
+    }
+    state = TxnState.OPEN;
+    LOG.debug("Begin. Transaction id: {}", transactionId);
+  }
+
+  /** Commits the transaction. Releases the {@link Lock}. */
+  public void commit() throws TransactionException {
+    try {
+      lock.release();
+    } catch (LockException e) {
+      // This appears to leave the remove transaction in an inconsistent state 
but the heartbeat is now
+      // cancelled and it will eventually time out
+      throw new TransactionException("Unable to release lock: " + lock + " for 
transaction: " + transactionId, e);
+    }
+    try {
+      metaStoreClient.commitTxn(transactionId);
+      state = TxnState.COMMITTED;
+    } catch (NoSuchTxnException e) {
+      throw new TransactionException("Invalid transaction id: " + 
transactionId, e);
+    } catch (TxnAbortedException e) {
+      throw new TransactionException("Aborted transaction cannot be committed: 
" + transactionId, e);
+    } catch (TException e) {
+      throw new TransactionException("Unable to commit transaction: " + 
transactionId, e);
+    }
+    LOG.debug("Committed. Transaction id: {}", transactionId);
+  }
+
+  /** Aborts the transaction. Releases the {@link Lock}. */
+  public void abort() throws TransactionException {
+    try {
+      lock.release();
+    } catch (LockException e) {
+      // This appears to leave the remove transaction in an inconsistent state 
but the heartbeat is now
+      // cancelled and it will eventually time out
+      throw new TransactionException("Unable to release lock: " + lock + " for 
transaction: " + transactionId, e);
+    }
+    try {
+      metaStoreClient.rollbackTxn(transactionId);
+      state = TxnState.ABORTED;
+    } catch (NoSuchTxnException e) {
+      throw new TransactionException("Unable to abort invalid transaction id : 
" + transactionId, e);
+    } catch (TException e) {
+      throw new TransactionException("Unable to abort transaction id : " + 
transactionId, e);
+    }
+    LOG.debug("Aborted. Transaction id: {}", transactionId);
+  }
+
+  @Override
+  public String toString() {
+    return "Transaction [transactionId=" + transactionId + ", state=" + state 
+ "]";
+  }
+
+  private long open(String user) throws TransactionException {
+    long transactionId = -1;
+    try {
+      transactionId = metaStoreClient.openTxn(user);
+      state = TxnState.INACTIVE;
+    } catch (TException e) {
+      throw new TransactionException("Unable to open transaction for user: " + 
user, e);
+    }
+    LOG.debug("Opened transaction with id: {}", transactionId);
+    return transactionId;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TransactionException.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TransactionException.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TransactionException.java
new file mode 100644
index 0000000..48fb1cf
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TransactionException.java
@@ -0,0 +1,15 @@
+package org.apache.hive.hcatalog.streaming.mutate.client;
+
+public class TransactionException extends ClientException {
+
+  private static final long serialVersionUID = 1L;
+
+  TransactionException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  TransactionException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatFactory.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatFactory.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatFactory.java
new file mode 100644
index 0000000..5814d37
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatFactory.java
@@ -0,0 +1,30 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+import java.util.Collection;
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Creates a default {@link HeartbeatTimerTask} for {@link Lock Locks}. */
+class HeartbeatFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HeartbeatFactory.class);
+
+  /** Creates a new {@link HeartbeatTimerTask} instance for the {@link Lock} 
and schedules it. */
+  Timer newInstance(IMetaStoreClient metaStoreClient, LockFailureListener 
listener, Long transactionId,
+      Collection<Table> tableDescriptors, long lockId, int heartbeatPeriod) {
+    Timer heartbeatTimer = new Timer("hive-lock-heartbeat[lockId=" + lockId + 
", transactionId=" + transactionId + "]",
+        true);
+    HeartbeatTimerTask task = new HeartbeatTimerTask(metaStoreClient, 
listener, transactionId, tableDescriptors, lockId);
+    heartbeatTimer.schedule(task, TimeUnit.SECONDS.toMillis(heartbeatPeriod),
+        TimeUnit.SECONDS.toMillis(heartbeatPeriod));
+
+    LOG.debug("Scheduled heartbeat timer task: {}", heartbeatTimer);
+    return heartbeatTimer;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatTimerTask.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatTimerTask.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatTimerTask.java
new file mode 100644
index 0000000..2446c10
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatTimerTask.java
@@ -0,0 +1,66 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+import java.util.Collection;
+import java.util.TimerTask;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hive.hcatalog.streaming.mutate.client.Transaction;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TimerTask} that sends {@link IMetaStoreClient#heartbeat(long, long) 
heartbeat} events to the
+ * {@link IMetaStoreClient meta store} to keet the {@link Lock} and {@link 
Transaction} alive. Nofifies the registered
+ * {@link LockFailureListener} should the lock fail.
+ */
+class HeartbeatTimerTask extends TimerTask {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HeartbeatTimerTask.class);
+
+  private final IMetaStoreClient metaStoreClient;
+  private final long lockId;
+  private final Long transactionId;
+  private final LockFailureListener listener;
+  private final Collection<Table> tableDescriptors;
+
+  HeartbeatTimerTask(IMetaStoreClient metaStoreClient, LockFailureListener 
listener, Long transactionId,
+      Collection<Table> tableDescriptors, long lockId) {
+    this.metaStoreClient = metaStoreClient;
+    this.listener = listener;
+    this.transactionId = transactionId;
+    this.tableDescriptors = tableDescriptors;
+    this.lockId = lockId;
+    LOG.debug("Reporting to listener {}", listener);
+  }
+
+  @Override
+  public void run() {
+    try {
+      // I'm assuming that there is no transaction ID for a read lock.
+      metaStoreClient.heartbeat(transactionId == null ? 0 : transactionId, 
lockId);
+      LOG.debug("Sent heartbeat for lock={}, transactionId={}", lockId, 
transactionId);
+    } catch (NoSuchLockException | NoSuchTxnException | TxnAbortedException e) 
{
+      failLock(e);
+    } catch (TException e) {
+      LOG.warn("Failed to send heartbeat to meta store.", e);
+    }
+  }
+
+  private void failLock(Exception e) {
+    LOG.debug("Lock " + lockId + " failed, cancelling heartbeat and notifiying 
listener: " + listener, e);
+    // Cancel the heartbeat
+    cancel();
+    listener.lockFailed(lockId, transactionId, 
Lock.asStrings(tableDescriptors), e);
+  }
+
+  @Override
+  public String toString() {
+    return "HeartbeatTimerTask [lockId=" + lockId + ", transactionId=" + 
transactionId + "]";
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
new file mode 100644
index 0000000..21604df
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java
@@ -0,0 +1,282 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the state required to safely read/write from/to an ACID table.
+ */
+public class Lock {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Lock.class);
+
+  private static final double HEARTBEAT_FACTOR = 0.75;
+  private static final int DEFAULT_HEARTBEAT_PERIOD = 275;
+
+  private final IMetaStoreClient metaStoreClient;
+  private final HeartbeatFactory heartbeatFactory;
+  private final LockFailureListener listener;
+  private final Collection<Table> tableDescriptors;
+  private final int lockRetries;
+  private final int retryWaitSeconds;
+  private final String user;
+  private final HiveConf hiveConf;
+
+  private Timer heartbeat;
+  private Long lockId;
+  private Long transactionId;
+
+  public Lock(IMetaStoreClient metaStoreClient, Options options) {
+    this(metaStoreClient, new HeartbeatFactory(), options.hiveConf, 
options.listener, options.user,
+        options.descriptors, options.lockRetries, options.retryWaitSeconds);
+  }
+
+  /** Visible for testing only. */
+  Lock(IMetaStoreClient metaStoreClient, HeartbeatFactory heartbeatFactory, 
HiveConf hiveConf,
+      LockFailureListener listener, String user, Collection<Table> 
tableDescriptors, int lockRetries,
+      int retryWaitSeconds) {
+    this.metaStoreClient = metaStoreClient;
+    this.heartbeatFactory = heartbeatFactory;
+    this.hiveConf = hiveConf;
+    this.user = user;
+    this.tableDescriptors = tableDescriptors;
+    this.listener = listener;
+    this.lockRetries = lockRetries;
+    this.retryWaitSeconds = retryWaitSeconds;
+
+    if (LockFailureListener.NULL_LISTENER.equals(listener)) {
+      LOG.warn("No {} supplied. Data quality and availability cannot be 
assured.",
+          LockFailureListener.class.getSimpleName());
+    }
+  }
+
+  /** Attempts to acquire a read lock on the table, returns if successful, 
throws exception otherwise. */
+  public void acquire() throws LockException {
+    lockId = internalAcquire(null);
+    initiateHeartbeat();
+  }
+
+  /** Attempts to acquire a read lock on the table, returns if successful, 
throws exception otherwise. */
+  public void acquire(long transactionId) throws LockException {
+    lockId = internalAcquire(transactionId);
+    this.transactionId = transactionId;
+    initiateHeartbeat();
+  }
+
+  /** Attempts to release the read lock on the table. Throws an exception if 
the lock failed at any point. */
+  public void release() throws LockException {
+    if (heartbeat != null) {
+      heartbeat.cancel();
+    }
+    internalRelease();
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  @Override
+  public String toString() {
+    return "Lock [metaStoreClient=" + metaStoreClient + ", lockId=" + lockId + 
", transactionId=" + transactionId
+        + "]";
+  }
+
+  private long internalAcquire(Long transactionId) throws LockException {
+    int attempts = 0;
+    LockRequest request = buildSharedLockRequest(transactionId);
+    do {
+      LockResponse response = null;
+      try {
+        response = metaStoreClient.lock(request);
+      } catch (TException e) {
+        throw new LockException("Unable to acquire lock for tables: [" + 
join(tableDescriptors) + "]", e);
+      }
+      if (response != null) {
+        LockState state = response.getState();
+        if (state == LockState.NOT_ACQUIRED || state == LockState.ABORT) {
+          // I expect we'll only see NOT_ACQUIRED here?
+          break;
+        }
+        if (state == LockState.ACQUIRED) {
+          LOG.debug("Acquired lock {}", response.getLockid());
+          return response.getLockid();
+        }
+        if (state == LockState.WAITING) {
+          try {
+            Thread.sleep(TimeUnit.SECONDS.toMillis(retryWaitSeconds));
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+      attempts++;
+    } while (attempts < lockRetries);
+    throw new LockException("Could not acquire lock on tables: [" + 
join(tableDescriptors) + "]");
+  }
+
+  private void internalRelease() {
+    try {
+      // if there is a transaction then this lock will be released on 
commit/abort/rollback instead.
+      if (lockId != null && transactionId == null) {
+        metaStoreClient.unlock(lockId);
+        LOG.debug("Released lock {}", lockId);
+        lockId = null;
+      }
+    } catch (TException e) {
+      LOG.error("Lock " + lockId + " failed.", e);
+      listener.lockFailed(lockId, transactionId, asStrings(tableDescriptors), 
e);
+    }
+  }
+
+  private LockRequest buildSharedLockRequest(Long transactionId) {
+    LockRequestBuilder requestBuilder = new LockRequestBuilder();
+    for (Table descriptor : tableDescriptors) {
+      LockComponent component = new LockComponentBuilder()
+          .setDbName(descriptor.getDbName())
+          .setTableName(descriptor.getTableName())
+          .setShared()
+          .build();
+      requestBuilder.addLockComponent(component);
+    }
+    if (transactionId != null) {
+      requestBuilder.setTransactionId(transactionId);
+    }
+    LockRequest request = requestBuilder.setUser(user).build();
+    return request;
+  }
+
+  private void initiateHeartbeat() {
+    int heartbeatPeriod = getHeartbeatPeriod();
+    LOG.debug("Heartbeat period {}s", heartbeatPeriod);
+    heartbeat = heartbeatFactory.newInstance(metaStoreClient, listener, 
transactionId, tableDescriptors, lockId,
+        heartbeatPeriod);
+  }
+
+  private int getHeartbeatPeriod() {
+    int heartbeatPeriod = DEFAULT_HEARTBEAT_PERIOD;
+    if (hiveConf != null) {
+      // This value is always in seconds and includes an 's' suffix.
+      String txTimeoutSeconds = 
hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT);
+      if (txTimeoutSeconds != null) {
+        // We want to send the heartbeat at an interval that is less than the 
timeout.
+        heartbeatPeriod = Math.max(1,
+            (int) (Integer.parseInt(txTimeoutSeconds.substring(0, 
txTimeoutSeconds.length() - 1)) * HEARTBEAT_FACTOR));
+      }
+    }
+    return heartbeatPeriod;
+  }
+
+  /** Visible for testing only. */
+  Long getLockId() {
+    return lockId;
+  }
+
+  /** Visible for testing only. */
+  Long getTransactionId() {
+    return transactionId;
+  }
+
+  /** Visible for testing only. */
+  static String join(Iterable<? extends Object> values) {
+    return StringUtils.join(values, ",");
+  }
+
+  /** Visible for testing only. */
+  static List<String> asStrings(Collection<Table> tables) {
+    List<String> strings = new ArrayList<>(tables.size());
+    for (Table descriptor : tables) {
+      strings.add(descriptor.getDbName() + "." + descriptor.getTableName());
+    }
+    return strings;
+  }
+
+  /** Constructs a lock options for a set of Hive ACID tables from which we 
wish to read. */
+  public static final class Options {
+    Set<Table> descriptors = new LinkedHashSet<>();
+    LockFailureListener listener = LockFailureListener.NULL_LISTENER;
+    int lockRetries = 5;
+    int retryWaitSeconds = 30;
+    String user;
+    HiveConf hiveConf;
+
+    /** Adds a table for which a shared read lock will be requested. */
+    public Options addTable(String databaseName, String tableName) {
+      checkNotNullOrEmpty(databaseName);
+      checkNotNullOrEmpty(tableName);
+      Table table = new Table();
+      table.setDbName(databaseName);
+      table.setTableName(tableName);
+      descriptors.add(table);
+      return this;
+    }
+
+    public Options user(String user) {
+      checkNotNullOrEmpty(user);
+      this.user = user;
+      return this;
+    }
+
+    public Options configuration(HiveConf hiveConf) {
+      checkNotNull(hiveConf);
+      this.hiveConf = hiveConf;
+      return this;
+    }
+
+    /** Sets a listener to handle failures of locks that were previously 
acquired. */
+    public Options lockFailureListener(LockFailureListener listener) {
+      checkNotNull(listener);
+      this.listener = listener;
+      return this;
+    }
+
+    public Options lockRetries(int lockRetries) {
+      checkArgument(lockRetries > 0);
+      this.lockRetries = lockRetries;
+      return this;
+    }
+
+    public Options retryWaitSeconds(int retryWaitSeconds) {
+      checkArgument(retryWaitSeconds > 0);
+      this.retryWaitSeconds = retryWaitSeconds;
+      return this;
+    }
+
+    private static void checkArgument(boolean value) {
+      if (!value) {
+        throw new IllegalArgumentException();
+      }
+    }
+
+    private static void checkNotNull(Object value) {
+      if (value == null) {
+        throw new IllegalArgumentException();
+      }
+    }
+
+    private static void checkNotNullOrEmpty(String value) {
+      if (StringUtils.isBlank(value)) {
+        throw new IllegalArgumentException();
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockException.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockException.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockException.java
new file mode 100644
index 0000000..67ed601
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockException.java
@@ -0,0 +1,15 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+public class LockException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  public LockException(String message) {
+    super(message);
+  }
+
+  public LockException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockFailureListener.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockFailureListener.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockFailureListener.java
new file mode 100644
index 0000000..2b6a12a
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockFailureListener.java
@@ -0,0 +1,26 @@
+package org.apache.hive.hcatalog.streaming.mutate.client.lock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Provides a means to handle the situation when a held lock fails. */
+public interface LockFailureListener {
+
+  static final Logger LOG = LoggerFactory.getLogger(LockFailureListener.class);
+
+  static final LockFailureListener NULL_LISTENER = new LockFailureListener() {
+    @Override
+    public void lockFailed(long lockId, Long transactionId, Iterable<String> 
tableNames, Throwable t) {
+      LOG.warn(
+          "Ignored lock failure: lockId=" + lockId + ", transactionId=" + 
transactionId + ", tables=" + tableNames, t);
+    }
+    
+    public String toString() {
+      return LockFailureListener.class.getName() + ".NULL_LISTENER";
+    }
+  };
+
+  /** Called when the specified lock has failed. You should probably abort 
your job in this case. */
+  void lockFailed(long lockId, Long transactionId, Iterable<String> 
tableNames, Throwable t);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.dot
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.dot
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.dot
new file mode 100644
index 0000000..79c30e7
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.dot
@@ -0,0 +1,27 @@
+digraph "API Usage" {
+  nodesep=1.2;
+
+  DATA [label="ACID\ndataset",shape=oval,style=filled,color="gray"];
+  CHANGES [label="Changed\ndata",shape=oval,style=filled,color="gray"];
+  
+  META_STORE 
[label="Hive\nMetaStore",shape=box,style=filled,color="darkseagreen3"];
+  HIVE_CLI [label="Hive\nCLI",shape=box,style=filled,color="darkseagreen3"];   
+
+  MERGE1 [label="Compute\nmutations\n(your 
code)",shape=box,style=filled,color="khaki1"];
+  SORT [label="Group\n& sort\n(your 
code)",shape=box,style=filled,color="khaki1"];
+  CLIENT [label="Mutator\nclient",shape=box,style=filled,color="lightblue"];
+  BUCKET [label="Bucket 
ID\nappender",shape=box,style=filled,color="lightblue"];
+  COORD 
[label="Mutator\ncoordinator",shape=box,style=filled,color="lightblue"]; 
+  CLIENT -> COORD [label="Provides\nconf to"];
+  CLIENT -> BUCKET [label="Provides\nconf to"];
+  
+  CLIENT -> META_STORE [label="Manages\ntxns using"];
+  CHANGES -> MERGE1 [label="Reads ∆s\nfrom"];
+  DATA -> MERGE1 [label="Reads\nROW__IDs\nfrom"];
+  BUCKET -> MERGE1 [label="Appends ids\nto inserts"];
+  MERGE1 -> SORT;
+  SORT -> COORD [label="Issues\nmutations to"];
+  COORD -> DATA [label="Writes to"];
+  DATA -> HIVE_CLI [label="Read by"];
+  META_STORE -> DATA [label="Compacts"]; 
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
new file mode 100644
index 0000000..9fc10b6
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
@@ -0,0 +1,495 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
+        "http://www.w3.org/TR/html4/loose.dtd";>
+
+<html lang="en">
+
+<head>
+<meta name=Title content="HCatalog Streaming Mutation API">
+<meta name=Keywords content="HCatalog Streaming Mutation ACID">
+<meta http-equiv=Content-Type content="text/html; charset=utf-8">
+<title>HCatalog Streaming Mutation API</title>
+</head>
+
+<body>
+
+<h1>HCatalog Streaming Mutation API -- high level description</h1>
+
+<h2>Background</h2>
+<p>
+In certain data processing use cases it is necessary to modify existing
+data when new facts arrive. An example of this is the classic ETL merge
+where a copy of a data set is kept in sync with a master by the frequent
+application of deltas. The deltas describe the mutations (inserts,
+updates, deletes) that have occurred to the master since the previous
+sync. To implement such a case using Hadoop traditionally demands that
+the partitions containing records targeted by the mutations be
+rewritten. This is a coarse approach; a partition containing millions of
+records might be rebuilt because of a single record change. Additionally
+these partitions cannot be restated atomically; at some point the old
+partition data must be swapped with the new partition data. When this
+swap occurs, usually by issuing an HDFS
+<code>rm</code>
+followed by a
+<code>mv</code>
+, the possibility exists where the data appears to be unavailable and
+hence any downstream jobs consuming the data might unexpectedly fail.
+Therefore data processing patterns that restate raw data on HDFS cannot
+operate robustly without some external mechanism to orchestrate
+concurrent access to changing data.
+</p>
+
+<p>
+The availability of ACID tables in Hive provides a mechanism that both
+enables concurrent access to data stored in HDFS (so long as it's in the
+ORC+ACID format), and also permits row level mutations or records within
+a table, without the need to rewrite the existing data. But while Hive
+itself supports
+<code>INSERT</code>
+,
+<code>UPDATE</code>
+and
+<code>DELETE</code>
+commands, and the ORC format can support large batches of mutations in a
+transaction, Hive's execution engine currently submits each individual
+mutation operation in a separate transaction and issues table scans (M/R
+jobs) to execute them. It does not currently scale to the demands of
+processing large deltas in an atomic manner. Furthermore it would be
+advantageous to extend atomic batch mutation capabilities beyond Hive by
+making them available to other data processing frameworks. The Streaming
+Mutation API does just this.
+</p>
+
+<p>The Streaming Mutation API, although similar to the Streaming
+API, has a number of differences and are built to enable very different
+use cases. Superficially, the Streaming API can only write new data
+whereas the mutation API can also modify existing data. However the two
+APIs also based on very different transaction models. The Streaming API
+focuses on surfacing a continuous stream of new data into a Hive table
+and does so by batching small sets of writes into multiple short-lived
+transactions. Conversely the mutation API is designed to infrequently
+apply large sets of mutations to a data set in an atomic fashion; all
+mutations will either be applied or they will not. This instead mandates
+the use of a single long-lived transaction. This table summarises the
+attributes of each API:</p>
+
+<table border="1">
+<thead>
+<tr>
+<th>Attribute</th>
+<th>Streaming API</th>
+<th>Mutation API</th>
+</tr>
+<tr>
+<td>Ingest type</td>
+<td>Data arrives continuously</td>
+<td>Ingests are performed periodically and the mutations are
+applied in a single batch</td>
+</tr>
+<tr>
+<td>Transaction scope</td>
+<td>Transactions are created for small batches of writes</td>
+<td>The entire set of mutations should be applied within a single
+transaction</td>
+</tr>
+<tr>
+<td>Data availability</td>
+<td>Surfaces new data to users frequently and quickly</td>
+<td>Change sets should be applied atomically, either the effect of
+the delta is visible or it is not</td>
+</tr>
+<tr>
+<td>Sensitive to record order</td>
+<td>No, records do not have pre-existing lastTxnIds or bucketIds.
+Records are likely being written into a single partition (today's date
+for example)</td>
+<td>Yes, all mutated records have existing <code>RecordIdentifiers</code>
+and must be grouped by (partitionValues, bucketId) and sorted by
+lastTxnId. These record coordinates initially arrive in an order that is
+effectively random.
+</td>
+</tr>
+<tr>
+<td>Impact of a write failure</td>
+<td>Transaction can be aborted and producer can choose to resubmit
+failed records as ordering is not important.</td>
+<td>Ingest for the respective must be halted and failed records
+resubmitted to preserve sequence.</td>
+</tr>
+<tr>
+<td>User perception of missing data</td>
+<td>Data has not arrived yet → "latency?"</td>
+<td>"This data is inconsistent, some records have been updated, but
+other related records have not" - consider here the classic transfer
+between bank accounts scenario</td>
+</tr>
+<tr>
+<td>API end point scope</td>
+<td>A given <code>HiveEndPoint</code> instance submits many
+transactions to a specific bucket, in a specific partition, of a
+specific table
+</td>
+<td>A set of<code>MutationCoordinators</code> write changes to
+unknown set of buckets, of an unknown set of partitions, of specific
+tables (can be more than one), within a single transaction.
+</td>
+</tr>
+</thead>
+</table>
+
+<h2>Structure</h2>
+<p>The API comprises two main concerns: transaction management, and
+the writing of mutation operations to the data set. The two concerns
+have a minimal coupling as it is expected that transactions will be
+initiated from a single job launcher type processes while the writing of
+mutations will be scaled out across any number of worker nodes. In the
+context of Hadoop M/R these can be more concretely defined as the Tool
+and Map/Reduce task components. However, use of this architecture is not
+mandated and in fact both concerns could be handled within a single
+simple process depending on the requirements.</p>
+
+<p>Note that a suitably configured Hive instance is required to
+operate this system even if you do not intend to access the data from
+within Hive. Internally, transactions are managed by the Hive MetaStore.
+Mutations are performed to HDFS via ORC APIs that bypass the MetaStore.
+Additionally you may wish to configure your MetaStore instance to
+perform periodic data compactions.</p>
+
+<p>
+<b>Note on packaging</b>: The APIs are defined in the 
<b>org.apache.hive.hcatalog.streaming.mutate</b>
+Java package and included as the hive-hcatalog-streaming jar.
+</p>
+
+<h2>Data requirements</h2>
+<p>
+Generally speaking, to apply a mutation to a record one must have some
+unique key that identifies the record. However, primary keys are not a
+construct provided by Hive. Internally Hive uses
+<code>RecordIdentifiers</code>
+stored in a virtual
+<code>ROW__ID</code>
+column to uniquely identified records within an ACID table. Therefore,
+any process that wishes to issue mutations to a table via this API must
+have available the corresponding row ids for the target records. What
+this means in practice is that the process issuing mutations must first
+read in a current snapshot the data and then join the mutations on some
+domain specific primary key to obtain the corresponding Hive
+<code>ROW__ID</code>
+. This is effectively what occurs within Hive's table scan process when
+an
+<code>UPDATE</code>
+or
+<code>DELETE</code>
+statement is executed. The
+<code>AcidInputFormat</code>
+provides access to this data via
+<code>AcidRecordReader.getRecordIdentifier()</code>
+.
+</p>
+
+<p>
+The implementation of the ACID format places some constraints on the
+order in which records are written and it is important that this
+ordering is enforced. Additionally, data must be grouped appropriately
+to adhere to the constraints imposed be the
+<code>OrcRecordUpdater</code>
+. Grouping also makes it possible parallelise the writing of mutations
+for the purposes of scaling. Finally, to correctly bucket new records
+(inserts) there is a slightly unintuitive trick that must be applied.
+</p>
+
+<p>All of these data sequencing concerns are the responsibility of
+the client process calling the API which is assumed to have first class
+grouping and sorting capabilities (Hadoop Map/Reduce etc.) The streaming
+API provides nothing more than validators that fail fast when they
+encounter groups and records that are out of sequence.</p>
+
+<p>In short, API client processes should prepare data for the mutate
+API like so:</p>
+<ul>
+<li><b>MUST:</b> Order records by <code>ROW__ID.originalTxn</code>,
+then <code>ROW__ID.rowId</code>.</li>
+<li><b>MUST:</b> Assign a <code>ROW__ID</code> containing a
+computed <code>bucketId</code> to records to be inserted.</li>
+<li><b>SHOULD:</b> Group/partition by table partition value, then 
<code>ROW__ID.bucketId</code>.</li>
+</ul>
+
+<p>
+The addition of a bucket ids to insert records prior to grouping and
+sorting seems unintuitive. However, it is required both to ensure
+adequate partitioning of new data and bucket allocation consistent with
+that provided by Hive. In a typical ETL the majority of mutation events
+are inserts, often targeting a single partition (new data for the
+previous day, hour, etc.) If more that one worker is writing said
+events, were we to leave the bucket id empty then all inserts would go
+to a single worker (e.g: reducer) and the workload could be heavily
+skewed. The assignment of a computed bucket allows inserts to be more
+usefully distributed across workers. Additionally, when Hive is working
+with the data it may expect records to have been bucketed in a way that
+is consistent with it's own internal scheme. A convenience type and
+method is provided to more easily compute and append bucket ids:
+<code>BucketIdResolver</code>
+and
+<code>BucketIdResolverImpl</code>
+.
+</p>
+
+<p>Update operations should not attempt to modify values of
+partition or bucketing columns. The API does not prevent this and such
+attempts could lead to data corruption.</p>
+
+<h2>Streaming requirements</h2>
+<p>A few things are currently required to use streaming.</p>
+
+<p>
+<ol>
+<li>Currently, only ORC storage format is supported. So '<b>stored
+as orc</b>' must be specified during table creation.
+</li>
+<li>The hive table must be bucketed, but not sorted. So something
+like '<b>clustered by (<i>colName</i>) into <i>10</i> buckets
+</b>' must be specified during table creation.
+</li>
+<li>User of the client streaming process must have the necessary
+permissions to write to the table or partition and create partitions in
+the table.</li>
+<li>Settings required in hive-site.xml for Metastore:
+<ol>
+<li><b>hive.txn.manager =
+org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</b></li>
+<li><b>hive.support.concurrency = true </b></li>
+<li><b>hive.compactor.initiator.on = true</b></li>
+<li><b>hive.compactor.worker.threads > 0 </b></li>
+</ol>
+</li>
+</ol>
+</p>
+
+<p>
+<b>Note:</b> Streaming mutations to <b>unpartitioned</b> tables is also
+supported.
+</p>
+
+<h2>Record layout</h2>
+<p>
+The structure, layout, and encoding of records is the exclusive concern
+of the client ETL mutation process and may be quite different from the
+target Hive ACID table. The mutation API requires concrete
+implementations of the
+<code>MutatorFactory</code>
+and
+<code>Mutator</code>
+classes to extract pertinent data from records and serialize data into
+the ACID files. Fortunately base classes are provided (
+<code>AbstractMutator</code>
+,
+<code>RecordInspectorImpl</code>
+) to simplify this effort and usually all that is required is the
+specification of a suitable
+<code>ObjectInspector</code>
+and the provision of the indexes of the
+<code>ROW__ID</code>
+and bucketed columns within the record structure. Note that all column
+indexes in these classes are with respect to your record structure, not
+the Hive table structure.
+</p>
+<p>
+You will likely also want to use a
+<code>BucketIdResolver</code>
+to append bucket ids to new records for insertion. Fortunately the core
+implementation is provided in
+<code>BucketIdResolverImpl</code>
+but note that bucket column indexes must be presented in the same order
+as they are in the Hive table definition to ensure consistent bucketing.
+Note that you cannot move records between buckets and an exception will
+be thrown if you attempt to do so. In real terms this mean that you
+should not attempt to modify the values in bucket columns with an
+<code>UPDATE</code>
+.
+</p>
+
+<h2>Connection and Transaction management</h2>
+<p>
+The
+<code>MutatorClient</code>
+class is used to create and manage transactions in which mutations can
+be performed. The scope of a transaction can extend across multiple ACID
+tables. When a client connects it communicates with the meta store to
+verify and acquire meta data for the target tables. An invocation of
+<code>newTransaction</code>
+then opens a transaction with the meta store, finalizes a collection of
+<code>AcidTables</code>
+and returns a new
+<code>Transaction</code>
+instance. The acid tables are light-weight, serializable objects that
+are used by the mutation writing components of the API to target
+specific ACID file locations. Usually your
+<code>MutatorClient</code>
+will be running on some master node and your coordinators on worker
+nodes. In this event the
+<code>AcidTableSerializer</code>
+can be used to encode the tables in a more transportable form, for use
+as a
+<code>Configuration</code>
+property for example.
+</p>
+<p>
+As you would expect, a
+<code>Transaction</code>
+must be initiated with a call to
+<code>begin</code>
+before any mutations can be applied. This invocation acquires a lock on
+the targeted tables using the meta store, and initiates a heartbeat to
+prevent transaction timeouts. It is highly recommended that you register
+a
+<code>LockFailureListener</code>
+with the client so that your process can handle any lock or transaction
+failures. Typically you may wish to abort the job in the event of such
+an error. With the transaction in place you can now start streaming
+mutations with one or more
+<code>MutatorCoordinator</code>
+instances (more on this later), can can finally
+<code>commit</code>
+or
+<code>abort</code>
+the transaction when the change set has been applied, which will release
+the lock with the meta store client. Finally you should
+<code>close</code>
+the mutation client to release any held resources.
+</p>
+<p>
+The
+<code>MutatorClientBuilder</code>
+is provided to simplify the construction of clients.
+</p>
+
+<p>
+<b>WARNING:</b> Hive doesn't currently have a deadlock detector (it is
+being worked on as part of <a
+href="https://issues.apache.org/jira/browse/HIVE-9675";>HIVE-9675</a>).
+This API could potentially deadlock with other stream writers or with
+SQL users.
+</p>
+<h2>Writing data</h2>
+
+<p>
+The
+<code>MutatorCoordinator</code>
+class is used to issue mutations to an ACID table. You will require at
+least one instance per table participating in the transaction. The
+target of a given instance is defined by the respective
+<code>AcidTable</code>
+used to construct the coordinator. It is recommended that a
+<code>MutatorClientBuilder</code>
+is used to simplify the construction process.
+</p>
+
+<p>
+Mutations can be applied by invoking the respective
+<code>insert</code>
+,
+<code>update</code>
+, and
+<code>delete</code>
+methods on the coordinator. These methods each take as parameters the
+target partition of the record and the mutated record. In the case of an
+unpartitioned table you should simply pass an empty list as the
+partition value. For inserts specifically, only the bucket id will be
+extracted from the
+<code>RecordIdentifier</code>
+, the transactionId and rowId will be ignored and replaced by
+appropriate values in the
+<code>RecordUpdater</code>
+. Additionally, in the case of deletes, everything but the
+<code>RecordIdentifier</code>
+in the record will be ignored and therefore it is often easier to simply
+submit the original record.
+</p>
+
+<p>
+<b>Caution:</b> As mentioned previously, mutations must arrive in
+specific order for the resultant table data to be consistent.
+Coordinators will verify a naturally ordered sequence of
+(lastTransactionId, rowId) and will throw an exception if this sequence
+is broken. This exception should almost certainly be escalated so that
+the transaction is aborted. This, along with the correct ordering of the
+data, is the responsibility of the client using the API.
+</p>
+
+<h3>Dynamic Partition Creation:</h3>
+It is very likely to be desirable to have new partitions created
+automatically (say on a hourly basis). In such cases requiring the Hive
+admin to pre-create the necessary partitions may not be reasonable.
+Consequently the API allows coordinators to create partitions as needed
+(see:
+<code>MutatorClientBuilder.addTable(String, String, boolean)</code>
+). Partition creation being an atomic action, multiple coordinators can
+race to create the partition, but only one would succeed, so
+coordinators clients need not synchronize when creating a partition. The
+user of the coordinator process needs to be given write permissions on
+the Hive table in order to create partitions.
+
+<h2>Reading data</h2>
+
+<p>
+Although this API is concerned with writing changes to data, as
+previously stated we'll almost certainly have to read the existing data
+first to obtain the relevant
+<code>ROW_IDs</code>
+. Therefore it is worth noting that reading ACID data in a robust and
+consistent manner requires the following:
+<ol>
+<li>Obtaining a valid transaction list from the meta store 
(<code>ValidTxnList</code>).
+</li>
+<li>Acquiring a read-lock with the meta store and issuing
+heartbeats (<code>LockImpl</code> can help with this).
+</li>
+<li>Configuring the <code>OrcInputFormat</code> and then reading
+the data. Make sure that you also pull in the <code>ROW__ID</code>
+values. See: <code>AcidRecordReader.getRecordIdentifier</code>.
+</li>
+<li>Releasing the read-lock.</li>
+</ol>
+</p>
+
+<h2>Example</h2>
+<p>
+<img src="doc-files/system-overview.png" />
+</p>
+<p>So to recap, the sequence of events required to apply mutations
+to a dataset using the API is:</p>
+<ol>
+<li>Create a <code>MutatorClient</code> to manage a transaction for
+the targeted ACID tables. This set of tables should include any
+transactional destinations or sources. Don't forget to register a 
<code>LockFailureListener</code>
+so that you can handle transaction failures.
+</li>
+<li>Open a new <code>Transaction</code> with the client.
+</li>
+<li>Get the <code>AcidTables</code> from the client.
+</li>
+<li>Begin the transaction.</li>
+<li>Create at least one <code>MutatorCoordinator</code> for each
+table. The <code>AcidTableSerializer</code> can help you transport the 
<code>AcidTables</code>
+when your workers are in a distributed environment.
+</li>
+<li>Compute your mutation set (this is your ETL merge process).</li>
+<li>Append bucket ids to insertion records. A <code>BucketIdResolver</code>
+can help here.
+</li>
+<li>Group and sort your data appropriately.</li>
+<li>Issue mutation events to your coordinators.</li>
+<li>Close your coordinators.</li>
+<li>Abort or commit the transaction.</li>
+<li>Close your mutation client.</li>
+</ol>
+<p>
+See
+<code>ExampleUseCase</code>
+and
+<code>TestMutations.testUpdatesAndDeletes()</code>
+for some very simple usages.
+</p>
+
+</body>
+
+</html>

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdException.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdException.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdException.java
new file mode 100644
index 0000000..656324c
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdException.java
@@ -0,0 +1,11 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+public class BucketIdException extends WorkerException {
+
+  private static final long serialVersionUID = 1L;
+
+  BucketIdException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java
new file mode 100644
index 0000000..dab2072
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java
@@ -0,0 +1,11 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+/** Computes and appends bucket ids to records that are due to be inserted. */
+public interface BucketIdResolver {
+
+  Object attachBucketIdToRecord(Object record);
+
+  /** See: {@link 
org.apache.hadoop.hive.ql.exec.ReduceSinkOperator#computeBucketNumber(Object, 
int)}. */
+  int computeBucketId(Object record);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
new file mode 100644
index 0000000..dbed9e1
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
@@ -0,0 +1,76 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+
+/**
+ * Implementation of a {@link BucketIdResolver} that includes the logic 
required to calculate a bucket id from a record
+ * that is consistent with Hive's own internal computation scheme.
+ */
+public class BucketIdResolverImpl implements BucketIdResolver {
+
+  private static final long INVALID_TRANSACTION_ID = -1L;
+  private static final long INVALID_ROW_ID = -1L;
+
+  private final SettableStructObjectInspector structObjectInspector;
+  private final StructField[] bucketFields;
+  private final int totalBuckets;
+  private final StructField recordIdentifierField;
+
+  /**
+   * Note that all column indexes are with respect to your record structure, 
not the Hive table structure. Bucket column
+   * indexes must be presented in the same order as they are in the Hive table 
definition.
+   */
+  public BucketIdResolverImpl(ObjectInspector objectInspector, int 
recordIdColumn, int totalBuckets, int[] bucketColumns) {
+    this.totalBuckets = totalBuckets;
+    if (!(objectInspector instanceof SettableStructObjectInspector)) {
+      throw new IllegalArgumentException("Serious problem, expected a 
StructObjectInspector, " + "but got a "
+          + objectInspector.getClass().getName());
+    }
+
+    if (bucketColumns.length < 1) {
+      throw new IllegalArgumentException("No bucket column indexes set.");
+    }
+    structObjectInspector = (SettableStructObjectInspector) objectInspector;
+    List<? extends StructField> structFields = 
structObjectInspector.getAllStructFieldRefs();
+
+    recordIdentifierField = structFields.get(recordIdColumn);
+
+    bucketFields = new StructField[bucketColumns.length];
+    for (int i = 0; i < bucketColumns.length; i++) {
+      int bucketColumnsIndex = bucketColumns[i];
+      bucketFields[i] = structFields.get(bucketColumnsIndex);
+    }
+  }
+
+  @Override
+  public Object attachBucketIdToRecord(Object record) {
+    int bucketId = computeBucketId(record);
+    RecordIdentifier recordIdentifier = new 
RecordIdentifier(INVALID_TRANSACTION_ID, bucketId, INVALID_ROW_ID);
+    structObjectInspector.setStructFieldData(record, recordIdentifierField, 
recordIdentifier);
+    return record;
+  }
+
+  /** Based on: {@link 
org.apache.hadoop.hive.ql.exec.ReduceSinkOperator#computeBucketNumber(Object, 
int)}. */
+  @Override
+  public int computeBucketId(Object record) {
+    int bucketId = 1;
+
+    for (int columnIndex = 0; columnIndex < bucketFields.length; 
columnIndex++) {
+      Object columnValue = structObjectInspector.getStructFieldData(record, 
bucketFields[columnIndex]);
+      bucketId = bucketId * 31 + ObjectInspectorUtils.hashCode(columnValue, 
bucketFields[columnIndex].getFieldObjectInspector());
+    }
+
+    if (bucketId < 0) {
+      bucketId = -1 * bucketId;
+    }
+
+    return bucketId % totalBuckets;
+  }
+
+}

Reply via email to