This is an automated email from the ASF dual-hosted git repository. kgyrtkirk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
commit 2a38a4317505058ee21812741036a57e5fe435c8 Author: Zhihua Deng <dengzhhu...@gmail.com> AuthorDate: Mon Jun 15 08:09:24 2020 +0000 HIVE-23633: Metastore some JDO query objects do not close properly (Zhihua Deng via Zoltan Haindrich) Signed-off-by: Zoltan Haindrich <k...@rxd.hu> Closes apache/hive#1071 --- .../hadoop/hive/metastore/MetaStoreDirectSql.java | 30 ++++++++++++----- .../apache/hadoop/hive/metastore/ObjectStore.java | 38 ++++++++++++++-------- 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index a0021f6..2f9150d 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -286,10 +286,9 @@ class MetaStoreDirectSql { initQueries.add(pm.newQuery(MCreationMetadata.class, "dbName == ''")); initQueries.add(pm.newQuery(MPartitionPrivilege.class, "principalName == ''")); initQueries.add(pm.newQuery(MPartitionColumnPrivilege.class, "principalName == ''")); - Query q; - while ((q = initQueries.peekFirst()) != null) { + + for (Query q : initQueries) { q.execute(); - initQueries.pollFirst(); } return true; @@ -472,8 +471,11 @@ class MetaStoreDirectSql { } Query<?> queryParams = pm.newQuery("javax.jdo.query.SQL", queryText); - return executeWithArray( + List<String> tableNames = executeWithArray( queryParams, pms.toArray(), queryText, limit); + List<String> results = new ArrayList<String>(tableNames); + queryParams.closeAll(); + return results; } /** @@ -493,8 +495,11 @@ class MetaStoreDirectSql { pms.add(TableType.MATERIALIZED_VIEW.toString()); Query<?> queryParams = pm.newQuery("javax.jdo.query.SQL", queryText); - return executeWithArray( + List<String> mvs = executeWithArray( queryParams, pms.toArray(), queryText); + List<String> results = new ArrayList<String>(mvs); + queryParams.closeAll(); + return results; } /** @@ -1129,6 +1134,7 @@ class MetaStoreDirectSql { int sqlResult = MetastoreDirectSqlUtils.extractSqlInt(query.executeWithArray(params)); long queryTime = doTrace ? System.nanoTime() : 0; MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, queryTime); + query.closeAll(); return sqlResult; } @@ -2225,7 +2231,7 @@ class MetaStoreDirectSql { } Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText); - List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray( + List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray( queryParams, pms.toArray(), queryText)); if (!sqlResult.isEmpty()) { @@ -2254,6 +2260,7 @@ class MetaStoreDirectSql { ret.add(currKey); } } + queryParams.closeAll(); return ret; } @@ -2292,7 +2299,7 @@ class MetaStoreDirectSql { } Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText); - List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray( + List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray( queryParams, pms.toArray(), queryText)); if (!sqlResult.isEmpty()) { @@ -2313,6 +2320,7 @@ class MetaStoreDirectSql { ret.add(currKey); } } + queryParams.closeAll(); return ret; } @@ -2350,7 +2358,7 @@ class MetaStoreDirectSql { } Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText); - List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray( + List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray( queryParams, pms.toArray(), queryText)); if (!sqlResult.isEmpty()) { @@ -2370,6 +2378,7 @@ class MetaStoreDirectSql { rely)); } } + queryParams.closeAll(); return ret; } @@ -2407,7 +2416,7 @@ class MetaStoreDirectSql { } Query queryParams = pm.newQuery("javax.jdo.query.SQL", queryText); - List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray( + List<Object[]> sqlResult = MetastoreDirectSqlUtils.ensureList(executeWithArray( queryParams, pms.toArray(), queryText)); if (!sqlResult.isEmpty()) { @@ -2427,6 +2436,7 @@ class MetaStoreDirectSql { rely)); } } + queryParams.closeAll(); return ret; } @@ -2490,6 +2500,7 @@ class MetaStoreDirectSql { ret.add(currConstraint); } } + queryParams.closeAll(); return ret; } @@ -2553,6 +2564,7 @@ class MetaStoreDirectSql { ret.add(currConstraint); } } + queryParams.closeAll(); return ret; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 7ca2a4a..5af7169 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -4853,6 +4853,7 @@ public class ObjectStore implements RawStore, Configurable { } } pm.makePersistentAll(mConstraintsList); + query.closeAll(); } // Finally replace CD oldSd.setCD(newSd.getCD()); @@ -7375,6 +7376,7 @@ public class ObjectStore implements RawStore, Configurable { "partition.table.tableName", "partition.table.database.name", "partition.partitionName", "partition.table.database.catalogName"); queryWithParams.getLeft().deletePersistentAll(queryWithParams.getRight()); + queryWithParams.getLeft().closeAll(); } private List<MDBPrivilege> listDatabaseGrants(String catName, String dbName, String authorizer) throws Exception { @@ -7439,6 +7441,7 @@ public class ObjectStore implements RawStore, Configurable { "partition.table.database.name", "partition.partitionName", "partition.table.database.catalogName"); queryWithParams.getLeft().deletePersistentAll(queryWithParams.getRight()); + queryWithParams.getLeft().closeAll(); } private <T> List<T> queryByPartitionNames(String catName, String dbName, String tableName, @@ -7446,7 +7449,10 @@ public class ObjectStore implements RawStore, Configurable { String catCol) { Pair<Query, Object[]> queryAndParams = makeQueryByPartitionNames(catName, dbName, tableName, partNames, clazz, tbCol, dbCol, partCol, catCol); - return (List<T>)queryAndParams.getLeft().executeWithArray(queryAndParams.getRight()); + List<T> results = new ArrayList<T>( + (List)queryAndParams.getLeft().executeWithArray(queryAndParams.getRight())); + queryAndParams.getLeft().closeAll(); + return results; } private Pair<Query, Object[]> makeQueryByPartitionNames( @@ -9473,6 +9479,7 @@ public class ObjectStore implements RawStore, Configurable { catName, dbName, tableName, partNames, MPartitionColumnStatistics.class, "tableName", "dbName", "partition.partitionName", "catName"); queryWithParams.getLeft().deletePersistentAll(queryWithParams.getRight()); + queryWithParams.getLeft().closeAll(); } @Override @@ -12849,19 +12856,18 @@ public class ObjectStore implements RawStore, Configurable { return 0; } boolean committed = false; + Query q = null; try { openTransaction(); int maxCreateTime = (int) (System.currentTimeMillis() / 1000) - maxRetainSecs; - Query q = pm.newQuery(MRuntimeStat.class); + q = pm.newQuery(MRuntimeStat.class); q.setFilter("createTime <= maxCreateTime"); q.declareParameters("int maxCreateTime"); long deleted = q.deletePersistentAll(maxCreateTime); committed = commitTransaction(); return (int) deleted; } finally { - if (!committed) { - rollbackTransaction(); - } + rollbackAndCleanup(committed, q); } } @@ -12900,6 +12906,7 @@ public class ObjectStore implements RawStore, Configurable { break; } } + query.closeAll(); return ret; } @@ -12991,9 +12998,10 @@ public class ObjectStore implements RawStore, Configurable { String namespace = request.getClusterNamespace(); boolean commited = false; ScheduledQueryPollResponse ret = new ScheduledQueryPollResponse(); + Query q = null; try { openTransaction(); - Query q = pm.newQuery(MScheduledQuery.class, + q = pm.newQuery(MScheduledQuery.class, "nextExecution <= now && enabled && clusterNamespace == ns && activeExecution == null"); q.setSerializeRead(true); q.declareParameters("java.lang.Integer now, java.lang.String ns"); @@ -13026,6 +13034,9 @@ public class ObjectStore implements RawStore, Configurable { LOG.debug("Caught jdo exception; exclusive", e); commited = false; } finally { + if (q != null) { + q.closeAll(); + } if (commited) { return ret; } else { @@ -13340,19 +13351,18 @@ public class ObjectStore implements RawStore, Configurable { return 0; } boolean committed = false; + Query q = null; try { openTransaction(); int maxCreateTime = (int) (System.currentTimeMillis() / 1000) - maxRetainSecs; - Query q = pm.newQuery(MScheduledExecution.class); + q = pm.newQuery(MScheduledExecution.class); q.setFilter("startTime <= maxCreateTime"); q.declareParameters("int maxCreateTime"); long deleted = q.deletePersistentAll(maxCreateTime); committed = commitTransaction(); return (int) deleted; } finally { - if (!committed) { - rollbackTransaction(); - } + rollbackAndCleanup(committed, q); } } @@ -13363,10 +13373,11 @@ public class ObjectStore implements RawStore, Configurable { return 0; } boolean committed = false; + Query q = null; try { openTransaction(); int maxLastUpdateTime = (int) (System.currentTimeMillis() / 1000) - timeoutSecs; - Query q = pm.newQuery(MScheduledExecution.class); + q = pm.newQuery(MScheduledExecution.class); q.setFilter("lastUpdateTime <= maxLastUpdateTime && (state == 'INITED' || state == 'EXECUTING')"); q.declareParameters("int maxLastUpdateTime"); @@ -13386,9 +13397,7 @@ public class ObjectStore implements RawStore, Configurable { committed = commitTransaction(); return results.size(); } finally { - if (!committed) { - rollbackTransaction(); - } + rollbackAndCleanup(committed, q); } } @@ -13407,5 +13416,6 @@ public class ObjectStore implements RawStore, Configurable { pm.makePersistent(e); } } + q.closeAll(); } }