Repository: hive Updated Branches: refs/heads/branch-3 0e1de2e5b -> bf39dcb4d
HIVE-19965: Make HiveEndPoint use IMetaStoreClient.add_partition (Eugene Koifman, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bf39dcb4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bf39dcb4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bf39dcb4 Branch: refs/heads/branch-3 Commit: bf39dcb4d4da2fa1fc5258f6204f0ec0213ffb52 Parents: 0e1de2e Author: Eugene Koifman <ekoif...@apache.org> Authored: Sun Jun 24 15:27:50 2018 -0700 Committer: Eugene Koifman <ekoif...@apache.org> Committed: Sun Jun 24 15:27:50 2018 -0700 ---------------------------------------------------------------------- .../hive/hcatalog/streaming/HiveEndPoint.java | 142 +++++++++---------- .../hive/hcatalog/streaming/TestStreaming.java | 12 ++ 2 files changed, 78 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/bf39dcb4/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 3ee19dd..3604630 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -19,9 +19,15 @@ package org.apache.hive.hcatalog.streaming; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.cli.CliSessionState; @@ -55,6 +61,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; /** * Information about the hive end point (i.e. table or partition) to write to. @@ -294,42 +301,42 @@ public class HiveEndPoint { private final String agentInfo; /** - * @param endPoint end point to connect to - * @param ugi on behalf of whom streaming is done. cannot be null - * @param conf HiveConf object + * @param endPoint end point to connect to + * @param ugi on behalf of whom streaming is done. cannot be null + * @param conf HiveConf object * @param createPart create the partition if it does not exist - * @throws ConnectionError if there is trouble connecting - * @throws InvalidPartition if specified partition does not exist (and createPart=false) - * @throws InvalidTable if specified table does not exist + * @throws ConnectionError if there is trouble connecting + * @throws InvalidPartition if specified partition does not exist (and createPart=false) + * @throws InvalidTable if specified table does not exist * @throws PartitionCreationFailed if createPart=true and not able to create partition */ private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi, - HiveConf conf, boolean createPart, String agentInfo) - throws ConnectionError, InvalidPartition, InvalidTable - , PartitionCreationFailed { + HiveConf conf, boolean createPart, String agentInfo) + throws ConnectionError, InvalidPartition, InvalidTable + , PartitionCreationFailed { this.endPt = endPoint; this.ugi = ugi; this.agentInfo = agentInfo; - this.username = ugi==null ? System.getProperty("user.name") : ugi.getShortUserName(); - if (conf==null) { + this.username = ugi == null ? System.getProperty("user.name") : ugi.getShortUserName(); + if (conf == null) { conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri); + } else { + overrideConfSettings(conf); } - else { - overrideConfSettings(conf); - } - this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials(); + this.secureMode = ugi == null ? false : ugi.hasKerberosCredentials(); this.msClient = getMetaStoreClient(endPoint, conf, secureMode); // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are // isolated from the other transaction related RPC calls. this.heartbeaterMSClient = getMetaStoreClient(endPoint, conf, secureMode); checkEndPoint(endPoint, msClient); - if (createPart && !endPoint.partitionVals.isEmpty()) { + if (createPart && !endPoint.partitionVals.isEmpty()) { createPartitionIfNotExists(endPoint, msClient, conf); } } /** * Checks the validity of endpoint + * * @param endPoint the HiveEndPoint to be checked * @param msClient the metastore client * @throws InvalidTable @@ -372,13 +379,13 @@ public class HiveEndPoint { */ @Override public void close() { - if (ugi==null) { + if (ugi == null) { msClient.close(); heartbeaterMSClient.close(); return; } try { - ugi.doAs ( + ugi.doAs( new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { @@ -386,7 +393,7 @@ public class HiveEndPoint { heartbeaterMSClient.close(); return null; } - } ); + }); try { FileSystem.closeAllForUGI(ugi); } catch (IOException exception) { @@ -408,91 +415,74 @@ public class HiveEndPoint { * Acquires a new batch of transactions from Hive. * * @param numTransactions is a hint from client indicating how many transactions client needs. - * @param recordWriter Used to write record. The same writer instance can - * be shared with another TransactionBatch (to the same endpoint) - * only after the first TransactionBatch has been closed. - * Writer will be closed when the TransactionBatch is closed. + * @param recordWriter Used to write record. The same writer instance can + * be shared with another TransactionBatch (to the same endpoint) + * only after the first TransactionBatch has been closed. + * Writer will be closed when the TransactionBatch is closed. * @return - * @throws StreamingIOFailure if failed to create new RecordUpdater for batch + * @throws StreamingIOFailure if failed to create new RecordUpdater for batch * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch - * @throws ImpersonationFailed failed to run command as proxyUser + * @throws ImpersonationFailed failed to run command as proxyUser * @throws InterruptedException */ @Override public TransactionBatch fetchTransactionBatch(final int numTransactions, - final RecordWriter recordWriter) - throws StreamingException, TransactionBatchUnAvailable, ImpersonationFailed - , InterruptedException { - if (ugi==null) { + final RecordWriter recordWriter) + throws StreamingException, TransactionBatchUnAvailable, ImpersonationFailed + , InterruptedException { + if (ugi == null) { return fetchTransactionBatchImpl(numTransactions, recordWriter); } try { - return ugi.doAs ( - new PrivilegedExceptionAction<TransactionBatch>() { - @Override - public TransactionBatch run() throws StreamingException, InterruptedException { - return fetchTransactionBatchImpl(numTransactions, recordWriter); - } - } + return ugi.doAs( + new PrivilegedExceptionAction<TransactionBatch>() { + @Override + public TransactionBatch run() throws StreamingException, InterruptedException { + return fetchTransactionBatchImpl(numTransactions, recordWriter); + } + } ); } catch (IOException e) { throw new ImpersonationFailed("Failed to fetch Txn Batch as user '" + ugi.getShortUserName() - + "' when acquiring Transaction Batch on endPoint " + endPt, e); + + "' when acquiring Transaction Batch on endPoint " + endPt, e); } } private TransactionBatch fetchTransactionBatchImpl(int numTransactions, - RecordWriter recordWriter) - throws StreamingException, TransactionBatchUnAvailable, InterruptedException { + RecordWriter recordWriter) + throws StreamingException, TransactionBatchUnAvailable, InterruptedException { return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient, heartbeaterMSClient, recordWriter, agentInfo); } - private static void createPartitionIfNotExists(HiveEndPoint ep, - IMetaStoreClient msClient, HiveConf conf) - throws InvalidTable, PartitionCreationFailed { + IMetaStoreClient msClient, HiveConf conf) throws PartitionCreationFailed { if (ep.partitionVals.isEmpty()) { return; } - SessionState localSession = null; - if(SessionState.get() == null) { - localSession = SessionState.start(new CliSessionState(conf)); - } - IDriver driver = DriverFactory.newDriver(conf); try { - if (LOG.isDebugEnabled()) { - LOG.debug("Attempting to create partition (if not existent) " + ep); - } - - List<FieldSchema> partKeys = msClient.getTable(ep.database, ep.table) - .getPartitionKeys(); - runDDL(driver, "use " + ep.database); - String query = "alter table " + ep.table + " add if not exists partition " - + partSpecStr(partKeys, ep.partitionVals); - runDDL(driver, query); - } catch (MetaException e) { - LOG.error("Failed to create partition : " + ep, e); - throw new PartitionCreationFailed(ep, e); - } catch (NoSuchObjectException e) { - LOG.error("Failed to create partition : " + ep, e); - throw new InvalidTable(ep.database, ep.table); - } catch (TException e) { - LOG.error("Failed to create partition : " + ep, e); - throw new PartitionCreationFailed(ep, e); - } catch (QueryFailedException e) { + org.apache.hadoop.hive.ql.metadata.Table tableObject = + new org.apache.hadoop.hive.ql.metadata.Table(msClient.getTable(ep.database, ep.table)); + Map<String, String> partSpec = + Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), ep.partitionVals); + + AddPartitionDesc addPartitionDesc = new AddPartitionDesc(ep.database, ep.table, true); + String partLocation = new Path(tableObject.getDataLocation(), + Warehouse.makePartPath(partSpec)).toString(); + addPartitionDesc.addPartition(partSpec, partLocation); + Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, + addPartitionDesc.getPartition(0), conf); + msClient.add_partition(partition); + } + catch (AlreadyExistsException e) { + //ignore this - multiple clients may be trying to create the same partition + //AddPartitionDesc has ifExists flag but it's not propagated to + // HMSHnalder.add_partitions_core() and so it throws... + } + catch(HiveException|TException e) { LOG.error("Failed to create partition : " + ep, e); throw new PartitionCreationFailed(ep, e); - } finally { - driver.close(); - try { - if(localSession != null) { - localSession.close(); - } - } catch (IOException e) { - LOG.warn("Error closing SessionState used to run Hive DDL."); - } } } http://git-wip-us.apache.org/repos/asf/hive/blob/bf39dcb4/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 5e5bc83..137323c 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -1355,6 +1355,18 @@ public class TestStreaming { } } + /** + * Make sure that creating an already existing partion is handled gracefully + * @throws Exception + */ + @Test + public void testCreatePartition() throws Exception { + final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); + StreamingConnection conn = ep.newConnection(true); + conn.close(); + conn = ep.newConnection(true); + conn.close(); + } @Test public void testConcurrentTransactionBatchCommits() throws Exception { final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);