Repository: hive
Updated Branches:
  refs/heads/master d8c500b89 -> 706bf724e


HIVE-20891: Call alter_partition in batch when dynamically loading partitions 
(Laszlo Pinter reviewed by Denys Kuzmenko and Peter Vary)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/706bf724
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/706bf724
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/706bf724

Branch: refs/heads/master
Commit: 706bf724ee24c4e88561b677793eb98468abaec1
Parents: d8c500b
Author: Laszlo Pinter <lpin...@cloudera.com>
Authored: Mon Dec 10 09:59:43 2018 +0100
Committer: Peter Vary <pv...@cloudera.com>
Committed: Mon Dec 10 09:59:43 2018 +0100

----------------------------------------------------------------------
 .../metastore/SynchronizedMetaStoreClient.java  |  7 +++
 .../apache/hadoop/hive/ql/metadata/Hive.java    | 51 +++++++++++++++-----
 2 files changed, 45 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/706bf724/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java 
b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
index e8f3623..0372064 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
@@ -87,6 +87,13 @@ public final class SynchronizedMetaStoreClient {
     client.alter_partition(catName, dbName, tblName, newPart, 
environmentContext, writeIdList);
   }
 
+  public void alter_partitions(String catName, String dbName, String tblName,
+                               List<Partition> partitions, EnvironmentContext 
environmentContext,
+                               String writeIdList, long writeId) throws 
TException {
+    client.alter_partitions(catName, dbName, tblName, partitions, 
environmentContext, writeIdList,
+            writeId);
+  }
+
   public synchronized LockResponse checkLock(long lockid) throws TException {
     return client.checkLock(lockid);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/706bf724/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index e185bf4..768a73c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2140,9 +2140,7 @@ public class Hive {
   private void addPartitionToMetastore(Partition newTPart, boolean 
hasFollowingStatsTask,
                                        Table tbl, TableSnapshot tableSnapshot) 
throws HiveException{
     try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Adding new partition " + newTPart.getSpec());
-      }
+      LOG.debug("Adding new partition " + newTPart.getSpec());
       getSynchronizedMSC().add_partition(newTPart.getTPartition());
     } catch (AlreadyExistsException aee) {
       // With multiple users concurrently issuing insert statements on the 
same partition has
@@ -2182,12 +2180,15 @@ public class Hive {
                                         List<AcidUtils.TableSnapshot> 
tableSnapshots)
                                         throws HiveException {
     try {
+      if (partitions.isEmpty() || tableSnapshots.isEmpty()) {
+        return;
+      }
       if (LOG.isDebugEnabled()) {
         StringBuffer debugMsg = new StringBuffer("Adding new partitions ");
         partitions.forEach(partition -> debugMsg.append(partition.getSpec() + 
" "));
         LOG.debug(debugMsg.toString());
       }
-      
getSynchronizedMSC().add_partitions(partitions.parallelStream().map(Partition::getTPartition)
+      
getSynchronizedMSC().add_partitions(partitions.stream().map(Partition::getTPartition)
               .collect(Collectors.toList()));
     } catch(AlreadyExistsException aee) {
       // With multiple users concurrently issuing insert statements on the 
same partition has
@@ -2308,7 +2309,7 @@ public class Hive {
   }
 
   private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, 
Table tbl,
-      Partition newTPart, TableSnapshot tableSnapshot) throws MetaException, 
TException {
+      Partition newTPart, TableSnapshot tableSnapshot) throws TException {
     EnvironmentContext ec = new EnvironmentContext();
     if (hasFollowingStatsTask) {
       ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, 
StatsSetupConst.TRUE);
@@ -2319,6 +2320,27 @@ public class Hive {
         tableSnapshot == null ? null : tableSnapshot.getValidWriteIdList());
   }
 
+  private void setStatsPropAndAlterPartitions(boolean hasFollowingStatsTask, 
Table tbl,
+                                             List<Partition> partitions,
+                                             long writeId) throws TException {
+    if (partitions.isEmpty()) {
+      return;
+    }
+    EnvironmentContext ec = new EnvironmentContext();
+    if (hasFollowingStatsTask) {
+      ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, 
StatsSetupConst.TRUE);
+    }
+    if (LOG.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder("Altering existing partitions ");
+      partitions.forEach(p -> sb.append(p.getSpec()));
+      LOG.debug(sb.toString());
+    }
+
+    getSynchronizedMSC().alter_partitions(tbl.getCatName(), tbl.getDbName(), 
tbl.getTableName(),
+            
partitions.stream().map(Partition::getTPartition).collect(Collectors.toList()),
+            ec, null, writeId);
+  }
+
   /**
  * Walk through sub-directory tree to construct list bucketing location map.
  *
@@ -2553,7 +2575,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     // Match valid partition path to partitions
     while (iterator.hasNext()) {
       Partition partition = iterator.next();
-      partitionDetailsMap.entrySet().parallelStream()
+      partitionDetailsMap.entrySet().stream()
               .filter(entry -> 
entry.getValue().fullSpec.equals(partition.getSpec()))
               .findAny().ifPresent(entry -> {
                 entry.getValue().partition = partition;
@@ -2633,14 +2655,14 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
       addPartitionsToMetastore(
               partitionDetailsMap.entrySet()
-                      .parallelStream()
+                      .stream()
                       .filter(entry -> !entry.getValue().hasOldPartition)
                       .map(entry -> entry.getValue().partition)
                       .collect(Collectors.toList()),
               hasFollowingStatsTask,
               tbl,
               partitionDetailsMap.entrySet()
-                      .parallelStream()
+                      .stream()
                       .filter(entry -> !entry.getValue().hasOldPartition)
                       .map(entry -> entry.getValue().tableSnapshot)
                       .collect(Collectors.toList()));
@@ -2652,11 +2674,14 @@ private void constructOneLBLocationMap(FileStatus fSta,
         if (isTxnTable && partitionDetails.newFiles != null) {
           addWriteNotificationLog(tbl, partitionDetails.fullSpec, 
partitionDetails.newFiles, writeId);
         }
-        if (partitionDetails.hasOldPartition) {
-          setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, 
partitionDetails.partition,
-                  partitionDetails.tableSnapshot);
-        }
       }
+
+      setStatsPropAndAlterPartitions(hasFollowingStatsTask, tbl,
+              partitionDetailsMap.entrySet().stream()
+                      .filter(entry -> entry.getValue().hasOldPartition)
+                      .map(entry -> entry.getValue().partition)
+                      .collect(Collectors.toList()), writeId);
+
     } catch (InterruptedException | ExecutionException e) {
       throw new HiveException("Exception when loading " + 
validPartitions.size()
               + " in table " + tbl.getTableName()
@@ -2687,7 +2712,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     try {
       if (isAcid) {
         List<String> partNames =
-                
result.values().parallelStream().map(Partition::getName).collect(Collectors.toList());
+                
result.values().stream().map(Partition::getName).collect(Collectors.toList());
         
getMSC().addDynamicPartitions(parentSession.getTxnMgr().getCurrentTxnId(), 
writeId,
                 tbl.getDbName(), tbl.getTableName(), partNames,
                 AcidUtils.toDataOperationType(operation));

Reply via email to