Repository: hive Updated Branches: refs/heads/master d8c500b89 -> 706bf724e
HIVE-20891: Call alter_partition in batch when dynamically loading partitions (Laszlo Pinter reviewed by Denys Kuzmenko and Peter Vary) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/706bf724 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/706bf724 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/706bf724 Branch: refs/heads/master Commit: 706bf724ee24c4e88561b677793eb98468abaec1 Parents: d8c500b Author: Laszlo Pinter <lpin...@cloudera.com> Authored: Mon Dec 10 09:59:43 2018 +0100 Committer: Peter Vary <pv...@cloudera.com> Committed: Mon Dec 10 09:59:43 2018 +0100 ---------------------------------------------------------------------- .../metastore/SynchronizedMetaStoreClient.java | 7 +++ .../apache/hadoop/hive/ql/metadata/Hive.java | 51 +++++++++++++++----- 2 files changed, 45 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/706bf724/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java index e8f3623..0372064 100644 --- a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java +++ b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java @@ -87,6 +87,13 @@ public final class SynchronizedMetaStoreClient { client.alter_partition(catName, dbName, tblName, newPart, environmentContext, writeIdList); } + public void alter_partitions(String catName, String dbName, String tblName, + List<Partition> partitions, EnvironmentContext environmentContext, + String writeIdList, long writeId) throws TException { + client.alter_partitions(catName, dbName, tblName, partitions, environmentContext, writeIdList, + writeId); + } + public synchronized LockResponse checkLock(long lockid) throws TException { return client.checkLock(lockid); } http://git-wip-us.apache.org/repos/asf/hive/blob/706bf724/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e185bf4..768a73c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2140,9 +2140,7 @@ public class Hive { private void addPartitionToMetastore(Partition newTPart, boolean hasFollowingStatsTask, Table tbl, TableSnapshot tableSnapshot) throws HiveException{ try { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding new partition " + newTPart.getSpec()); - } + LOG.debug("Adding new partition " + newTPart.getSpec()); getSynchronizedMSC().add_partition(newTPart.getTPartition()); } catch (AlreadyExistsException aee) { // With multiple users concurrently issuing insert statements on the same partition has @@ -2182,12 +2180,15 @@ public class Hive { List<AcidUtils.TableSnapshot> tableSnapshots) throws HiveException { try { + if (partitions.isEmpty() || tableSnapshots.isEmpty()) { + return; + } if (LOG.isDebugEnabled()) { StringBuffer debugMsg = new StringBuffer("Adding new partitions "); partitions.forEach(partition -> debugMsg.append(partition.getSpec() + " ")); LOG.debug(debugMsg.toString()); } - getSynchronizedMSC().add_partitions(partitions.parallelStream().map(Partition::getTPartition) + getSynchronizedMSC().add_partitions(partitions.stream().map(Partition::getTPartition) .collect(Collectors.toList())); } catch(AlreadyExistsException aee) { // With multiple users concurrently issuing insert statements on the same partition has @@ -2308,7 +2309,7 @@ public class Hive { } private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, Table tbl, - Partition newTPart, TableSnapshot tableSnapshot) throws MetaException, TException { + Partition newTPart, TableSnapshot tableSnapshot) throws TException { EnvironmentContext ec = new EnvironmentContext(); if (hasFollowingStatsTask) { ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); @@ -2319,6 +2320,27 @@ public class Hive { tableSnapshot == null ? null : tableSnapshot.getValidWriteIdList()); } + private void setStatsPropAndAlterPartitions(boolean hasFollowingStatsTask, Table tbl, + List<Partition> partitions, + long writeId) throws TException { + if (partitions.isEmpty()) { + return; + } + EnvironmentContext ec = new EnvironmentContext(); + if (hasFollowingStatsTask) { + ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); + } + if (LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder("Altering existing partitions "); + partitions.forEach(p -> sb.append(p.getSpec())); + LOG.debug(sb.toString()); + } + + getSynchronizedMSC().alter_partitions(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), + partitions.stream().map(Partition::getTPartition).collect(Collectors.toList()), + ec, null, writeId); + } + /** * Walk through sub-directory tree to construct list bucketing location map. * @@ -2553,7 +2575,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // Match valid partition path to partitions while (iterator.hasNext()) { Partition partition = iterator.next(); - partitionDetailsMap.entrySet().parallelStream() + partitionDetailsMap.entrySet().stream() .filter(entry -> entry.getValue().fullSpec.equals(partition.getSpec())) .findAny().ifPresent(entry -> { entry.getValue().partition = partition; @@ -2633,14 +2655,14 @@ private void constructOneLBLocationMap(FileStatus fSta, addPartitionsToMetastore( partitionDetailsMap.entrySet() - .parallelStream() + .stream() .filter(entry -> !entry.getValue().hasOldPartition) .map(entry -> entry.getValue().partition) .collect(Collectors.toList()), hasFollowingStatsTask, tbl, partitionDetailsMap.entrySet() - .parallelStream() + .stream() .filter(entry -> !entry.getValue().hasOldPartition) .map(entry -> entry.getValue().tableSnapshot) .collect(Collectors.toList())); @@ -2652,11 +2674,14 @@ private void constructOneLBLocationMap(FileStatus fSta, if (isTxnTable && partitionDetails.newFiles != null) { addWriteNotificationLog(tbl, partitionDetails.fullSpec, partitionDetails.newFiles, writeId); } - if (partitionDetails.hasOldPartition) { - setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, partitionDetails.partition, - partitionDetails.tableSnapshot); - } } + + setStatsPropAndAlterPartitions(hasFollowingStatsTask, tbl, + partitionDetailsMap.entrySet().stream() + .filter(entry -> entry.getValue().hasOldPartition) + .map(entry -> entry.getValue().partition) + .collect(Collectors.toList()), writeId); + } catch (InterruptedException | ExecutionException e) { throw new HiveException("Exception when loading " + validPartitions.size() + " in table " + tbl.getTableName() @@ -2687,7 +2712,7 @@ private void constructOneLBLocationMap(FileStatus fSta, try { if (isAcid) { List<String> partNames = - result.values().parallelStream().map(Partition::getName).collect(Collectors.toList()); + result.values().stream().map(Partition::getName).collect(Collectors.toList()); getMSC().addDynamicPartitions(parentSession.getTxnMgr().getCurrentTxnId(), writeId, tbl.getDbName(), tbl.getTableName(), partNames, AcidUtils.toDataOperationType(operation));