http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index c8b0d4c..3670de1 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore; import static org.apache.commons.lang.StringUtils.join; import static org.apache.commons.lang.StringUtils.repeat; +import java.sql.Clob; import java.sql.Connection; import java.sql.Statement; import java.sql.SQLException; @@ -60,6 +61,8 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.cache.CacheUtils; +import org.apache.hadoop.hive.metastore.cache.CachedStore; import org.apache.hadoop.hive.metastore.model.MConstraint; import org.apache.hadoop.hive.metastore.model.MDatabase; import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics; @@ -78,6 +81,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** * This class contains the optimizations for MetaStore that rely on direct SQL access to @@ -648,7 +652,7 @@ class MetaStoreDirectSql { loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() { @Override public void apply(StorageDescriptor t, Object[] fields) { - t.putToParameters((String)fields[1], (String)fields[2]); + t.putToParameters((String)fields[1], extractSqlClob(fields[2])); }}); // Perform conversion of null map values for (StorageDescriptor t : sds.values()) { @@ -779,7 +783,7 @@ class MetaStoreDirectSql { loopJoinOrderedResult(colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() { @Override public void apply(List<FieldSchema> t, Object[] fields) { - t.add(new FieldSchema((String)fields[2], (String)fields[3], (String)fields[1])); + t.add(new FieldSchema((String)fields[2], extractSqlClob(fields[3]), (String)fields[1])); }}); } @@ -790,7 +794,7 @@ class MetaStoreDirectSql { loopJoinOrderedResult(serdes, queryText, 0, new ApplyFunc<SerDeInfo>() { @Override public void apply(SerDeInfo t, Object[] fields) { - t.putToParameters((String)fields[1], (String)fields[2]); + t.putToParameters((String)fields[1], extractSqlClob(fields[2])); }}); // Perform conversion of null map values for (SerDeInfo t : serdes.values()) { @@ -878,6 +882,21 @@ class MetaStoreDirectSql { return ((Number) obj).doubleValue(); } + private String extractSqlClob(Object value) { + if (value == null) return null; + try { + if (value instanceof Clob) { + // we trim the Clob value to a max length an int can hold + int maxLength = (((Clob)value).length() < Integer.MAX_VALUE - 2) ? (int)((Clob)value).length() : Integer.MAX_VALUE - 2; + return ((Clob)value).getSubString(1L, maxLength); + } else { + return value.toString(); + } + } catch (SQLException sqle) { + return null; + } + } + private static String trimCommaList(StringBuilder sb) { if (sb.length() > 0) { sb.setLength(sb.length() - 1); @@ -1190,7 +1209,7 @@ class MetaStoreDirectSql { } public AggrStats aggrColStatsForPartitions(String dbName, String tableName, - List<String> partNames, List<String> colNames, boolean useDensityFunctionForNDVEstimation) + List<String> partNames, List<String> colNames, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { if (colNames.isEmpty() || partNames.isEmpty()) { LOG.debug("Columns is empty or partNames is empty : Short-circuiting stats eval"); @@ -1225,7 +1244,7 @@ class MetaStoreDirectSql { // Read aggregated stats for one column colStatsAggrFromDB = columnStatisticsObjForPartitions(dbName, tableName, partNames, colNamesForDB, - partsFound, useDensityFunctionForNDVEstimation); + partsFound, useDensityFunctionForNDVEstimation, ndvTuner); if (!colStatsAggrFromDB.isEmpty()) { ColumnStatisticsObj colStatsAggr = colStatsAggrFromDB.get(0); colStatsList.add(colStatsAggr); @@ -1238,7 +1257,7 @@ class MetaStoreDirectSql { partsFound = partsFoundForPartitions(dbName, tableName, partNames, colNames); colStatsList = columnStatisticsObjForPartitions(dbName, tableName, partNames, colNames, partsFound, - useDensityFunctionForNDVEstimation); + useDensityFunctionForNDVEstimation, ndvTuner); } LOG.info("useDensityFunctionForNDVEstimation = " + useDensityFunctionForNDVEstimation + "\npartsFound = " + partsFound + "\nColumnStatisticsObj = " @@ -1301,24 +1320,81 @@ class MetaStoreDirectSql { private List<ColumnStatisticsObj> columnStatisticsObjForPartitions(final String dbName, final String tableName, final List<String> partNames, List<String> colNames, long partsFound, - final boolean useDensityFunctionForNDVEstimation) throws MetaException { + final boolean useDensityFunctionForNDVEstimation, final double ndvTuner) throws MetaException { final boolean areAllPartsFound = (partsFound == partNames.size()); return runBatched(colNames, new Batchable<String, ColumnStatisticsObj>() { public List<ColumnStatisticsObj> run(final List<String> inputColNames) throws MetaException { return runBatched(partNames, new Batchable<String, ColumnStatisticsObj>() { public List<ColumnStatisticsObj> run(List<String> inputPartNames) throws MetaException { return columnStatisticsObjForPartitionsBatch(dbName, tableName, inputPartNames, - inputColNames, areAllPartsFound, useDensityFunctionForNDVEstimation); + inputColNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner); } }); } }); } + // Get aggregated column stats for a table per partition for all columns in the partition + // This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm) + public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName, + String tblName, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { + String queryText = "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", " + + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), " + + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), " + + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), " + + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), " + // The following data is used to compute a partitioned table's NDV based + // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be + // accurately derived from partition NDVs, because the domain of column value two partitions + // can overlap. If there is no overlap then global NDV is just the sum + // of partition NDVs (UpperBound). But if there is some overlay then + // global NDV can be anywhere between sum of partition NDVs (no overlap) + // and same as one of the partition NDV (domain of column value in all other + // partitions is subset of the domain value in one of the partition) + // (LowerBound).But under uniform distribution, we can roughly estimate the global + // NDV by leveraging the min/max values. + // And, we also guarantee that the estimation makes sense by comparing it to the + // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")") + // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")") + + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," + + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," + + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," + + "sum(\"NUM_DISTINCTS\") from \"PART_COL_STATS\"" + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? group by \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\""; + long start = 0; + long end = 0; + Query query = null; + boolean doTrace = LOG.isDebugEnabled(); + Object qResult = null; + ForwardQueryResult fqr = null; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, + prepareParams(dbName, tblName, new ArrayList<String>(), new ArrayList<String>()), queryText); + if (qResult == null) { + query.closeAll(); + return Maps.newHashMap(); + } + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + List<Object[]> list = ensureList(qResult); + Map<String, ColumnStatisticsObj> partColStatsMap = new HashMap<String, ColumnStatisticsObj>(); + for (Object[] row : list) { + String partName = (String) row[0]; + String colName = (String) row[1]; + partColStatsMap.put( + CacheUtils.buildKey(dbName, tblName, CachedStore.partNameToVals(partName), colName), + prepareCSObjWithAdjustedNDV(row, 1, useDensityFunctionForNDVEstimation, ndvTuner)); + Deadline.checkTimeout(); + } + query.closeAll(); + return partColStatsMap; + } + /** Should be called with the list short enough to not trip up Oracle/etc. */ private List<ColumnStatisticsObj> columnStatisticsObjForPartitionsBatch(String dbName, String tableName, List<String> partNames, List<String> colNames, boolean areAllPartsFound, - boolean useDensityFunctionForNDVEstimation) throws MetaException { + boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { // TODO: all the extrapolation logic should be moved out of this class, // only mechanical data retrieval should remain here. String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", " @@ -1370,7 +1446,7 @@ class MetaStoreDirectSql { List<Object[]> list = ensureList(qResult); List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(list.size()); for (Object[] row : list) { - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation)); + colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner)); Deadline.checkTimeout(); } query.closeAll(); @@ -1429,7 +1505,7 @@ class MetaStoreDirectSql { } list = ensureList(qResult); for (Object[] row : list) { - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation)); + colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner)); Deadline.checkTimeout(); } end = doTrace ? System.nanoTime() : 0; @@ -1576,7 +1652,7 @@ class MetaStoreDirectSql { query.closeAll(); } } - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation)); + colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner)); Deadline.checkTimeout(); } } @@ -1596,13 +1672,13 @@ class MetaStoreDirectSql { } private ColumnStatisticsObj prepareCSObjWithAdjustedNDV(Object[] row, int i, - boolean useDensityFunctionForNDVEstimation) throws MetaException { + boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { ColumnStatisticsData data = new ColumnStatisticsData(); ColumnStatisticsObj cso = new ColumnStatisticsObj((String) row[i++], (String) row[i++], data); Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh = row[i++], declow = row[i++], dechigh = row[i++], nulls = row[i++], dist = row[i++], avglen = row[i++], maxlen = row[i++], trues = row[i++], falses = row[i++], avgLong = row[i++], avgDouble = row[i++], avgDecimal = row[i++], sumDist = row[i++]; StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, llow, lhigh, dlow, dhigh, declow, dechigh, nulls, dist, avglen, maxlen, trues, falses, avgLong, avgDouble, - avgDecimal, sumDist, useDensityFunctionForNDVEstimation); + avgDecimal, sumDist, useDensityFunctionForNDVEstimation, ndvTuner); return cso; }
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java index b0defb5..868e5a5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java @@ -75,19 +75,17 @@ public abstract class MetaStoreEventListener implements Configurable { } /** - * @param add partition event - * @throws MetaException - */ - - /** * @param tableEvent alter table event * @throws MetaException */ public void onAlterTable (AlterTableEvent tableEvent) throws MetaException { } - public void onAddPartition (AddPartitionEvent partitionEvent) - throws MetaException { + /** + * @param partitionEvent add partition event + * @throws MetaException + */ + public void onAddPartition (AddPartitionEvent partitionEvent) throws MetaException { } /** http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java new file mode 100644 index 0000000..20011cc --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import org.apache.hadoop.hive.common.classification.InterfaceAudience.Private; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.events.AddIndexEvent; +import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; +import org.apache.hadoop.hive.metastore.events.DropIndexEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.InsertEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; + +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; + +/** + * This class is used to notify a list of listeners about specific MetaStore events. + */ +@Private +public class MetaStoreListenerNotifier { + private interface EventNotifier { + void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException; + } + + private static Map<EventType, EventNotifier> notificationEvents = Maps.newHashMap( + ImmutableMap.<EventType, EventNotifier>builder() + .put(EventType.CREATE_DATABASE, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onCreateDatabase((CreateDatabaseEvent)event); + } + }) + .put(EventType.DROP_DATABASE, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onDropDatabase((DropDatabaseEvent)event); + } + }) + .put(EventType.CREATE_TABLE, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onCreateTable((CreateTableEvent)event); + } + }) + .put(EventType.DROP_TABLE, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onDropTable((DropTableEvent)event); + } + }) + .put(EventType.ADD_PARTITION, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onAddPartition((AddPartitionEvent)event); + } + }) + .put(EventType.DROP_PARTITION, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onDropPartition((DropPartitionEvent)event); + } + }) + .put(EventType.ALTER_TABLE, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onAlterTable((AlterTableEvent)event); + } + }) + .put(EventType.ALTER_PARTITION, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onAlterPartition((AlterPartitionEvent)event); + } + }) + .put(EventType.INSERT, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onInsert((InsertEvent)event); + } + }) + .put(EventType.CREATE_FUNCTION, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onCreateFunction((CreateFunctionEvent)event); + } + }) + .put(EventType.DROP_FUNCTION, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onDropFunction((DropFunctionEvent)event); + } + }) + .put(EventType.CREATE_INDEX, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onAddIndex((AddIndexEvent)event); + } + }) + .put(EventType.DROP_INDEX, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onDropIndex((DropIndexEvent)event); + } + }) + .put(EventType.ALTER_INDEX, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onAlterIndex((AlterIndexEvent)event); + } + }) + .build() + ); + + /** + * Notify a list of listeners about a specific metastore event. Each listener notified might update + * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will + * be returned to the caller. + * + * @param listeners List of MetaStoreEventListener listeners. + * @param eventType Type of the notification event. + * @param event The ListenerEvent with information about the event. + * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty + * map if no parameters were updated or if no listeners were notified. + * @throws MetaException If an error occurred while calling the listeners. + */ + public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners, + EventType eventType, + ListenerEvent event) throws MetaException { + + Preconditions.checkNotNull(listeners, "Listeners must not be null."); + Preconditions.checkNotNull(event, "The event must not be null."); + + for (MetaStoreEventListener listener : listeners) { + notificationEvents.get(eventType).notify(listener, event); + } + + // Each listener called above might set a different parameter on the event. + // This write permission is allowed on the listener side to avoid breaking compatibility if we change the API + // method calls. + return event.getParameters(); + } + + /** + * Notify a list of listeners about a specific metastore event. Each listener notified might update + * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will + * be returned to the caller. + * + * @param listeners List of MetaStoreEventListener listeners. + * @param eventType Type of the notification event. + * @param event The ListenerEvent with information about the event. + * @param environmentContext An EnvironmentContext object with parameters sent by the HMS client. + * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty + * map if no parameters were updated or if no listeners were notified. + * @throws MetaException If an error occurred while calling the listeners. + */ + public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners, + EventType eventType, + ListenerEvent event, + EnvironmentContext environmentContext) throws MetaException { + + Preconditions.checkNotNull(event, "The event must not be null."); + + event.setEnvironmentContext(environmentContext); + return notifyEvent(listeners, eventType, event); + } + + /** + * Notify a list of listeners about a specific metastore event. Each listener notified might update + * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will + * be returned to the caller. + * + * @param listeners List of MetaStoreEventListener listeners. + * @param eventType Type of the notification event. + * @param event The ListenerEvent with information about the event. + * @param environmentContext An EnvironmentContext object with parameters sent by the HMS client. + * @param parameters A list of key/value pairs with the new parameters to add. + * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty + * map if no parameters were updated or if no listeners were notified. + * @throws MetaException If an error occurred while calling the listeners. + */ + public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners, + EventType eventType, + ListenerEvent event, + EnvironmentContext environmentContext, + Map<String, String> parameters) throws MetaException { + + Preconditions.checkNotNull(event, "The event must not be null."); + + event.putParameters(parameters); + return notifyEvent(listeners, eventType, event, environmentContext); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java index 9c30ee7..320902b 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.metastore; import java.io.BufferedReader; import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; @@ -27,21 +26,19 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.common.util.HiveVersionInfo; import com.google.common.collect.ImmutableMap; public class MetaStoreSchemaInfo { - private static String SQL_FILE_EXTENSION=".sql"; - private static String UPGRADE_FILE_PREFIX="upgrade-"; - private static String INIT_FILE_PREFIX="hive-schema-"; - private static String VERSION_UPGRADE_LIST = "upgrade.order"; - private static String PRE_UPGRADE_PREFIX = "pre-"; + private static final String SQL_FILE_EXTENSION = ".sql"; + private static final String UPGRADE_FILE_PREFIX = "upgrade-"; + private static final String INIT_FILE_PREFIX = "hive-schema-"; + private static final String VERSION_UPGRADE_LIST = "upgrade.order"; + private static final String PRE_UPGRADE_PREFIX = "pre-"; private final String dbType; private final String hiveSchemaVersions[]; - private final HiveConf hiveConf; private final String hiveHome; // Some version upgrades often don't change schema. So they are equivalent to @@ -55,10 +52,9 @@ public class MetaStoreSchemaInfo { "1.2.1", "1.2.0" ); - public MetaStoreSchemaInfo(String hiveHome, HiveConf hiveConf, String dbType) throws HiveMetaException { + public MetaStoreSchemaInfo(String hiveHome, String dbType) throws HiveMetaException { this.hiveHome = hiveHome; this.dbType = dbType; - this.hiveConf = hiveConf; // load upgrade order for the given dbType List<String> upgradeOrderList = new ArrayList<String>(); String upgradeListFile = getMetaStoreScriptDir() + File.separator + http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java index 6259cda..3ee7977 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java @@ -47,6 +47,7 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -86,6 +87,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; +import org.apache.hadoop.security.SaslRpcServer; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.ReflectionUtil; @@ -638,14 +640,6 @@ public class MetaStoreUtils { } } - static boolean isCascadeNeededInAlterTable(Table oldTable, Table newTable) { - //currently cascade only supports add/replace columns and - //changing column type/position/name/comments - List<FieldSchema> oldCols = oldTable.getSd().getCols(); - List<FieldSchema> newCols = newTable.getSd().getCols(); - return !areSameColumns(oldCols, newCols); - } - static boolean areSameColumns(List<FieldSchema> oldCols, List<FieldSchema> newCols) { if (oldCols.size() != newCols.size()) { return false; @@ -696,8 +690,6 @@ public class MetaStoreUtils { TypeInfoUtils.getTypeInfoFromTypeString(newType)); } - public static final int MAX_MS_TYPENAME_LENGTH = 2000; // 4000/2, for an unlikely unicode case - public static final String TYPE_FROM_DESERIALIZER = "<derived from deserializer>"; /** * validate column type @@ -708,9 +700,6 @@ public class MetaStoreUtils { */ static public String validateColumnType(String type) { if (type.equals(TYPE_FROM_DESERIALIZER)) return null; - if (type.length() > MAX_MS_TYPENAME_LENGTH) { - return "type name is too long: " + type; - } int last = 0; boolean lastAlphaDigit = isValidTypeChar(type.charAt(last)); for (int i = 1; i <= type.length(); i++) { @@ -1769,8 +1758,19 @@ public class MetaStoreUtils { * @param conf * @return The SASL configuration */ - public static Map<String, String> getMetaStoreSaslProperties(HiveConf conf) { + public static Map<String, String> getMetaStoreSaslProperties(HiveConf conf, boolean useSSL) { // As of now Hive Meta Store uses the same configuration as Hadoop SASL configuration + + // If SSL is enabled, override the given value of "hadoop.rpc.protection" and set it to "authentication" + // This disables any encryption provided by SASL, since SSL already provides it + String hadoopRpcProtectionVal = conf.get(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION); + String hadoopRpcProtectionAuth = SaslRpcServer.QualityOfProtection.AUTHENTICATION.toString(); + + if (useSSL && hadoopRpcProtectionVal != null && !hadoopRpcProtectionVal.equals(hadoopRpcProtectionAuth)) { + LOG.warn("Overriding value of " + CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION + " setting it from " + + hadoopRpcProtectionVal + " to " + hadoopRpcProtectionAuth + " because SSL is enabled"); + conf.set(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION, hadoopRpcProtectionAuth); + } return ShimLoader.getHadoopThriftAuthBridge().getHadoopSaslProperties(conf); } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 51bc6d0..c351ffd 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -160,9 +160,13 @@ import org.apache.hive.common.util.HiveStringUtils; import org.apache.thrift.TException; import org.datanucleus.AbstractNucleusContext; import org.datanucleus.ClassLoaderResolver; +import org.datanucleus.ClassLoaderResolverImpl; import org.datanucleus.NucleusContext; +import org.datanucleus.api.jdo.JDOPersistenceManager; import org.datanucleus.api.jdo.JDOPersistenceManagerFactory; import org.datanucleus.store.rdbms.exceptions.MissingTableException; +import org.datanucleus.store.scostore.Store; +import org.datanucleus.util.WeakValueMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -195,6 +199,7 @@ public class ObjectStore implements RawStore, Configurable { private static final Map<String, Class> PINCLASSMAP; private static final String HOSTNAME; private static final String USER; + private static final String JDO_PARAM = ":param"; static { Map<String, Class> map = new HashMap<String, Class>(); map.put("table", MTable.class); @@ -234,26 +239,22 @@ public class ObjectStore implements RawStore, Configurable { private Pattern partitionValidationPattern; /** - * A class to pass the Query object to the caller to let the caller release - * resources by calling QueryWrapper.query.closeAll() after consuming all the query results. + * A Autocloseable wrapper around Query class to pass the Query object to the caller and let the caller release + * the resources when the QueryWrapper goes out of scope */ - public static class QueryWrapper { + public static class QueryWrapper implements AutoCloseable { public Query query; /** * Explicitly closes the query object to release the resources */ + @Override public void close() { if (query != null) { query.closeAll(); query = null; } } - - @Override - protected void finalize() { - this.close(); - } } public ObjectStore() { @@ -284,6 +285,9 @@ public class ObjectStore implements RawStore, Configurable { boolean propsChanged = !propsFromConf.equals(prop); if (propsChanged) { + if (pmf != null){ + clearOutPmfClassLoaderCache(pmf); + } pmf = null; prop = null; } @@ -748,12 +752,7 @@ public class ObjectStore implements RawStore, Configurable { pm.retrieve(mdb); commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } if (mdb == null) { throw new NoSuchObjectException("There is no database named " + name); @@ -872,10 +871,7 @@ public class ObjectStore implements RawStore, Configurable { } success = commitTransaction(); } finally { - if (!success) { - rollbackTransaction(); - } - queryWrapper.close(); + rollbackAndCleanup(success, queryWrapper); } return success; } @@ -893,33 +889,20 @@ public class ObjectStore implements RawStore, Configurable { // Take the pattern and split it on the | to get all the composing // patterns String[] subpatterns = pattern.trim().split("\\|"); - String queryStr = "select name from org.apache.hadoop.hive.metastore.model.MDatabase where ("; - boolean first = true; - for (String subpattern : subpatterns) { - subpattern = "(?i)" + subpattern.replaceAll("\\*", ".*"); - if (!first) { - queryStr = queryStr + " || "; - } - queryStr = queryStr + " name.matches(\"" + subpattern + "\")"; - first = false; - } - queryStr = queryStr + ")"; - query = pm.newQuery(queryStr); + StringBuilder filterBuilder = new StringBuilder(); + List<String> parameterVals = new ArrayList<>(subpatterns.length); + appendPatternCondition(filterBuilder, "name", subpatterns, parameterVals); + query = pm.newQuery(MDatabase.class, filterBuilder.toString()); query.setResult("name"); query.setOrdering("name ascending"); - Collection names = (Collection) query.execute(); + Collection names = (Collection) query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()])); databases = new ArrayList<String>(); for (Iterator i = names.iterator(); i.hasNext();) { databases.add((String) i.next()); } commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return databases; } @@ -939,12 +922,7 @@ public class ObjectStore implements RawStore, Configurable { databases = new ArrayList<String>((Collection<String>) query.execute()); commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } Collections.sort(databases); return databases; @@ -1012,12 +990,7 @@ public class ObjectStore implements RawStore, Configurable { } commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return type; } @@ -1041,12 +1014,7 @@ public class ObjectStore implements RawStore, Configurable { success = commitTransaction(); LOG.debug("type not found " + typeName, e); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } return success; } @@ -1206,6 +1174,9 @@ public class ObjectStore implements RawStore, Configurable { private List<MConstraint> listAllTableConstraintsWithOptionalConstraintName (String dbName, String tableName, String constraintname) { + dbName = HiveStringUtils.normalizeIdentifier(dbName); + tableName = HiveStringUtils.normalizeIdentifier(tableName); + constraintname = constraintname!=null?HiveStringUtils.normalizeIdentifier(constraintname):null; List<MConstraint> mConstraints = null; List<String> constraintNames = new ArrayList<String>(); Query query = null; @@ -1296,40 +1267,28 @@ public class ObjectStore implements RawStore, Configurable { dbName = HiveStringUtils.normalizeIdentifier(dbName); // Take the pattern and split it on the | to get all the composing // patterns - String[] subpatterns = pattern.trim().split("\\|"); - String queryStr = - "select tableName from org.apache.hadoop.hive.metastore.model.MTable " - + "where database.name == dbName && ("; - boolean first = true; - for (String subpattern : subpatterns) { - subpattern = "(?i)" + subpattern.replaceAll("\\*", ".*"); - if (!first) { - queryStr = queryStr + " || "; - } - queryStr = queryStr + " tableName.matches(\"" + subpattern + "\")"; - first = false; + List<String> parameterVals = new ArrayList<>(); + StringBuilder filterBuilder = new StringBuilder(); + //adds database.name == dbName to the filter + appendSimpleCondition(filterBuilder, "database.name", new String[] {dbName}, parameterVals); + if(pattern != null) { + appendPatternCondition(filterBuilder, "tableName", pattern, parameterVals); } - queryStr = queryStr + ")"; - if (tableType != null) { - queryStr = queryStr + " && tableType.matches(\"" + tableType.toString() + "\")"; + if(tableType != null) { + appendPatternCondition(filterBuilder, "tableType", new String[] {tableType.toString()}, parameterVals); } - query = pm.newQuery(queryStr); - query.declareParameters("java.lang.String dbName"); + + query = pm.newQuery(MTable.class, filterBuilder.toString()); query.setResult("tableName"); query.setOrdering("tableName ascending"); - Collection names = (Collection) query.execute(dbName); + Collection names = (Collection) query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()])); tbls = new ArrayList<String>(); for (Iterator i = names.iterator(); i.hasNext();) { tbls.add((String) i.next()); } commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return tbls; } @@ -1361,12 +1320,7 @@ public class ObjectStore implements RawStore, Configurable { result = (Long) query.execute(); commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return result.intValue(); } @@ -1382,19 +1336,20 @@ public class ObjectStore implements RawStore, Configurable { openTransaction(); // Take the pattern and split it on the | to get all the composing // patterns - StringBuilder builder = new StringBuilder(); + StringBuilder filterBuilder = new StringBuilder(); + List<String> parameterVals = new ArrayList<>(); if (dbNames != null && !dbNames.equals("*")) { - appendPatternCondition(builder, "database.name", dbNames); + appendPatternCondition(filterBuilder, "database.name", dbNames, parameterVals); } if (tableNames != null && !tableNames.equals("*")) { - appendPatternCondition(builder, "tableName", tableNames); + appendPatternCondition(filterBuilder, "tableName", tableNames, parameterVals); } if (tableTypes != null && !tableTypes.isEmpty()) { - appendSimpleCondition(builder, "tableType", tableTypes.toArray(new String[0])); + appendSimpleCondition(filterBuilder, "tableType", tableTypes.toArray(new String[0]), parameterVals); } - query = pm.newQuery(MTable.class, builder.toString()); - Collection<MTable> tables = (Collection<MTable>) query.execute(); + query = pm.newQuery(MTable.class, filterBuilder.toString()); + Collection<MTable> tables = (Collection<MTable>) query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()])); for (MTable table : tables) { TableMeta metaData = new TableMeta( table.getDatabase().getName(), table.getTableName(), table.getTableType()); @@ -1403,29 +1358,29 @@ public class ObjectStore implements RawStore, Configurable { } commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return metas; } + private StringBuilder appendPatternCondition(StringBuilder filterBuilder, String fieldName, + String[] elements, List<String> parameterVals) { + return appendCondition(filterBuilder, fieldName, elements, true, parameterVals); + } + private StringBuilder appendPatternCondition(StringBuilder builder, - String fieldName, String elements) { + String fieldName, String elements, List<String> parameters) { elements = HiveStringUtils.normalizeIdentifier(elements); - return appendCondition(builder, fieldName, elements.split("\\|"), true); + return appendCondition(builder, fieldName, elements.split("\\|"), true, parameters); } private StringBuilder appendSimpleCondition(StringBuilder builder, - String fieldName, String[] elements) { - return appendCondition(builder, fieldName, elements, false); + String fieldName, String[] elements, List<String> parameters) { + return appendCondition(builder, fieldName, elements, false, parameters); } private StringBuilder appendCondition(StringBuilder builder, - String fieldName, String[] elements, boolean pattern) { + String fieldName, String[] elements, boolean pattern, List<String> parameters) { if (builder.length() > 0) { builder.append(" && "); } @@ -1435,14 +1390,15 @@ public class ObjectStore implements RawStore, Configurable { if (pattern) { element = "(?i)" + element.replaceAll("\\*", ".*"); } + parameters.add(element); if (builder.length() > length) { builder.append(" || "); } builder.append(fieldName); if (pattern) { - builder.append(".matches(\"").append(element).append("\")"); + builder.append(".matches(").append(JDO_PARAM).append(parameters.size()).append(")"); } else { - builder.append(" == \"").append(element).append("\""); + builder.append(" == ").append(JDO_PARAM).append(parameters.size()); } } builder.append(" )"); @@ -1488,12 +1444,7 @@ public class ObjectStore implements RawStore, Configurable { } commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } nmtbl.mtbl = mtbl; return nmtbl; @@ -1536,15 +1487,10 @@ public class ObjectStore implements RawStore, Configurable { } committed = commitTransaction(); } finally { - if (!committed) { - rollbackTransaction(); - } + rollbackAndCleanup(committed, query); if (dbExistsQuery != null) { dbExistsQuery.closeAll(); } - if (query != null) { - query.closeAll(); - } } return tables; } @@ -2065,12 +2011,7 @@ public class ObjectStore implements RawStore, Configurable { } } } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return ret; } @@ -2302,10 +2243,7 @@ public class ObjectStore implements RawStore, Configurable { success = commitTransaction(); return parts; } finally { - if (!success) { - rollbackTransaction(); - } - queryWrapper.close(); + rollbackAndCleanup(success, queryWrapper); } } @@ -2407,6 +2345,7 @@ public class ObjectStore implements RawStore, Configurable { for (Iterator i = names.iterator(); i.hasNext();) { pns.add((String) i.next()); } + if (query != null) { query.closeAll(); } @@ -2501,10 +2440,7 @@ public class ObjectStore implements RawStore, Configurable { } success = commitTransaction(); } finally { - if (!success) { - rollbackTransaction(); - } - queryWrapper.close(); + rollbackAndCleanup(success, queryWrapper); } return partitions; } @@ -2526,10 +2462,7 @@ public class ObjectStore implements RawStore, Configurable { } success = commitTransaction(); } finally { - if (!success) { - rollbackTransaction(); - } - queryWrapper.close(); + rollbackAndCleanup(success, queryWrapper); } return partitionNames; } @@ -3294,12 +3227,7 @@ public class ObjectStore implements RawStore, Configurable { success = commitTransaction(); LOG.debug("Done retrieving all objects for listTableNamesByFilter"); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } return tableNames; } @@ -3345,12 +3273,7 @@ public class ObjectStore implements RawStore, Configurable { success = commitTransaction(); LOG.debug("Done retrieving all objects for listMPartitionNamesByFilter"); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } return partNames; } @@ -3571,10 +3494,7 @@ public class ObjectStore implements RawStore, Configurable { success = commitTransaction(); LOG.debug("successfully deleted a CD in removeUnusedColumnDescriptor"); } finally { - if (!success) { - rollbackTransaction(); - } - queryWrapper.close(); + rollbackAndCleanup(success, queryWrapper); } } @@ -3658,12 +3578,7 @@ public class ObjectStore implements RawStore, Configurable { constraintNameIfExists = (String) constraintExistsQuery.execute(name); commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (constraintExistsQuery != null) { - constraintExistsQuery.closeAll(); - } + rollbackAndCleanup(commited, constraintExistsQuery); } return constraintNameIfExists != null && !constraintNameIfExists.isEmpty(); } @@ -3911,12 +3826,7 @@ public class ObjectStore implements RawStore, Configurable { pm.retrieve(midx); commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return midx; } @@ -3979,12 +3889,7 @@ public class ObjectStore implements RawStore, Configurable { return indexes; } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } } @@ -4011,12 +3916,7 @@ public class ObjectStore implements RawStore, Configurable { } success = commitTransaction(); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } return pns; } @@ -4139,12 +4039,7 @@ public class ObjectStore implements RawStore, Configurable { pm.retrieve(mRoleMember); commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return mRoleMember; } @@ -4213,11 +4108,7 @@ public class ObjectStore implements RawStore, Configurable { } success = commitTransaction(); } finally { - if (!success) { - rollbackTransaction(); - } - - queryWrapper.close(); + rollbackAndCleanup(success, queryWrapper); } return success; } @@ -4287,12 +4178,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listRoles"); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } if (principalType == PrincipalType.USER) { @@ -4358,7 +4244,6 @@ public class ObjectStore implements RawStore, Configurable { mRoleMemebership = (List<MRoleMap>) query.execute(roleName, principalType.toString()); pm.retrieveAll(mRoleMemebership); success = commitTransaction(); - LOG.debug("Done retrieving all objects for listMSecurityPrincipalMembershipRole"); } finally { if (!success) { @@ -4392,12 +4277,7 @@ public class ObjectStore implements RawStore, Configurable { pm.retrieve(mrole); commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return mrole; } @@ -4419,12 +4299,7 @@ public class ObjectStore implements RawStore, Configurable { success = commitTransaction(); return roleNames; } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } } @@ -5250,12 +5125,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listRoleMembers"); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } return mRoleMemeberList; } @@ -5306,12 +5176,7 @@ public class ObjectStore implements RawStore, Configurable { userNameDbPriv.addAll(mPrivs); } } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return userNameDbPriv; } @@ -5351,12 +5216,7 @@ public class ObjectStore implements RawStore, Configurable { commited = commitTransaction(); return convertGlobal(userNameDbPriv); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } } @@ -5399,12 +5259,7 @@ public class ObjectStore implements RawStore, Configurable { mSecurityDBList.addAll(mPrivs); LOG.debug("Done retrieving all objects for listPrincipalDBGrants"); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } return mSecurityDBList; } @@ -5527,12 +5382,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listAllTableGrants"); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } return mSecurityTabList; } @@ -5559,12 +5409,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listTableAllPartitionGrants"); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } return mSecurityTabPartList; } @@ -5592,12 +5437,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listTableAllColumnGrants"); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } return mTblColPrivilegeList; } @@ -5626,12 +5466,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listTableAllPartitionColumnGrants"); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } return mSecurityColList; } @@ -5674,7 +5509,6 @@ public class ObjectStore implements RawStore, Configurable { private List<MDBPrivilege> listDatabaseGrants(String dbName, QueryWrapper queryWrapper) { dbName = HiveStringUtils.normalizeIdentifier(dbName); boolean success = false; - try { LOG.debug("Executing listDatabaseGrants"); @@ -5782,12 +5616,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listAllTableGrants"); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } return mSecurityTabPartList; } @@ -5847,12 +5676,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listPrincipalPartitionGrants"); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } return mSecurityTabPartList; } @@ -5916,12 +5740,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listPrincipalTableColumnGrants"); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } return mSecurityColList; } @@ -5983,12 +5802,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listPrincipalPartitionColumnGrants"); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } return mSecurityColList; } @@ -6050,12 +5864,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listPrincipalPartitionColumnGrantsAll"); return result; } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } } @@ -6083,12 +5892,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listPartitionColumnGrantsAll"); return result; } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } } @@ -6163,12 +5967,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listPrincipalAllTableGrants"); return result; } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } } @@ -6191,12 +5990,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listPrincipalAllTableGrants"); return result; } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } } @@ -6236,7 +6030,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listPrincipalAllPartitionGrants"); } finally { if (!success) { - rollbackTransaction(); + rollbackTransaction(); } } return mSecurityTabPartList; @@ -6268,12 +6062,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listPrincipalPartitionGrantsAll"); return result; } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } } @@ -6299,12 +6088,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listPrincipalPartitionGrantsAll"); return result; } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } } @@ -6382,12 +6166,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listPrincipalTableColumnGrantsAll"); return result; } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } } @@ -6412,12 +6191,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done retrieving all objects for listPrincipalTableColumnGrantsAll"); return result; } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } } @@ -6494,12 +6268,7 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Done executing isPartitionMarkedForEvent"); return (partEvents != null && !partEvents.isEmpty()) ? true : false; } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } } @@ -6553,7 +6322,6 @@ public class ObjectStore implements RawStore, Configurable { public Collection<?> executeJDOQLSelect(String queryStr, QueryWrapper queryWrapper) { boolean committed = false; Collection<?> result = null; - try { openTransaction(); Query query = queryWrapper.query = pm.newQuery(queryStr); @@ -6594,12 +6362,7 @@ public class ObjectStore implements RawStore, Configurable { return -1; } } finally { - if (!committed) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(committed, query); } } @@ -6629,12 +6392,7 @@ public class ObjectStore implements RawStore, Configurable { return null; } } finally { - if (!committed) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(committed, query); } } @@ -6745,12 +6503,7 @@ public class ObjectStore implements RawStore, Configurable { } return retVal; } finally { - if (!committed) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(committed, query); } } @@ -6838,12 +6591,7 @@ public class ObjectStore implements RawStore, Configurable { } return retVal; } finally { - if (!committed) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(committed, query); } } @@ -6877,12 +6625,7 @@ public class ObjectStore implements RawStore, Configurable { } return retVal; } finally { - if (!committed) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(committed, query); } } @@ -6973,12 +6716,7 @@ public class ObjectStore implements RawStore, Configurable { } return retVal; } finally { - if (!committed) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(committed, query); } } @@ -7055,12 +6793,7 @@ public class ObjectStore implements RawStore, Configurable { } return retVal; } finally { - if (!committed) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(committed, query); } } @@ -7271,7 +7004,6 @@ public class ObjectStore implements RawStore, Configurable { } boolean committed = false; - try { openTransaction(); @@ -7318,7 +7050,7 @@ public class ObjectStore implements RawStore, Configurable { for (String colName : colNames) { boolean foundCol = false; for (FieldSchema mCol : colList) { - if (mCol.getName().equals(colName.trim())) { + if (mCol.getName().equals(colName)) { foundCol = true; break; } @@ -7430,13 +7162,16 @@ public class ObjectStore implements RawStore, Configurable { @Override public AggrStats get_aggr_stats_for(String dbName, String tblName, final List<String> partNames, final List<String> colNames) throws MetaException, NoSuchObjectException { - final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(), HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION); + final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(), + HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION); + final double ndvTuner = HiveConf.getFloatVar(getConf(), + HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER); return new GetHelper<AggrStats>(dbName, tblName, true, false) { @Override protected AggrStats getSqlResult(GetHelper<AggrStats> ctx) throws MetaException { return directSql.aggrColStatsForPartitions(dbName, tblName, partNames, - colNames, useDensityFunctionForNDVEstimation); + colNames, useDensityFunctionForNDVEstimation, ndvTuner); } @Override protected AggrStats getJdoResult(GetHelper<AggrStats> ctx) @@ -7454,6 +7189,38 @@ public class ObjectStore implements RawStore, Configurable { } @Override + public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(), + HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION); + final double ndvTuner = HiveConf.getFloatVar(getConf(), + HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER); + return new GetHelper<Map<String, ColumnStatisticsObj>>(dbName, tableName, true, false) { + @Override + protected Map<String, ColumnStatisticsObj> getSqlResult( + GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException { + return directSql.getAggrColStatsForTablePartitions(dbName, tblName, + useDensityFunctionForNDVEstimation, ndvTuner); + } + + @Override + protected Map<String, ColumnStatisticsObj> getJdoResult( + GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException, + NoSuchObjectException { + // This is fast path for query optimizations, if we can find this info + // quickly using directSql, do it. No point in failing back to slow path + // here. + throw new MetaException("Jdo path is not implemented for stats aggr."); + } + + @Override + protected String describeResult() { + return null; + } + }.run(true); + } + + @Override public void flushCache() { // NOP as there's no caching } @@ -7466,7 +7233,12 @@ public class ObjectStore implements RawStore, Configurable { try { openTransaction(); // We are not going to verify SD for each partition. Just verify for the table. - validateTableCols(table, colNames); + // ToDo: we need verify the partition column instead + try { + validateTableCols(table, colNames); + } catch (MetaException me) { + LOG.warn("The table does not have the same column definition as its partition."); + } Query query = queryWrapper.query = pm.newQuery(MPartitionColumnStatistics.class); String paramStr = "java.lang.String t1, java.lang.String t2"; String filter = "tableName == t1 && dbName == t2 && ("; @@ -7593,12 +7365,7 @@ public class ObjectStore implements RawStore, Configurable { rollbackTransaction(); throw e; } finally { - if (!ret) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(ret, query); } return ret; } @@ -7668,12 +7435,7 @@ public class ObjectStore implements RawStore, Configurable { rollbackTransaction(); throw e; } finally { - if (!ret) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(ret, query); } return ret; } @@ -7695,12 +7457,7 @@ public class ObjectStore implements RawStore, Configurable { delCnt = query.deletePersistentAll(curTime, expiryTime); commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); LOG.debug("Done executing cleanupEvents"); } return delCnt; @@ -7804,12 +7561,7 @@ public class ObjectStore implements RawStore, Configurable { return tokenIdents; } finally { LOG.debug("Done executing getAllTokenIdentifers with status : " + committed); - if (!committed) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(committed, query); } } @@ -7852,12 +7604,7 @@ public class ObjectStore implements RawStore, Configurable { } committed = commitTransaction(); } finally { - if (!committed) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(committed, query); } LOG.debug("Done executing updateMasterKey with status : " + committed); if (null == masterKey) { @@ -7885,12 +7632,7 @@ public class ObjectStore implements RawStore, Configurable { } success = commitTransaction(); } finally { - if (!success) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(success, query); } LOG.debug("Done executing removeMasterKey with status : " + success); return (null != masterKey) && success; @@ -7916,12 +7658,7 @@ public class ObjectStore implements RawStore, Configurable { return masterKeys; } finally { LOG.debug("Done executing getMasterKeys with status : " + committed); - if (!committed) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(committed, query); } } @@ -8033,12 +7770,7 @@ public class ObjectStore implements RawStore, Configurable { } return mVerTables.get(0); } finally { - if (!committed) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(committed, query); } } @@ -8264,12 +7996,7 @@ public class ObjectStore implements RawStore, Configurable { pm.retrieve(mfunc); commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return mfunc; } @@ -8317,37 +8044,23 @@ public class ObjectStore implements RawStore, Configurable { dbName = HiveStringUtils.normalizeIdentifier(dbName); // Take the pattern and split it on the | to get all the composing // patterns - String[] subpatterns = pattern.trim().split("\\|"); - String queryStr = - "select functionName from org.apache.hadoop.hive.metastore.model.MFunction " - + "where database.name == dbName && ("; - boolean first = true; - for (String subpattern : subpatterns) { - subpattern = "(?i)" + subpattern.replaceAll("\\*", ".*"); - if (!first) { - queryStr = queryStr + " || "; - } - queryStr = queryStr + " functionName.matches(\"" + subpattern + "\")"; - first = false; + List<String> parameterVals = new ArrayList<>(); + StringBuilder filterBuilder = new StringBuilder(); + appendSimpleCondition(filterBuilder, "database.name", new String[] { dbName }, parameterVals); + if(pattern != null) { + appendPatternCondition(filterBuilder, "functionName", pattern, parameterVals); } - queryStr = queryStr + ")"; - query = pm.newQuery(queryStr); - query.declareParameters("java.lang.String dbName"); + query = pm.newQuery(MFunction.class, filterBuilder.toString()); query.setResult("functionName"); query.setOrdering("functionName ascending"); - Collection names = (Collection) query.execute(dbName); + Collection names = (Collection) query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()])); funcs = new ArrayList<String>(); for (Iterator i = names.iterator(); i.hasNext();) { funcs.add((String) i.next()); } commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return funcs; } @@ -8356,6 +8069,9 @@ public class ObjectStore implements RawStore, Configurable { public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) { boolean commited = false; Query query = null; + + NotificationEventResponse result = new NotificationEventResponse(); + result.setEvents(new ArrayList<NotificationEvent>()); try { openTransaction(); long lastEvent = rqst.getLastEvent(); @@ -8365,11 +8081,9 @@ public class ObjectStore implements RawStore, Configurable { Collection<MNotificationLog> events = (Collection) query.execute(lastEvent); commited = commitTransaction(); if (events == null) { - return null; + return result; } Iterator<MNotificationLog> i = events.iterator(); - NotificationEventResponse result = new NotificationEventResponse(); - result.setEvents(new ArrayList<NotificationEvent>()); int maxEvents = rqst.getMaxEvents() > 0 ? rqst.getMaxEvents() : Integer.MAX_VALUE; int numEvents = 0; while (i.hasNext() && numEvents++ < maxEvents) { @@ -8377,11 +8091,8 @@ public class ObjectStore implements RawStore, Configurable { } return result; } finally { - if (query != null) { - query.closeAll(); - } if (!commited) { - rollbackTransaction(); + rollbackAndCleanup(commited, query); return null; } } @@ -8411,12 +8122,7 @@ public class ObjectStore implements RawStore, Configurable { pm.makePersistent(translateThriftToDb(entry)); commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } } @@ -8436,12 +8142,7 @@ public class ObjectStore implements RawStore, Configurable { } commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } } @@ -8460,12 +8161,7 @@ public class ObjectStore implements RawStore, Configurable { commited = commitTransaction(); return new CurrentNotificationEventId(id); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } } @@ -8533,20 +8229,99 @@ public class ObjectStore implements RawStore, Configurable { */ public static void unCacheDataNucleusClassLoaders() { PersistenceManagerFactory pmf = ObjectStore.getPMF(); - if ((pmf != null) && (pmf instanceof JDOPersistenceManagerFactory)) { - JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf; - NucleusContext nc = jdoPmf.getNucleusContext(); - try { - Field classLoaderResolverMap = AbstractNucleusContext.class.getDeclaredField( - "classLoaderResolverMap"); - classLoaderResolverMap.setAccessible(true); - classLoaderResolverMap.set(nc, new HashMap<String, ClassLoaderResolver>()); - LOG.debug("Removed cached classloaders from DataNucleus NucleusContext"); - } catch (Exception e) { - LOG.warn("Failed to remove cached classloaders from DataNucleus NucleusContext ", e); + clearOutPmfClassLoaderCache(pmf); + } + + private static void clearOutPmfClassLoaderCache(PersistenceManagerFactory pmf) { + if ((pmf == null) || (!(pmf instanceof JDOPersistenceManagerFactory))) { + return; + } + // NOTE : This is hacky, and this section of code is fragile depending on DN code varnames + // so it's likely to stop working at some time in the future, especially if we upgrade DN + // versions, so we actively need to find a better way to make sure the leak doesn't happen + // instead of just clearing out the cache after every call. + JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf; + NucleusContext nc = jdoPmf.getNucleusContext(); + try { + Field pmCache = pmf.getClass().getDeclaredField("pmCache"); + pmCache.setAccessible(true); + Set<JDOPersistenceManager> pmSet = (Set<JDOPersistenceManager>)pmCache.get(pmf); + for (JDOPersistenceManager pm : pmSet) { + org.datanucleus.ExecutionContext ec = (org.datanucleus.ExecutionContext)pm.getExecutionContext(); + if (ec instanceof org.datanucleus.ExecutionContextThreadedImpl) { + ClassLoaderResolver clr = ((org.datanucleus.ExecutionContextThreadedImpl)ec).getClassLoaderResolver(); + clearClr(clr); + } + } + org.datanucleus.plugin.PluginManager pluginManager = jdoPmf.getNucleusContext().getPluginManager(); + Field registryField = pluginManager.getClass().getDeclaredField("registry"); + registryField.setAccessible(true); + org.datanucleus.plugin.PluginRegistry registry = (org.datanucleus.plugin.PluginRegistry)registryField.get(pluginManager); + if (registry instanceof org.datanucleus.plugin.NonManagedPluginRegistry) { + org.datanucleus.plugin.NonManagedPluginRegistry nRegistry = (org.datanucleus.plugin.NonManagedPluginRegistry)registry; + Field clrField = nRegistry.getClass().getDeclaredField("clr"); + clrField.setAccessible(true); + ClassLoaderResolver clr = (ClassLoaderResolver)clrField.get(nRegistry); + clearClr(clr); + } + if (nc instanceof org.datanucleus.PersistenceNucleusContextImpl) { + org.datanucleus.PersistenceNucleusContextImpl pnc = (org.datanucleus.PersistenceNucleusContextImpl)nc; + org.datanucleus.store.types.TypeManagerImpl tm = (org.datanucleus.store.types.TypeManagerImpl)pnc.getTypeManager(); + Field clrField = tm.getClass().getDeclaredField("clr"); + clrField.setAccessible(true); + ClassLoaderResolver clr = (ClassLoaderResolver)clrField.get(tm); + clearClr(clr); + Field storeMgrField = pnc.getClass().getDeclaredField("storeMgr"); + storeMgrField.setAccessible(true); + org.datanucleus.store.rdbms.RDBMSStoreManager storeMgr = (org.datanucleus.store.rdbms.RDBMSStoreManager)storeMgrField.get(pnc); + Field backingStoreField = storeMgr.getClass().getDeclaredField("backingStoreByMemberName"); + backingStoreField.setAccessible(true); + Map<String, Store> backingStoreByMemberName = (Map<String, Store>)backingStoreField.get(storeMgr); + for (Store store : backingStoreByMemberName.values()) { + org.datanucleus.store.rdbms.scostore.BaseContainerStore baseStore = (org.datanucleus.store.rdbms.scostore.BaseContainerStore)store; + clrField = org.datanucleus.store.rdbms.scostore.BaseContainerStore.class.getDeclaredField("clr"); + clrField.setAccessible(true); + clr = (ClassLoaderResolver)clrField.get(baseStore); + clearClr(clr); + } + } + Field classLoaderResolverMap = AbstractNucleusContext.class.getDeclaredField( + "classLoaderResolverMap"); + classLoaderResolverMap.setAccessible(true); + Map<String,ClassLoaderResolver> loaderMap = + (Map<String, ClassLoaderResolver>) classLoaderResolverMap.get(nc); + for (ClassLoaderResolver clr : loaderMap.values()){ + clearClr(clr); + } + classLoaderResolverMap.set(nc, new HashMap<String, ClassLoaderResolver>()); + LOG.debug("Removed cached classloaders from DataNucleus NucleusContext"); + } catch (Exception e) { + LOG.warn("Failed to remove cached classloaders from DataNucleus NucleusContext ", e); + } + } + + private static void clearClr(ClassLoaderResolver clr) throws Exception { + if (clr != null){ + if (clr instanceof ClassLoaderResolverImpl){ + ClassLoaderResolverImpl clri = (ClassLoaderResolverImpl) clr; + long resourcesCleared = clearFieldMap(clri,"resources"); + long loadedClassesCleared = clearFieldMap(clri,"loadedClasses"); + long unloadedClassesCleared = clearFieldMap(clri, "unloadedClasses"); + LOG.debug("Cleared ClassLoaderResolverImpl: " + + resourcesCleared + "," + loadedClassesCleared + "," + unloadedClassesCleared); } } } + private static long clearFieldMap(ClassLoaderResolverImpl clri, String mapFieldName) throws Exception { + Field mapField = ClassLoaderResolverImpl.class.getDeclaredField(mapFieldName); + mapField.setAccessible(true); + + Map<String,Class> map = (Map<String, Class>) mapField.get(clri); + long sz = map.size(); + mapField.set(clri, Collections.synchronizedMap(new WeakValueMap())); + return sz; + } + @Override public List<SQLPrimaryKey> getPrimaryKeys(String db_name, String tbl_name) throws MetaException { @@ -8557,10 +8332,12 @@ public class ObjectStore implements RawStore, Configurable { } } - protected List<SQLPrimaryKey> getPrimaryKeysInternal(final String db_name, - final String tbl_name, + protected List<SQLPrimaryKey> getPrimaryKeysInternal(final String db_name_input, + final String tbl_name_input, boolean allowSql, boolean allowJdo) throws MetaException, NoSuchObjectException { + final String db_name = HiveStringUtils.normalizeIdentifier(db_name_input); + final String tbl_name = HiveStringUtils.normalizeIdentifier(tbl_name_input); return new GetListHelper<SQLPrimaryKey>(db_name, tbl_name, allowSql, allowJdo) { @Override @@ -8603,12 +8380,7 @@ public class ObjectStore implements RawStore, Configurable { } commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return primaryKeys; } @@ -8633,12 +8405,7 @@ public class ObjectStore implements RawStore, Configurable { } commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return ret; } @@ -8654,9 +8421,13 @@ public class ObjectStore implements RawStore, Configurable { } } - protected List<SQLForeignKey> getForeignKeysInternal(final String parent_db_name, - final String parent_tbl_name, final String foreign_db_name, final String foreign_tbl_name, - boolean allowSql, boolean allowJdo) throws MetaException, NoSuchObjectException { + protected List<SQLForeignKey> getForeignKeysInternal(final String parent_db_name_input, + final String parent_tbl_name_input, final String foreign_db_name_input, + final String foreign_tbl_name_input, boolean allowSql, boolean allowJdo) throws MetaException, NoSuchObjectException { + final String parent_db_name = parent_db_name_input; + final String parent_tbl_name = parent_tbl_name_input; + final String foreign_db_name = foreign_db_name_input; + final String foreign_tbl_name = foreign_tbl_name_input; return new GetListHelper<SQLForeignKey>(foreign_db_name, foreign_tbl_name, allowSql, allowJdo) { @Override @@ -8757,12 +8528,7 @@ public class ObjectStore implements RawStore, Configurable { } commited = commitTransaction(); } finally { - if (!commited) { - rollbackTransaction(); - } - if (query != null) { - query.closeAll(); - } + rollbackAndCleanup(commited, query); } return foreignKeys; } @@ -8790,6 +8556,46 @@ public class ObjectStore implements RawStore, Configurable { } } + /** + * This is a cleanup method which is used to rollback a active transaction + * if the success flag is false and close the associated Query object. This method is used + * internally and visible for testing purposes only + * @param success Rollback the current active transaction if false + * @param query Query object which needs to be closed + */ + @VisibleForTesting + void rollbackAndCleanup(boolean success, Query query) { + try { + if (!success) { + rollbackTransaction(); + } + } finally { + if (query != null) { + query.closeAll(); + } + } + } + + /** + * This is a cleanup method which is used to rollback a active transaction + * if the success flag is false and close the associated QueryWrapper object. This method is used + * internally and visible for testing purposes only + * @param success Rollback the current active transaction if false + * @param queryWrapper QueryWrapper object which needs to be closed + */ + @VisibleForTesting + void rollbackAndCleanup(boolean success, QueryWrapper queryWrapper) { + try { + if (!success) { + rollbackTransaction(); + } + } finally { + if (queryWrapper != null) { + queryWrapper.close(); + } + } + } + @Override public void createTableWrite(Table tbl, long writeId, char state, long heartbeat) { boolean success = false; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index 63b696d..ded978c 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; @@ -589,6 +590,17 @@ public interface RawStore extends Configurable { List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException; /** + * Get all partition column statistics for a table + * @param dbName + * @param tableName + * @return Map of partition column statistics + * @throws MetaException + * @throws NoSuchObjectException + */ + public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException; + + /** * Get the next notification event. * @param rqst Request containing information on the last processed notification. * @return list of notifications, sorted by eventId