Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r199975945 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -89,32 +188,103 @@ public Boolean run() throws IOException, ServletException { }; // create proxy user UGI from the UGI of the logged in user and execute the servlet chain as that user - UserGroupInformation proxyUGI = null; + TimedProxyUGI timedProxyUGI = getTimedProxyUGI(user, session); try { - LOG.debug("Creating proxy user = " + user); - proxyUGI = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); - proxyUGI.doAs(action); + timedProxyUGI.proxyUGI.doAs(action); } catch (UndeclaredThrowableException ute) { // unwrap the real exception thrown by the action throw new ServletException(ute.getCause()); } catch (InterruptedException ie) { throw new ServletException(ie); - } finally { - try { - if (proxyUGI != null) { - LOG.debug("Closing FileSystem for proxy user = " + proxyUGI.getUserName()); - FileSystem.closeAllForUGI(proxyUGI); - } - } catch (Throwable t) { - LOG.warn("Error closing FileSystem for proxy user = " + proxyUGI.getUserName()); - } + } + finally { + release(timedProxyUGI, fragmentIndex, fragmentCount); } } else { // no user impersonation is configured chain.doFilter(request, response); } } + private TimedProxyUGI getTimedProxyUGI(String user, SegmentTransactionId session) throws IOException { + synchronized (session.segmentTransactionId.intern()) { + TimedProxyUGI timedProxyUGI = cache.get(session); + if (timedProxyUGI == null || timedProxyUGI.getDelayMillis() < 0) { + cleanup(); + LOG.info(session.toString() + " Creating proxy user = " + user); + UserGroupInformation proxyUGI = + UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); + timedProxyUGI = new TimedProxyUGI(proxyUGI, session); + delayQueue.offer(timedProxyUGI); + cache.put(session, timedProxyUGI); + } else { + timedProxyUGI.incrementCounter(); + } + return timedProxyUGI; + } + } + + private void release(TimedProxyUGI timedProxyUGI, Integer fragmentIndex, Integer fragmentCount) { + synchronized (timedProxyUGI.session.segmentTransactionId.intern()) { + timedProxyUGI.resetTime(); + timedProxyUGI.decrementCounter(); + if (fragmentIndex != null && fragmentCount.equals(fragmentIndex)) + closeUGI(timedProxyUGI); + } + } + + private void cleanup() { + TimedProxyUGI timedProxyUGI = delayQueue.poll(); + while (timedProxyUGI != null) { + closeUGI(timedProxyUGI); + LOG.info(timedProxyUGI.session.toString() + " Delay Queue Size = " + delayQueue.size()); + timedProxyUGI = delayQueue.poll(); + } + } + + private void closeUGI(TimedProxyUGI timedProxyUGI) { + synchronized (timedProxyUGI.session.segmentTransactionId.intern()) { --- End diff -- this can deadlock -- if 2 cleanup attempts happen at the same time for session A and session B -- session A comes first and gets a lock on, say, "seg1:tx1" in release() method and then comes here and tries to get a lock on session B "seg2:tx1", while session B would get that loc already and will, in turn try to get lock on session A. This might happen when new requests for both sessions arrive simultaneously after a delay when their previous UGIs have expired. Granted the probability of this is low, but non-zero, meaning this is dangerous and cannot be ignored. In other words, to prevent deadlocks, do not acquire new locks while holding other locks if another thread can do the same in reverse order, which is exactly the case here.
---