http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 915bce3..64bc1a2 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -58,9 +58,13 @@ import java.util.regex.Pattern; import javax.jdo.JDOException; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimaps; import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; @@ -78,6 +82,7 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.io.HdfsUtils; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.events.AddIndexEvent; @@ -114,6 +119,7 @@ import org.apache.hadoop.hive.metastore.events.PreLoadPartitionDoneEvent; import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent; import org.apache.hadoop.hive.metastore.events.PreReadTableEvent; import org.apache.hadoop.hive.metastore.filemeta.OrcFileMetadataHandler; +import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; import org.apache.hadoop.hive.metastore.model.MTableWrite; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -159,10 +165,6 @@ import com.facebook.fb303.fb_status; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimaps; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -170,6 +172,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; */ public class HiveMetaStore extends ThriftHiveMetastore { public static final Logger LOG = LoggerFactory.getLogger(HiveMetaStore.class); + public static final String PARTITION_NUMBER_EXCEED_LIMIT_MSG = + "Number of partitions scanned (=%d) on table '%s' exceeds limit (=%d). This is controlled on the metastore server by %s."; // boolean that tells if the HiveMetaStore (remote) server is being used. // Can be used to determine if the calls to metastore api (HMSHandler) are being made with @@ -234,7 +238,6 @@ public class HiveMetaStore extends ThriftHiveMetastore { public static class HMSHandler extends FacebookBase implements IHMSHandler, ThreadLocalRawStore { public static final Logger LOG = HiveMetaStore.LOG; - private String rawStoreClassName; private final HiveConf hiveConf; // stores datastore (jpox) properties, // right now they come from jpox.properties @@ -416,7 +419,6 @@ public class HiveMetaStore extends ThriftHiveMetastore { @Override public void init() throws MetaException { - rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL); initListeners = MetaStoreUtils.getMetaStoreListeners( MetaStoreInitListener.class, hiveConf, hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS)); @@ -514,7 +516,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { fileMetadataManager = new FileMetadataManager((ThreadLocalRawStore)this, hiveConf); } - private String addPrefix(String s) { + private static String addPrefix(String s) { return threadLocalId.get() + ": " + s; } @@ -589,9 +591,14 @@ public class HiveMetaStore extends ThriftHiveMetastore { @InterfaceStability.Evolving @Override public RawStore getMS() throws MetaException { + Configuration conf = getConf(); + return getMSForConf(conf); + } + + public static RawStore getMSForConf(Configuration conf) throws MetaException { RawStore ms = threadLocalMS.get(); if (ms == null) { - ms = newRawStore(); + ms = newRawStoreForConf(conf); ms.verifySchema(); threadLocalMS.set(ms); ms = threadLocalMS.get(); @@ -608,24 +615,23 @@ public class HiveMetaStore extends ThriftHiveMetastore { return txn; } - private RawStore newRawStore() throws MetaException { - LOG.info(addPrefix("Opening raw store with implementation class:" - + rawStoreClassName)); - Configuration conf = getConf(); - + private static RawStore newRawStoreForConf(Configuration conf) throws MetaException { + HiveConf hiveConf = new HiveConf(conf, HiveConf.class); + String rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL); + LOG.info(addPrefix("Opening raw store with implementation class:" + rawStoreClassName)); if (hiveConf.getBoolVar(ConfVars.METASTORE_FASTPATH)) { LOG.info("Fastpath, skipping raw store proxy"); try { - RawStore rs = ((Class<? extends RawStore>) MetaStoreUtils.getClass( - rawStoreClassName)).newInstance(); - rs.setConf(conf); + RawStore rs = + ((Class<? extends RawStore>) MetaStoreUtils.getClass(rawStoreClassName)) + .newInstance(); + rs.setConf(hiveConf); return rs; } catch (Exception e) { LOG.error("Unable to instantiate raw store directly in fastpath mode", e); throw new RuntimeException(e); } } - return RawStoreProxy.getProxy(hiveConf, conf, rawStoreClassName, threadLocalId.get()); } @@ -874,10 +880,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { Path dbPath = new Path(db.getLocationUri()); boolean success = false; boolean madeDir = false; + Map<String, String> transactionalListenersResponses = Collections.emptyMap(); try { firePreEvent(new PreCreateDatabaseEvent(db, this)); if (!wh.isDir(dbPath)) { - if (!wh.mkdirs(dbPath, true)) { + if (!wh.mkdirs(dbPath)) { throw new MetaException("Unable to create database path " + dbPath + ", failed to create database " + db.getName()); } @@ -886,11 +893,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { ms.openTransaction(); ms.createDatabase(db); - if (transactionalListeners.size() > 0) { - CreateDatabaseEvent cde = new CreateDatabaseEvent(db, true, this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onCreateDatabase(cde); - } + + if (!transactionalListeners.isEmpty()) { + transactionalListenersResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.CREATE_DATABASE, + new CreateDatabaseEvent(db, true, this)); } success = ms.commitTransaction(); @@ -901,8 +909,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { wh.deleteDir(dbPath, true); } } - for (MetaStoreEventListener listener : listeners) { - listener.onCreateDatabase(new CreateDatabaseEvent(db, success, this)); + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.CREATE_DATABASE, + new CreateDatabaseEvent(db, success, this), + null, + transactionalListenersResponses); } } } @@ -1017,6 +1030,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { Database db = null; List<Path> tablePaths = new ArrayList<Path>(); List<Path> partitionPaths = new ArrayList<Path>(); + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); db = ms.getDatabase(name); @@ -1099,12 +1113,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { } if (ms.dropDatabase(name)) { - if (transactionalListeners.size() > 0) { - DropDatabaseEvent dde = new DropDatabaseEvent(db, true, this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onDropDatabase(dde); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DROP_DATABASE, + new DropDatabaseEvent(db, true, this)); } + success = ms.commitTransaction(); } } finally { @@ -1126,8 +1141,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { } // it is not a terrible thing even if the data is not deleted } - for (MetaStoreEventListener listener : listeners) { - listener.onDropDatabase(new DropDatabaseEvent(db, success, this)); + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DROP_DATABASE, + new DropDatabaseEvent(db, success, this), + null, + transactionalListenerResponses); } } } @@ -1353,6 +1373,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); Path tblPath = null; boolean success = false, madeDir = false; try { @@ -1374,7 +1395,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) { if (tbl.getSd().getLocation() == null || tbl.getSd().getLocation().isEmpty()) { - tblPath = wh.getTablePath( + tblPath = wh.getDefaultTablePath( ms.getDatabase(tbl.getDbName()), tbl.getTableName()); } else { if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) { @@ -1388,7 +1409,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (tblPath != null) { if (!wh.isDir(tblPath)) { - if (!wh.mkdirs(tblPath, true)) { + if (!wh.mkdirs(tblPath)) { throw new MetaException(tblPath + " is not a directory or unable to create one"); } @@ -1413,12 +1434,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys); } - if (transactionalListeners.size() > 0) { - CreateTableEvent createTableEvent = new CreateTableEvent(tbl, true, this); - createTableEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onCreateTable(createTableEvent); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.CREATE_TABLE, + new CreateTableEvent(tbl, true, this), + envContext); } success = ms.commitTransaction(); @@ -1429,11 +1450,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { wh.deleteDir(tblPath, true); } } - for (MetaStoreEventListener listener : listeners) { - CreateTableEvent createTableEvent = - new CreateTableEvent(tbl, success, this); - createTableEvent.setEnvironmentContext(envContext); - listener.onCreateTable(createTableEvent); + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.CREATE_TABLE, + new CreateTableEvent(tbl, success, this), + envContext, + transactionalListenerResponses); } } } @@ -1598,6 +1621,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { List<Path> partPaths = null; Table tbl = null; boolean ifPurge = false; + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); // drop any partitions @@ -1642,7 +1666,6 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } - checkTrashPurgeCombination(tblPath, dbname + "." + name, ifPurge, deleteData && !isExternal); // Drop the partitions and get a list of locations which need to be deleted partPaths = dropPartitionsAndGetLocations(ms, dbname, name, tblPath, tbl.getPartitionKeys(), deleteData && !isExternal); @@ -1651,12 +1674,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { throw new MetaException(indexName == null ? "Unable to drop table " + tableName: "Unable to drop index table " + tableName + " for index " + indexName); } else { - if (transactionalListeners.size() > 0) { - DropTableEvent dropTableEvent = new DropTableEvent(tbl, true, deleteData, this); - dropTableEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onDropTable(dropTableEvent); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DROP_TABLE, + new DropTableEvent(tbl, deleteData, true, this), + envContext); } success = ms.commitTransaction(); } @@ -1671,53 +1694,16 @@ public class HiveMetaStore extends ThriftHiveMetastore { deleteTableData(tblPath, ifPurge); // ok even if the data is not deleted } - for (MetaStoreEventListener listener : listeners) { - DropTableEvent dropTableEvent = new DropTableEvent(tbl, success, deleteData, this); - dropTableEvent.setEnvironmentContext(envContext); - listener.onDropTable(dropTableEvent); - } - } - return success; - } - - /** - * Will throw MetaException if combination of trash policy/purge can't be satisfied - * @param pathToData path to data which may potentially be moved to trash - * @param objectName db.table, or db.table.part - * @param ifPurge if PURGE options is specified - */ - private void checkTrashPurgeCombination(Path pathToData, String objectName, boolean ifPurge, - boolean deleteData) throws MetaException { - // There is no need to check TrashPurgeCombination in following cases since Purge/Trash - // is not applicable: - // a) deleteData is false -- drop an external table - // b) pathToData is null -- a view - // c) ifPurge is true -- force delete without Trash - if (!deleteData || pathToData == null || ifPurge) { - return; - } - boolean trashEnabled = false; - try { - trashEnabled = 0 < hiveConf.getFloat("fs.trash.interval", -1); - } catch(NumberFormatException ex) { - // nothing to do - } - - if (trashEnabled) { - try { - HadoopShims.HdfsEncryptionShim shim = - ShimLoader.getHadoopShims().createHdfsEncryptionShim(FileSystem.get(hiveConf), hiveConf); - if (shim.isPathEncrypted(pathToData)) { - throw new MetaException("Unable to drop " + objectName + " because it is in an encryption zone" + - " and trash is enabled. Use PURGE option to skip trash."); - } - } catch (IOException ex) { - MetaException e = new MetaException(ex.getMessage()); - e.initCause(ex); - throw e; + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DROP_TABLE, + new DropTableEvent(tbl, deleteData, success, this), + envContext, + transactionalListenerResponses); } } + return success; } /** @@ -1883,6 +1869,151 @@ public class HiveMetaStore extends ThriftHiveMetastore { } + private void updateStatsForTruncate(Map<String,String> props, EnvironmentContext environmentContext) { + if (null == props) { + return; + } + for (String stat : StatsSetupConst.supportedStats) { + String statVal = props.get(stat); + if (statVal != null) { + //In the case of truncate table, we set the stats to be 0. + props.put(stat, "0"); + } + } + //first set basic stats to true + StatsSetupConst.setBasicStatsState(props, StatsSetupConst.TRUE); + environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK); + //then invalidate column stats + StatsSetupConst.clearColumnStatsState(props); + return; + } + + private void alterPartitionForTruncate(final RawStore ms, + final String dbName, + final String tableName, + final Table table, + final Partition partition) throws Exception { + EnvironmentContext environmentContext = new EnvironmentContext(); + updateStatsForTruncate(partition.getParameters(), environmentContext); + + if (!transactionalListeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.ALTER_PARTITION, + new AlterPartitionEvent(partition, partition, table, true, true, this)); + } + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ALTER_PARTITION, + new AlterPartitionEvent(partition, partition, table, true, true, this)); + } + + alterHandler.alterPartition(ms, wh, dbName, tableName, null, partition, environmentContext, this); + } + + private void alterTableStatsForTruncate(final RawStore ms, + final String dbName, + final String tableName, + final Table table, + final List<String> partNames) throws Exception { + if (partNames == null) { + if (0 != table.getPartitionKeysSize()) { + for (Partition partition : ms.getPartitions(dbName, tableName, Integer.MAX_VALUE)) { + alterPartitionForTruncate(ms, dbName, tableName, table, partition); + } + } else { + EnvironmentContext environmentContext = new EnvironmentContext(); + updateStatsForTruncate(table.getParameters(), environmentContext); + + if (!transactionalListeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.ALTER_TABLE, + new AlterTableEvent(table, table, true, true, this)); + } + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ALTER_TABLE, + new AlterTableEvent(table, table, true, true, this)); + } + + alterHandler.alterTable(ms, wh, dbName, tableName, table, environmentContext, this); + } + } else { + for (Partition partition : ms.getPartitionsByNames(dbName, tableName, partNames)) { + alterPartitionForTruncate(ms, dbName, tableName, table, partition); + } + } + return; + } + + private List<Path> getLocationsForTruncate(final RawStore ms, + final String dbName, + final String tableName, + final Table table, + final List<String> partNames) throws Exception { + List<Path> locations = new ArrayList<Path>(); + if (partNames == null) { + if (0 != table.getPartitionKeysSize()) { + for (Partition partition : ms.getPartitions(dbName, tableName, Integer.MAX_VALUE)) { + locations.add(new Path(partition.getSd().getLocation())); + } + } else { + locations.add(new Path(table.getSd().getLocation())); + } + } else { + for (Partition partition : ms.getPartitionsByNames(dbName, tableName, partNames)) { + locations.add(new Path(partition.getSd().getLocation())); + } + } + return locations; + } + + @Override + public void truncate_table(final String dbName, final String tableName, List<String> partNames) + throws NoSuchObjectException, MetaException { + try { + Table tbl = get_table_core(dbName, tableName); + boolean isAutopurge = (tbl.isSetParameters() && "true".equalsIgnoreCase(tbl.getParameters().get("auto.purge"))); + + // This is not transactional + for (Path location : getLocationsForTruncate(getMS(), dbName, tableName, tbl, partNames)) { + FileSystem fs = location.getFileSystem(getHiveConf()); + HadoopShims.HdfsEncryptionShim shim + = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, getHiveConf()); + if (!shim.isPathEncrypted(location)) { + HdfsUtils.HadoopFileStatus status = new HdfsUtils.HadoopFileStatus(getHiveConf(), fs, location); + FileStatus targetStatus = fs.getFileStatus(location); + String targetGroup = targetStatus == null ? null : targetStatus.getGroup(); + wh.deleteDir(location, true, isAutopurge); + fs.mkdirs(location); + HdfsUtils.setFullFileStatus(getHiveConf(), status, targetGroup, fs, location, false); + } else { + FileStatus[] statuses = fs.listStatus(location, FileUtils.HIDDEN_FILES_PATH_FILTER); + if (statuses == null || statuses.length == 0) { + continue; + } + for (final FileStatus status : statuses) { + wh.deleteDir(status.getPath(), true, isAutopurge); + } + } + } + + // Alter the table/partition stats and also notify truncate table event + alterTableStatsForTruncate(getMS(), dbName, tableName, tbl, partNames); + } catch (IOException e) { + throw new MetaException(e.getMessage()); + } catch (Exception e) { + if (e instanceof MetaException) { + throw (MetaException) e; + } else if (e instanceof NoSuchObjectException) { + throw (NoSuchObjectException) e; + } else { + throw newMetaException(e); + } + } + } + /** * Is this an external table? * @@ -2135,6 +2266,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { boolean success = false, madeDir = false; Path partLocation = null; Table tbl = null; + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); part.setDbName(dbName); @@ -2173,7 +2305,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { } if (!wh.isDir(partLocation)) { - if (!wh.mkdirs(partLocation, true)) { + if (!wh.mkdirs(partLocation)) { throw new MetaException(partLocation + " is not a directory or unable to create one"); } @@ -2191,12 +2323,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { } if (ms.addPartition(part)) { - if (transactionalListeners.size() > 0) { - AddPartitionEvent addPartitionEvent = new AddPartitionEvent(tbl, part, true, this); - addPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAddPartition(addPartitionEvent); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, part, true, this), + envContext); } success = ms.commitTransaction(); @@ -2209,11 +2341,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } - for (MetaStoreEventListener listener : listeners) { - AddPartitionEvent addPartitionEvent = - new AddPartitionEvent(tbl, part, success, this); - addPartitionEvent.setEnvironmentContext(envContext); - listener.onAddPartition(addPartitionEvent); + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, part, success, this), + envContext, + transactionalListenerResponses); } } return part; @@ -2358,8 +2491,10 @@ public class HiveMetaStore extends ThriftHiveMetastore { final Map<PartValEqWrapper, Boolean> addedPartitions = Collections.synchronizedMap(new HashMap<PartValEqWrapper, Boolean>()); final List<Partition> newParts = new ArrayList<Partition>(); - final List<Partition> existingParts = new ArrayList<Partition>();; + final List<Partition> existingParts = new ArrayList<Partition>(); Table tbl = null; + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); + try { ms.openTransaction(); tbl = ms.getTable(dbName, tblName); @@ -2445,7 +2580,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { success = false; // Notification is generated for newly created partitions only. The subset of partitions // that already exist (existingParts), will not generate notifications. - fireMetaStoreAddPartitionEventTransactional(tbl, newParts, null, true); + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, newParts, true, this)); + } + success = ms.commitTransaction(); } finally { if (!success) { @@ -2456,12 +2597,26 @@ public class HiveMetaStore extends ThriftHiveMetastore { wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true); } } - fireMetaStoreAddPartitionEvent(tbl, parts, null, false); + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, parts, false, this)); + } } else { - fireMetaStoreAddPartitionEvent(tbl, newParts, null, true); - if (existingParts != null) { - // The request has succeeded but we failed to add these partitions. - fireMetaStoreAddPartitionEvent(tbl, existingParts, null, false); + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, newParts, true, this), + null, + transactionalListenerResponses); + + if (!existingParts.isEmpty()) { + // The request has succeeded but we failed to add these partitions. + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, existingParts, false, this)); + } } } } @@ -2548,6 +2703,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { final PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy .getPartitionIterator(); Table tbl = null; + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); tbl = ms.getTable(dbName, tblName); @@ -2621,7 +2777,14 @@ public class HiveMetaStore extends ThriftHiveMetastore { success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists); //setting success to false to make sure that if the listener fails, rollback happens. success = false; - fireMetaStoreAddPartitionEventTransactional(tbl, partitionSpecProxy, null, true); + + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, partitionSpecProxy, true, this)); + } + success = ms.commitTransaction(); return addedPartitions.size(); } finally { @@ -2634,7 +2797,14 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } } - fireMetaStoreAddPartitionEvent(tbl, partitionSpecProxy, null, true); + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, partitionSpecProxy, true, this), + null, + transactionalListenerResponses); + } } } @@ -2686,7 +2856,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { // mkdirs() because if the file system is read-only, mkdirs will // throw an exception even if the directory already exists. if (!wh.isDir(partLocation)) { - if (!wh.mkdirs(partLocation, true)) { + if (!wh.mkdirs(partLocation)) { throw new MetaException(partLocation + " is not a directory or unable to create one"); } @@ -2739,6 +2909,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { throws InvalidObjectException, AlreadyExistsException, MetaException, TException { boolean success = false; Table tbl = null; + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); tbl = ms.getTable(part.getDbName(), part.getTableName()); @@ -2763,7 +2934,16 @@ public class HiveMetaStore extends ThriftHiveMetastore { // Setting success to false to make sure that if the listener fails, rollback happens. success = false; - fireMetaStoreAddPartitionEventTransactional(tbl, Arrays.asList(part), envContext, true); + + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, Arrays.asList(part), true, this), + envContext); + + } + // we proceed only if we'd actually succeeded anyway, otherwise, // we'd have thrown an exception success = ms.commitTransaction(); @@ -2771,64 +2951,19 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (!success) { ms.rollbackTransaction(); } - fireMetaStoreAddPartitionEvent(tbl, Arrays.asList(part), envContext, success); - } - return part; - } - private void fireMetaStoreAddPartitionEvent(final Table tbl, - final List<Partition> parts, final EnvironmentContext envContext, boolean success) - throws MetaException { - if (tbl != null && parts != null && !parts.isEmpty()) { - AddPartitionEvent addPartitionEvent = - new AddPartitionEvent(tbl, parts, success, this); - addPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener listener : listeners) { - listener.onAddPartition(addPartitionEvent); - } - } - } + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(tbl, Arrays.asList(part), success, this), + envContext, + transactionalListenerResponses); - private void fireMetaStoreAddPartitionEvent(final Table tbl, - final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean success) - throws MetaException { - if (tbl != null && partitionSpec != null) { - AddPartitionEvent addPartitionEvent = - new AddPartitionEvent(tbl, partitionSpec, success, this); - addPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener listener : listeners) { - listener.onAddPartition(addPartitionEvent); - } - } - } - - private void fireMetaStoreAddPartitionEventTransactional(final Table tbl, - final List<Partition> parts, final EnvironmentContext envContext, boolean success) - throws MetaException { - if (tbl != null && parts != null && !parts.isEmpty()) { - AddPartitionEvent addPartitionEvent = - new AddPartitionEvent(tbl, parts, success, this); - addPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAddPartition(addPartitionEvent); - } - } - } - - private void fireMetaStoreAddPartitionEventTransactional(final Table tbl, - final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean success) - throws MetaException { - if (tbl != null && partitionSpec != null) { - AddPartitionEvent addPartitionEvent = - new AddPartitionEvent(tbl, partitionSpec, success, this); - addPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAddPartition(addPartitionEvent); } } + return part; } - @Override public Partition add_partition(final Partition part) throws InvalidObjectException, AlreadyExistsException, MetaException { @@ -2911,6 +3046,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { Path destPath = new Path(destinationTable.getSd().getLocation(), Warehouse.makePartName(partitionKeysPresent, partValsPresent)); List<Partition> destPartitions = new ArrayList<Partition>(); + + Map<String, String> transactionalListenerResponsesForAddPartition = Collections.emptyMap(); + List<Map<String, String>> transactionalListenerResponsesForDropPartition = + Lists.newArrayListWithCapacity(partitionsToExchange.size()); + try { for (Partition partition: partitionsToExchange) { Partition destPartition = new Partition(partition); @@ -2926,7 +3066,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { } Path destParentPath = destPath.getParent(); if (!wh.isDir(destParentPath)) { - if (!wh.mkdirs(destParentPath, true)) { + if (!wh.mkdirs(destParentPath)) { throw new MetaException("Unable to create path " + destParentPath); } } @@ -2938,8 +3078,22 @@ public class HiveMetaStore extends ThriftHiveMetastore { // Setting success to false to make sure that if the listener fails, rollback happens. success = false; - fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange, - destinationTable, destPartitions, transactionalListeners, true); + + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponsesForAddPartition = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.ADD_PARTITION, + new AddPartitionEvent(destinationTable, destPartitions, true, this)); + + for (Partition partition : partitionsToExchange) { + DropPartitionEvent dropPartitionEvent = + new DropPartitionEvent(sourceTable, partition, true, true, this); + transactionalListenerResponsesForDropPartition.add( + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DROP_PARTITION, + dropPartitionEvent)); + } + } success = ms.commitTransaction(); return destPartitions; @@ -2949,34 +3103,31 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (pathCreated) { wh.renameDir(destPath, sourcePath); } - - fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange, - destinationTable, destPartitions, listeners, success); } - } - } - private void fireMetaStoreExchangePartitionEvent(Table sourceTable, - List<Partition> partitionsToExchange, Table destinationTable, - List<Partition> destPartitions, - List<MetaStoreEventListener> eventListeners, - boolean status) throws MetaException { - if (sourceTable != null && destinationTable != null - && !CollectionUtils.isEmpty(partitionsToExchange) - && !CollectionUtils.isEmpty(destPartitions)) { - if (eventListeners.size() > 0) { - AddPartitionEvent addPartitionEvent = - new AddPartitionEvent(destinationTable, destPartitions, status, this); - for (MetaStoreEventListener eventListener : eventListeners) { - eventListener.onAddPartition(addPartitionEvent); - } + if (!listeners.isEmpty()) { + AddPartitionEvent addPartitionEvent = new AddPartitionEvent(destinationTable, destPartitions, success, this); + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ADD_PARTITION, + addPartitionEvent, + null, + transactionalListenerResponsesForAddPartition); + i = 0; for (Partition partition : partitionsToExchange) { DropPartitionEvent dropPartitionEvent = - new DropPartitionEvent(sourceTable, partition, true, status, this); - for (MetaStoreEventListener eventListener : eventListeners) { - eventListener.onDropPartition(dropPartitionEvent); - } + new DropPartitionEvent(sourceTable, partition, success, true, this); + Map<String, String> parameters = + (transactionalListenerResponsesForDropPartition.size() > i) + ? transactionalListenerResponsesForDropPartition.get(i) + : null; + + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DROP_PARTITION, + dropPartitionEvent, + null, + parameters); + i++; } } } @@ -2994,6 +3145,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { Path archiveParentDir = null; boolean mustPurge = false; boolean isExternalTbl = false; + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); @@ -3012,27 +3164,23 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (isArchived) { archiveParentDir = MetaStoreUtils.getOriginalLocation(part); verifyIsWritablePath(archiveParentDir); - checkTrashPurgeCombination(archiveParentDir, db_name + "." + tbl_name + "." + part_vals, - mustPurge, deleteData && !isExternalTbl); } if ((part.getSd() != null) && (part.getSd().getLocation() != null)) { partPath = new Path(part.getSd().getLocation()); verifyIsWritablePath(partPath); - checkTrashPurgeCombination(partPath, db_name + "." + tbl_name + "." + part_vals, - mustPurge, deleteData && !isExternalTbl); } if (!ms.dropPartition(db_name, tbl_name, part_vals)) { throw new MetaException("Unable to drop partition"); } else { - if (transactionalListeners.size() > 0) { - DropPartitionEvent dropPartitionEvent = - new DropPartitionEvent(tbl, part, true, deleteData, this); - dropPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onDropPartition(dropPartitionEvent); - } + if (!transactionalListeners.isEmpty()) { + + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DROP_PARTITION, + new DropPartitionEvent(tbl, part, true, deleteData, this), + envContext); } success = ms.commitTransaction(); } @@ -3060,11 +3208,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { // ok even if the data is not deleted } } - for (MetaStoreEventListener listener : listeners) { - DropPartitionEvent dropPartitionEvent = - new DropPartitionEvent(tbl, part, success, deleteData, this); - dropPartitionEvent.setEnvironmentContext(envContext); - listener.onDropPartition(dropPartitionEvent); + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DROP_PARTITION, + new DropPartitionEvent(tbl, part, success, deleteData, this), + envContext, + transactionalListenerResponses); } } return true; @@ -3126,6 +3275,8 @@ public class HiveMetaStore extends ThriftHiveMetastore { List<Partition> parts = null; boolean mustPurge = false; boolean isExternalTbl = false; + List<Map<String, String>> transactionalListenerResponses = Lists.newArrayList(); + try { // We need Partition-s for firing events and for result; DN needs MPartition-s to drop. // Great... Maybe we could bypass fetching MPartitions by issuing direct SQL deletes. @@ -3195,28 +3346,23 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (MetaStoreUtils.isArchived(part)) { Path archiveParentDir = MetaStoreUtils.getOriginalLocation(part); verifyIsWritablePath(archiveParentDir); - checkTrashPurgeCombination(archiveParentDir, dbName + "." + tblName + "." + - part.getValues(), mustPurge, deleteData && !isExternalTbl); archToDelete.add(archiveParentDir); } if ((part.getSd() != null) && (part.getSd().getLocation() != null)) { Path partPath = new Path(part.getSd().getLocation()); verifyIsWritablePath(partPath); - checkTrashPurgeCombination(partPath, dbName + "." + tblName + "." + part.getValues(), - mustPurge, deleteData && !isExternalTbl); dirsToDelete.add(new PathAndPartValSize(partPath, part.getValues().size())); } } ms.dropPartitions(dbName, tblName, partNames); - if (parts != null && transactionalListeners.size() > 0) { + if (parts != null && !transactionalListeners.isEmpty()) { for (Partition part : parts) { - DropPartitionEvent dropPartitionEvent = - new DropPartitionEvent(tbl, part, true, deleteData, this); - dropPartitionEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onDropPartition(dropPartitionEvent); - } + transactionalListenerResponses.add( + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DROP_PARTITION, + new DropPartitionEvent(tbl, part, true, deleteData, this), + envContext)); } } @@ -3250,12 +3396,19 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } if (parts != null) { - for (Partition part : parts) { - for (MetaStoreEventListener listener : listeners) { - DropPartitionEvent dropPartitionEvent = - new DropPartitionEvent(tbl, part, success, deleteData, this); - dropPartitionEvent.setEnvironmentContext(envContext); - listener.onDropPartition(dropPartitionEvent); + int i = 0; + if (parts != null && !listeners.isEmpty()) { + for (Partition part : parts) { + Map<String, String> parameters = + (!transactionalListenerResponses.isEmpty()) ? transactionalListenerResponses.get(i) : null; + + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DROP_PARTITION, + new DropPartitionEvent(tbl, part, success, deleteData, this), + envContext, + parameters); + + i++; } } } @@ -3435,8 +3588,8 @@ public class HiveMetaStore extends ThriftHiveMetastore { int partitionRequest = (maxToFetch < 0) ? numPartitions : maxToFetch; if (partitionRequest > partitionLimit) { String configName = ConfVars.METASTORE_LIMIT_PARTITION_REQUEST.varname; - throw new MetaException(String.format("Number of partitions scanned (=%d) on table '%s' exceeds limit" + - " (=%d). This is controlled on the metastore server by %s.", partitionRequest, tblName, partitionLimit, configName)); + throw new MetaException(String.format(PARTITION_NUMBER_EXCEED_LIMIT_MSG, partitionRequest, + tblName, partitionLimit, configName)); } } } @@ -3678,14 +3831,15 @@ public class HiveMetaStore extends ThriftHiveMetastore { // Only fetch the table if we actually have a listener Table table = null; - for (MetaStoreEventListener listener : listeners) { + if (!listeners.isEmpty()) { if (table == null) { table = getMS().getTable(db_name, tbl_name); } - AlterPartitionEvent alterPartitionEvent = - new AlterPartitionEvent(oldPart, new_part, table, true, this); - alterPartitionEvent.setEnvironmentContext(envContext); - listener.onAlterPartition(alterPartitionEvent); + + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ALTER_PARTITION, + new AlterPartitionEvent(oldPart, new_part, table, false, true, this), + envContext); } } catch (InvalidObjectException e) { ex = e; @@ -3749,13 +3903,15 @@ public class HiveMetaStore extends ThriftHiveMetastore { else { throw new InvalidOperationException("failed to alterpartitions"); } - for (MetaStoreEventListener listener : listeners) { - if (table == null) { - table = getMS().getTable(db_name, tbl_name); - } - AlterPartitionEvent alterPartitionEvent = - new AlterPartitionEvent(oldTmpPart, tmpPart, table, true, this); - listener.onAlterPartition(alterPartitionEvent); + + if (table == null) { + table = getMS().getTable(db_name, tbl_name); + } + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ALTER_PARTITION, + new AlterPartitionEvent(oldTmpPart, tmpPart, table, false, true, this)); } } } catch (InvalidObjectException e) { @@ -3792,16 +3948,17 @@ public class HiveMetaStore extends ThriftHiveMetastore { Exception ex = null; Index oldIndex = null; RawStore ms = getMS(); + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); oldIndex = get_index_by_name(dbname, base_table_name, index_name); firePreEvent(new PreAlterIndexEvent(oldIndex, newIndex, this)); ms.alterIndex(dbname, base_table_name, index_name, newIndex); - if (transactionalListeners.size() > 0) { - AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, true, this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAlterIndex(alterIndexEvent); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.ALTER_INDEX, + new AlterIndexEvent(oldIndex, newIndex, true, this)); } success = ms.commitTransaction(); @@ -3823,9 +3980,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { } endFunction("alter_index", success, ex, base_table_name); - for (MetaStoreEventListener listener : listeners) { - AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, success, this); - listener.onAlterIndex(alterIndexEvent); + + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ALTER_INDEX, + new AlterIndexEvent(oldIndex, newIndex, success, this), + null, + transactionalListenerResponses); } } } @@ -3893,11 +4054,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { alterHandler.alterTable(getMS(), wh, dbname, name, newTable, envContext, this); success = true; - for (MetaStoreEventListener listener : listeners) { - AlterTableEvent alterTableEvent = - new AlterTableEvent(oldt, newTable, success, this); - alterTableEvent.setEnvironmentContext(envContext); - listener.onAlterTable(alterTableEvent); + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.ALTER_TABLE, + new AlterTableEvent(oldt, newTable, false, true, this), + envContext); } } catch (NoSuchObjectException e) { // thrown when the table to be altered does not exist @@ -4465,6 +4626,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { boolean success = false, indexTableCreated = false; String[] qualified = MetaStoreUtils.getQualifiedName(index.getDbName(), index.getIndexTableName()); + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); firePreEvent(new PreAddIndexEvent(index, this)); @@ -4502,11 +4664,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { index.setCreateTime((int) time); index.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time)); if (ms.addIndex(index)) { - if (transactionalListeners.size() > 0) { - AddIndexEvent addIndexEvent = new AddIndexEvent(index, true, this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onAddIndex(addIndexEvent); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.CREATE_INDEX, + new AddIndexEvent(index, true, this)); } } @@ -4523,9 +4685,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { ms.rollbackTransaction(); } - for (MetaStoreEventListener listener : listeners) { - AddIndexEvent addIndexEvent = new AddIndexEvent(index, success, this); - listener.onAddIndex(addIndexEvent); + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.CREATE_INDEX, + new AddIndexEvent(index, success, this), + null, + transactionalListenerResponses); } } } @@ -4563,6 +4728,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { Index index = null; Path tblPath = null; List<Path> partPaths = null; + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); // drop the underlying index table @@ -4595,11 +4761,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } - if (transactionalListeners.size() > 0) { - DropIndexEvent dropIndexEvent = new DropIndexEvent(index, true, this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onDropIndex(dropIndexEvent); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DROP_INDEX, + new DropIndexEvent(index, true, this)); } success = ms.commitTransaction(); @@ -4612,11 +4778,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { // ok even if the data is not deleted } // Skip the event listeners if the index is NULL - if (index != null) { - for (MetaStoreEventListener listener : listeners) { - DropIndexEvent dropIndexEvent = new DropIndexEvent(index, success, this); - listener.onDropIndex(dropIndexEvent); - } + if (index != null && !listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DROP_INDEX, + new DropIndexEvent(index, success, this), + null, + transactionalListenerResponses); } } return success; @@ -6052,6 +6219,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { validateFunctionInfo(func); boolean success = false; RawStore ms = getMS(); + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); Database db = ms.getDatabase(func.getDbName()); @@ -6068,11 +6236,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { long time = System.currentTimeMillis() / 1000; func.setCreateTime((int) time); ms.createFunction(func); - if (transactionalListeners.size() > 0) { - CreateFunctionEvent createFunctionEvent = new CreateFunctionEvent(func, true, this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onCreateFunction(createFunctionEvent); - } + if (!transactionalListeners.isEmpty()) { + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.CREATE_FUNCTION, + new CreateFunctionEvent(func, true, this)); } success = ms.commitTransaction(); @@ -6081,11 +6249,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { ms.rollbackTransaction(); } - if (listeners.size() > 0) { - CreateFunctionEvent createFunctionEvent = new CreateFunctionEvent(func, success, this); - for (MetaStoreEventListener listener : listeners) { - listener.onCreateFunction(createFunctionEvent); - } + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.CREATE_FUNCTION, + new CreateFunctionEvent(func, success, this), + null, + transactionalListenerResponses); } } } @@ -6097,6 +6266,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { boolean success = false; Function func = null; RawStore ms = getMS(); + Map<String, String> transactionalListenerResponses = Collections.emptyMap(); try { ms.openTransaction(); func = ms.getFunction(dbName, funcName); @@ -6106,10 +6276,10 @@ public class HiveMetaStore extends ThriftHiveMetastore { ms.dropFunction(dbName, funcName); if (transactionalListeners.size() > 0) { - DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, true, this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onDropFunction(dropFunctionEvent); - } + transactionalListenerResponses = + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DROP_FUNCTION, + new DropFunctionEvent(func, true, this)); } success = ms.commitTransaction(); @@ -6119,10 +6289,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { } if (listeners.size() > 0) { - DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, success, this); - for (MetaStoreEventListener listener : listeners) { - listener.onDropFunction(dropFunctionEvent); - } + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DROP_FUNCTION, + new DropFunctionEvent(func, success, this), + null, + transactionalListenerResponses); } } } @@ -6474,13 +6645,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { InsertEvent event = new InsertEvent(rqst.getDbName(), rqst.getTableName(), rqst.getPartitionVals(), rqst .getData().getInsertData(), rqst.isSuccessful(), this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onInsert(event); - } - for (MetaStoreEventListener listener : listeners) { - listener.onInsert(event); - } + /* + * The transactional listener response will be set already on the event, so there is not need + * to pass the response to the non-transactional listener. + */ + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.INSERT, event); + MetaStoreListenerNotifier.notifyEvent(listeners, EventType.INSERT, event); return new FireEventResponse(); @@ -7234,10 +7405,9 @@ public class HiveMetaStore extends ThriftHiveMetastore { ServerMode.METASTORE); saslServer.setSecretManager(delegationTokenManager.getSecretManager()); transFactory = saslServer.createTransportFactory( - MetaStoreUtils.getMetaStoreSaslProperties(conf)); + MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL)); processor = saslServer.wrapProcessor( new ThriftHiveMetastore.Processor<IHMSHandler>(handler)); - serverSocket = HiveAuthUtils.getServerSocket(null, port); LOG.info("Starting DB backed MetaStore Server in Secure Mode"); } else { @@ -7256,25 +7426,27 @@ public class HiveMetaStore extends ThriftHiveMetastore { processor = new TSetIpAddressProcessor<IHMSHandler>(handler); LOG.info("Starting DB backed MetaStore Server"); } + } + + if (!useSSL) { + serverSocket = HiveAuthUtils.getServerSocket(null, port); + } else { + String keyStorePath = conf.getVar(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PATH).trim(); + if (keyStorePath.isEmpty()) { + throw new IllegalArgumentException(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PATH.varname + + " Not configured for SSL connection"); + } + String keyStorePassword = ShimLoader.getHadoopShims().getPassword(conf, + HiveConf.ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname); // enable SSL support for HMS List<String> sslVersionBlacklist = new ArrayList<String>(); for (String sslVersion : conf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) { sslVersionBlacklist.add(sslVersion); } - if (!useSSL) { - serverSocket = HiveAuthUtils.getServerSocket(null, port); - } else { - String keyStorePath = conf.getVar(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PATH).trim(); - if (keyStorePath.isEmpty()) { - throw new IllegalArgumentException(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname - + " Not configured for SSL connection"); - } - String keyStorePassword = ShimLoader.getHadoopShims().getPassword(conf, - HiveConf.ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname); - serverSocket = HiveAuthUtils.getServerSSLSocket(null, port, keyStorePath, - keyStorePassword, sslVersionBlacklist); - } + + serverSocket = HiveAuthUtils.getServerSSLSocket(null, port, keyStorePath, + keyStorePassword, sslVersionBlacklist); } if (tcpKeepAlive) { @@ -7336,6 +7508,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { HMSHandler.LOG.info("Options.maxWorkerThreads = " + maxWorkerThreads); HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive); + HMSHandler.LOG.info("Enable SSL = " + useSSL); if (startLock != null) { signalOtherThreadsToStart(tServer, startLock, startCondition, startedServing);
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index b0b009a..4912a31 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -395,6 +395,29 @@ public class HiveMetaStoreClient implements IMetaStoreClient { LOG.info("Trying to connect to metastore with URI " + store); try { + if (useSSL) { + try { + String trustStorePath = conf.getVar(ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PATH).trim(); + if (trustStorePath.isEmpty()) { + throw new IllegalArgumentException(ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PATH.varname + + " Not configured for SSL connection"); + } + String trustStorePassword = ShimLoader.getHadoopShims().getPassword(conf, + HiveConf.ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PASSWORD.varname); + + // Create an SSL socket and connect + transport = HiveAuthUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout, trustStorePath, trustStorePassword ); + LOG.info("Opened an SSL connection to metastore, current connections: " + connCount.incrementAndGet()); + } catch(IOException e) { + throw new IllegalArgumentException(e); + } catch(TTransportException e) { + tte = e; + throw new MetaException(e.toString()); + } + } else { + transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout); + } + if (useSasl) { // Wrap thrift connection with SASL for secure connection. try { @@ -409,48 +432,24 @@ public class HiveMetaStoreClient implements IMetaStoreClient { String tokenSig = conf.getVar(ConfVars.METASTORE_TOKEN_SIGNATURE); // tokenSig could be null tokenStrForm = Utils.getTokenStrForm(tokenSig); - transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout); if(tokenStrForm != null) { // authenticate using delegation tokens via the "DIGEST" mechanism transport = authBridge.createClientTransport(null, store.getHost(), "DIGEST", tokenStrForm, transport, - MetaStoreUtils.getMetaStoreSaslProperties(conf)); + MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL)); } else { String principalConfig = conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL); transport = authBridge.createClientTransport( principalConfig, store.getHost(), "KERBEROS", null, - transport, MetaStoreUtils.getMetaStoreSaslProperties(conf)); + transport, MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL)); } } catch (IOException ioe) { LOG.error("Couldn't create client transport", ioe); throw new MetaException(ioe.toString()); } } else { - if (useSSL) { - try { - String trustStorePath = conf.getVar(ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PATH).trim(); - if (trustStorePath.isEmpty()) { - throw new IllegalArgumentException(ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PATH.varname - + " Not configured for SSL connection"); - } - String trustStorePassword = ShimLoader.getHadoopShims().getPassword(conf, - HiveConf.ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PASSWORD.varname); - - // Create an SSL socket and connect - transport = HiveAuthUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout, trustStorePath, trustStorePassword ); - LOG.info("Opened an SSL connection to metastore, current connections: " + connCount.incrementAndGet()); - } catch(IOException e) { - throw new IllegalArgumentException(e); - } catch(TTransportException e) { - tte = e; - throw new MetaException(e.toString()); - } - } else { - transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout); - } - if (useFramedTransport) { transport = new TFramedTransport(transport); } @@ -1097,6 +1096,23 @@ public class HiveMetaStoreClient implements IMetaStoreClient { } /** + * Truncate the table/partitions in the DEFAULT database. + * @param dbName + * The db to which the table to be truncate belongs to + * @param tableName + * The table to truncate + * @param partNames + * List of partitions to truncate. NULL will truncate the whole table/all partitions + * @throws MetaException + * @throws TException + * Could not truncate table properly. + */ + @Override + public void truncateTable(String dbName, String tableName, List<String> partNames) throws MetaException, TException { + client.truncate_table(dbName, tableName, partNames); + } + + /** * @param type * @return true if the type is dropped * @throws MetaException http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java index df698c8..b7d7b50 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java @@ -25,35 +25,23 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.Trash; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.api.MetaException; public class HiveMetaStoreFsImpl implements MetaStoreFS { public static final Logger LOG = LoggerFactory - .getLogger("hive.metastore.hivemetastoressimpl"); + .getLogger("hive.metastore.hivemetastoreFsimpl"); @Override public boolean deleteDir(FileSystem fs, Path f, boolean recursive, boolean ifPurge, Configuration conf) throws MetaException { - LOG.debug("deleting " + f); - try { - if (ifPurge) { - LOG.info("Not moving "+ f +" to trash"); - } else if (Trash.moveToAppropriateTrash(fs, f, conf)) { - LOG.info("Moved to trash: " + f); - return true; - } - - if (fs.delete(f, true)) { - LOG.debug("Deleted the diretory " + f); - return true; - } - + FileUtils.moveToTrash(fs, f, conf, ifPurge); if (fs.exists(f)) { throw new MetaException("Unable to delete directory: " + f); } + return true; } catch (FileNotFoundException e) { return true; // ok even if there is not data } catch (Exception e) { @@ -61,5 +49,4 @@ public class HiveMetaStoreFsImpl implements MetaStoreFS { } return false; } - } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index d567258..82db281 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -305,6 +305,20 @@ public interface IMetaStoreClient { void dropTable(String dbname, String tableName) throws MetaException, TException, NoSuchObjectException; + /** + * Truncate the table/partitions in the DEFAULT database. + * @param dbName + * The db to which the table to be truncate belongs to + * @param tableName + * The table to truncate + * @param partNames + * List of partitions to truncate. NULL will truncate the whole table/all partitions + * @throws MetaException + * @throws TException + * Could not truncate table properly. + */ + void truncateTable(String dbName, String tableName, List<String> partNames) throws MetaException, TException; + boolean tableExists(String databaseName, String tableName) throws MetaException, TException, UnknownDBException;