Repository: hive
Updated Branches:
  refs/heads/master ba210c904 -> be1955f61


HIVE-19965: Make HiveEndPoint use IMetaStoreClient.add_partition (Eugene 
Koifman, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/be1955f6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/be1955f6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/be1955f6

Branch: refs/heads/master
Commit: be1955f61496925a5e452228b46d8d9a3f08ddbd
Parents: ba210c9
Author: Eugene Koifman <ekoif...@apache.org>
Authored: Sun Jun 24 15:27:12 2018 -0700
Committer: Eugene Koifman <ekoif...@apache.org>
Committed: Sun Jun 24 15:27:12 2018 -0700

----------------------------------------------------------------------
 .../hive/hcatalog/streaming/HiveEndPoint.java   | 142 +++++++++----------
 .../hive/hcatalog/streaming/TestStreaming.java  |  12 ++
 2 files changed, 78 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/be1955f6/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 3ee19dd..3604630 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -19,9 +19,15 @@
 package org.apache.hive.hcatalog.streaming;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.cli.CliSessionState;
@@ -55,6 +61,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Information about the hive end point (i.e. table or partition) to write to.
@@ -294,42 +301,42 @@ public class HiveEndPoint {
     private final String agentInfo;
 
     /**
-     * @param endPoint end point to connect to
-     * @param ugi on behalf of whom streaming is done. cannot be null
-     * @param conf HiveConf object
+     * @param endPoint   end point to connect to
+     * @param ugi        on behalf of whom streaming is done. cannot be null
+     * @param conf       HiveConf object
      * @param createPart create the partition if it does not exist
-     * @throws ConnectionError if there is trouble connecting
-     * @throws InvalidPartition if specified partition does not exist (and 
createPart=false)
-     * @throws InvalidTable if specified table does not exist
+     * @throws ConnectionError         if there is trouble connecting
+     * @throws InvalidPartition        if specified partition does not exist 
(and createPart=false)
+     * @throws InvalidTable            if specified table does not exist
      * @throws PartitionCreationFailed if createPart=true and not able to 
create partition
      */
     private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi,
-                           HiveConf conf, boolean createPart, String agentInfo)
-            throws ConnectionError, InvalidPartition, InvalidTable
-                   , PartitionCreationFailed {
+        HiveConf conf, boolean createPart, String agentInfo)
+        throws ConnectionError, InvalidPartition, InvalidTable
+        , PartitionCreationFailed {
       this.endPt = endPoint;
       this.ugi = ugi;
       this.agentInfo = agentInfo;
-      this.username = ugi==null ? System.getProperty("user.name") : 
ugi.getShortUserName();
-      if (conf==null) {
+      this.username = ugi == null ? System.getProperty("user.name") : 
ugi.getShortUserName();
+      if (conf == null) {
         conf = HiveEndPoint.createHiveConf(this.getClass(), 
endPoint.metaStoreUri);
+      } else {
+        overrideConfSettings(conf);
       }
-      else {
-          overrideConfSettings(conf);
-      }
-      this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
+      this.secureMode = ugi == null ? false : ugi.hasKerberosCredentials();
       this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
       // We use a separate metastore client for heartbeat calls to ensure 
heartbeat RPC calls are
       // isolated from the other transaction related RPC calls.
       this.heartbeaterMSClient = getMetaStoreClient(endPoint, conf, 
secureMode);
       checkEndPoint(endPoint, msClient);
-      if (createPart  &&  !endPoint.partitionVals.isEmpty()) {
+      if (createPart && !endPoint.partitionVals.isEmpty()) {
         createPartitionIfNotExists(endPoint, msClient, conf);
       }
     }
 
     /**
      * Checks the validity of endpoint
+     *
      * @param endPoint the HiveEndPoint to be checked
      * @param msClient the metastore client
      * @throws InvalidTable
@@ -372,13 +379,13 @@ public class HiveEndPoint {
      */
     @Override
     public void close() {
-      if (ugi==null) {
+      if (ugi == null) {
         msClient.close();
         heartbeaterMSClient.close();
         return;
       }
       try {
-        ugi.doAs (
+        ugi.doAs(
             new PrivilegedExceptionAction<Void>() {
               @Override
               public Void run() throws Exception {
@@ -386,7 +393,7 @@ public class HiveEndPoint {
                 heartbeaterMSClient.close();
                 return null;
               }
-            } );
+            });
         try {
           FileSystem.closeAllForUGI(ugi);
         } catch (IOException exception) {
@@ -408,91 +415,74 @@ public class HiveEndPoint {
      * Acquires a new batch of transactions from Hive.
      *
      * @param numTransactions is a hint from client indicating how many 
transactions client needs.
-     * @param recordWriter  Used to write record. The same writer instance can
-     *                      be shared with another TransactionBatch (to the 
same endpoint)
-     *                      only after the first TransactionBatch has been 
closed.
-     *                      Writer will be closed when the TransactionBatch is 
closed.
+     * @param recordWriter    Used to write record. The same writer instance 
can
+     *                        be shared with another TransactionBatch (to the 
same endpoint)
+     *                        only after the first TransactionBatch has been 
closed.
+     *                        Writer will be closed when the TransactionBatch 
is closed.
      * @return
-     * @throws StreamingIOFailure if failed to create new RecordUpdater for 
batch
+     * @throws StreamingIOFailure          if failed to create new 
RecordUpdater for batch
      * @throws TransactionBatchUnAvailable if failed to acquire a new 
Transaction batch
-     * @throws ImpersonationFailed failed to run command as proxyUser
+     * @throws ImpersonationFailed         failed to run command as proxyUser
      * @throws InterruptedException
      */
     @Override
     public TransactionBatch fetchTransactionBatch(final int numTransactions,
-                                                      final RecordWriter 
recordWriter)
-            throws StreamingException, TransactionBatchUnAvailable, 
ImpersonationFailed
-                  , InterruptedException {
-      if (ugi==null) {
+        final RecordWriter recordWriter)
+        throws StreamingException, TransactionBatchUnAvailable, 
ImpersonationFailed
+        , InterruptedException {
+      if (ugi == null) {
         return fetchTransactionBatchImpl(numTransactions, recordWriter);
       }
       try {
-        return ugi.doAs (
-                new PrivilegedExceptionAction<TransactionBatch>() {
-                  @Override
-                  public TransactionBatch run() throws StreamingException, 
InterruptedException {
-                    return fetchTransactionBatchImpl(numTransactions, 
recordWriter);
-                  }
-                }
+        return ugi.doAs(
+            new PrivilegedExceptionAction<TransactionBatch>() {
+              @Override
+              public TransactionBatch run() throws StreamingException, 
InterruptedException {
+                return fetchTransactionBatchImpl(numTransactions, 
recordWriter);
+              }
+            }
         );
       } catch (IOException e) {
         throw new ImpersonationFailed("Failed to fetch Txn Batch as user '" + 
ugi.getShortUserName()
-                + "' when acquiring Transaction Batch on endPoint " + endPt, 
e);
+            + "' when acquiring Transaction Batch on endPoint " + endPt, e);
       }
     }
 
     private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
-                                                  RecordWriter recordWriter)
-            throws StreamingException, TransactionBatchUnAvailable, 
InterruptedException {
+        RecordWriter recordWriter)
+        throws StreamingException, TransactionBatchUnAvailable, 
InterruptedException {
       return new TransactionBatchImpl(username, ugi, endPt, numTransactions, 
msClient,
           heartbeaterMSClient, recordWriter, agentInfo);
     }
 
-
     private static void createPartitionIfNotExists(HiveEndPoint ep,
-                                                   IMetaStoreClient msClient, 
HiveConf conf)
-            throws InvalidTable, PartitionCreationFailed {
+        IMetaStoreClient msClient, HiveConf conf) throws 
PartitionCreationFailed {
       if (ep.partitionVals.isEmpty()) {
         return;
       }
-      SessionState localSession = null;
-      if(SessionState.get() == null) {
-        localSession = SessionState.start(new CliSessionState(conf));
-      }
-      IDriver driver = DriverFactory.newDriver(conf);
 
       try {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Attempting to create partition (if not existent) " + ep);
-        }
-
-        List<FieldSchema> partKeys = msClient.getTable(ep.database, ep.table)
-                .getPartitionKeys();
-        runDDL(driver, "use " + ep.database);
-        String query = "alter table " + ep.table + " add if not exists 
partition "
-                + partSpecStr(partKeys, ep.partitionVals);
-        runDDL(driver, query);
-      } catch (MetaException e) {
-        LOG.error("Failed to create partition : " + ep, e);
-        throw new PartitionCreationFailed(ep, e);
-      } catch (NoSuchObjectException e) {
-        LOG.error("Failed to create partition : " + ep, e);
-        throw new InvalidTable(ep.database, ep.table);
-      } catch (TException e) {
-        LOG.error("Failed to create partition : " + ep, e);
-        throw new PartitionCreationFailed(ep, e);
-      } catch (QueryFailedException e) {
+        org.apache.hadoop.hive.ql.metadata.Table tableObject =
+            new 
org.apache.hadoop.hive.ql.metadata.Table(msClient.getTable(ep.database, 
ep.table));
+        Map<String, String> partSpec =
+            Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), 
ep.partitionVals);
+
+        AddPartitionDesc addPartitionDesc = new AddPartitionDesc(ep.database, 
ep.table, true);
+        String partLocation = new Path(tableObject.getDataLocation(),
+            Warehouse.makePartPath(partSpec)).toString();
+        addPartitionDesc.addPartition(partSpec, partLocation);
+        Partition partition = Hive.convertAddSpecToMetaPartition(tableObject,
+            addPartitionDesc.getPartition(0), conf);
+        msClient.add_partition(partition);
+      }
+      catch (AlreadyExistsException e) {
+        //ignore this - multiple clients may be trying to create the same 
partition
+        //AddPartitionDesc has ifExists flag but it's not propagated to
+        // HMSHnalder.add_partitions_core() and so it throws...
+      }
+      catch(HiveException|TException e) {
         LOG.error("Failed to create partition : " + ep, e);
         throw new PartitionCreationFailed(ep, e);
-      } finally {
-        driver.close();
-        try {
-          if(localSession != null) {
-            localSession.close();
-          }
-        } catch (IOException e) {
-          LOG.warn("Error closing SessionState used to run Hive DDL.");
-        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/be1955f6/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 5e5bc83..137323c 100644
--- 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -1355,6 +1355,18 @@ public class TestStreaming {
     }
   }
 
+  /**
+   * Make sure that creating an already existing partion is handled gracefully
+   * @throws Exception
+   */
+  @Test
+  public void testCreatePartition() throws Exception {
+    final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, 
partitionVals);
+    StreamingConnection conn = ep.newConnection(true);
+    conn.close();
+    conn = ep.newConnection(true);
+    conn.close();
+  }
   @Test
   public void testConcurrentTransactionBatchCommits() throws Exception {
     final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, 
partitionVals);

Reply via email to