LENS-1464: One or two queued queries failing with ConcurrentModificationException on restart
Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/1d667975 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/1d667975 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/1d667975 Branch: refs/heads/current-release-line Commit: 1d6679754240f2980e9bffd278a3ee04ac98db02 Parents: 0f58445 Author: Rajat Khandelwal <pro...@apache.org> Authored: Wed Aug 30 15:59:23 2017 +0530 Committer: rajub <raju.bairishe...@lazada.com> Committed: Thu Oct 5 11:12:56 2017 +0800 ---------------------------------------------------------------------- lens-server/pom.xml | 4 +-- .../server/query/QueryExecutionServiceImpl.java | 32 +++++++++----------- 2 files changed, 16 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/1d667975/lens-server/pom.xml ---------------------------------------------------------------------- diff --git a/lens-server/pom.xml b/lens-server/pom.xml index 2065f04..f56f7bc 100644 --- a/lens-server/pom.xml +++ b/lens-server/pom.xml @@ -402,8 +402,8 @@ </execution> </executions> <configuration> - <source>1.7</source> - <target>1.7</target> + <source>1.8</source> + <target>1.8</target> <configFile>enunciate.xml</configFile> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/lens/blob/1d667975/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index e70d290..4d141e5 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -3476,27 +3476,23 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE // Add resources if either they haven't been marked as added on the session, or if Hive driver says they need // to be added to the corresponding hive driver - if (!hiveDriver.areDBResourcesAddedForSession(sessionIdentifier, ctx.getDatabase())) { - Collection<ResourceEntry> dbResources = session.getDBResources(ctx.getDatabase()); - - if (CollectionUtils.isNotEmpty(dbResources)) { - log.info("Proceeding to add resources for DB {} for query {} resources: {}", session.getCurrentDatabase(), - ctx.getLogHandle(), dbResources); - - List<ResourceEntry> failedDBResources = addResources(dbResources, sessionHandle, hiveDriver); - Iterator<ResourceEntry> itr = dbResources.iterator(); - while (itr.hasNext()) { - ResourceEntry res = itr.next(); - if (!failedDBResources.contains(res)) { - itr.remove(); - } + synchronized (session) { + if (!hiveDriver.areDBResourcesAddedForSession(sessionIdentifier, ctx.getDatabase())) { + Collection<ResourceEntry> dbResources = session.getDBResources(ctx.getDatabase()); + + if (CollectionUtils.isNotEmpty(dbResources)) { + log.info("Proceeding to add resources for DB {} for query {} resources: {}", session.getCurrentDatabase(), + ctx.getLogHandle(), dbResources); + + List<ResourceEntry> failedDBResources = addResources(dbResources, sessionHandle, hiveDriver); + dbResources.removeIf(res -> !failedDBResources.contains(res)); + } else { + log.info("No need to add DB resources for session: {} db= {}", sessionIdentifier, + session.getCurrentDatabase()); } - } else { - log.info("No need to add DB resources for session: {} db= {}", sessionIdentifier, session.getCurrentDatabase()); + hiveDriver.setResourcesAddedForSession(sessionIdentifier, ctx.getDatabase()); } - hiveDriver.setResourcesAddedForSession(sessionIdentifier, ctx.getDatabase()); } - // Get pending session resources which needed to be added for this database Collection<ResourceEntry> pendingResources = session.getPendingSessionResourcesForDatabase(ctx.getDatabase());