http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java new file mode 100644 index 0000000..9aab346 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java @@ -0,0 +1,83 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utility class that can create new table partitions within the {@link IMetaStoreClient meta store}. */ +class CreatePartitionHelper { + + private static final Logger LOG = LoggerFactory.getLogger(CreatePartitionHelper.class); + + private final IMetaStoreClient metaStoreClient; + private final String databaseName; + private final String tableName; + + CreatePartitionHelper(IMetaStoreClient metaStoreClient, String databaseName, String tableName) { + this.metaStoreClient = metaStoreClient; + this.databaseName = databaseName; + this.tableName = tableName; + } + + /** Returns the expected {@link Path} for a given partition value. */ + Path getPathForPartition(List<String> newPartitionValues) throws WorkerException { + try { + String location; + if (newPartitionValues.isEmpty()) { + location = metaStoreClient.getTable(databaseName, tableName).getSd().getLocation(); + } else { + location = metaStoreClient.getPartition(databaseName, tableName, newPartitionValues).getSd().getLocation(); + } + LOG.debug("Found path {} for partition {}", location, newPartitionValues); + return new Path(location); + } catch (NoSuchObjectException e) { + throw new WorkerException("Table not found '" + databaseName + "." + tableName + "'.", e); + } catch (TException e) { + throw new WorkerException("Failed to get path for partitions '" + newPartitionValues + "' on table '" + + databaseName + "." + tableName + "' with meta store: " + metaStoreClient, e); + } + } + + /** Creates the specified partition if it does not already exist. Does nothing if the table is unpartitioned. */ + void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException { + if (newPartitionValues.isEmpty()) { + return; + } + + try { + LOG.debug("Attempting to create partition (if not exists) {}.{}:{}", databaseName, tableName, newPartitionValues); + Table table = metaStoreClient.getTable(databaseName, tableName); + + Partition partition = new Partition(); + partition.setDbName(table.getDbName()); + partition.setTableName(table.getTableName()); + StorageDescriptor partitionSd = new StorageDescriptor(table.getSd()); + partitionSd.setLocation(table.getSd().getLocation() + Path.SEPARATOR + + Warehouse.makePartName(table.getPartitionKeys(), newPartitionValues)); + partition.setSd(partitionSd); + partition.setValues(newPartitionValues); + + metaStoreClient.add_partition(partition); + } catch (AlreadyExistsException e) { + LOG.debug("Partition already exisits: {}.{}:{}", databaseName, tableName, newPartitionValues); + } catch (NoSuchObjectException e) { + LOG.error("Failed to create partition : " + newPartitionValues, e); + throw new PartitionCreationException("Table not found '" + databaseName + "." + tableName + "'.", e); + } catch (TException e) { + LOG.error("Failed to create partition : " + newPartitionValues, e); + throw new PartitionCreationException("Failed to create partition '" + newPartitionValues + "' on table '" + + databaseName + "." + tableName + "'", e); + } + } + +}
http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupRevisitedException.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupRevisitedException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupRevisitedException.java new file mode 100644 index 0000000..f8e46d6 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupRevisitedException.java @@ -0,0 +1,11 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +public class GroupRevisitedException extends WorkerException { + + private static final long serialVersionUID = 1L; + + GroupRevisitedException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupingValidator.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupingValidator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupingValidator.java new file mode 100644 index 0000000..8ae3904 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/GroupingValidator.java @@ -0,0 +1,74 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Tracks the (partition, bucket) combinations that have been encountered, checking that a group is not revisited. + * Potentially memory intensive. + */ +class GroupingValidator { + + private final Map<String, Set<Integer>> visited; + private final StringBuffer partitionKeyBuilder; + private long groups; + private String lastPartitionKey; + private int lastBucketId = -1; + + GroupingValidator() { + visited = new HashMap<String, Set<Integer>>(); + partitionKeyBuilder = new StringBuffer(64); + } + + /** + * Checks that this group is either the same as the last or is a new group. + */ + boolean isInSequence(List<String> partitionValues, int bucketId) { + String partitionKey = getPartitionKey(partitionValues); + if (Objects.equals(lastPartitionKey, partitionKey) && lastBucketId == bucketId) { + return true; + } + lastPartitionKey = partitionKey; + lastBucketId = bucketId; + + Set<Integer> bucketIdSet = visited.get(partitionKey); + if (bucketIdSet == null) { + // If the bucket id set component of this data structure proves to be too large there is the + // option of moving it to Trove or HPPC in an effort to reduce size. + bucketIdSet = new HashSet<>(); + visited.put(partitionKey, bucketIdSet); + } + + boolean newGroup = bucketIdSet.add(bucketId); + if (newGroup) { + groups++; + } + return newGroup; + } + + private String getPartitionKey(List<String> partitionValues) { + partitionKeyBuilder.setLength(0); + boolean first = true; + for (String element : partitionValues) { + if (first) { + first = false; + } else { + partitionKeyBuilder.append('/'); + } + partitionKeyBuilder.append(element); + } + String partitionKey = partitionKeyBuilder.toString(); + return partitionKey; + } + + @Override + public String toString() { + return "GroupingValidator [groups=" + groups + ",lastPartitionKey=" + lastPartitionKey + ",lastBucketId=" + + lastBucketId + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java new file mode 100644 index 0000000..96ecce9 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/Mutator.java @@ -0,0 +1,21 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; + +/** + * Interface for submitting mutation events to a given partition and bucket in an ACID table. Requires records to arrive + * in the order defined by the {@link SequenceValidator}. + */ +public interface Mutator extends Closeable, Flushable { + + void insert(Object record) throws IOException; + + void update(Object record) throws IOException; + + void delete(Object record) throws IOException; + + void flush() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java new file mode 100644 index 0000000..96f05e5 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java @@ -0,0 +1,281 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Orchestrates the application of an ordered sequence of mutation events to a given ACID table. Events must be grouped + * by partition, then bucket and ordered by origTxnId, then rowId. Ordering is enforced by the {@link SequenceValidator} + * and grouping is by the {@link GroupingValidator}. An acid delta file is created for each combination partition, and + * bucket id (a single transaction id is implied). Once a delta file has been closed it cannot be reopened. Therefore + * care is needed as to group the data correctly otherwise failures will occur if a delta belonging to group has been + * previously closed. The {@link MutatorCoordinator} will seamlessly handle transitions between groups, creating and + * closing {@link Mutator Mutators} as needed to write to the appropriate partition and bucket. New partitions will be + * created in the meta store if {@link AcidTable#createPartitions()} is set. + * <p/> + * {@link #insert(List, Object) Insert} events must be artificially assigned appropriate bucket ids in the preceding + * grouping phase so that they are grouped correctly. Note that any transaction id or row id assigned to the + * {@link RecordIdentifier RecordIdentifier} of such events will be ignored by both the coordinator and the underlying + * {@link RecordUpdater}. + */ +public class MutatorCoordinator implements Closeable, Flushable { + + private static final Logger LOG = LoggerFactory.getLogger(MutatorCoordinator.class); + + private final IMetaStoreClient metaStoreClient; + private final MutatorFactory mutatorFactory; + private final GroupingValidator groupingValidator; + private final SequenceValidator sequenceValidator; + private final AcidTable table; + private final RecordInspector recordInspector; + private final CreatePartitionHelper partitionHelper; + private final AcidOutputFormat<?, ?> outputFormat; + private final BucketIdResolver bucketIdResolver; + private final HiveConf configuration; + private final boolean deleteDeltaIfExists; + + private int bucketId; + private List<String> partitionValues; + private Path partitionPath; + private Mutator mutator; + + MutatorCoordinator(IMetaStoreClient metaStoreClient, HiveConf configuration, MutatorFactory mutatorFactory, + AcidTable table, boolean deleteDeltaIfExists) throws WorkerException { + this(metaStoreClient, configuration, mutatorFactory, new CreatePartitionHelper(metaStoreClient, + table.getDatabaseName(), table.getTableName()), new GroupingValidator(), new SequenceValidator(), table, + deleteDeltaIfExists); + } + + /** Visible for testing only. */ + MutatorCoordinator(IMetaStoreClient metaStoreClient, HiveConf configuration, MutatorFactory mutatorFactory, + CreatePartitionHelper partitionHelper, GroupingValidator groupingValidator, SequenceValidator sequenceValidator, + AcidTable table, boolean deleteDeltaIfExists) throws WorkerException { + this.metaStoreClient = metaStoreClient; + this.configuration = configuration; + this.mutatorFactory = mutatorFactory; + this.partitionHelper = partitionHelper; + this.groupingValidator = groupingValidator; + this.sequenceValidator = sequenceValidator; + this.table = table; + this.deleteDeltaIfExists = deleteDeltaIfExists; + this.recordInspector = this.mutatorFactory.newRecordInspector(); + bucketIdResolver = this.mutatorFactory.newBucketIdResolver(table.getTotalBuckets()); + + bucketId = -1; + outputFormat = createOutputFormat(table.getOutputFormatName(), configuration); + } + + /** + * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId). + * + * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed + * using the values in the record's bucketed columns. + * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId) + * sequence. + * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already + * been closed. + * @throws PartitionCreationException Could not create a new partition in the meta store. + * @throws WorkerException + */ + public void insert(List<String> partitionValues, Object record) throws WorkerException { + reconfigureState(OperationType.INSERT, partitionValues, record); + try { + mutator.insert(record); + LOG.debug("Inserted into partition={}, record={}", partitionValues, record); + } catch (IOException e) { + throw new WorkerException("Failed to insert record '" + record + " using mutator '" + mutator + "'.", e); + } + } + + /** + * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId). + * + * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed + * using the values in the record's bucketed columns. + * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId) + * sequence. + * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already + * been closed. + * @throws PartitionCreationException Could not create a new partition in the meta store. + * @throws WorkerException + */ + public void update(List<String> partitionValues, Object record) throws WorkerException { + reconfigureState(OperationType.UPDATE, partitionValues, record); + try { + mutator.update(record); + LOG.debug("Updated in partition={}, record={}", partitionValues, record); + } catch (IOException e) { + throw new WorkerException("Failed to update record '" + record + " using mutator '" + mutator + "'.", e); + } + } + + /** + * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId). + * + * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed + * using the values in the record's bucketed columns. + * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId) + * sequence. + * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already + * been closed. + * @throws PartitionCreationException Could not create a new partition in the meta store. + * @throws WorkerException + */ + public void delete(List<String> partitionValues, Object record) throws WorkerException { + reconfigureState(OperationType.DELETE, partitionValues, record); + try { + mutator.delete(record); + LOG.debug("Deleted from partition={}, record={}", partitionValues, record); + } catch (IOException e) { + throw new WorkerException("Failed to delete record '" + record + " using mutator '" + mutator + "'.", e); + } + } + + @Override + public void close() throws IOException { + try { + if (mutator != null) { + mutator.close(); + } + } finally { + metaStoreClient.close(); + } + } + + @Override + public void flush() throws IOException { + if (mutator != null) { + mutator.flush(); + } + } + + private void reconfigureState(OperationType operationType, List<String> newPartitionValues, Object record) + throws WorkerException { + RecordIdentifier newRecordIdentifier = extractRecordIdentifier(operationType, newPartitionValues, record); + int newBucketId = newRecordIdentifier.getBucketId(); + + if (newPartitionValues == null) { + newPartitionValues = Collections.emptyList(); + } + + try { + if (partitionHasChanged(newPartitionValues)) { + if (table.createPartitions()) { + partitionHelper.createPartitionIfNotExists(newPartitionValues); + } + Path newPartitionPath = partitionHelper.getPathForPartition(newPartitionValues); + resetMutator(newBucketId, newPartitionValues, newPartitionPath); + } else if (bucketIdHasChanged(newBucketId)) { + resetMutator(newBucketId, partitionValues, partitionPath); + } else { + validateRecordSequence(operationType, newRecordIdentifier); + } + } catch (IOException e) { + throw new WorkerException("Failed to reset mutator when performing " + operationType + " of record: " + record, e); + } + } + + private RecordIdentifier extractRecordIdentifier(OperationType operationType, List<String> newPartitionValues, + Object record) throws BucketIdException { + RecordIdentifier recordIdentifier = recordInspector.extractRecordIdentifier(record); + int computedBucketId = bucketIdResolver.computeBucketId(record); + if (operationType != OperationType.DELETE && recordIdentifier.getBucketId() != computedBucketId) { + throw new BucketIdException("RecordIdentifier.bucketId != computed bucketId (" + computedBucketId + + ") for record " + recordIdentifier + " in partition " + newPartitionValues + "."); + } + return recordIdentifier; + } + + private void resetMutator(int newBucketId, List<String> newPartitionValues, Path newPartitionPath) + throws IOException, GroupRevisitedException { + if (mutator != null) { + mutator.close(); + } + validateGrouping(newPartitionValues, newBucketId); + sequenceValidator.reset(); + if (deleteDeltaIfExists) { + // TODO: Should this be the concern of the mutator? + deleteDeltaIfExists(newPartitionPath, table.getTransactionId(), newBucketId); + } + mutator = mutatorFactory.newMutator(outputFormat, table.getTransactionId(), newPartitionPath, newBucketId); + bucketId = newBucketId; + partitionValues = newPartitionValues; + partitionPath = newPartitionPath; + LOG.debug("Reset mutator: bucketId={}, partition={}, partitionPath={}", bucketId, partitionValues, partitionPath); + } + + private boolean partitionHasChanged(List<String> newPartitionValues) { + boolean partitionHasChanged = !Objects.equals(this.partitionValues, newPartitionValues); + if (partitionHasChanged) { + LOG.debug("Partition changed from={}, to={}", this.partitionValues, newPartitionValues); + } + return partitionHasChanged; + } + + private boolean bucketIdHasChanged(int newBucketId) { + boolean bucketIdHasChanged = this.bucketId != newBucketId; + if (bucketIdHasChanged) { + LOG.debug("Bucket ID changed from={}, to={}", this.bucketId, newBucketId); + } + return bucketIdHasChanged; + } + + private void validateGrouping(List<String> newPartitionValues, int newBucketId) throws GroupRevisitedException { + if (!groupingValidator.isInSequence(newPartitionValues, bucketId)) { + throw new GroupRevisitedException("Group out of sequence: state=" + groupingValidator + ", partition=" + + newPartitionValues + ", bucketId=" + newBucketId); + } + } + + private void validateRecordSequence(OperationType operationType, RecordIdentifier newRecordIdentifier) + throws RecordSequenceException { + boolean identiferOutOfSequence = operationType != OperationType.INSERT + && !sequenceValidator.isInSequence(newRecordIdentifier); + if (identiferOutOfSequence) { + throw new RecordSequenceException("Records not in sequence: state=" + sequenceValidator + ", recordIdentifier=" + + newRecordIdentifier); + } + } + + @SuppressWarnings("unchecked") + private AcidOutputFormat<?, ?> createOutputFormat(String outputFormatName, HiveConf configuration) + throws WorkerException { + try { + return (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outputFormatName), configuration); + } catch (ClassNotFoundException e) { + throw new WorkerException("Could not locate class for '" + outputFormatName + "'.", e); + } + } + + private void deleteDeltaIfExists(Path partitionPath, long transactionId, int bucketId) throws IOException { + Path deltaPath = AcidUtils.createFilename(partitionPath, + new AcidOutputFormat.Options(configuration) + .bucket(bucketId) + .minimumTransactionId(transactionId) + .maximumTransactionId(transactionId)); + FileSystem fileSystem = deltaPath.getFileSystem(configuration); + if (fileSystem.exists(deltaPath)) { + LOG.info("Deleting existing delta path: {}", deltaPath); + fileSystem.delete(deltaPath, false); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java new file mode 100644 index 0000000..8851ea6 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java @@ -0,0 +1,76 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import java.io.IOException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.streaming.mutate.HiveConfFactory; +import org.apache.hive.hcatalog.streaming.mutate.UgiMetaStoreClientFactory; +import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable; + +/** Convenience class for building {@link MutatorCoordinator} instances. */ +public class MutatorCoordinatorBuilder { + + private HiveConf configuration; + private MutatorFactory mutatorFactory; + private UserGroupInformation authenticatedUser; + private String metaStoreUri; + private AcidTable table; + private boolean deleteDeltaIfExists; + + public MutatorCoordinatorBuilder configuration(HiveConf configuration) { + this.configuration = configuration; + return this; + } + + public MutatorCoordinatorBuilder authenticatedUser(UserGroupInformation authenticatedUser) { + this.authenticatedUser = authenticatedUser; + return this; + } + + public MutatorCoordinatorBuilder metaStoreUri(String metaStoreUri) { + this.metaStoreUri = metaStoreUri; + return this; + } + + /** Set the destination ACID table for this client. */ + public MutatorCoordinatorBuilder table(AcidTable table) { + this.table = table; + return this; + } + + /** + * If the delta file already exists, delete it. THis is useful in a MapReduce setting where a number of task retries + * will attempt to write the same delta file. + */ + public MutatorCoordinatorBuilder deleteDeltaIfExists() { + this.deleteDeltaIfExists = true; + return this; + } + + public MutatorCoordinatorBuilder mutatorFactory(MutatorFactory mutatorFactory) { + this.mutatorFactory = mutatorFactory; + return this; + } + + public MutatorCoordinator build() throws WorkerException, MetaException { + String user = authenticatedUser == null ? System.getProperty("user.name") : authenticatedUser.getShortUserName(); + boolean secureMode = authenticatedUser == null ? false : authenticatedUser.hasKerberosCredentials(); + + configuration = HiveConfFactory.newInstance(configuration, this.getClass(), metaStoreUri); + + IMetaStoreClient metaStoreClient; + try { + metaStoreClient = new UgiMetaStoreClientFactory(metaStoreUri, configuration, authenticatedUser, user, secureMode) + .newInstance(HCatUtil.getHiveMetastoreClient(configuration)); + } catch (IOException e) { + throw new WorkerException("Could not create meta store client.", e); + } + + return new MutatorCoordinator(metaStoreClient, configuration, mutatorFactory, table, deleteDeltaIfExists); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java new file mode 100644 index 0000000..850054f --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java @@ -0,0 +1,16 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; + +public interface MutatorFactory { + + Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId) throws IOException; + + RecordInspector newRecordInspector(); + + BucketIdResolver newBucketIdResolver(int totalBuckets); + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java new file mode 100644 index 0000000..0fe41d5 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java @@ -0,0 +1,84 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** Base {@link Mutator} implementation. Creates a suitable {@link RecordUpdater} and delegates mutation events. */ +public class MutatorImpl implements Mutator { + + private final long transactionId; + private final Path partitionPath; + private final int bucketId; + private final Configuration configuration; + private final int recordIdColumn; + private final ObjectInspector objectInspector; + private RecordUpdater updater; + + public MutatorImpl(Configuration configuration, int recordIdColumn, ObjectInspector objectInspector, + AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId) throws IOException { + this.configuration = configuration; + this.recordIdColumn = recordIdColumn; + this.objectInspector = objectInspector; + this.transactionId = transactionId; + this.partitionPath = partitionPath; + this.bucketId = bucketId; + + updater = createRecordUpdater(outputFormat); + } + + @Override + public void insert(Object record) throws IOException { + updater.insert(transactionId, record); + } + + @Override + public void update(Object record) throws IOException { + updater.update(transactionId, record); + } + + @Override + public void delete(Object record) throws IOException { + updater.delete(transactionId, record); + } + + /** + * This implementation does intentionally nothing at this time. We only use a single transaction and + * {@link OrcRecordUpdater#flush()} will purposefully throw and exception in this instance. We keep this here in the + * event that we support multiple transactions and to make it clear that the omission of an invocation of + * {@link OrcRecordUpdater#flush()} was not a mistake. + */ + @Override + public void flush() throws IOException { + // Intentionally do nothing + } + + @Override + public void close() throws IOException { + updater.close(false); + updater = null; + } + + @Override + public String toString() { + return "ObjectInspectorMutator [transactionId=" + transactionId + ", partitionPath=" + partitionPath + + ", bucketId=" + bucketId + "]"; + } + + protected RecordUpdater createRecordUpdater(AcidOutputFormat<?, ?> outputFormat) throws IOException { + return outputFormat.getRecordUpdater( + partitionPath, + new AcidOutputFormat.Options(configuration) + .inspector(objectInspector) + .bucket(bucketId) + .minimumTransactionId(transactionId) + .maximumTransactionId(transactionId) + .recordIdColumn(recordIdColumn)); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java new file mode 100644 index 0000000..5ecb1bb --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/OperationType.java @@ -0,0 +1,7 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +enum OperationType { + INSERT, + UPDATE, + DELETE; +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionCreationException.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionCreationException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionCreationException.java new file mode 100644 index 0000000..5b59e01 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionCreationException.java @@ -0,0 +1,15 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +public class PartitionCreationException extends WorkerException { + + private static final long serialVersionUID = 1L; + + PartitionCreationException(String message, Throwable cause) { + super(message, cause); + } + + PartitionCreationException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java new file mode 100644 index 0000000..11ef0dd --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspector.java @@ -0,0 +1,11 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import org.apache.hadoop.hive.ql.io.RecordIdentifier; + +/** Provide a means to extract {@link RecordIdentifier} from record objects. */ +public interface RecordInspector { + + /** Get the {@link RecordIdentifier} from the record - to be used for updates and deletes only. */ + RecordIdentifier extractRecordIdentifier(Object record); + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java new file mode 100644 index 0000000..18ee458 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordInspectorImpl.java @@ -0,0 +1,45 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import java.util.List; + +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +/** + * Standard {@link RecordInspector} implementation that uses the supplied {@link ObjectInspector} and + * {@link AcidOutputFormat.Options#recordIdColumn(int) record id column} to extract {@link RecordIdentifier + * RecordIdentifiers}, and calculate bucket ids from records. + */ +public class RecordInspectorImpl implements RecordInspector { + + private final StructObjectInspector structObjectInspector; + private final StructField recordIdentifierField; + + /** + * Note that all column indexes are with respect to your record structure, not the Hive table structure. + */ + public RecordInspectorImpl(ObjectInspector objectInspector, int recordIdColumn) { + if (!(objectInspector instanceof StructObjectInspector)) { + throw new IllegalArgumentException("Serious problem, expected a StructObjectInspector, " + "but got a " + + objectInspector.getClass().getName()); + } + + structObjectInspector = (StructObjectInspector) objectInspector; + List<? extends StructField> structFields = structObjectInspector.getAllStructFieldRefs(); + recordIdentifierField = structFields.get(recordIdColumn); + } + + public RecordIdentifier extractRecordIdentifier(Object record) { + return (RecordIdentifier) structObjectInspector.getStructFieldData(record, recordIdentifierField); + } + + @Override + public String toString() { + return "RecordInspectorImpl [structObjectInspector=" + structObjectInspector + ", recordIdentifierField=" + + recordIdentifierField + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordSequenceException.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordSequenceException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordSequenceException.java new file mode 100644 index 0000000..6b034f1 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/RecordSequenceException.java @@ -0,0 +1,11 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +public class RecordSequenceException extends WorkerException { + + private static final long serialVersionUID = 1L; + + RecordSequenceException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java new file mode 100644 index 0000000..bcff4d6 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java @@ -0,0 +1,49 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Verifies that the sequence of {@link RecordIdentifier RecordIdentifiers} are in a valid order for insertion into an + * ACID delta file in a given partition and bucket. + */ +class SequenceValidator { + + private static final Logger LOG = LoggerFactory.getLogger(SequenceValidator.class); + + private Long lastTxId; + private Long lastRowId; + + SequenceValidator() { + } + + boolean isInSequence(RecordIdentifier recordIdentifier) { + if (lastTxId != null && recordIdentifier.getTransactionId() < lastTxId) { + LOG.debug("Non-sequential transaction ID. Expected >{}, recordIdentifier={}", lastTxId, recordIdentifier); + return false; + } else if (lastTxId != null && recordIdentifier.getTransactionId() == lastTxId && lastRowId != null + && recordIdentifier.getRowId() <= lastRowId) { + LOG.debug("Non-sequential row ID. Expected >{}, recordIdentifier={}", lastRowId, recordIdentifier); + return false; + } + lastTxId = recordIdentifier.getTransactionId(); + lastRowId = recordIdentifier.getRowId(); + return true; + } + + /** + * Validator must be reset for each new partition and or bucket. + */ + void reset() { + lastTxId = null; + lastRowId = null; + LOG.debug("reset"); + } + + @Override + public String toString() { + return "SequenceValidator [lastTxId=" + lastTxId + ", lastRowId=" + lastRowId + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WorkerException.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WorkerException.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WorkerException.java new file mode 100644 index 0000000..1fa1998 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WorkerException.java @@ -0,0 +1,15 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +public class WorkerException extends Exception { + + private static final long serialVersionUID = 1L; + + WorkerException(String message, Throwable cause) { + super(message, cause); + } + + WorkerException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java new file mode 100644 index 0000000..86d70d4 --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java @@ -0,0 +1,82 @@ +package org.apache.hive.hcatalog.streaming.mutate; + +import java.util.List; + +import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClient; +import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClientBuilder; +import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable; +import org.apache.hive.hcatalog.streaming.mutate.client.Transaction; +import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolver; +import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinator; +import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinatorBuilder; +import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorFactory; + +public class ExampleUseCase { + + private String metaStoreUri; + private String databaseName; + private String tableName; + private boolean createPartitions = true; + private List<String> partitionValues1, partitionValues2, partitionValues3; + private Object record1, record2, record3; + private MutatorFactory mutatorFactory; + + /* This is an illustration, not a functioning example. */ + public void example() throws Exception { + // CLIENT/TOOL END + // + // Singleton instance in the job client + + // Create a client to manage our transaction + MutatorClient client = new MutatorClientBuilder() + .addSinkTable(databaseName, tableName, createPartitions) + .metaStoreUri(metaStoreUri) + .build(); + + // Get the transaction + Transaction transaction = client.newTransaction(); + + // Get serializable details of the destination tables + List<AcidTable> tables = client.getTables(); + + transaction.begin(); + + // CLUSTER / WORKER END + // + // Job submitted to the cluster + // + + BucketIdResolver bucketIdResolver = mutatorFactory.newBucketIdResolver(tables.get(0).getTotalBuckets()); + record1 = bucketIdResolver.attachBucketIdToRecord(record1); + + // -------------------------------------------------------------- + // DATA SHOULD GET SORTED BY YOUR ETL/MERGE PROCESS HERE + // + // Group the data by (partitionValues, ROW__ID.bucketId) + // Order the groups by (ROW__ID.lastTransactionId, ROW__ID.rowId) + // -------------------------------------------------------------- + + // One of these runs at the output of each reducer + // + MutatorCoordinator coordinator = new MutatorCoordinatorBuilder() + .metaStoreUri(metaStoreUri) + .table(tables.get(0)) + .mutatorFactory(mutatorFactory) + .build(); + + coordinator.insert(partitionValues1, record1); + coordinator.update(partitionValues2, record2); + coordinator.delete(partitionValues3, record3); + + coordinator.close(); + + // CLIENT/TOOL END + // + // The tasks have completed, control is back at the tool + + transaction.commit(); + + client.close(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java new file mode 100644 index 0000000..0d87a31 --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/MutableRecord.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming.mutate; + +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.io.Text; + +public class MutableRecord { + + // Column 0 + public final int id; + // Column 1 + public final Text msg; + // Column 2 + public RecordIdentifier rowId; + + public MutableRecord(int id, String msg, RecordIdentifier rowId) { + this.id = id; + this.msg = new Text(msg); + this.rowId = rowId; + } + + public MutableRecord(int id, String msg) { + this.id = id; + this.msg = new Text(msg); + rowId = null; + } + + @Override + public String toString() { + return "MutableRecord [id=" + id + ", msg=" + msg + ", rowId=" + rowId + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java new file mode 100644 index 0000000..2a851c8 --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java @@ -0,0 +1,51 @@ +package org.apache.hive.hcatalog.streaming.mutate; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolver; +import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolverImpl; +import org.apache.hive.hcatalog.streaming.mutate.worker.Mutator; +import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorFactory; +import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorImpl; +import org.apache.hive.hcatalog.streaming.mutate.worker.RecordInspector; +import org.apache.hive.hcatalog.streaming.mutate.worker.RecordInspectorImpl; + +public class ReflectiveMutatorFactory implements MutatorFactory { + + private final int recordIdColumn; + private final ObjectInspector objectInspector; + private final Configuration configuration; + private final int[] bucketColumnIndexes; + + public ReflectiveMutatorFactory(Configuration configuration, Class<?> recordClass, int recordIdColumn, + int[] bucketColumnIndexes) { + this.configuration = configuration; + this.recordIdColumn = recordIdColumn; + this.bucketColumnIndexes = bucketColumnIndexes; + objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(recordClass, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + @Override + public Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId) + throws IOException { + return new MutatorImpl(configuration, recordIdColumn, objectInspector, outputFormat, transactionId, partitionPath, + bucketId); + } + + @Override + public RecordInspector newRecordInspector() { + return new RecordInspectorImpl(objectInspector, recordIdColumn); + } + + @Override + public BucketIdResolver newBucketIdResolver(int totalBuckets) { + return new BucketIdResolverImpl(objectInspector, recordIdColumn, totalBuckets, bucketColumnIndexes); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java new file mode 100644 index 0000000..477ed8c --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming.mutate; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils.Directory; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.thrift.TException; + +public class StreamingAssert { + + public static class Factory { + private IMetaStoreClient metaStoreClient; + private final HiveConf conf; + + public Factory(IMetaStoreClient metaStoreClient, HiveConf conf) { + this.metaStoreClient = metaStoreClient; + this.conf = conf; + } + + public StreamingAssert newStreamingAssert(Table table) throws Exception { + return newStreamingAssert(table, Collections.<String> emptyList()); + } + + public StreamingAssert newStreamingAssert(Table table, List<String> partition) throws Exception { + return new StreamingAssert(metaStoreClient, conf, table, partition); + } + } + + private Table table; + private List<String> partition; + private IMetaStoreClient metaStoreClient; + private Directory dir; + private ValidTxnList txns; + private List<AcidUtils.ParsedDelta> currentDeltas; + private long min; + private long max; + private Path partitionLocation; + + StreamingAssert(IMetaStoreClient metaStoreClient, HiveConf conf, Table table, List<String> partition) + throws Exception { + this.metaStoreClient = metaStoreClient; + this.table = table; + this.partition = partition; + + txns = metaStoreClient.getValidTxns(); + partitionLocation = getPartitionLocation(); + dir = AcidUtils.getAcidState(partitionLocation, conf, txns); + assertEquals(0, dir.getObsolete().size()); + assertEquals(0, dir.getOriginalFiles().size()); + + currentDeltas = dir.getCurrentDirectories(); + min = Long.MAX_VALUE; + max = Long.MIN_VALUE; + System.out.println("Files found: "); + for (AcidUtils.ParsedDelta parsedDelta : currentDeltas) { + System.out.println(parsedDelta.getPath().toString()); + max = Math.max(parsedDelta.getMaxTransaction(), max); + min = Math.min(parsedDelta.getMinTransaction(), min); + } + } + + public void assertExpectedFileCount(int expectedFileCount) { + assertEquals(expectedFileCount, currentDeltas.size()); + } + + public void assertNothingWritten() { + assertExpectedFileCount(0); + } + + public void assertMinTransactionId(long expectedMinTransactionId) { + if (currentDeltas.isEmpty()) { + throw new AssertionError("No data"); + } + assertEquals(expectedMinTransactionId, min); + } + + public void assertMaxTransactionId(long expectedMaxTransactionId) { + if (currentDeltas.isEmpty()) { + throw new AssertionError("No data"); + } + assertEquals(expectedMaxTransactionId, max); + } + + List<Record> readRecords() throws Exception { + if (currentDeltas.isEmpty()) { + throw new AssertionError("No data"); + } + InputFormat<NullWritable, OrcStruct> inputFormat = new OrcInputFormat(); + JobConf job = new JobConf(); + job.set("mapred.input.dir", partitionLocation.toString()); + job.set("bucket_count", Integer.toString(table.getSd().getNumBuckets())); + job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); + InputSplit[] splits = inputFormat.getSplits(job, 1); + assertEquals(1, splits.length); + + final AcidRecordReader<NullWritable, OrcStruct> recordReader = (AcidRecordReader<NullWritable, OrcStruct>) inputFormat + .getRecordReader(splits[0], job, Reporter.NULL); + + NullWritable key = recordReader.createKey(); + OrcStruct value = recordReader.createValue(); + + List<Record> records = new ArrayList<>(); + while (recordReader.next(key, value)) { + RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier(); + Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(), + recordIdentifier.getBucketId(), recordIdentifier.getRowId()), value.toString()); + System.out.println(record); + records.add(record); + } + recordReader.close(); + return records; + } + + private Path getPartitionLocation() throws NoSuchObjectException, MetaException, TException { + Path partitionLocacation; + if (partition.isEmpty()) { + partitionLocacation = new Path(table.getSd().getLocation()); + } else { + // TODO: calculate this instead. Just because we're writing to the location doesn't mean that it'll + // always be wanted in the meta store right away. + List<Partition> partitionEntries = metaStoreClient.listPartitions(table.getDbName(), table.getTableName(), + partition, (short) 1); + partitionLocacation = new Path(partitionEntries.get(0).getSd().getLocation()); + } + return partitionLocacation; + } + + public static class Record { + private RecordIdentifier recordIdentifier; + private String row; + + Record(RecordIdentifier recordIdentifier, String row) { + this.recordIdentifier = recordIdentifier; + this.row = row; + } + + public RecordIdentifier getRecordIdentifier() { + return recordIdentifier; + } + + public String getRow() { + return row; + } + + @Override + public String toString() { + return "Record [recordIdentifier=" + recordIdentifier + ", row=" + row + "]"; + } + + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java new file mode 100644 index 0000000..f8c8537 --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java @@ -0,0 +1,261 @@ +package org.apache.hive.hcatalog.streaming.mutate; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.thrift.TException; + +public class StreamingTestUtils { + + public HiveConf newHiveConf(String metaStoreUri) { + HiveConf conf = new HiveConf(this.getClass()); + conf.set("fs.raw.impl", RawFileSystem.class.getName()); + if (metaStoreUri != null) { + conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri); + } + conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); + return conf; + } + + public void prepareTransactionDatabase(HiveConf conf) throws Exception { + TxnDbUtil.setConfValues(conf); + TxnDbUtil.cleanDb(); + TxnDbUtil.prepDb(); + } + + public IMetaStoreClient newMetaStoreClient(HiveConf conf) throws Exception { + return new HiveMetaStoreClient(conf); + } + + public static class RawFileSystem extends RawLocalFileSystem { + private static final URI NAME; + static { + try { + NAME = new URI("raw:///"); + } catch (URISyntaxException se) { + throw new IllegalArgumentException("bad uri", se); + } + } + + @Override + public URI getUri() { + return NAME; + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + File file = pathToFile(path); + if (!file.exists()) { + throw new FileNotFoundException("Can't find " + path); + } + // get close enough + short mod = 0; + if (file.canRead()) { + mod |= 0444; + } + if (file.canWrite()) { + mod |= 0200; + } + if (file.canExecute()) { + mod |= 0111; + } + return new FileStatus(file.length(), file.isDirectory(), 1, 1024, file.lastModified(), file.lastModified(), + FsPermission.createImmutable(mod), "owen", "users", path); + } + } + + public static DatabaseBuilder databaseBuilder(File warehouseFolder) { + return new DatabaseBuilder(warehouseFolder); + } + + public static class DatabaseBuilder { + + private Database database; + private File warehouseFolder; + + public DatabaseBuilder(File warehouseFolder) { + this.warehouseFolder = warehouseFolder; + database = new Database(); + } + + public DatabaseBuilder name(String name) { + database.setName(name); + File databaseFolder = new File(warehouseFolder, name + ".db"); + String databaseLocation = "raw://" + databaseFolder.toURI().getPath(); + database.setLocationUri(databaseLocation); + return this; + } + + public Database dropAndCreate(IMetaStoreClient metaStoreClient) throws Exception { + if (metaStoreClient == null) { + throw new IllegalArgumentException(); + } + try { + for (String table : metaStoreClient.listTableNamesByFilter(database.getName(), "", (short) -1)) { + metaStoreClient.dropTable(database.getName(), table, true, true); + } + metaStoreClient.dropDatabase(database.getName()); + } catch (TException e) { + } + metaStoreClient.createDatabase(database); + return database; + } + + public Database build() { + return database; + } + + } + + public static TableBuilder tableBuilder(Database database) { + return new TableBuilder(database); + } + + public static class TableBuilder { + + private Table table; + private StorageDescriptor sd; + private SerDeInfo serDeInfo; + private Database database; + private List<List<String>> partitions; + private List<String> columnNames; + private List<String> columnTypes; + private List<String> partitionKeys; + + public TableBuilder(Database database) { + this.database = database; + partitions = new ArrayList<>(); + columnNames = new ArrayList<>(); + columnTypes = new ArrayList<>(); + partitionKeys = Collections.emptyList(); + table = new Table(); + table.setDbName(database.getName()); + table.setTableType(TableType.MANAGED_TABLE.toString()); + Map<String, String> tableParams = new HashMap<String, String>(); + tableParams.put("transactional", Boolean.TRUE.toString()); + table.setParameters(tableParams); + + sd = new StorageDescriptor(); + sd.setInputFormat(HiveInputFormat.class.getName()); + sd.setOutputFormat(OrcOutputFormat.class.getName()); + sd.setNumBuckets(1); + table.setSd(sd); + + serDeInfo = new SerDeInfo(); + serDeInfo.setParameters(new HashMap<String, String>()); + serDeInfo.getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1"); + serDeInfo.setSerializationLib(OrcSerde.class.getName()); + sd.setSerdeInfo(serDeInfo); + } + + public TableBuilder name(String name) { + sd.setLocation(database.getLocationUri() + Path.SEPARATOR + name); + table.setTableName(name); + serDeInfo.setName(name); + return this; + } + + public TableBuilder buckets(int buckets) { + sd.setNumBuckets(buckets); + return this; + } + + public TableBuilder addColumn(String columnName, String columnType) { + columnNames.add(columnName); + columnTypes.add(columnType); + return this; + } + + public TableBuilder partitionKeys(String... partitionKeys) { + this.partitionKeys = Arrays.asList(partitionKeys); + return this; + } + + public TableBuilder addPartition(String... partitionValues) { + partitions.add(Arrays.asList(partitionValues)); + return this; + } + + public TableBuilder addPartition(List<String> partitionValues) { + partitions.add(partitionValues); + return this; + } + + public Table create(IMetaStoreClient metaStoreClient) throws Exception { + if (metaStoreClient == null) { + throw new IllegalArgumentException(); + } + return internalCreate(metaStoreClient); + } + + public Table build() throws Exception { + return internalCreate(null); + } + + private Table internalCreate(IMetaStoreClient metaStoreClient) throws Exception { + List<FieldSchema> fields = new ArrayList<FieldSchema>(columnNames.size()); + for (int i = 0; i < columnNames.size(); i++) { + fields.add(new FieldSchema(columnNames.get(i), columnTypes.get(i), "")); + } + sd.setCols(fields); + + if (!partitionKeys.isEmpty()) { + List<FieldSchema> partitionFields = new ArrayList<FieldSchema>(); + for (String partitionKey : partitionKeys) { + partitionFields.add(new FieldSchema(partitionKey, serdeConstants.STRING_TYPE_NAME, "")); + } + table.setPartitionKeys(partitionFields); + } + if (metaStoreClient != null) { + metaStoreClient.createTable(table); + } + + for (List<String> partitionValues : partitions) { + Partition partition = new Partition(); + partition.setDbName(database.getName()); + partition.setTableName(table.getTableName()); + StorageDescriptor partitionSd = new StorageDescriptor(table.getSd()); + partitionSd.setLocation(table.getSd().getLocation() + Path.SEPARATOR + + Warehouse.makePartName(table.getPartitionKeys(), partitionValues)); + partition.setSd(partitionSd); + partition.setValues(partitionValues); + + if (metaStoreClient != null) { + metaStoreClient.add_partition(partition); + } + } + return table; + } + } + +}