HIVE-10165 Improve hive-hcatalog-streaming extensibility and support updates and deletes (Eliot West via gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/994d98c0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/994d98c0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/994d98c0 Branch: refs/heads/master Commit: 994d98c0963ee48c2abbfee6f389d75c0223c8f1 Parents: 3991dba Author: Alan Gates <ga...@hortonworks.com> Authored: Tue Jun 30 14:59:55 2015 -0700 Committer: Alan Gates <ga...@hortonworks.com> Committed: Tue Jun 30 14:59:55 2015 -0700 ---------------------------------------------------------------------- .gitignore | 1 + hcatalog/streaming/pom.xml | 6 + .../streaming/mutate/HiveConfFactory.java | 63 +++ .../mutate/UgiMetaStoreClientFactory.java | 102 ++++ .../streaming/mutate/client/AcidTable.java | 112 ++++ .../mutate/client/AcidTableSerializer.java | 100 ++++ .../mutate/client/ClientException.java | 15 + .../mutate/client/ConnectionException.java | 15 + .../streaming/mutate/client/MutatorClient.java | 140 +++++ .../mutate/client/MutatorClientBuilder.java | 115 ++++ .../streaming/mutate/client/TableType.java | 37 ++ .../streaming/mutate/client/Transaction.java | 114 ++++ .../mutate/client/TransactionException.java | 15 + .../mutate/client/lock/HeartbeatFactory.java | 30 + .../mutate/client/lock/HeartbeatTimerTask.java | 66 +++ .../streaming/mutate/client/lock/Lock.java | 282 ++++++++++ .../mutate/client/lock/LockException.java | 15 + .../mutate/client/lock/LockFailureListener.java | 26 + .../mutate/doc-files/system-overview.dot | 27 + .../hive/hcatalog/streaming/mutate/package.html | 495 +++++++++++++++++ .../mutate/worker/BucketIdException.java | 11 + .../mutate/worker/BucketIdResolver.java | 11 + .../mutate/worker/BucketIdResolverImpl.java | 76 +++ .../mutate/worker/CreatePartitionHelper.java | 83 +++ .../mutate/worker/GroupRevisitedException.java | 11 + .../mutate/worker/GroupingValidator.java | 74 +++ .../streaming/mutate/worker/Mutator.java | 21 + .../mutate/worker/MutatorCoordinator.java | 281 ++++++++++ .../worker/MutatorCoordinatorBuilder.java | 76 +++ .../streaming/mutate/worker/MutatorFactory.java | 16 + .../streaming/mutate/worker/MutatorImpl.java | 84 +++ .../streaming/mutate/worker/OperationType.java | 7 + .../worker/PartitionCreationException.java | 15 + .../mutate/worker/RecordInspector.java | 11 + .../mutate/worker/RecordInspectorImpl.java | 45 ++ .../mutate/worker/RecordSequenceException.java | 11 + .../mutate/worker/SequenceValidator.java | 49 ++ .../mutate/worker/WorkerException.java | 15 + .../streaming/mutate/ExampleUseCase.java | 82 +++ .../streaming/mutate/MutableRecord.java | 50 ++ .../mutate/ReflectiveMutatorFactory.java | 51 ++ .../streaming/mutate/StreamingAssert.java | 191 +++++++ .../streaming/mutate/StreamingTestUtils.java | 261 +++++++++ .../streaming/mutate/TestMutations.java | 544 +++++++++++++++++++ .../mutate/client/TestAcidTableSerializer.java | 66 +++ .../mutate/client/TestMutatorClient.java | 176 ++++++ .../mutate/client/TestTransaction.java | 95 ++++ .../client/lock/TestHeartbeatTimerTask.java | 100 ++++ .../streaming/mutate/client/lock/TestLock.java | 283 ++++++++++ .../mutate/worker/TestBucketIdResolverImpl.java | 38 ++ .../mutate/worker/TestGroupingValidator.java | 70 +++ .../mutate/worker/TestMutatorCoordinator.java | 234 ++++++++ .../mutate/worker/TestMutatorImpl.java | 99 ++++ .../mutate/worker/TestRecordInspectorImpl.java | 31 ++ .../mutate/worker/TestSequenceValidator.java | 91 ++++ 55 files changed, 5135 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index c5decaf..4d341a0 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ hcatalog/webhcat/java-client/target hcatalog/storage-handlers/hbase/target hcatalog/webhcat/svr/target conf/hive-default.xml.template +.DS_Store http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/pom.xml ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/pom.xml b/hcatalog/streaming/pom.xml index 2135e89..6d03ce1 100644 --- a/hcatalog/streaming/pom.xml +++ b/hcatalog/streaming/pom.xml @@ -89,6 +89,12 @@ <optional>true</optional> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <optional>true</optional> + <version>3.3.2</version> + </dependency> <!-- test --> <dependency> http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/HiveConfFactory.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/HiveConfFactory.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/HiveConfFactory.java new file mode 100644 index 0000000..fcf446c --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/HiveConfFactory.java @@ -0,0 +1,63 @@ +package org.apache.hive.hcatalog.streaming.mutate; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Creates/configures {@link HiveConf} instances with required ACID attributes. */ +public class HiveConfFactory { + + private static final Logger LOG = LoggerFactory.getLogger(HiveConfFactory.class); + private static final String TRANSACTION_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; + + public static HiveConf newInstance(Configuration configuration, Class<?> clazz, String metaStoreUri) { + HiveConf hiveConf = null; + if (configuration != null) { + if (!HiveConf.class.isAssignableFrom(configuration.getClass())) { + hiveConf = new HiveConf(configuration, clazz); + } else { + hiveConf = (HiveConf) configuration; + } + } + + if (hiveConf == null) { + hiveConf = HiveConfFactory.newInstance(clazz, metaStoreUri); + } else { + HiveConfFactory.overrideSettings(hiveConf); + } + return hiveConf; + } + + public static HiveConf newInstance(Class<?> clazz, String metaStoreUri) { + HiveConf conf = new HiveConf(clazz); + if (metaStoreUri != null) { + setHiveConf(conf, HiveConf.ConfVars.METASTOREURIS, metaStoreUri); + } + overrideSettings(conf); + return conf; + } + + public static void overrideSettings(HiveConf conf) { + setHiveConf(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER, TRANSACTION_MANAGER); + setHiveConf(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); + setHiveConf(conf, HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true); + // Avoids creating Tez Client sessions internally as it takes much longer currently + setHiveConf(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr"); + } + + private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, String value) { + if (LOG.isDebugEnabled()) { + LOG.debug("Overriding HiveConf setting : {} = {}", var, value); + } + conf.setVar(var, value); + } + + private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean value) { + if (LOG.isDebugEnabled()) { + LOG.debug("Overriding HiveConf setting : {} = {}", var, value); + } + conf.setBoolVar(var, value); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/UgiMetaStoreClientFactory.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/UgiMetaStoreClientFactory.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/UgiMetaStoreClientFactory.java new file mode 100644 index 0000000..2a4ddbe --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/UgiMetaStoreClientFactory.java @@ -0,0 +1,102 @@ +package org.apache.hive.hcatalog.streaming.mutate; + +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TException; + +import com.google.common.reflect.AbstractInvocationHandler; + +/** + * Creates a proxied {@link IMetaStoreClient client} that wraps calls in a {@link PrivilegedExceptionAction} if the + * {@link UserGroupInformation} is specified. Invokes directly otherwise. + */ +public class UgiMetaStoreClientFactory { + + private static Set<Method> I_META_STORE_CLIENT_METHODS = getIMetaStoreClientMethods(); + + private final String metaStoreUri; + private final HiveConf conf; + private final boolean secureMode; + private final UserGroupInformation authenticatedUser; + private final String user; + + public UgiMetaStoreClientFactory(String metaStoreUri, HiveConf conf, UserGroupInformation authenticatedUser, + String user, boolean secureMode) { + this.metaStoreUri = metaStoreUri; + this.conf = conf; + this.authenticatedUser = authenticatedUser; + this.user = user; + this.secureMode = secureMode; + if (metaStoreUri != null) { + conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri); + } + if (secureMode) { + conf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true); + } + } + + public IMetaStoreClient newInstance() throws MetaException { + return newInstance(new HiveMetaStoreClient(conf)); + } + + public IMetaStoreClient newInstance(IMetaStoreClient delegate) throws MetaException { + return createProxy(delegate, user, authenticatedUser); + } + + @Override + public String toString() { + return "UgiMetaStoreClientFactory [metaStoreUri=" + metaStoreUri + ", secureMode=" + secureMode + + ", authenticatedUser=" + authenticatedUser + ", user=" + user + "]"; + } + + private IMetaStoreClient createProxy(final IMetaStoreClient delegate, final String user, + final UserGroupInformation authenticatedUser) { + InvocationHandler handler = new AbstractInvocationHandler() { + + @Override + protected Object handleInvocation(Object proxy, final Method method, final Object[] args) throws Throwable { + try { + if (!I_META_STORE_CLIENT_METHODS.contains(method) || authenticatedUser == null) { + return method.invoke(delegate, args); + } + try { + return authenticatedUser.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + return method.invoke(delegate, args); + } + }); + } catch (IOException | InterruptedException e) { + throw new TException("PrivilegedExceptionAction failed as user '" + user + "'.", e); + } + } catch (UndeclaredThrowableException | InvocationTargetException e) { + throw e.getCause(); + } + } + }; + + ClassLoader classLoader = IMetaStoreClient.class.getClassLoader(); + Class<?>[] interfaces = new Class<?>[] { IMetaStoreClient.class }; + Object proxy = Proxy.newProxyInstance(classLoader, interfaces, handler); + return IMetaStoreClient.class.cast(proxy); + } + + private static Set<Method> getIMetaStoreClientMethods() { + return new HashSet<>(Arrays.asList(IMetaStoreClient.class.getDeclaredMethods())); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java new file mode 100644 index 0000000..20747db --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java @@ -0,0 +1,112 @@ +package org.apache.hive.hcatalog.streaming.mutate.client; + +import java.io.Serializable; + +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; + +/** + * Describes an ACID table that can receive mutation events. Used to encode the information required by workers to write + * ACID events without requiring them to once more retrieve the data from the meta store db. + */ +public class AcidTable implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String databaseName; + private final String tableName; + private final boolean createPartitions; + private final TableType tableType; + private long transactionId; + + private Table table; + + AcidTable(String databaseName, String tableName, boolean createPartitions, TableType tableType) { + this.databaseName = databaseName; + this.tableName = tableName; + this.createPartitions = createPartitions; + this.tableType = tableType; + } + + /** + * Returns {@code 0} until such a time that a {@link Transaction} has been acquired (when + * {@link MutatorClient#newTransaction()} exits), at which point this will return the + * {@link Transaction#getTransactionId() transaction id}. + */ + public long getTransactionId() { + return transactionId; + } + + public String getDatabaseName() { + return databaseName; + } + + public String getTableName() { + return tableName; + } + + public boolean createPartitions() { + return createPartitions; + } + + /** + * Returns {@code null} until such a time that the table described by the {@link #getDatabaseName() database_name} + * {@code .}{@link #getTableName() table_name} has been resolved with the meta store database (when + * {@link MutatorClient#connect()} exits), at which point this will then return the corresponding + * {@link StorageDescriptor#getOutputFormat() OutputFormat}. + */ + public String getOutputFormatName() { + return table != null ? table.getSd().getOutputFormat() : null; + } + + /** + * Returns {@code 0} until such a time that the table described by the {@link #getDatabaseName() database_name} + * {@code .}{@link #getTableName() table_name} has been resolved with the meta store database (when + * {@link MutatorClient#connect()} exits), at which point this will then return the corresponding + * {@link StorageDescriptor#getNumBuckets() total bucket count}. + */ + public int getTotalBuckets() { + return table != null ? table.getSd().getNumBuckets() : 0; + } + + public TableType getTableType() { + return tableType; + } + + public String getQualifiedName() { + return (databaseName + "." + tableName).toUpperCase(); + } + + /** + * Returns {@code null} until such a time that the table described by the {@link #getDatabaseName() database_name} + * {@code .}{@link #getTableName() table_name} has been resolved with the meta store database (when + * {@link MutatorClient#connect()} exits), at which point this will then return the corresponding {@link Table}. + * Provided as a convenience to API users who may wish to gather further meta data regarding the table without + * connecting with the meta store once more. + */ + public Table getTable() { + return table; + } + + void setTransactionId(long transactionId) { + this.transactionId = transactionId; + } + + void setTable(Table table) { + if (!databaseName.equalsIgnoreCase(table.getDbName())) { + throw new IllegalArgumentException("Incorrect database name."); + } + if (!tableName.equalsIgnoreCase(table.getTableName())) { + throw new IllegalArgumentException("Incorrect table name."); + } + this.table = table; + } + + @Override + public String toString() { + return "AcidTable [databaseName=" + databaseName + ", tableName=" + tableName + ", createPartitions=" + + createPartitions + ", tableType=" + tableType + ", outputFormatName=" + getOutputFormatName() + + ", totalBuckets=" + getTotalBuckets() + ", transactionId=" + transactionId + "]"; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java new file mode 100644 index 0000000..5d8a2bf --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java @@ -0,0 +1,100 @@ +package org.apache.hive.hcatalog.streaming.mutate.client; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TCompactProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility to serialize/deserialize {@link AcidTable AcidTables} into strings so that they can be easily transported as + * {@link Configuration} properties. + */ +public class AcidTableSerializer { + + private static final Logger LOG = LoggerFactory.getLogger(AcidTableSerializer.class); + + /* Allow for improved schemes. */ + private static final String PROLOG_V1 = "AcidTableV1:"; + + /** Returns a base 64 encoded representation of the supplied {@link AcidTable}. */ + public static String encode(AcidTable table) throws IOException { + DataOutputStream data = null; + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try { + data = new DataOutputStream(bytes); + data.writeUTF(table.getDatabaseName()); + data.writeUTF(table.getTableName()); + data.writeBoolean(table.createPartitions()); + if (table.getTransactionId() <= 0) { + LOG.warn("Transaction ID <= 0. The recipient is probably expecting a transaction ID."); + } + data.writeLong(table.getTransactionId()); + data.writeByte(table.getTableType().getId()); + + Table metaTable = table.getTable(); + if (metaTable != null) { + byte[] thrift = new TSerializer(new TCompactProtocol.Factory()).serialize(metaTable); + data.writeInt(thrift.length); + data.write(thrift); + } else { + LOG.warn("Meta store table is null. The recipient is probably expecting an instance."); + data.writeInt(0); + } + } catch (TException e) { + throw new IOException("Error serializing meta store table.", e); + } finally { + data.close(); + } + + return PROLOG_V1 + new String(Base64.encodeBase64(bytes.toByteArray()), Charset.forName("UTF-8")); + } + + /** Returns the {@link AcidTable} instance decoded from a base 64 representation. */ + public static AcidTable decode(String encoded) throws IOException { + if (!encoded.startsWith(PROLOG_V1)) { + throw new IllegalStateException("Unsupported version."); + } + encoded = encoded.substring(PROLOG_V1.length()); + + byte[] decoded = Base64.decodeBase64(encoded); + AcidTable table = null; + try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(decoded))) { + String databaseName = in.readUTF(); + String tableName = in.readUTF(); + boolean createPartitions = in.readBoolean(); + long transactionId = in.readLong(); + TableType tableType = TableType.valueOf(in.readByte()); + int thriftLength = in.readInt(); + + table = new AcidTable(databaseName, tableName, createPartitions, tableType); + table.setTransactionId(transactionId); + + Table metaTable = null; + if (thriftLength > 0) { + metaTable = new Table(); + try { + byte[] thriftEncoded = new byte[thriftLength]; + in.readFully(thriftEncoded, 0, thriftLength); + new TDeserializer(new TCompactProtocol.Factory()).deserialize(metaTable, thriftEncoded); + table.setTable(metaTable); + } catch (TException e) { + throw new IOException("Error deserializing meta store table.", e); + } + } + } + return table; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ClientException.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ClientException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ClientException.java new file mode 100644 index 0000000..988dc38 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ClientException.java @@ -0,0 +1,15 @@ +package org.apache.hive.hcatalog.streaming.mutate.client; + +public class ClientException extends Exception { + + private static final long serialVersionUID = 1L; + + ClientException(String message, Throwable cause) { + super(message, cause); + } + + ClientException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ConnectionException.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ConnectionException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ConnectionException.java new file mode 100644 index 0000000..b54455a --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/ConnectionException.java @@ -0,0 +1,15 @@ +package org.apache.hive.hcatalog.streaming.mutate.client; + +public class ConnectionException extends ClientException { + + private static final long serialVersionUID = 1L; + + ConnectionException(String message, Throwable cause) { + super(message, cause); + } + + ConnectionException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java new file mode 100644 index 0000000..2724525 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java @@ -0,0 +1,140 @@ +package org.apache.hive.hcatalog.streaming.mutate.client; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock; +import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockFailureListener; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Responsible for orchestrating {@link Transaction Transactions} within which ACID table mutation events can occur. + * Typically this will be a large batch of delta operations. + */ +public class MutatorClient implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(MutatorClient.class); + private static final String TRANSACTIONAL_PARAM_KEY = "transactional"; + + private final IMetaStoreClient metaStoreClient; + private final Lock.Options lockOptions; + private final List<AcidTable> tables; + private boolean connected; + + MutatorClient(IMetaStoreClient metaStoreClient, HiveConf configuration, LockFailureListener lockFailureListener, + String user, Collection<AcidTable> tables) { + this.metaStoreClient = metaStoreClient; + this.tables = Collections.unmodifiableList(new ArrayList<>(tables)); + + lockOptions = new Lock.Options() + .configuration(configuration) + .lockFailureListener(lockFailureListener == null ? LockFailureListener.NULL_LISTENER : lockFailureListener) + .user(user); + for (AcidTable table : tables) { + lockOptions.addTable(table.getDatabaseName(), table.getTableName()); + } + } + + /** + * Connects to the {@link IMetaStoreClient meta store} that will be used to manage {@link Transaction} life-cycles. + * Also checks that the tables destined to receive mutation events are able to do so. The client should only hold one + * open transaction at any given time (TODO: enforce this). + */ + public void connect() throws ConnectionException { + if (connected) { + throw new ConnectionException("Already connected."); + } + for (AcidTable table : tables) { + checkTable(metaStoreClient, table); + } + LOG.debug("Connected to end point {}", metaStoreClient); + connected = true; + } + + /** Creates a new {@link Transaction} by opening a transaction with the {@link IMetaStoreClient meta store}. */ + public Transaction newTransaction() throws TransactionException { + if (!connected) { + throw new TransactionException("Not connected - cannot create transaction."); + } + Transaction transaction = new Transaction(metaStoreClient, lockOptions); + for (AcidTable table : tables) { + table.setTransactionId(transaction.getTransactionId()); + } + LOG.debug("Created transaction {}", transaction); + return transaction; + } + + /** Did the client connect successfully. Note the the client may have since become disconnected. */ + public boolean isConnected() { + return connected; + } + + /** + * Closes the client releasing any {@link IMetaStoreClient meta store} connections held. Does not notify any open + * transactions (TODO: perhaps it should?) + */ + @Override + public void close() throws IOException { + metaStoreClient.close(); + LOG.debug("Closed client."); + connected = false; + } + + /** + * Returns the list of managed {@link AcidTable AcidTables} that can receive mutation events under the control of this + * client. + */ + public List<AcidTable> getTables() throws ConnectionException { + if (!connected) { + throw new ConnectionException("Not connected - cannot interrogate tables."); + } + return Collections.<AcidTable> unmodifiableList(tables); + } + + @Override + public String toString() { + return "MutatorClient [metaStoreClient=" + metaStoreClient + ", connected=" + connected + "]"; + } + + private void checkTable(IMetaStoreClient metaStoreClient, AcidTable acidTable) throws ConnectionException { + try { + LOG.debug("Checking table {}.", acidTable.getQualifiedName()); + Table metaStoreTable = metaStoreClient.getTable(acidTable.getDatabaseName(), acidTable.getTableName()); + + if (acidTable.getTableType() == TableType.SINK) { + Map<String, String> parameters = metaStoreTable.getParameters(); + if (!Boolean.parseBoolean(parameters.get(TRANSACTIONAL_PARAM_KEY))) { + throw new ConnectionException("Cannot stream to table that is not transactional: '" + + acidTable.getQualifiedName() + "'."); + } + int totalBuckets = metaStoreTable.getSd().getNumBuckets(); + LOG.debug("Table {} has {} buckets.", acidTable.getQualifiedName(), totalBuckets); + if (totalBuckets <= 0) { + throw new ConnectionException("Cannot stream to table that has not been bucketed: '" + + acidTable.getQualifiedName() + "'."); + } + + String outputFormat = metaStoreTable.getSd().getOutputFormat(); + LOG.debug("Table {} has {} OutputFormat.", acidTable.getQualifiedName(), outputFormat); + acidTable.setTable(metaStoreTable); + } + } catch (NoSuchObjectException e) { + throw new ConnectionException("Invalid table '" + acidTable.getQualifiedName() + "'", e); + } catch (TException e) { + throw new ConnectionException("Error communicating with the meta store", e); + } + LOG.debug("Table {} OK.", acidTable.getQualifiedName()); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClientBuilder.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClientBuilder.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClientBuilder.java new file mode 100644 index 0000000..6c21c59 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClientBuilder.java @@ -0,0 +1,115 @@ +package org.apache.hive.hcatalog.streaming.mutate.client; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.streaming.mutate.HiveConfFactory; +import org.apache.hive.hcatalog.streaming.mutate.UgiMetaStoreClientFactory; +import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock; +import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockFailureListener; + +/** Convenience class for building {@link MutatorClient} instances. */ +public class MutatorClientBuilder { + + private final Map<String, AcidTable> tables = new HashMap<>(); + private HiveConf configuration; + private UserGroupInformation authenticatedUser; + private String metaStoreUri; + public LockFailureListener lockFailureListener; + + public MutatorClientBuilder configuration(HiveConf conf) { + this.configuration = conf; + return this; + } + + public MutatorClientBuilder authenticatedUser(UserGroupInformation authenticatedUser) { + this.authenticatedUser = authenticatedUser; + return this; + } + + public MutatorClientBuilder metaStoreUri(String metaStoreUri) { + this.metaStoreUri = metaStoreUri; + return this; + } + + /** Set a listener to handle {@link Lock} failure events - highly recommended. */ + public MutatorClientBuilder lockFailureListener(LockFailureListener lockFailureListener) { + this.lockFailureListener = lockFailureListener; + return this; + } + + /** + * Adds a mutation event destination (an ACID table) to be managed by this client, which is either unpartitioned or + * will is not to have partitions created automatically. + */ + public MutatorClientBuilder addSourceTable(String databaseName, String tableName) { + addTable(databaseName, tableName, false, TableType.SOURCE); + return this; + } + + /** + * Adds a mutation event destination (an ACID table) to be managed by this client, which is either unpartitioned or + * will is not to have partitions created automatically. + */ + public MutatorClientBuilder addSinkTable(String databaseName, String tableName) { + return addSinkTable(databaseName, tableName, false); + } + + /** + * Adds a partitioned mutation event destination (an ACID table) to be managed by this client, where new partitions + * will be created as needed. + */ + public MutatorClientBuilder addSinkTable(String databaseName, String tableName, boolean createPartitions) { + addTable(databaseName, tableName, createPartitions, TableType.SINK); + return this; + } + + private void addTable(String databaseName, String tableName, boolean createPartitions, TableType tableType) { + if (databaseName == null) { + throw new IllegalArgumentException("Database cannot be null"); + } + if (tableName == null) { + throw new IllegalArgumentException("Table cannot be null"); + } + String key = (databaseName + "." + tableName).toUpperCase(); + AcidTable previous = tables.get(key); + if (previous != null) { + if (tableType == TableType.SINK && previous.getTableType() != TableType.SINK) { + tables.remove(key); + } else { + throw new IllegalArgumentException("Table has already been added: " + databaseName + "." + tableName); + } + } + + Table table = new Table(); + table.setDbName(databaseName); + table.setTableName(tableName); + tables.put(key, new AcidTable(databaseName, tableName, createPartitions, tableType)); + } + + /** Builds the client. */ + public MutatorClient build() throws ClientException, MetaException { + String user = authenticatedUser == null ? System.getProperty("user.name") : authenticatedUser.getShortUserName(); + boolean secureMode = authenticatedUser == null ? false : authenticatedUser.hasKerberosCredentials(); + + configuration = HiveConfFactory.newInstance(configuration, this.getClass(), metaStoreUri); + + IMetaStoreClient metaStoreClient; + try { + metaStoreClient = new UgiMetaStoreClientFactory(metaStoreUri, configuration, authenticatedUser, user, secureMode) + .newInstance(HCatUtil.getHiveMetastoreClient(configuration)); + } catch (IOException e) { + throw new ClientException("Could not create meta store client.", e); + } + + return new MutatorClient(metaStoreClient, configuration, lockFailureListener, user, tables.values()); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TableType.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TableType.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TableType.java new file mode 100644 index 0000000..aa6d239 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TableType.java @@ -0,0 +1,37 @@ +package org.apache.hive.hcatalog.streaming.mutate.client; + +public enum TableType { + SOURCE((byte) 0), + SINK((byte) 1); + + private static final TableType[] INDEX = buildIndex(); + + private static TableType[] buildIndex() { + TableType[] index = new TableType[TableType.values().length]; + for (TableType type : values()) { + byte position = type.getId(); + if (index[position] != null) { + throw new IllegalStateException("Overloaded index: " + position); + } + index[position] = type; + } + return index; + } + + private byte id; + + private TableType(byte id) { + this.id = id; + } + + public byte getId() { + return id; + } + + public static TableType valueOf(byte id) { + if (id < 0 || id >= INDEX.length) { + throw new IllegalArgumentException("Invalid id: " + id); + } + return INDEX[id]; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/Transaction.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/Transaction.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/Transaction.java new file mode 100644 index 0000000..6532900 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/Transaction.java @@ -0,0 +1,114 @@ +package org.apache.hive.hcatalog.streaming.mutate.client; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState; +import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock; +import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Transaction { + + private static final Logger LOG = LoggerFactory.getLogger(Transaction.class); + + private final Lock lock; + private final IMetaStoreClient metaStoreClient; + private final long transactionId; + + private TxnState state; + + Transaction(IMetaStoreClient metaStoreClient, Lock.Options lockOptions) throws TransactionException { + this(metaStoreClient, new Lock(metaStoreClient, lockOptions)); + } + + /** Visible for testing only. */ + Transaction(IMetaStoreClient metaStoreClient, Lock lock) throws TransactionException { + this.metaStoreClient = metaStoreClient; + this.lock = lock; + transactionId = open(lock.getUser()); + } + + public long getTransactionId() { + return transactionId; + } + + public TxnState getState() { + return state; + } + + /** + * Begin the transaction. Acquires a {@link Lock} for the transaction and {@link AcidTable AcidTables}. + */ + public void begin() throws TransactionException { + try { + lock.acquire(transactionId); + } catch (LockException e) { + throw new TransactionException("Unable to acquire lock for transaction: " + transactionId, e); + } + state = TxnState.OPEN; + LOG.debug("Begin. Transaction id: {}", transactionId); + } + + /** Commits the transaction. Releases the {@link Lock}. */ + public void commit() throws TransactionException { + try { + lock.release(); + } catch (LockException e) { + // This appears to leave the remove transaction in an inconsistent state but the heartbeat is now + // cancelled and it will eventually time out + throw new TransactionException("Unable to release lock: " + lock + " for transaction: " + transactionId, e); + } + try { + metaStoreClient.commitTxn(transactionId); + state = TxnState.COMMITTED; + } catch (NoSuchTxnException e) { + throw new TransactionException("Invalid transaction id: " + transactionId, e); + } catch (TxnAbortedException e) { + throw new TransactionException("Aborted transaction cannot be committed: " + transactionId, e); + } catch (TException e) { + throw new TransactionException("Unable to commit transaction: " + transactionId, e); + } + LOG.debug("Committed. Transaction id: {}", transactionId); + } + + /** Aborts the transaction. Releases the {@link Lock}. */ + public void abort() throws TransactionException { + try { + lock.release(); + } catch (LockException e) { + // This appears to leave the remove transaction in an inconsistent state but the heartbeat is now + // cancelled and it will eventually time out + throw new TransactionException("Unable to release lock: " + lock + " for transaction: " + transactionId, e); + } + try { + metaStoreClient.rollbackTxn(transactionId); + state = TxnState.ABORTED; + } catch (NoSuchTxnException e) { + throw new TransactionException("Unable to abort invalid transaction id : " + transactionId, e); + } catch (TException e) { + throw new TransactionException("Unable to abort transaction id : " + transactionId, e); + } + LOG.debug("Aborted. Transaction id: {}", transactionId); + } + + @Override + public String toString() { + return "Transaction [transactionId=" + transactionId + ", state=" + state + "]"; + } + + private long open(String user) throws TransactionException { + long transactionId = -1; + try { + transactionId = metaStoreClient.openTxn(user); + state = TxnState.INACTIVE; + } catch (TException e) { + throw new TransactionException("Unable to open transaction for user: " + user, e); + } + LOG.debug("Opened transaction with id: {}", transactionId); + return transactionId; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TransactionException.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TransactionException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TransactionException.java new file mode 100644 index 0000000..48fb1cf --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/TransactionException.java @@ -0,0 +1,15 @@ +package org.apache.hive.hcatalog.streaming.mutate.client; + +public class TransactionException extends ClientException { + + private static final long serialVersionUID = 1L; + + TransactionException(String message, Throwable cause) { + super(message, cause); + } + + TransactionException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatFactory.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatFactory.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatFactory.java new file mode 100644 index 0000000..5814d37 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatFactory.java @@ -0,0 +1,30 @@ +package org.apache.hive.hcatalog.streaming.mutate.client.lock; + +import java.util.Collection; +import java.util.Timer; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Creates a default {@link HeartbeatTimerTask} for {@link Lock Locks}. */ +class HeartbeatFactory { + + private static final Logger LOG = LoggerFactory.getLogger(HeartbeatFactory.class); + + /** Creates a new {@link HeartbeatTimerTask} instance for the {@link Lock} and schedules it. */ + Timer newInstance(IMetaStoreClient metaStoreClient, LockFailureListener listener, Long transactionId, + Collection<Table> tableDescriptors, long lockId, int heartbeatPeriod) { + Timer heartbeatTimer = new Timer("hive-lock-heartbeat[lockId=" + lockId + ", transactionId=" + transactionId + "]", + true); + HeartbeatTimerTask task = new HeartbeatTimerTask(metaStoreClient, listener, transactionId, tableDescriptors, lockId); + heartbeatTimer.schedule(task, TimeUnit.SECONDS.toMillis(heartbeatPeriod), + TimeUnit.SECONDS.toMillis(heartbeatPeriod)); + + LOG.debug("Scheduled heartbeat timer task: {}", heartbeatTimer); + return heartbeatTimer; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatTimerTask.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatTimerTask.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatTimerTask.java new file mode 100644 index 0000000..2446c10 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/HeartbeatTimerTask.java @@ -0,0 +1,66 @@ +package org.apache.hive.hcatalog.streaming.mutate.client.lock; + +import java.util.Collection; +import java.util.TimerTask; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hive.hcatalog.streaming.mutate.client.Transaction; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link TimerTask} that sends {@link IMetaStoreClient#heartbeat(long, long) heartbeat} events to the + * {@link IMetaStoreClient meta store} to keet the {@link Lock} and {@link Transaction} alive. Nofifies the registered + * {@link LockFailureListener} should the lock fail. + */ +class HeartbeatTimerTask extends TimerTask { + + private static final Logger LOG = LoggerFactory.getLogger(HeartbeatTimerTask.class); + + private final IMetaStoreClient metaStoreClient; + private final long lockId; + private final Long transactionId; + private final LockFailureListener listener; + private final Collection<Table> tableDescriptors; + + HeartbeatTimerTask(IMetaStoreClient metaStoreClient, LockFailureListener listener, Long transactionId, + Collection<Table> tableDescriptors, long lockId) { + this.metaStoreClient = metaStoreClient; + this.listener = listener; + this.transactionId = transactionId; + this.tableDescriptors = tableDescriptors; + this.lockId = lockId; + LOG.debug("Reporting to listener {}", listener); + } + + @Override + public void run() { + try { + // I'm assuming that there is no transaction ID for a read lock. + metaStoreClient.heartbeat(transactionId == null ? 0 : transactionId, lockId); + LOG.debug("Sent heartbeat for lock={}, transactionId={}", lockId, transactionId); + } catch (NoSuchLockException | NoSuchTxnException | TxnAbortedException e) { + failLock(e); + } catch (TException e) { + LOG.warn("Failed to send heartbeat to meta store.", e); + } + } + + private void failLock(Exception e) { + LOG.debug("Lock " + lockId + " failed, cancelling heartbeat and notifiying listener: " + listener, e); + // Cancel the heartbeat + cancel(); + listener.lockFailed(lockId, transactionId, Lock.asStrings(tableDescriptors), e); + } + + @Override + public String toString() { + return "HeartbeatTimerTask [lockId=" + lockId + ", transactionId=" + transactionId + "]"; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java new file mode 100644 index 0000000..21604df --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java @@ -0,0 +1,282 @@ +package org.apache.hive.hcatalog.streaming.mutate.client.lock; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.Timer; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; +import org.apache.hadoop.hive.metastore.LockRequestBuilder; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages the state required to safely read/write from/to an ACID table. + */ +public class Lock { + + private static final Logger LOG = LoggerFactory.getLogger(Lock.class); + + private static final double HEARTBEAT_FACTOR = 0.75; + private static final int DEFAULT_HEARTBEAT_PERIOD = 275; + + private final IMetaStoreClient metaStoreClient; + private final HeartbeatFactory heartbeatFactory; + private final LockFailureListener listener; + private final Collection<Table> tableDescriptors; + private final int lockRetries; + private final int retryWaitSeconds; + private final String user; + private final HiveConf hiveConf; + + private Timer heartbeat; + private Long lockId; + private Long transactionId; + + public Lock(IMetaStoreClient metaStoreClient, Options options) { + this(metaStoreClient, new HeartbeatFactory(), options.hiveConf, options.listener, options.user, + options.descriptors, options.lockRetries, options.retryWaitSeconds); + } + + /** Visible for testing only. */ + Lock(IMetaStoreClient metaStoreClient, HeartbeatFactory heartbeatFactory, HiveConf hiveConf, + LockFailureListener listener, String user, Collection<Table> tableDescriptors, int lockRetries, + int retryWaitSeconds) { + this.metaStoreClient = metaStoreClient; + this.heartbeatFactory = heartbeatFactory; + this.hiveConf = hiveConf; + this.user = user; + this.tableDescriptors = tableDescriptors; + this.listener = listener; + this.lockRetries = lockRetries; + this.retryWaitSeconds = retryWaitSeconds; + + if (LockFailureListener.NULL_LISTENER.equals(listener)) { + LOG.warn("No {} supplied. Data quality and availability cannot be assured.", + LockFailureListener.class.getSimpleName()); + } + } + + /** Attempts to acquire a read lock on the table, returns if successful, throws exception otherwise. */ + public void acquire() throws LockException { + lockId = internalAcquire(null); + initiateHeartbeat(); + } + + /** Attempts to acquire a read lock on the table, returns if successful, throws exception otherwise. */ + public void acquire(long transactionId) throws LockException { + lockId = internalAcquire(transactionId); + this.transactionId = transactionId; + initiateHeartbeat(); + } + + /** Attempts to release the read lock on the table. Throws an exception if the lock failed at any point. */ + public void release() throws LockException { + if (heartbeat != null) { + heartbeat.cancel(); + } + internalRelease(); + } + + public String getUser() { + return user; + } + + @Override + public String toString() { + return "Lock [metaStoreClient=" + metaStoreClient + ", lockId=" + lockId + ", transactionId=" + transactionId + + "]"; + } + + private long internalAcquire(Long transactionId) throws LockException { + int attempts = 0; + LockRequest request = buildSharedLockRequest(transactionId); + do { + LockResponse response = null; + try { + response = metaStoreClient.lock(request); + } catch (TException e) { + throw new LockException("Unable to acquire lock for tables: [" + join(tableDescriptors) + "]", e); + } + if (response != null) { + LockState state = response.getState(); + if (state == LockState.NOT_ACQUIRED || state == LockState.ABORT) { + // I expect we'll only see NOT_ACQUIRED here? + break; + } + if (state == LockState.ACQUIRED) { + LOG.debug("Acquired lock {}", response.getLockid()); + return response.getLockid(); + } + if (state == LockState.WAITING) { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(retryWaitSeconds)); + } catch (InterruptedException e) { + } + } + } + attempts++; + } while (attempts < lockRetries); + throw new LockException("Could not acquire lock on tables: [" + join(tableDescriptors) + "]"); + } + + private void internalRelease() { + try { + // if there is a transaction then this lock will be released on commit/abort/rollback instead. + if (lockId != null && transactionId == null) { + metaStoreClient.unlock(lockId); + LOG.debug("Released lock {}", lockId); + lockId = null; + } + } catch (TException e) { + LOG.error("Lock " + lockId + " failed.", e); + listener.lockFailed(lockId, transactionId, asStrings(tableDescriptors), e); + } + } + + private LockRequest buildSharedLockRequest(Long transactionId) { + LockRequestBuilder requestBuilder = new LockRequestBuilder(); + for (Table descriptor : tableDescriptors) { + LockComponent component = new LockComponentBuilder() + .setDbName(descriptor.getDbName()) + .setTableName(descriptor.getTableName()) + .setShared() + .build(); + requestBuilder.addLockComponent(component); + } + if (transactionId != null) { + requestBuilder.setTransactionId(transactionId); + } + LockRequest request = requestBuilder.setUser(user).build(); + return request; + } + + private void initiateHeartbeat() { + int heartbeatPeriod = getHeartbeatPeriod(); + LOG.debug("Heartbeat period {}s", heartbeatPeriod); + heartbeat = heartbeatFactory.newInstance(metaStoreClient, listener, transactionId, tableDescriptors, lockId, + heartbeatPeriod); + } + + private int getHeartbeatPeriod() { + int heartbeatPeriod = DEFAULT_HEARTBEAT_PERIOD; + if (hiveConf != null) { + // This value is always in seconds and includes an 's' suffix. + String txTimeoutSeconds = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT); + if (txTimeoutSeconds != null) { + // We want to send the heartbeat at an interval that is less than the timeout. + heartbeatPeriod = Math.max(1, + (int) (Integer.parseInt(txTimeoutSeconds.substring(0, txTimeoutSeconds.length() - 1)) * HEARTBEAT_FACTOR)); + } + } + return heartbeatPeriod; + } + + /** Visible for testing only. */ + Long getLockId() { + return lockId; + } + + /** Visible for testing only. */ + Long getTransactionId() { + return transactionId; + } + + /** Visible for testing only. */ + static String join(Iterable<? extends Object> values) { + return StringUtils.join(values, ","); + } + + /** Visible for testing only. */ + static List<String> asStrings(Collection<Table> tables) { + List<String> strings = new ArrayList<>(tables.size()); + for (Table descriptor : tables) { + strings.add(descriptor.getDbName() + "." + descriptor.getTableName()); + } + return strings; + } + + /** Constructs a lock options for a set of Hive ACID tables from which we wish to read. */ + public static final class Options { + Set<Table> descriptors = new LinkedHashSet<>(); + LockFailureListener listener = LockFailureListener.NULL_LISTENER; + int lockRetries = 5; + int retryWaitSeconds = 30; + String user; + HiveConf hiveConf; + + /** Adds a table for which a shared read lock will be requested. */ + public Options addTable(String databaseName, String tableName) { + checkNotNullOrEmpty(databaseName); + checkNotNullOrEmpty(tableName); + Table table = new Table(); + table.setDbName(databaseName); + table.setTableName(tableName); + descriptors.add(table); + return this; + } + + public Options user(String user) { + checkNotNullOrEmpty(user); + this.user = user; + return this; + } + + public Options configuration(HiveConf hiveConf) { + checkNotNull(hiveConf); + this.hiveConf = hiveConf; + return this; + } + + /** Sets a listener to handle failures of locks that were previously acquired. */ + public Options lockFailureListener(LockFailureListener listener) { + checkNotNull(listener); + this.listener = listener; + return this; + } + + public Options lockRetries(int lockRetries) { + checkArgument(lockRetries > 0); + this.lockRetries = lockRetries; + return this; + } + + public Options retryWaitSeconds(int retryWaitSeconds) { + checkArgument(retryWaitSeconds > 0); + this.retryWaitSeconds = retryWaitSeconds; + return this; + } + + private static void checkArgument(boolean value) { + if (!value) { + throw new IllegalArgumentException(); + } + } + + private static void checkNotNull(Object value) { + if (value == null) { + throw new IllegalArgumentException(); + } + } + + private static void checkNotNullOrEmpty(String value) { + if (StringUtils.isBlank(value)) { + throw new IllegalArgumentException(); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockException.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockException.java new file mode 100644 index 0000000..67ed601 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockException.java @@ -0,0 +1,15 @@ +package org.apache.hive.hcatalog.streaming.mutate.client.lock; + +public class LockException extends Exception { + + private static final long serialVersionUID = 1L; + + public LockException(String message) { + super(message); + } + + public LockException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockFailureListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockFailureListener.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockFailureListener.java new file mode 100644 index 0000000..2b6a12a --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/LockFailureListener.java @@ -0,0 +1,26 @@ +package org.apache.hive.hcatalog.streaming.mutate.client.lock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Provides a means to handle the situation when a held lock fails. */ +public interface LockFailureListener { + + static final Logger LOG = LoggerFactory.getLogger(LockFailureListener.class); + + static final LockFailureListener NULL_LISTENER = new LockFailureListener() { + @Override + public void lockFailed(long lockId, Long transactionId, Iterable<String> tableNames, Throwable t) { + LOG.warn( + "Ignored lock failure: lockId=" + lockId + ", transactionId=" + transactionId + ", tables=" + tableNames, t); + } + + public String toString() { + return LockFailureListener.class.getName() + ".NULL_LISTENER"; + } + }; + + /** Called when the specified lock has failed. You should probably abort your job in this case. */ + void lockFailed(long lockId, Long transactionId, Iterable<String> tableNames, Throwable t); + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.dot ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.dot b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.dot new file mode 100644 index 0000000..79c30e7 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/doc-files/system-overview.dot @@ -0,0 +1,27 @@ +digraph "API Usage" { + nodesep=1.2; + + DATA [label="ACID\ndataset",shape=oval,style=filled,color="gray"]; + CHANGES [label="Changed\ndata",shape=oval,style=filled,color="gray"]; + + META_STORE [label="Hive\nMetaStore",shape=box,style=filled,color="darkseagreen3"]; + HIVE_CLI [label="Hive\nCLI",shape=box,style=filled,color="darkseagreen3"]; + + MERGE1 [label="Compute\nmutations\n(your code)",shape=box,style=filled,color="khaki1"]; + SORT [label="Group\n& sort\n(your code)",shape=box,style=filled,color="khaki1"]; + CLIENT [label="Mutator\nclient",shape=box,style=filled,color="lightblue"]; + BUCKET [label="Bucket ID\nappender",shape=box,style=filled,color="lightblue"]; + COORD [label="Mutator\ncoordinator",shape=box,style=filled,color="lightblue"]; + CLIENT -> COORD [label="Provides\nconf to"]; + CLIENT -> BUCKET [label="Provides\nconf to"]; + + CLIENT -> META_STORE [label="Manages\ntxns using"]; + CHANGES -> MERGE1 [label="Reads âs\nfrom"]; + DATA -> MERGE1 [label="Reads\nROW__IDs\nfrom"]; + BUCKET -> MERGE1 [label="Appends ids\nto inserts"]; + MERGE1 -> SORT; + SORT -> COORD [label="Issues\nmutations to"]; + COORD -> DATA [label="Writes to"]; + DATA -> HIVE_CLI [label="Read by"]; + META_STORE -> DATA [label="Compacts"]; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html new file mode 100644 index 0000000..9fc10b6 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html @@ -0,0 +1,495 @@ +<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" + "http://www.w3.org/TR/html4/loose.dtd"> + +<html lang="en"> + +<head> +<meta name=Title content="HCatalog Streaming Mutation API"> +<meta name=Keywords content="HCatalog Streaming Mutation ACID"> +<meta http-equiv=Content-Type content="text/html; charset=utf-8"> +<title>HCatalog Streaming Mutation API</title> +</head> + +<body> + +<h1>HCatalog Streaming Mutation API -- high level description</h1> + +<h2>Background</h2> +<p> +In certain data processing use cases it is necessary to modify existing +data when new facts arrive. An example of this is the classic ETL merge +where a copy of a data set is kept in sync with a master by the frequent +application of deltas. The deltas describe the mutations (inserts, +updates, deletes) that have occurred to the master since the previous +sync. To implement such a case using Hadoop traditionally demands that +the partitions containing records targeted by the mutations be +rewritten. This is a coarse approach; a partition containing millions of +records might be rebuilt because of a single record change. Additionally +these partitions cannot be restated atomically; at some point the old +partition data must be swapped with the new partition data. When this +swap occurs, usually by issuing an HDFS +<code>rm</code> +followed by a +<code>mv</code> +, the possibility exists where the data appears to be unavailable and +hence any downstream jobs consuming the data might unexpectedly fail. +Therefore data processing patterns that restate raw data on HDFS cannot +operate robustly without some external mechanism to orchestrate +concurrent access to changing data. +</p> + +<p> +The availability of ACID tables in Hive provides a mechanism that both +enables concurrent access to data stored in HDFS (so long as it's in the +ORC+ACID format), and also permits row level mutations or records within +a table, without the need to rewrite the existing data. But while Hive +itself supports +<code>INSERT</code> +, +<code>UPDATE</code> +and +<code>DELETE</code> +commands, and the ORC format can support large batches of mutations in a +transaction, Hive's execution engine currently submits each individual +mutation operation in a separate transaction and issues table scans (M/R +jobs) to execute them. It does not currently scale to the demands of +processing large deltas in an atomic manner. Furthermore it would be +advantageous to extend atomic batch mutation capabilities beyond Hive by +making them available to other data processing frameworks. The Streaming +Mutation API does just this. +</p> + +<p>The Streaming Mutation API, although similar to the Streaming +API, has a number of differences and are built to enable very different +use cases. Superficially, the Streaming API can only write new data +whereas the mutation API can also modify existing data. However the two +APIs also based on very different transaction models. The Streaming API +focuses on surfacing a continuous stream of new data into a Hive table +and does so by batching small sets of writes into multiple short-lived +transactions. Conversely the mutation API is designed to infrequently +apply large sets of mutations to a data set in an atomic fashion; all +mutations will either be applied or they will not. This instead mandates +the use of a single long-lived transaction. This table summarises the +attributes of each API:</p> + +<table border="1"> +<thead> +<tr> +<th>Attribute</th> +<th>Streaming API</th> +<th>Mutation API</th> +</tr> +<tr> +<td>Ingest type</td> +<td>Data arrives continuously</td> +<td>Ingests are performed periodically and the mutations are +applied in a single batch</td> +</tr> +<tr> +<td>Transaction scope</td> +<td>Transactions are created for small batches of writes</td> +<td>The entire set of mutations should be applied within a single +transaction</td> +</tr> +<tr> +<td>Data availability</td> +<td>Surfaces new data to users frequently and quickly</td> +<td>Change sets should be applied atomically, either the effect of +the delta is visible or it is not</td> +</tr> +<tr> +<td>Sensitive to record order</td> +<td>No, records do not have pre-existing lastTxnIds or bucketIds. +Records are likely being written into a single partition (today's date +for example)</td> +<td>Yes, all mutated records have existing <code>RecordIdentifiers</code> +and must be grouped by (partitionValues, bucketId) and sorted by +lastTxnId. These record coordinates initially arrive in an order that is +effectively random. +</td> +</tr> +<tr> +<td>Impact of a write failure</td> +<td>Transaction can be aborted and producer can choose to resubmit +failed records as ordering is not important.</td> +<td>Ingest for the respective must be halted and failed records +resubmitted to preserve sequence.</td> +</tr> +<tr> +<td>User perception of missing data</td> +<td>Data has not arrived yet â "latency?"</td> +<td>"This data is inconsistent, some records have been updated, but +other related records have not" - consider here the classic transfer +between bank accounts scenario</td> +</tr> +<tr> +<td>API end point scope</td> +<td>A given <code>HiveEndPoint</code> instance submits many +transactions to a specific bucket, in a specific partition, of a +specific table +</td> +<td>A set of<code>MutationCoordinators</code> write changes to +unknown set of buckets, of an unknown set of partitions, of specific +tables (can be more than one), within a single transaction. +</td> +</tr> +</thead> +</table> + +<h2>Structure</h2> +<p>The API comprises two main concerns: transaction management, and +the writing of mutation operations to the data set. The two concerns +have a minimal coupling as it is expected that transactions will be +initiated from a single job launcher type processes while the writing of +mutations will be scaled out across any number of worker nodes. In the +context of Hadoop M/R these can be more concretely defined as the Tool +and Map/Reduce task components. However, use of this architecture is not +mandated and in fact both concerns could be handled within a single +simple process depending on the requirements.</p> + +<p>Note that a suitably configured Hive instance is required to +operate this system even if you do not intend to access the data from +within Hive. Internally, transactions are managed by the Hive MetaStore. +Mutations are performed to HDFS via ORC APIs that bypass the MetaStore. +Additionally you may wish to configure your MetaStore instance to +perform periodic data compactions.</p> + +<p> +<b>Note on packaging</b>: The APIs are defined in the <b>org.apache.hive.hcatalog.streaming.mutate</b> +Java package and included as the hive-hcatalog-streaming jar. +</p> + +<h2>Data requirements</h2> +<p> +Generally speaking, to apply a mutation to a record one must have some +unique key that identifies the record. However, primary keys are not a +construct provided by Hive. Internally Hive uses +<code>RecordIdentifiers</code> +stored in a virtual +<code>ROW__ID</code> +column to uniquely identified records within an ACID table. Therefore, +any process that wishes to issue mutations to a table via this API must +have available the corresponding row ids for the target records. What +this means in practice is that the process issuing mutations must first +read in a current snapshot the data and then join the mutations on some +domain specific primary key to obtain the corresponding Hive +<code>ROW__ID</code> +. This is effectively what occurs within Hive's table scan process when +an +<code>UPDATE</code> +or +<code>DELETE</code> +statement is executed. The +<code>AcidInputFormat</code> +provides access to this data via +<code>AcidRecordReader.getRecordIdentifier()</code> +. +</p> + +<p> +The implementation of the ACID format places some constraints on the +order in which records are written and it is important that this +ordering is enforced. Additionally, data must be grouped appropriately +to adhere to the constraints imposed be the +<code>OrcRecordUpdater</code> +. Grouping also makes it possible parallelise the writing of mutations +for the purposes of scaling. Finally, to correctly bucket new records +(inserts) there is a slightly unintuitive trick that must be applied. +</p> + +<p>All of these data sequencing concerns are the responsibility of +the client process calling the API which is assumed to have first class +grouping and sorting capabilities (Hadoop Map/Reduce etc.) The streaming +API provides nothing more than validators that fail fast when they +encounter groups and records that are out of sequence.</p> + +<p>In short, API client processes should prepare data for the mutate +API like so:</p> +<ul> +<li><b>MUST:</b> Order records by <code>ROW__ID.originalTxn</code>, +then <code>ROW__ID.rowId</code>.</li> +<li><b>MUST:</b> Assign a <code>ROW__ID</code> containing a +computed <code>bucketId</code> to records to be inserted.</li> +<li><b>SHOULD:</b> Group/partition by table partition value, then <code>ROW__ID.bucketId</code>.</li> +</ul> + +<p> +The addition of a bucket ids to insert records prior to grouping and +sorting seems unintuitive. However, it is required both to ensure +adequate partitioning of new data and bucket allocation consistent with +that provided by Hive. In a typical ETL the majority of mutation events +are inserts, often targeting a single partition (new data for the +previous day, hour, etc.) If more that one worker is writing said +events, were we to leave the bucket id empty then all inserts would go +to a single worker (e.g: reducer) and the workload could be heavily +skewed. The assignment of a computed bucket allows inserts to be more +usefully distributed across workers. Additionally, when Hive is working +with the data it may expect records to have been bucketed in a way that +is consistent with it's own internal scheme. A convenience type and +method is provided to more easily compute and append bucket ids: +<code>BucketIdResolver</code> +and +<code>BucketIdResolverImpl</code> +. +</p> + +<p>Update operations should not attempt to modify values of +partition or bucketing columns. The API does not prevent this and such +attempts could lead to data corruption.</p> + +<h2>Streaming requirements</h2> +<p>A few things are currently required to use streaming.</p> + +<p> +<ol> +<li>Currently, only ORC storage format is supported. So '<b>stored +as orc</b>' must be specified during table creation. +</li> +<li>The hive table must be bucketed, but not sorted. So something +like '<b>clustered by (<i>colName</i>) into <i>10</i> buckets +</b>' must be specified during table creation. +</li> +<li>User of the client streaming process must have the necessary +permissions to write to the table or partition and create partitions in +the table.</li> +<li>Settings required in hive-site.xml for Metastore: +<ol> +<li><b>hive.txn.manager = +org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</b></li> +<li><b>hive.support.concurrency = true </b></li> +<li><b>hive.compactor.initiator.on = true</b></li> +<li><b>hive.compactor.worker.threads > 0 </b></li> +</ol> +</li> +</ol> +</p> + +<p> +<b>Note:</b> Streaming mutations to <b>unpartitioned</b> tables is also +supported. +</p> + +<h2>Record layout</h2> +<p> +The structure, layout, and encoding of records is the exclusive concern +of the client ETL mutation process and may be quite different from the +target Hive ACID table. The mutation API requires concrete +implementations of the +<code>MutatorFactory</code> +and +<code>Mutator</code> +classes to extract pertinent data from records and serialize data into +the ACID files. Fortunately base classes are provided ( +<code>AbstractMutator</code> +, +<code>RecordInspectorImpl</code> +) to simplify this effort and usually all that is required is the +specification of a suitable +<code>ObjectInspector</code> +and the provision of the indexes of the +<code>ROW__ID</code> +and bucketed columns within the record structure. Note that all column +indexes in these classes are with respect to your record structure, not +the Hive table structure. +</p> +<p> +You will likely also want to use a +<code>BucketIdResolver</code> +to append bucket ids to new records for insertion. Fortunately the core +implementation is provided in +<code>BucketIdResolverImpl</code> +but note that bucket column indexes must be presented in the same order +as they are in the Hive table definition to ensure consistent bucketing. +Note that you cannot move records between buckets and an exception will +be thrown if you attempt to do so. In real terms this mean that you +should not attempt to modify the values in bucket columns with an +<code>UPDATE</code> +. +</p> + +<h2>Connection and Transaction management</h2> +<p> +The +<code>MutatorClient</code> +class is used to create and manage transactions in which mutations can +be performed. The scope of a transaction can extend across multiple ACID +tables. When a client connects it communicates with the meta store to +verify and acquire meta data for the target tables. An invocation of +<code>newTransaction</code> +then opens a transaction with the meta store, finalizes a collection of +<code>AcidTables</code> +and returns a new +<code>Transaction</code> +instance. The acid tables are light-weight, serializable objects that +are used by the mutation writing components of the API to target +specific ACID file locations. Usually your +<code>MutatorClient</code> +will be running on some master node and your coordinators on worker +nodes. In this event the +<code>AcidTableSerializer</code> +can be used to encode the tables in a more transportable form, for use +as a +<code>Configuration</code> +property for example. +</p> +<p> +As you would expect, a +<code>Transaction</code> +must be initiated with a call to +<code>begin</code> +before any mutations can be applied. This invocation acquires a lock on +the targeted tables using the meta store, and initiates a heartbeat to +prevent transaction timeouts. It is highly recommended that you register +a +<code>LockFailureListener</code> +with the client so that your process can handle any lock or transaction +failures. Typically you may wish to abort the job in the event of such +an error. With the transaction in place you can now start streaming +mutations with one or more +<code>MutatorCoordinator</code> +instances (more on this later), can can finally +<code>commit</code> +or +<code>abort</code> +the transaction when the change set has been applied, which will release +the lock with the meta store client. Finally you should +<code>close</code> +the mutation client to release any held resources. +</p> +<p> +The +<code>MutatorClientBuilder</code> +is provided to simplify the construction of clients. +</p> + +<p> +<b>WARNING:</b> Hive doesn't currently have a deadlock detector (it is +being worked on as part of <a +href="https://issues.apache.org/jira/browse/HIVE-9675">HIVE-9675</a>). +This API could potentially deadlock with other stream writers or with +SQL users. +</p> +<h2>Writing data</h2> + +<p> +The +<code>MutatorCoordinator</code> +class is used to issue mutations to an ACID table. You will require at +least one instance per table participating in the transaction. The +target of a given instance is defined by the respective +<code>AcidTable</code> +used to construct the coordinator. It is recommended that a +<code>MutatorClientBuilder</code> +is used to simplify the construction process. +</p> + +<p> +Mutations can be applied by invoking the respective +<code>insert</code> +, +<code>update</code> +, and +<code>delete</code> +methods on the coordinator. These methods each take as parameters the +target partition of the record and the mutated record. In the case of an +unpartitioned table you should simply pass an empty list as the +partition value. For inserts specifically, only the bucket id will be +extracted from the +<code>RecordIdentifier</code> +, the transactionId and rowId will be ignored and replaced by +appropriate values in the +<code>RecordUpdater</code> +. Additionally, in the case of deletes, everything but the +<code>RecordIdentifier</code> +in the record will be ignored and therefore it is often easier to simply +submit the original record. +</p> + +<p> +<b>Caution:</b> As mentioned previously, mutations must arrive in +specific order for the resultant table data to be consistent. +Coordinators will verify a naturally ordered sequence of +(lastTransactionId, rowId) and will throw an exception if this sequence +is broken. This exception should almost certainly be escalated so that +the transaction is aborted. This, along with the correct ordering of the +data, is the responsibility of the client using the API. +</p> + +<h3>Dynamic Partition Creation:</h3> +It is very likely to be desirable to have new partitions created +automatically (say on a hourly basis). In such cases requiring the Hive +admin to pre-create the necessary partitions may not be reasonable. +Consequently the API allows coordinators to create partitions as needed +(see: +<code>MutatorClientBuilder.addTable(String, String, boolean)</code> +). Partition creation being an atomic action, multiple coordinators can +race to create the partition, but only one would succeed, so +coordinators clients need not synchronize when creating a partition. The +user of the coordinator process needs to be given write permissions on +the Hive table in order to create partitions. + +<h2>Reading data</h2> + +<p> +Although this API is concerned with writing changes to data, as +previously stated we'll almost certainly have to read the existing data +first to obtain the relevant +<code>ROW_IDs</code> +. Therefore it is worth noting that reading ACID data in a robust and +consistent manner requires the following: +<ol> +<li>Obtaining a valid transaction list from the meta store (<code>ValidTxnList</code>). +</li> +<li>Acquiring a read-lock with the meta store and issuing +heartbeats (<code>LockImpl</code> can help with this). +</li> +<li>Configuring the <code>OrcInputFormat</code> and then reading +the data. Make sure that you also pull in the <code>ROW__ID</code> +values. See: <code>AcidRecordReader.getRecordIdentifier</code>. +</li> +<li>Releasing the read-lock.</li> +</ol> +</p> + +<h2>Example</h2> +<p> +<img src="doc-files/system-overview.png" /> +</p> +<p>So to recap, the sequence of events required to apply mutations +to a dataset using the API is:</p> +<ol> +<li>Create a <code>MutatorClient</code> to manage a transaction for +the targeted ACID tables. This set of tables should include any +transactional destinations or sources. Don't forget to register a <code>LockFailureListener</code> +so that you can handle transaction failures. +</li> +<li>Open a new <code>Transaction</code> with the client. +</li> +<li>Get the <code>AcidTables</code> from the client. +</li> +<li>Begin the transaction.</li> +<li>Create at least one <code>MutatorCoordinator</code> for each +table. The <code>AcidTableSerializer</code> can help you transport the <code>AcidTables</code> +when your workers are in a distributed environment. +</li> +<li>Compute your mutation set (this is your ETL merge process).</li> +<li>Append bucket ids to insertion records. A <code>BucketIdResolver</code> +can help here. +</li> +<li>Group and sort your data appropriately.</li> +<li>Issue mutation events to your coordinators.</li> +<li>Close your coordinators.</li> +<li>Abort or commit the transaction.</li> +<li>Close your mutation client.</li> +</ol> +<p> +See +<code>ExampleUseCase</code> +and +<code>TestMutations.testUpdatesAndDeletes()</code> +for some very simple usages. +</p> + +</body> + +</html> http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdException.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdException.java new file mode 100644 index 0000000..656324c --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdException.java @@ -0,0 +1,11 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +public class BucketIdException extends WorkerException { + + private static final long serialVersionUID = 1L; + + BucketIdException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java new file mode 100644 index 0000000..dab2072 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolver.java @@ -0,0 +1,11 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +/** Computes and appends bucket ids to records that are due to be inserted. */ +public interface BucketIdResolver { + + Object attachBucketIdToRecord(Object record); + + /** See: {@link org.apache.hadoop.hive.ql.exec.ReduceSinkOperator#computeBucketNumber(Object, int)}. */ + int computeBucketId(Object record); + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java new file mode 100644 index 0000000..dbed9e1 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java @@ -0,0 +1,76 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import java.util.List; + +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; + +/** + * Implementation of a {@link BucketIdResolver} that includes the logic required to calculate a bucket id from a record + * that is consistent with Hive's own internal computation scheme. + */ +public class BucketIdResolverImpl implements BucketIdResolver { + + private static final long INVALID_TRANSACTION_ID = -1L; + private static final long INVALID_ROW_ID = -1L; + + private final SettableStructObjectInspector structObjectInspector; + private final StructField[] bucketFields; + private final int totalBuckets; + private final StructField recordIdentifierField; + + /** + * Note that all column indexes are with respect to your record structure, not the Hive table structure. Bucket column + * indexes must be presented in the same order as they are in the Hive table definition. + */ + public BucketIdResolverImpl(ObjectInspector objectInspector, int recordIdColumn, int totalBuckets, int[] bucketColumns) { + this.totalBuckets = totalBuckets; + if (!(objectInspector instanceof SettableStructObjectInspector)) { + throw new IllegalArgumentException("Serious problem, expected a StructObjectInspector, " + "but got a " + + objectInspector.getClass().getName()); + } + + if (bucketColumns.length < 1) { + throw new IllegalArgumentException("No bucket column indexes set."); + } + structObjectInspector = (SettableStructObjectInspector) objectInspector; + List<? extends StructField> structFields = structObjectInspector.getAllStructFieldRefs(); + + recordIdentifierField = structFields.get(recordIdColumn); + + bucketFields = new StructField[bucketColumns.length]; + for (int i = 0; i < bucketColumns.length; i++) { + int bucketColumnsIndex = bucketColumns[i]; + bucketFields[i] = structFields.get(bucketColumnsIndex); + } + } + + @Override + public Object attachBucketIdToRecord(Object record) { + int bucketId = computeBucketId(record); + RecordIdentifier recordIdentifier = new RecordIdentifier(INVALID_TRANSACTION_ID, bucketId, INVALID_ROW_ID); + structObjectInspector.setStructFieldData(record, recordIdentifierField, recordIdentifier); + return record; + } + + /** Based on: {@link org.apache.hadoop.hive.ql.exec.ReduceSinkOperator#computeBucketNumber(Object, int)}. */ + @Override + public int computeBucketId(Object record) { + int bucketId = 1; + + for (int columnIndex = 0; columnIndex < bucketFields.length; columnIndex++) { + Object columnValue = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]); + bucketId = bucketId * 31 + ObjectInspectorUtils.hashCode(columnValue, bucketFields[columnIndex].getFieldObjectInspector()); + } + + if (bucketId < 0) { + bucketId = -1 * bucketId; + } + + return bucketId % totalBuckets; + } + +}