Github user benchristel commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1379#discussion_r200797045
  
    --- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
    @@ -0,0 +1,129 @@
    +package org.apache.hawq.pxf.service;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import java.io.IOException;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.DelayQueue;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.security.UserGroupInformation;
    +
    +public class UGICache {
    +
    +    private static final Log LOG = LogFactory.getLog(UGICache.class);
    +    private static Map<SegmentTransactionId, TimedProxyUGI> cache = new 
ConcurrentHashMap<>();
    +    //private static DelayQueue<TimedProxyUGI> delayQueue = new 
DelayQueue<>();
    +    private static DelayQueue<TimedProxyUGI>[] delayQueues = new 
DelayQueue<>[64];
    +    public static long UGI_CACHE_EXPIRY = 15 * 1 * 1000L; // 15 Minutes
    +
    +    public UGICache() {
    +        for (int i = 0; i < delayQueues.length; i++) {
    +            delayQueues[i] = new DelayQueue<>();
    +        }
    +    }
    +
    +    public TimedProxyUGI getTimedProxyUGI(String user, 
SegmentTransactionId session) throws IOException {
    +
    +        Integer segmentId = session.getSegmentId();
    +        synchronized (delayQueues[segmentId]) {
    +            // use the opportunity to cleanup any expired entries
    +            cleanup(segmentId);
    +            TimedProxyUGI timedProxyUGI = cache.get(session);
    +            if (timedProxyUGI == null) {
    +                LOG.info(session.toString() + " Creating proxy user = " + 
user);
    +                UserGroupInformation proxyUGI =
    +                        UserGroupInformation.createProxyUser(user, 
UserGroupInformation.getLoginUser());
    +                timedProxyUGI = new TimedProxyUGI(proxyUGI, session);
    +                delayQueues[segmentId].offer(timedProxyUGI);
    +                cache.put(session, timedProxyUGI);
    +            }
    +            timedProxyUGI.incrementCounter();
    +            return timedProxyUGI;
    +        }
    +    }
    +
    +    private cleanup(Integer segmentId) {
    +        // poll segment expiration queue for all expired entries and clean 
them if possible   
    +        TimedProxyUGI ugi = null;
    +        while ((ugi = delayQueues[segmentId].poll()) != null) {
    +            // place it back in the queue if still in use and was not 
closed
    +            if (!closeUGI(ugi)) {
    +                delayQueues[segmentId].offer(ugi);
    +            }
    +            LOG.info("Delay Queue Size for segment " +
    +                    segmentId + " = " + delayQueues[segmentId].size());
    +        }
    +    }
    +    
    +    public void release(TimedProxyUGI timedProxyUGI, boolean forceClean) {
    +
    +        Integer segmentId = timedProxyUGI.getSession().getSegmentId();
    +        
    +        timedProxyUGI.resetTime();
    +        timedProxyUGI.decrementCounter();
    +        
    +        if (forceClean) {
    +            synchronized (delayQueues[segmentId]) {
    +                timedProxyUGI.setExpired();
    +                closeUGI(timedProxyUGI);
    +            }
    +        }
    +    }
    +
    +    private static boolean closeUGI(TimedProxyUGI expiredProxyUGI) {
    +
    +        SegmentTransactionId session = expiredProxyUGI.getSession();
    +        Integer segmentId = session.getSegmentId();
    +        //synchronized (delayQueues[segmentId]) {
    +            String fsMsg = "FileSystem for proxy user = " + 
expiredProxyUGI.getProxyUGI().getUserName();
    +            try {
    +                // The UGI object is still being used by another thread
    +                if (expiredProxyUGI.inProgress.get() != 0) {
    +                    LOG.info(session.toString() + " Skipping close of " + 
fsMsg);
    +                    expiredProxyUGI.resetTime(); // update time so that it 
doesn't expire until release updates time again
    --- End diff --
    
    Is this really the logic we want? With the current implementation, I think 
we're checking each UGI every 15 minutes (approximately) and if it happens to 
be not-in-use, we're destroying it. This means that sometimes, a UGI will live 
for 15+ minutes after `inProgress` goes to 0, and sometimes it will be 
destroyed immediately—e.g. if another query happens to cause `closeUGI` to be 
called right after `inProgress` goes to 0. This effectively means the cache 
does not have a consistent TTL.
    
    I think it's simpler (and easier to document and debug etc.) if we start 
the timer for each UGI at the moment its reference count becomes 0, and if it's 
still 0 when the timer goes off, destroy the UGI. When we increment the 
reference count, we stop the timer. This ensures that UGIs will always be 
cached for at least 15 minutes after their last use.


---

Reply via email to