Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r201517046 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,143 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; + +public class UGICache { + + private static final Log LOG = LogFactory.getLog(UGICache.class); + private Map<SessionId, UGICacheEntry> cache = new ConcurrentHashMap<>(); + @SuppressWarnings("unchecked") + // There is a separate DelayQueue for each segment (also being used for locking) + private DelayQueue<UGICacheEntry>[] delayQueues = (DelayQueue<UGICacheEntry>[])new DelayQueue[64]; + private final UGIProvider ugiProvider; + + public UGICache(UGIProvider provider) { + this.ugiProvider = provider; + for (int i = 0; i < delayQueues.length; i++) { + delayQueues[i] = new DelayQueue<>(); + } + } + + public UGICache() { + this(new UGIProvider()); + } + + // Create new proxy UGI if not found in cache and increment reference count + public UGICacheEntry getTimedProxyUGI(SessionId session) + throws IOException { + + Integer segmentId = session.getSegmentId(); + String user = session.getUser(); + synchronized (delayQueues[segmentId]) { + // Use the opportunity to cleanup any expired entries + cleanup(segmentId); + UGICacheEntry timedProxyUGI = cache.get(session); + if (timedProxyUGI == null) { + LOG.info(session.toString() + " Creating proxy user = " + user); + UserGroupInformation proxyUGI = ugiProvider.createProxyUGI(user); + timedProxyUGI = new UGICacheEntry(proxyUGI, session); + delayQueues[segmentId].offer(timedProxyUGI); + cache.put(session, timedProxyUGI); + } + timedProxyUGI.incrementCounter(); + return timedProxyUGI; + } + } + + // Poll segment expiration queue for all expired entries + // and clean them if possible + private void cleanup(Integer segmentId) { --- End diff -- proper javadoc, please
---