Repository: hive Updated Branches: refs/heads/master dfdc6700c -> 6ec72de79
HIVE-11229 Mutation API: Coordinator communication with meta store should be optional (Elliot West via gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6ec72de7 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6ec72de7 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6ec72de7 Branch: refs/heads/master Commit: 6ec72de79ebb898f699402e8a2d7681c4e39ecd2 Parents: dfdc670 Author: Alan Gates <ga...@hortonworks.com> Authored: Tue Jul 21 11:08:59 2015 -0700 Committer: Alan Gates <ga...@hortonworks.com> Committed: Tue Jul 21 11:08:59 2015 -0700 ---------------------------------------------------------------------- .../hive/hcatalog/streaming/mutate/package.html | 31 ++++- .../mutate/worker/CreatePartitionHelper.java | 83 -------------- .../mutate/worker/MetaStorePartitionHelper.java | 102 +++++++++++++++++ .../mutate/worker/MutatorCoordinator.java | 21 ++-- .../worker/MutatorCoordinatorBuilder.java | 41 +++++-- .../mutate/worker/PartitionHelper.java | 17 +++ .../mutate/worker/WarehousePartitionHelper.java | 69 ++++++++++++ .../worker/TestMetaStorePartitionHelper.java | 112 +++++++++++++++++++ .../mutate/worker/TestMutatorCoordinator.java | 40 ++++--- .../worker/TestWarehousePartitionHelper.java | 57 ++++++++++ 10 files changed, 452 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html index 09a55b6..72ce6b1 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html @@ -416,17 +416,39 @@ data, is the responsibility of the client using the API. </p> <h3>Dynamic Partition Creation:</h3> +<p> It is very likely to be desirable to have new partitions created automatically (say on a hourly basis). In such cases requiring the Hive -admin to pre-create the necessary partitions may not be reasonable. -Consequently the API allows coordinators to create partitions as needed -(see: +admin to pre-create the necessary partitions may not be reasonable. The +API allows coordinators to create partitions as needed (see: <code>MutatorClientBuilder.addSinkTable(String, String, boolean)</code> ). Partition creation being an atomic action, multiple coordinators can race to create the partition, but only one would succeed, so coordinators clients need not synchronize when creating a partition. The user of the coordinator process needs to be given write permissions on the Hive table in order to create partitions. +</p> + +<p>Care must be taken when using this option as it requires that the +coordinators maintain a connection with the meta store database. When +coordinator are running in a distributed environment (as is likely the +case) it possible for them to overwhelm the meta store. In such cases it +may be better to disable partition creation and collect a set of +affected partitions as part of your ETL merge process. These can then be +created with a single meta store connection in your client code, once +the cluster side merge process is complete.</p> +<p> +Finally, note that when partition creation is disabled the coordinators +must synthesize the partition URI as they cannot retrieve it from the +meta store. This may cause problems if the layout of your partitions in +HDFS does not follow the Hive standard (as implemented in +<code> +org.apache.hadoop.hive.metastore.Warehouse.getPartitionPath(Path, +LinkedHashMap +<String , String>). +</code> +) +</p> <h2>Reading data</h2> @@ -473,6 +495,7 @@ table. The <code>AcidTableSerializer</code> can help you transport the <code>Aci when your workers are in a distributed environment. </li> <li>Compute your mutation set (this is your ETL merge process).</li> +<li>Optionally: collect the set of affected partitions.</li> <li>Append bucket ids to insertion records. A <code>BucketIdResolver</code> can help here. </li> @@ -481,6 +504,8 @@ can help here. <li>Close your coordinators.</li> <li>Abort or commit the transaction.</li> <li>Close your mutation client.</li> +<li>Optionally: create any affected partitions that do not exist in +the meta store.</li> </ol> <p> See http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java deleted file mode 100644 index 9aab346..0000000 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java +++ /dev/null @@ -1,83 +0,0 @@ -package org.apache.hive.hcatalog.streaming.mutate.worker; - -import java.util.List; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Utility class that can create new table partitions within the {@link IMetaStoreClient meta store}. */ -class CreatePartitionHelper { - - private static final Logger LOG = LoggerFactory.getLogger(CreatePartitionHelper.class); - - private final IMetaStoreClient metaStoreClient; - private final String databaseName; - private final String tableName; - - CreatePartitionHelper(IMetaStoreClient metaStoreClient, String databaseName, String tableName) { - this.metaStoreClient = metaStoreClient; - this.databaseName = databaseName; - this.tableName = tableName; - } - - /** Returns the expected {@link Path} for a given partition value. */ - Path getPathForPartition(List<String> newPartitionValues) throws WorkerException { - try { - String location; - if (newPartitionValues.isEmpty()) { - location = metaStoreClient.getTable(databaseName, tableName).getSd().getLocation(); - } else { - location = metaStoreClient.getPartition(databaseName, tableName, newPartitionValues).getSd().getLocation(); - } - LOG.debug("Found path {} for partition {}", location, newPartitionValues); - return new Path(location); - } catch (NoSuchObjectException e) { - throw new WorkerException("Table not found '" + databaseName + "." + tableName + "'.", e); - } catch (TException e) { - throw new WorkerException("Failed to get path for partitions '" + newPartitionValues + "' on table '" - + databaseName + "." + tableName + "' with meta store: " + metaStoreClient, e); - } - } - - /** Creates the specified partition if it does not already exist. Does nothing if the table is unpartitioned. */ - void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException { - if (newPartitionValues.isEmpty()) { - return; - } - - try { - LOG.debug("Attempting to create partition (if not exists) {}.{}:{}", databaseName, tableName, newPartitionValues); - Table table = metaStoreClient.getTable(databaseName, tableName); - - Partition partition = new Partition(); - partition.setDbName(table.getDbName()); - partition.setTableName(table.getTableName()); - StorageDescriptor partitionSd = new StorageDescriptor(table.getSd()); - partitionSd.setLocation(table.getSd().getLocation() + Path.SEPARATOR - + Warehouse.makePartName(table.getPartitionKeys(), newPartitionValues)); - partition.setSd(partitionSd); - partition.setValues(newPartitionValues); - - metaStoreClient.add_partition(partition); - } catch (AlreadyExistsException e) { - LOG.debug("Partition already exisits: {}.{}:{}", databaseName, tableName, newPartitionValues); - } catch (NoSuchObjectException e) { - LOG.error("Failed to create partition : " + newPartitionValues, e); - throw new PartitionCreationException("Table not found '" + databaseName + "." + tableName + "'.", e); - } catch (TException e) { - LOG.error("Failed to create partition : " + newPartitionValues, e); - throw new PartitionCreationException("Failed to create partition '" + newPartitionValues + "' on table '" - + databaseName + "." + tableName + "'", e); - } - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MetaStorePartitionHelper.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MetaStorePartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MetaStorePartitionHelper.java new file mode 100644 index 0000000..7e2e006 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MetaStorePartitionHelper.java @@ -0,0 +1,102 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PartitionHelper} implementation that uses the {@link IMetaStoreClient meta store} to both create partitions + * and obtain information concerning partitions. Exercise care when using this from within workers that are running in a + * cluster as it may overwhelm the meta store database instance. As an alternative, consider using the + * {@link WarehousePartitionHelper}, collecting the affected partitions as an output of your merge job, and then + * retrospectively adding partitions in your client. + */ +class MetaStorePartitionHelper implements PartitionHelper { + + private static final Logger LOG = LoggerFactory.getLogger(MetaStorePartitionHelper.class); + + private final IMetaStoreClient metaStoreClient; + private final String databaseName; + private final String tableName; + private final Path tablePath; + + MetaStorePartitionHelper(IMetaStoreClient metaStoreClient, String databaseName, String tableName, Path tablePath) { + this.metaStoreClient = metaStoreClient; + this.tablePath = tablePath; + this.databaseName = databaseName; + this.tableName = tableName; + } + + /** Returns the expected {@link Path} for a given partition value. */ + @Override + public Path getPathForPartition(List<String> newPartitionValues) throws WorkerException { + if (newPartitionValues.isEmpty()) { + LOG.debug("Using path {} for unpartitioned table {}.{}", tablePath, databaseName, tableName); + return tablePath; + } else { + try { + String location = metaStoreClient + .getPartition(databaseName, tableName, newPartitionValues) + .getSd() + .getLocation(); + LOG.debug("Found path {} for partition {}", location, newPartitionValues); + return new Path(location); + } catch (NoSuchObjectException e) { + throw new WorkerException("Table not found '" + databaseName + "." + tableName + "'.", e); + } catch (TException e) { + throw new WorkerException("Failed to get path for partitions '" + newPartitionValues + "' on table '" + + databaseName + "." + tableName + "' with meta store: " + metaStoreClient, e); + } + } + } + + /** Creates the specified partition if it does not already exist. Does nothing if the table is unpartitioned. */ + @Override + public void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException { + if (newPartitionValues.isEmpty()) { + return; + } + + try { + LOG.debug("Attempting to create partition (if not exists) {}.{}:{}", databaseName, tableName, newPartitionValues); + Table table = metaStoreClient.getTable(databaseName, tableName); + + Partition partition = new Partition(); + partition.setDbName(table.getDbName()); + partition.setTableName(table.getTableName()); + StorageDescriptor partitionSd = new StorageDescriptor(table.getSd()); + partitionSd.setLocation(table.getSd().getLocation() + Path.SEPARATOR + + Warehouse.makePartName(table.getPartitionKeys(), newPartitionValues)); + partition.setSd(partitionSd); + partition.setValues(newPartitionValues); + + metaStoreClient.add_partition(partition); + } catch (AlreadyExistsException e) { + LOG.debug("Partition already exisits: {}.{}:{}", databaseName, tableName, newPartitionValues); + } catch (NoSuchObjectException e) { + LOG.error("Failed to create partition : " + newPartitionValues, e); + throw new PartitionCreationException("Table not found '" + databaseName + "." + tableName + "'.", e); + } catch (TException e) { + LOG.error("Failed to create partition : " + newPartitionValues, e); + throw new PartitionCreationException("Failed to create partition '" + newPartitionValues + "' on table '" + + databaseName + "." + tableName + "'", e); + } + } + + @Override + public void close() throws IOException { + metaStoreClient.close(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java index 96f05e5..eaed09e 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java @@ -11,7 +11,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordIdentifier; @@ -40,13 +39,12 @@ public class MutatorCoordinator implements Closeable, Flushable { private static final Logger LOG = LoggerFactory.getLogger(MutatorCoordinator.class); - private final IMetaStoreClient metaStoreClient; private final MutatorFactory mutatorFactory; private final GroupingValidator groupingValidator; private final SequenceValidator sequenceValidator; private final AcidTable table; private final RecordInspector recordInspector; - private final CreatePartitionHelper partitionHelper; + private final PartitionHelper partitionHelper; private final AcidOutputFormat<?, ?> outputFormat; private final BucketIdResolver bucketIdResolver; private final HiveConf configuration; @@ -57,18 +55,16 @@ public class MutatorCoordinator implements Closeable, Flushable { private Path partitionPath; private Mutator mutator; - MutatorCoordinator(IMetaStoreClient metaStoreClient, HiveConf configuration, MutatorFactory mutatorFactory, + MutatorCoordinator(HiveConf configuration, MutatorFactory mutatorFactory, PartitionHelper partitionHelper, AcidTable table, boolean deleteDeltaIfExists) throws WorkerException { - this(metaStoreClient, configuration, mutatorFactory, new CreatePartitionHelper(metaStoreClient, - table.getDatabaseName(), table.getTableName()), new GroupingValidator(), new SequenceValidator(), table, + this(configuration, mutatorFactory, partitionHelper, new GroupingValidator(), new SequenceValidator(), table, deleteDeltaIfExists); } /** Visible for testing only. */ - MutatorCoordinator(IMetaStoreClient metaStoreClient, HiveConf configuration, MutatorFactory mutatorFactory, - CreatePartitionHelper partitionHelper, GroupingValidator groupingValidator, SequenceValidator sequenceValidator, - AcidTable table, boolean deleteDeltaIfExists) throws WorkerException { - this.metaStoreClient = metaStoreClient; + MutatorCoordinator(HiveConf configuration, MutatorFactory mutatorFactory, PartitionHelper partitionHelper, + GroupingValidator groupingValidator, SequenceValidator sequenceValidator, AcidTable table, + boolean deleteDeltaIfExists) throws WorkerException { this.configuration = configuration; this.mutatorFactory = mutatorFactory; this.partitionHelper = partitionHelper; @@ -156,7 +152,7 @@ public class MutatorCoordinator implements Closeable, Flushable { mutator.close(); } } finally { - metaStoreClient.close(); + partitionHelper.close(); } } @@ -178,7 +174,7 @@ public class MutatorCoordinator implements Closeable, Flushable { try { if (partitionHasChanged(newPartitionValues)) { - if (table.createPartitions()) { + if (table.createPartitions() && operationType == OperationType.INSERT) { partitionHelper.createPartitionIfNotExists(newPartitionValues); } Path newPartitionPath = partitionHelper.getPathForPartition(newPartitionValues); @@ -265,6 +261,7 @@ public class MutatorCoordinator implements Closeable, Flushable { } } + /* A delta may be present from a previous failed task attempt. */ private void deleteDeltaIfExists(Path partitionPath, long transactionId, int bucketId) throws IOException { Path deltaPath = AcidUtils.createFilename(partitionPath, new AcidOutputFormat.Options(configuration) http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java index 8851ea6..cd28e02 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java @@ -1,9 +1,13 @@ package org.apache.hive.hcatalog.streaming.mutate.worker; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.hcatalog.common.HCatUtil; @@ -57,20 +61,41 @@ public class MutatorCoordinatorBuilder { } public MutatorCoordinator build() throws WorkerException, MetaException { - String user = authenticatedUser == null ? System.getProperty("user.name") : authenticatedUser.getShortUserName(); - boolean secureMode = authenticatedUser == null ? false : authenticatedUser.hasKerberosCredentials(); - configuration = HiveConfFactory.newInstance(configuration, this.getClass(), metaStoreUri); - IMetaStoreClient metaStoreClient; + PartitionHelper partitionHelper; + if (table.createPartitions()) { + partitionHelper = newMetaStorePartitionHelper(); + } else { + partitionHelper = newWarehousePartitionHelper(); + } + + return new MutatorCoordinator(configuration, mutatorFactory, partitionHelper, table, deleteDeltaIfExists); + } + + private PartitionHelper newWarehousePartitionHelper() throws MetaException, WorkerException { + String location = table.getTable().getSd().getLocation(); + Path tablePath = new Path(location); + List<FieldSchema> partitionFields = table.getTable().getPartitionKeys(); + List<String> partitionColumns = new ArrayList<>(partitionFields.size()); + for (FieldSchema field : partitionFields) { + partitionColumns.add(field.getName()); + } + return new WarehousePartitionHelper(configuration, tablePath, partitionColumns); + } + + private PartitionHelper newMetaStorePartitionHelper() throws MetaException, WorkerException { + String user = authenticatedUser == null ? System.getProperty("user.name") : authenticatedUser.getShortUserName(); + boolean secureMode = authenticatedUser == null ? false : authenticatedUser.hasKerberosCredentials(); try { - metaStoreClient = new UgiMetaStoreClientFactory(metaStoreUri, configuration, authenticatedUser, user, secureMode) - .newInstance(HCatUtil.getHiveMetastoreClient(configuration)); + IMetaStoreClient metaStoreClient = new UgiMetaStoreClientFactory(metaStoreUri, configuration, authenticatedUser, + user, secureMode).newInstance(HCatUtil.getHiveMetastoreClient(configuration)); + String tableLocation = table.getTable().getSd().getLocation(); + Path tablePath = new Path(tableLocation); + return new MetaStorePartitionHelper(metaStoreClient, table.getDatabaseName(), table.getTableName(), tablePath); } catch (IOException e) { throw new WorkerException("Could not create meta store client.", e); } - - return new MutatorCoordinator(metaStoreClient, configuration, mutatorFactory, table, deleteDeltaIfExists); } } http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionHelper.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionHelper.java new file mode 100644 index 0000000..d70207a --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionHelper.java @@ -0,0 +1,17 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import java.io.Closeable; +import java.util.List; + +import org.apache.hadoop.fs.Path; + +/** Implementations are responsible for creating and obtaining path information about partitions. */ +interface PartitionHelper extends Closeable { + + /** Return the location of the partition described by the provided values. */ + Path getPathForPartition(List<String> newPartitionValues) throws WorkerException; + + /** Create the partition described by the provided values if it does not exist already. */ + void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WarehousePartitionHelper.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WarehousePartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WarehousePartitionHelper.java new file mode 100644 index 0000000..c2edee3 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WarehousePartitionHelper.java @@ -0,0 +1,69 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; + +/** + * A {@link PartitionHelper} implementation that uses the {@link Warehouse} class to obtain partition path information. + * As this does not require a connection to the meta store database it is safe to use in workers that are distributed on + * a cluster. However, it does not support the creation of new partitions so you will need to provide a mechanism to + * collect affected partitions in your merge job and create them from your client. + */ +class WarehousePartitionHelper implements PartitionHelper { + + private final Warehouse warehouse; + private final Path tablePath; + private final LinkedHashMap<String, String> partitions; + private final List<String> partitionColumns; + + WarehousePartitionHelper(Configuration configuration, Path tablePath, List<String> partitionColumns) + throws MetaException { + this.tablePath = tablePath; + this.partitionColumns = partitionColumns; + this.partitions = new LinkedHashMap<>(partitionColumns.size()); + for (String partitionColumn : partitionColumns) { + partitions.put(partitionColumn, null); + } + warehouse = new Warehouse(configuration); + } + + @Override + public Path getPathForPartition(List<String> partitionValues) throws WorkerException { + if (partitionValues.size() != partitionColumns.size()) { + throw new IllegalArgumentException("Incorrect number of partition values. columns=" + partitionColumns + + ",values=" + partitionValues); + } + if (partitionColumns.isEmpty()) { + return tablePath; + } + for (int columnIndex = 0; columnIndex < partitionValues.size(); columnIndex++) { + String partitionColumn = partitionColumns.get(columnIndex); + String partitionValue = partitionValues.get(columnIndex); + partitions.put(partitionColumn, partitionValue); + } + try { + return warehouse.getPartitionPath(tablePath, partitions); + } catch (MetaException e) { + throw new WorkerException("Unable to determine partition path. tablePath=" + tablePath + ",partition=" + + partitionValues, e); + } + } + + /** Throws {@link UnsupportedOperationException}. */ + @Override + public void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException { + throw new UnsupportedOperationException("You require a connection to the meta store to do this."); + } + + @Override + public void close() throws IOException { + // Nothing to close here. + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMetaStorePartitionHelper.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMetaStorePartitionHelper.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMetaStorePartitionHelper.java new file mode 100644 index 0000000..cc4173e --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMetaStorePartitionHelper.java @@ -0,0 +1,112 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestMetaStorePartitionHelper { + + private static final Path TABLE_PATH = new Path("table"); + private static final String TABLE_LOCATION = TABLE_PATH.toString(); + + private static final FieldSchema PARTITION_KEY_A = new FieldSchema("A", "string", null); + private static final FieldSchema PARTITION_KEY_B = new FieldSchema("B", "string", null); + private static final List<FieldSchema> PARTITION_KEYS = Arrays.asList(PARTITION_KEY_A, PARTITION_KEY_B); + private static final Path PARTITION_PATH = new Path(TABLE_PATH, "a=1/b=2"); + private static final String PARTITION_LOCATION = PARTITION_PATH.toString(); + + private static final String DATABASE_NAME = "db"; + private static final String TABLE_NAME = "one"; + + private static final List<String> UNPARTITIONED_VALUES = Collections.emptyList(); + private static final List<String> PARTITIONED_VALUES = Arrays.asList("1", "2"); + + @Mock + private IMetaStoreClient mockClient; + @Mock + private Table mockTable; + private StorageDescriptor tableStorageDescriptor = new StorageDescriptor(); + + @Mock + private Partition mockPartition; + @Mock + private StorageDescriptor mockPartitionStorageDescriptor; + @Captor + private ArgumentCaptor<Partition> partitionCaptor; + + private PartitionHelper helper; + + @Before + public void injectMocks() throws Exception { + when(mockClient.getTable(DATABASE_NAME, TABLE_NAME)).thenReturn(mockTable); + when(mockTable.getDbName()).thenReturn(DATABASE_NAME); + when(mockTable.getTableName()).thenReturn(TABLE_NAME); + when(mockTable.getPartitionKeys()).thenReturn(PARTITION_KEYS); + when(mockTable.getSd()).thenReturn(tableStorageDescriptor); + tableStorageDescriptor.setLocation(TABLE_LOCATION); + + when(mockClient.getPartition(DATABASE_NAME, TABLE_NAME, PARTITIONED_VALUES)).thenReturn(mockPartition); + when(mockPartition.getSd()).thenReturn(mockPartitionStorageDescriptor); + when(mockPartitionStorageDescriptor.getLocation()).thenReturn(PARTITION_LOCATION); + + helper = new MetaStorePartitionHelper(mockClient, DATABASE_NAME, TABLE_NAME, TABLE_PATH); + } + + @Test + public void getPathForUnpartitionedTable() throws Exception { + Path path = helper.getPathForPartition(UNPARTITIONED_VALUES); + assertThat(path, is(TABLE_PATH)); + verifyZeroInteractions(mockClient); + } + + @Test + public void getPathForPartitionedTable() throws Exception { + Path path = helper.getPathForPartition(PARTITIONED_VALUES); + assertThat(path, is(PARTITION_PATH)); + } + + @Test + public void createOnUnpartitionTableDoesNothing() throws Exception { + helper.createPartitionIfNotExists(UNPARTITIONED_VALUES); + verifyZeroInteractions(mockClient); + } + + @Test + public void createOnPartitionTable() throws Exception { + helper.createPartitionIfNotExists(PARTITIONED_VALUES); + + verify(mockClient).add_partition(partitionCaptor.capture()); + Partition actual = partitionCaptor.getValue(); + assertThat(actual.getSd().getLocation(), is(PARTITION_LOCATION)); + assertThat(actual.getValues(), is(PARTITIONED_VALUES)); + } + + @Test + public void closeSucceeds() throws IOException { + helper.close(); + verify(mockClient).close(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java index 6e9ffa2..2983d12 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java @@ -2,8 +2,10 @@ package org.apache.hive.hcatalog.streaming.mutate.worker; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; @@ -15,7 +17,6 @@ import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable; @@ -42,11 +43,9 @@ public class TestMutatorCoordinator { private static final RecordIdentifier ROW__ID_INSERT = new RecordIdentifier(-1L, BUCKET_ID, -1L); @Mock - private IMetaStoreClient mockMetaStoreClient; - @Mock private MutatorFactory mockMutatorFactory; @Mock - private CreatePartitionHelper mockPartitionHelper; + private PartitionHelper mockPartitionHelper; @Mock private GroupingValidator mockGroupingValidator; @Mock @@ -79,8 +78,8 @@ public class TestMutatorCoordinator { when(mockSequenceValidator.isInSequence(any(RecordIdentifier.class))).thenReturn(true); when(mockGroupingValidator.isInSequence(any(List.class), anyInt())).thenReturn(true); - coordinator = new MutatorCoordinator(mockMetaStoreClient, configuration, mockMutatorFactory, mockPartitionHelper, - mockGroupingValidator, mockSequenceValidator, mockAcidTable, false); + coordinator = new MutatorCoordinator(configuration, mockMutatorFactory, mockPartitionHelper, mockGroupingValidator, + mockSequenceValidator, mockAcidTable, false); } @Test @@ -127,7 +126,6 @@ public class TestMutatorCoordinator { coordinator.update(UNPARTITIONED, RECORD); coordinator.delete(UNPARTITIONED, RECORD); - verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); verify(mockMutatorFactory) .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID + 1)); @@ -145,12 +143,11 @@ public class TestMutatorCoordinator { when(mockPartitionHelper.getPathForPartition(PARTITION_A)).thenReturn(PATH_A); when(mockPartitionHelper.getPathForPartition(PARTITION_B)).thenReturn(PATH_B); - coordinator.update(PARTITION_A, RECORD); - coordinator.delete(PARTITION_B, RECORD); - coordinator.update(PARTITION_B, RECORD); - coordinator.insert(PARTITION_B, RECORD); + coordinator.update(PARTITION_A, RECORD); /* PaB0 */ + coordinator.insert(PARTITION_B, RECORD); /* PbB0 */ + coordinator.delete(PARTITION_B, RECORD); /* PbB0 */ + coordinator.update(PARTITION_B, RECORD); /* PbB1 */ - verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A); verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B); verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); verify(mockMutatorFactory, times(2)).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), @@ -163,6 +160,18 @@ public class TestMutatorCoordinator { verify(mockSequenceValidator, times(4)).reset(); } + @Test + public void partitionThenBucketChangesNoCreateAsPartitionEstablished() throws Exception { + when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0, ROW__ID_INSERT); + when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(0, 0); + when(mockPartitionHelper.getPathForPartition(PARTITION_B)).thenReturn(PATH_B); + + coordinator.delete(PARTITION_B, RECORD); /* PbB0 */ + coordinator.insert(PARTITION_B, RECORD); /* PbB0 */ + + verify(mockPartitionHelper, never()).createPartitionIfNotExists(anyList()); + } + @Test(expected = RecordSequenceException.class) public void outOfSequence() throws Exception { when(mockSequenceValidator.isInSequence(any(RecordIdentifier.class))).thenReturn(false); @@ -175,14 +184,14 @@ public class TestMutatorCoordinator { verify(mockMutator).update(RECORD); verify(mockMutator).delete(RECORD); } - + @Test(expected = GroupRevisitedException.class) public void revisitGroup() throws Exception { when(mockGroupingValidator.isInSequence(any(List.class), anyInt())).thenReturn(false); - + coordinator.update(UNPARTITIONED, RECORD); coordinator.delete(UNPARTITIONED, RECORD); - + verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); verify(mockMutator).update(RECORD); @@ -230,5 +239,6 @@ public class TestMutatorCoordinator { coordinator.close(); verify(mockMutator).close(); + verify(mockPartitionHelper).close(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java new file mode 100644 index 0000000..e779771 --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java @@ -0,0 +1,57 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +public class TestWarehousePartitionHelper { + + private static final Configuration CONFIGURATION = new Configuration(); + private static final Path TABLE_PATH = new Path("table"); + + private static final List<String> UNPARTITIONED_COLUMNS = Collections.emptyList(); + private static final List<String> UNPARTITIONED_VALUES = Collections.emptyList(); + + private static final List<String> PARTITIONED_COLUMNS = Arrays.asList("A", "B"); + private static final List<String> PARTITIONED_VALUES = Arrays.asList("1", "2"); + + private final PartitionHelper unpartitionedHelper; + private final PartitionHelper partitionedHelper; + + public TestWarehousePartitionHelper() throws Exception { + unpartitionedHelper = new WarehousePartitionHelper(CONFIGURATION, TABLE_PATH, UNPARTITIONED_COLUMNS); + partitionedHelper = new WarehousePartitionHelper(CONFIGURATION, TABLE_PATH, PARTITIONED_COLUMNS); + } + + @Test(expected = UnsupportedOperationException.class) + public void createNotSupported() throws Exception { + unpartitionedHelper.createPartitionIfNotExists(UNPARTITIONED_VALUES); + } + + @Test + public void getPathForUnpartitionedTable() throws Exception { + Path path = unpartitionedHelper.getPathForPartition(UNPARTITIONED_VALUES); + assertThat(path, is(TABLE_PATH)); + } + + @Test + public void getPathForPartitionedTable() throws Exception { + Path path = partitionedHelper.getPathForPartition(PARTITIONED_VALUES); + assertThat(path, is(new Path(TABLE_PATH, "A=1/B=2"))); + } + + @Test + public void closeSucceeds() throws IOException { + partitionedHelper.close(); + unpartitionedHelper.close(); + } + +}