[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-26 Thread lavjain
Github user lavjain closed the pull request at:

https://github.com/apache/incubator-hawq/pull/1379


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-16 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202824082
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
@@ -19,94 +19,114 @@
  * under the License.
  */
 
-
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
 
-import javax.servlet.*;
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hawq.pxf.service.SessionId;
+import org.apache.hawq.pxf.service.UGICache;
 import org.apache.hawq.pxf.service.utilities.SecureLogin;
 
-
 /**
  * Listener on lifecycle events of our webapp
  */
 public class SecurityServletFilter implements Filter {
 
 private static final Log LOG = 
LogFactory.getLog(SecurityServletFilter.class);
 private static final String USER_HEADER = "X-GP-USER";
-private static final String MISSING_HEADER_ERROR = 
String.format("Header %s is missing in the request", USER_HEADER);
-private static final String EMPTY_HEADER_ERROR = String.format("Header 
%s is empty in the request", USER_HEADER);
+private static final String SEGMENT_ID_HEADER = "X-GP-SEGMENT-ID";
+private static final String TRANSACTION_ID_HEADER = "X-GP-XID";
+private static final String FRAGMENT_INDEX_HEADER = 
"X-GP-FRAGMENT-INDEX";
+private static final String FRAGMENT_COUNT_HEADER = 
"X-GP-FRAGMENT-COUNT";
+private static final String MISSING_HEADER_ERROR = "Header %s is 
missing in the request";
+private static final String EMPTY_HEADER_ERROR = "Header %s is empty 
in the request";
+private static UGICache proxyUGICache;
--- End diff --

@denalex @lavjain What is the lifetime of the `SecurityServletFilter` 
instance? As I was thinking about making this non-static, I realized that if 
there's more than one instance per JVM, we might get redundant caches (and 
possible UGI resource leaks, if a `SecurityServletFilter` gets 
garbage-collected before all the UGIs in its cache are cleaned up).


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-13 Thread frankgh
Github user frankgh commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202409510
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+private Ticker ticker;
+private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 
Minutes
+
+/**
+ * Create a UGICache with the given {@link UGIProvider}. Intended for 
use by tests which need
+ * to substitute a mock UGIProvider.
+ */
+UGICache(UGIProvider provider, Ticker ticker) {
+this.ticker = ticker;
+this.ugiProvider = provider;
+}
+
+/**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that 
this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+public UGICache() {
+this(new UGIProvider(), Ticker.systemTicker());
+}
+
+/**
+ * Create new proxy UGI if not found in cache and increment reference 
count
+ */
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getExpirationQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(session);
+Entry entry = cache.get(session);
+if (entry == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
+delayQueue.offer(entry);
+cache.put(session, entry);
+}
+entry.acquireReference();
+return entry.getUGI();
+}
+}
+
+/**
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
+ *
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
+ */
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
+
+Entry timedProxyUGI = cache.get(session);
+
+if (timedProxyUGI == null) return;
+
+timedProxyUGI.resetTime();
+timedProxyUGI.releaseReference();
  

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202195089
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+private Ticker ticker;
+private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 
Minutes
+
+/**
+ * Create a UGICache with the given {@link UGIProvider}. Intended for 
use by tests which need
+ * to substitute a mock UGIProvider.
+ */
+UGICache(UGIProvider provider, Ticker ticker) {
+this.ticker = ticker;
+this.ugiProvider = provider;
+}
+
+/**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that 
this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+public UGICache() {
+this(new UGIProvider(), Ticker.systemTicker());
+}
+
+/**
+ * Create new proxy UGI if not found in cache and increment reference 
count
+ */
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getExpirationQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(session);
+Entry entry = cache.get(session);
+if (entry == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
+delayQueue.offer(entry);
+cache.put(session, entry);
+}
+entry.acquireReference();
+return entry.getUGI();
+}
+}
+
+/**
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
+ *
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
+ */
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
--- End diff --

hmm ... not sure that would be better. We have dilemma here -- cache should 
not know about caller and caller should not care about internal cache 
implementation, such as that clea

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202193131
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+private Ticker ticker;
+private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 
Minutes
+
+/**
+ * Create a UGICache with the given {@link UGIProvider}. Intended for 
use by tests which need
+ * to substitute a mock UGIProvider.
+ */
+UGICache(UGIProvider provider, Ticker ticker) {
+this.ticker = ticker;
+this.ugiProvider = provider;
+}
+
+/**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that 
this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+public UGICache() {
+this(new UGIProvider(), Ticker.systemTicker());
+}
+
+/**
+ * Create new proxy UGI if not found in cache and increment reference 
count
+ */
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getExpirationQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(session);
+Entry entry = cache.get(session);
+if (entry == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
+delayQueue.offer(entry);
+cache.put(session, entry);
+}
+entry.acquireReference();
+return entry.getUGI();
+}
+}
+
+/**
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
+ *
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
+ */
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
--- End diff --

I'd like the parameter names to allow someone reading this code to consider 
the cache in isolation from its callers, and from that perspective, 
`cleanImmediatelyIfNoRefs` or s

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202185416
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+private Ticker ticker;
+private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 
Minutes
+
+/**
+ * Create a UGICache with the given {@link UGIProvider}. Intended for 
use by tests which need
+ * to substitute a mock UGIProvider.
+ */
+UGICache(UGIProvider provider, Ticker ticker) {
+this.ticker = ticker;
+this.ugiProvider = provider;
+}
+
+/**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that 
this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+public UGICache() {
+this(new UGIProvider(), Ticker.systemTicker());
+}
+
+/**
+ * Create new proxy UGI if not found in cache and increment reference 
count
+ */
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getExpirationQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(session);
+Entry entry = cache.get(session);
+if (entry == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
+delayQueue.offer(entry);
+cache.put(session, entry);
+}
+entry.acquireReference();
+return entry.getUGI();
+}
+}
+
+/**
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
+ *
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
+ */
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
+
+Entry timedProxyUGI = cache.get(session);
--- End diff --

We are changing it to entry


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202134716
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
@@ -52,69 +62,90 @@
  */
 @Override
 public void init(FilterConfig filterConfig) throws ServletException {
+//TODO: initialize cache here
 }
 
 /**
  * If user impersonation is configured, examines the request for the 
presense of the expected security headers
  * and create a proxy user to execute further request chain. Responds 
with an HTTP error if the header is missing
  * or the chain processing throws an exception.
  *
- * @param request http request
+ * @param request  http request
  * @param response http response
- * @param chain filter chain
+ * @param chainfilter chain
  */
 @Override
-public void doFilter(final ServletRequest request, final 
ServletResponse response, final FilterChain chain) throws IOException, 
ServletException {
+public void doFilter(final ServletRequest request, final 
ServletResponse response, final FilterChain chain)
+throws IOException, ServletException {
 
 if (SecureLogin.isUserImpersonationEnabled()) {
 
 // retrieve user header and make sure header is present and is 
not empty
-final String user = ((HttpServletRequest) 
request).getHeader(USER_HEADER);
-if (user == null) {
-throw new IllegalArgumentException(MISSING_HEADER_ERROR);
-} else if (user.trim().isEmpty()) {
-throw new IllegalArgumentException(EMPTY_HEADER_ERROR);
+final String gpdbUser = getHeaderValue(request, USER_HEADER);
+String transactionId = getHeaderValue(request, 
TRANSACTION_ID_HEADER);
+Integer segmentId = getHeaderValueInt(request, 
SEGMENT_ID_HEADER, true);
+Integer fragmentCount = getHeaderValueInt(request, 
FRAGMENT_COUNT_HEADER, false);
+Integer fragmentIndex = getHeaderValueInt(request, 
FRAGMENT_INDEX_HEADER, false);
+
+SessionId session = new SessionId(segmentId, transactionId, 
gpdbUser);
+if (LOG.isDebugEnabled() && fragmentCount != null) {
+LOG.debug(session.toString() + " Fragment = " + 
fragmentIndex + " of " + fragmentCount);
 }
 
 // TODO refresh Kerberos token when security is enabled
 
-// prepare pivileged action to run on behalf of proxy user
+// prepare privileged action to run on behalf of proxy user
 PrivilegedExceptionAction action = new 
PrivilegedExceptionAction() {
 @Override
 public Boolean run() throws IOException, ServletException {
-LOG.debug("Performing request chain call for proxy 
user = " + user);
+LOG.debug("Performing request chain call for proxy 
user = " + gpdbUser);
 chain.doFilter(request, response);
 return true;
 }
 };
 
 // create proxy user UGI from the UGI of the logged in user 
and execute the servlet chain as that user
-UserGroupInformation proxyUGI = null;
+UserGroupInformation ugi = 
cache.getUserGroupInformation(session);
 try {
-LOG.debug("Creating proxy user = " + user);
-proxyUGI = UserGroupInformation.createProxyUser(user, 
UserGroupInformation.getLoginUser());
-proxyUGI.doAs(action);
+ugi.doAs(action);
 } catch (UndeclaredThrowableException ute) {
 // unwrap the real exception thrown by the action
 throw new ServletException(ute.getCause());
 } catch (InterruptedException ie) {
 throw new ServletException(ie);
 } finally {
-try {
-if (proxyUGI != null) {
-LOG.debug("Closing FileSystem for proxy user = " + 
proxyUGI.getUserName());
-FileSystem.closeAllForUGI(proxyUGI);
-}
-} catch (Throwable t) {
-LOG.warn("Error closing FileSystem for proxy user = " 
+ proxyUGI.getUserName());
-}
+// Optimization to cleanup the cache if it is the last 
fragment
+boolean forceClean = (fragmentIndex != null && 
fragmentIndex.equals(fragmentCount));
+cache.release(session, forceClean);
--- End diff --

here you're n

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202131530
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+private Ticker ticker;
+private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 
Minutes
+
+/**
+ * Create a UGICache with the given {@link UGIProvider}. Intended for 
use by tests which need
+ * to substitute a mock UGIProvider.
+ */
+UGICache(UGIProvider provider, Ticker ticker) {
+this.ticker = ticker;
+this.ugiProvider = provider;
+}
+
+/**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that 
this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+public UGICache() {
+this(new UGIProvider(), Ticker.systemTicker());
+}
+
+/**
+ * Create new proxy UGI if not found in cache and increment reference 
count
+ */
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getExpirationQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(session);
+Entry entry = cache.get(session);
+if (entry == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
+delayQueue.offer(entry);
+cache.put(session, entry);
+}
+entry.acquireReference();
+return entry.getUGI();
+}
+}
+
+/**
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
+ *
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
+ */
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
+
+Entry timedProxyUGI = cache.get(session);
+
+if (timedProxyUGI == null) return;
+
+timedProxyUGI.resetTime();
+timedProxyUGI.releaseReference();
  

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202132916
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+private Ticker ticker;
+private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 
Minutes
+
+/**
+ * Create a UGICache with the given {@link UGIProvider}. Intended for 
use by tests which need
+ * to substitute a mock UGIProvider.
+ */
+UGICache(UGIProvider provider, Ticker ticker) {
+this.ticker = ticker;
+this.ugiProvider = provider;
+}
+
+/**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that 
this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+public UGICache() {
+this(new UGIProvider(), Ticker.systemTicker());
+}
+
+/**
+ * Create new proxy UGI if not found in cache and increment reference 
count
+ */
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getExpirationQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(session);
+Entry entry = cache.get(session);
+if (entry == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
+delayQueue.offer(entry);
+cache.put(session, entry);
+}
+entry.acquireReference();
+return entry.getUGI();
+}
+}
+
+/**
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
+ *
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
+ */
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
+
+Entry timedProxyUGI = cache.get(session);
+
+if (timedProxyUGI == null) return;
+
+timedProxyUGI.resetTime();
+timedProxyUGI.releaseReference();
  

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202130797
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+private Ticker ticker;
+private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 
Minutes
+
+/**
+ * Create a UGICache with the given {@link UGIProvider}. Intended for 
use by tests which need
+ * to substitute a mock UGIProvider.
+ */
+UGICache(UGIProvider provider, Ticker ticker) {
+this.ticker = ticker;
+this.ugiProvider = provider;
+}
+
+/**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that 
this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+public UGICache() {
+this(new UGIProvider(), Ticker.systemTicker());
+}
+
+/**
+ * Create new proxy UGI if not found in cache and increment reference 
count
+ */
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getExpirationQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(session);
+Entry entry = cache.get(session);
+if (entry == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
+delayQueue.offer(entry);
+cache.put(session, entry);
+}
+entry.acquireReference();
+return entry.getUGI();
+}
+}
+
+/**
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
+ *
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
+ */
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
+
+Entry timedProxyUGI = cache.get(session);
+
+if (timedProxyUGI == null) return;
+
+timedProxyUGI.resetTime();
+timedProxyUGI.releaseReference();
  

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202127945
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+private Ticker ticker;
+private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 
Minutes
+
+/**
+ * Create a UGICache with the given {@link UGIProvider}. Intended for 
use by tests which need
+ * to substitute a mock UGIProvider.
+ */
+UGICache(UGIProvider provider, Ticker ticker) {
+this.ticker = ticker;
+this.ugiProvider = provider;
+}
+
+/**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that 
this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+public UGICache() {
+this(new UGIProvider(), Ticker.systemTicker());
+}
+
+/**
+ * Create new proxy UGI if not found in cache and increment reference 
count
+ */
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getExpirationQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(session);
+Entry entry = cache.get(session);
+if (entry == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
--- End diff --

dial this down to debug for production code ?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202132529
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+private Ticker ticker;
+private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 
Minutes
+
+/**
+ * Create a UGICache with the given {@link UGIProvider}. Intended for 
use by tests which need
+ * to substitute a mock UGIProvider.
+ */
+UGICache(UGIProvider provider, Ticker ticker) {
+this.ticker = ticker;
+this.ugiProvider = provider;
+}
+
+/**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that 
this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+public UGICache() {
+this(new UGIProvider(), Ticker.systemTicker());
+}
+
+/**
+ * Create new proxy UGI if not found in cache and increment reference 
count
+ */
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getExpirationQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(session);
+Entry entry = cache.get(session);
+if (entry == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
+delayQueue.offer(entry);
+cache.put(session, entry);
+}
+entry.acquireReference();
+return entry.getUGI();
+}
+}
+
+/**
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
+ *
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
+ */
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
+
+Entry timedProxyUGI = cache.get(session);
+
+if (timedProxyUGI == null) return;
+
+timedProxyUGI.resetTime();
+timedProxyUGI.releaseReference();
  

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202126694
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
--- End diff --

do you still need this ? that was needed for arrays, should be ok for maps ?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202126336
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java ---
@@ -65,12 +74,15 @@ public int hashCode() {
  * {@inheritDoc}
  */
 @Override
-public boolean equals(Object other) {
-if (!(other instanceof SessionId)) {
-return false;
-}
-SessionId that = (SessionId) other;
-return this.sessionId.equals(that.sessionId);
+public boolean equals(Object obj) {
+if (obj == null) return false;
+if (obj == this) return true;
+if (obj.getClass() != getClass()) return false;
+
+SessionId that = (SessionId) obj;
+return new EqualsBuilder()
+.append(sessionId, that.sessionId)
+.isEquals();
--- End diff --

yes, agree, my previous comment about EqualsBuilder was about the pattern 
of comparisons for the top of the method and the helper in case there are 
multiple fields. Here with only one member, direct comparison should suffice.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202128478
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+private Ticker ticker;
+private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 
Minutes
+
+/**
+ * Create a UGICache with the given {@link UGIProvider}. Intended for 
use by tests which need
+ * to substitute a mock UGIProvider.
+ */
+UGICache(UGIProvider provider, Ticker ticker) {
+this.ticker = ticker;
+this.ugiProvider = provider;
+}
+
+/**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that 
this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+public UGICache() {
+this(new UGIProvider(), Ticker.systemTicker());
+}
+
+/**
+ * Create new proxy UGI if not found in cache and increment reference 
count
+ */
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getExpirationQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(session);
+Entry entry = cache.get(session);
+if (entry == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
+delayQueue.offer(entry);
+cache.put(session, entry);
+}
+entry.acquireReference();
--- End diff --

not the best name, IMO increaseRefCount was more meaningful


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202129998
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+private Ticker ticker;
+private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 
Minutes
+
+/**
+ * Create a UGICache with the given {@link UGIProvider}. Intended for 
use by tests which need
+ * to substitute a mock UGIProvider.
+ */
+UGICache(UGIProvider provider, Ticker ticker) {
+this.ticker = ticker;
+this.ugiProvider = provider;
+}
+
+/**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that 
this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+public UGICache() {
+this(new UGIProvider(), Ticker.systemTicker());
+}
+
+/**
+ * Create new proxy UGI if not found in cache and increment reference 
count
+ */
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getExpirationQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(session);
+Entry entry = cache.get(session);
+if (entry == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
+delayQueue.offer(entry);
+cache.put(session, entry);
+}
+entry.acquireReference();
+return entry.getUGI();
+}
+}
+
+/**
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
+ *
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
+ */
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
+
+Entry timedProxyUGI = cache.get(session);
--- End diff --

rename variable to cacheEntry ?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202131246
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+private Ticker ticker;
+private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 
Minutes
+
+/**
+ * Create a UGICache with the given {@link UGIProvider}. Intended for 
use by tests which need
+ * to substitute a mock UGIProvider.
+ */
+UGICache(UGIProvider provider, Ticker ticker) {
+this.ticker = ticker;
+this.ugiProvider = provider;
+}
+
+/**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that 
this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+public UGICache() {
+this(new UGIProvider(), Ticker.systemTicker());
+}
+
+/**
+ * Create new proxy UGI if not found in cache and increment reference 
count
+ */
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getExpirationQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(session);
+Entry entry = cache.get(session);
+if (entry == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
+delayQueue.offer(entry);
+cache.put(session, entry);
+}
+entry.acquireReference();
+return entry.getUGI();
+}
+}
+
+/**
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
+ *
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
+ */
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
+
+Entry timedProxyUGI = cache.get(session);
+
+if (timedProxyUGI == null) return;
+
+timedProxyUGI.resetTime();
+timedProxyUGI.releaseReference();
  

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202133845
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
@@ -52,69 +62,90 @@
  */
 @Override
 public void init(FilterConfig filterConfig) throws ServletException {
+//TODO: initialize cache here
 }
 
 /**
  * If user impersonation is configured, examines the request for the 
presense of the expected security headers
  * and create a proxy user to execute further request chain. Responds 
with an HTTP error if the header is missing
  * or the chain processing throws an exception.
  *
- * @param request http request
+ * @param request  http request
  * @param response http response
- * @param chain filter chain
+ * @param chainfilter chain
  */
 @Override
-public void doFilter(final ServletRequest request, final 
ServletResponse response, final FilterChain chain) throws IOException, 
ServletException {
+public void doFilter(final ServletRequest request, final 
ServletResponse response, final FilterChain chain)
+throws IOException, ServletException {
 
 if (SecureLogin.isUserImpersonationEnabled()) {
 
 // retrieve user header and make sure header is present and is 
not empty
-final String user = ((HttpServletRequest) 
request).getHeader(USER_HEADER);
-if (user == null) {
-throw new IllegalArgumentException(MISSING_HEADER_ERROR);
-} else if (user.trim().isEmpty()) {
-throw new IllegalArgumentException(EMPTY_HEADER_ERROR);
+final String gpdbUser = getHeaderValue(request, USER_HEADER);
+String transactionId = getHeaderValue(request, 
TRANSACTION_ID_HEADER);
+Integer segmentId = getHeaderValueInt(request, 
SEGMENT_ID_HEADER, true);
+Integer fragmentCount = getHeaderValueInt(request, 
FRAGMENT_COUNT_HEADER, false);
+Integer fragmentIndex = getHeaderValueInt(request, 
FRAGMENT_INDEX_HEADER, false);
+
+SessionId session = new SessionId(segmentId, transactionId, 
gpdbUser);
+if (LOG.isDebugEnabled() && fragmentCount != null) {
+LOG.debug(session.toString() + " Fragment = " + 
fragmentIndex + " of " + fragmentCount);
 }
 
 // TODO refresh Kerberos token when security is enabled
 
-// prepare pivileged action to run on behalf of proxy user
+// prepare privileged action to run on behalf of proxy user
 PrivilegedExceptionAction action = new 
PrivilegedExceptionAction() {
 @Override
 public Boolean run() throws IOException, ServletException {
-LOG.debug("Performing request chain call for proxy 
user = " + user);
+LOG.debug("Performing request chain call for proxy 
user = " + gpdbUser);
 chain.doFilter(request, response);
 return true;
 }
 };
 
 // create proxy user UGI from the UGI of the logged in user 
and execute the servlet chain as that user
-UserGroupInformation proxyUGI = null;
+UserGroupInformation ugi = 
cache.getUserGroupInformation(session);
--- End diff --

I'd call this variable here proxyUGI as this is what we are after


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202133483
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
@@ -52,69 +62,90 @@
  */
 @Override
 public void init(FilterConfig filterConfig) throws ServletException {
+//TODO: initialize cache here
--- End diff --

please do that


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202129124
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+private Ticker ticker;
+private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 
Minutes
+
+/**
+ * Create a UGICache with the given {@link UGIProvider}. Intended for 
use by tests which need
+ * to substitute a mock UGIProvider.
+ */
+UGICache(UGIProvider provider, Ticker ticker) {
+this.ticker = ticker;
+this.ugiProvider = provider;
+}
+
+/**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that 
this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+public UGICache() {
+this(new UGIProvider(), Ticker.systemTicker());
+}
+
+/**
+ * Create new proxy UGI if not found in cache and increment reference 
count
+ */
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getExpirationQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(session);
+Entry entry = cache.get(session);
+if (entry == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
+delayQueue.offer(entry);
+cache.put(session, entry);
+}
+entry.acquireReference();
+return entry.getUGI();
+}
+}
+
+/**
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
+ *
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
+ */
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
--- End diff --

cleanImmediatelyIfNoRefs is our interpretation of the fact that we do not 
expect any more requests from this session, and this is only what's known to 
the caller of the function, 

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202124776
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -61,48 +64,51 @@
  * create and destroy UserGroupInformation instances.
  */
 public UGICache() {
-this(new UGIProvider());
+this(new UGIProvider(), Ticker.systemTicker());
 }
 
 /**
  * Create new proxy UGI if not found in cache and increment reference 
count
  */
-public Entry getTimedProxyUGI(SessionId session)
-throws IOException {
-
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
 Integer segmentId = session.getSegmentId();
 String user = session.getUser();
 DelayQueue delayQueue = getExpirationQueue(segmentId);
 synchronized (delayQueue) {
 // Use the opportunity to cleanup any expired entries
-cleanup(segmentId);
+cleanup(session);
 Entry entry = cache.get(session);
 if (entry == null) {
 LOG.info(session.toString() + " Creating proxy user = " + 
user);
-entry = new Entry(ugiProvider.createProxyUGI(user), 
session);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
 delayQueue.offer(entry);
 cache.put(session, entry);
 }
 entry.acquireReference();
-return entry;
+return entry.getUGI();
 }
 }
 
 /**
- * Decrement reference count for the given Entry.
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
  *
- * @param timedProxyUGI the cache entry to release
- * @param forceClean if true, destroys the UGI for the given Entry 
(only if it is now unreferenced).
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
  */
-public void release(Entry timedProxyUGI, boolean forceClean) {
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
+
+Entry timedProxyUGI = cache.get(session);
+
+if (timedProxyUGI == null) return;
--- End diff --

I'd rather not have any assert statements in Java code, if that's what you 
mean.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread lavjain
Github user lavjain commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202121962
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
@@ -52,69 +62,90 @@
  */
 @Override
 public void init(FilterConfig filterConfig) throws ServletException {
+//TODO: initialize cache here
 }
 
 /**
  * If user impersonation is configured, examines the request for the 
presense of the expected security headers
  * and create a proxy user to execute further request chain. Responds 
with an HTTP error if the header is missing
  * or the chain processing throws an exception.
  *
- * @param request http request
+ * @param request  http request
  * @param response http response
- * @param chain filter chain
+ * @param chainfilter chain
  */
 @Override
-public void doFilter(final ServletRequest request, final 
ServletResponse response, final FilterChain chain) throws IOException, 
ServletException {
+public void doFilter(final ServletRequest request, final 
ServletResponse response, final FilterChain chain)
+throws IOException, ServletException {
 
 if (SecureLogin.isUserImpersonationEnabled()) {
 
 // retrieve user header and make sure header is present and is 
not empty
-final String user = ((HttpServletRequest) 
request).getHeader(USER_HEADER);
-if (user == null) {
-throw new IllegalArgumentException(MISSING_HEADER_ERROR);
-} else if (user.trim().isEmpty()) {
-throw new IllegalArgumentException(EMPTY_HEADER_ERROR);
+final String gpdbUser = getHeaderValue(request, USER_HEADER);
+String transactionId = getHeaderValue(request, 
TRANSACTION_ID_HEADER);
+Integer segmentId = getHeaderValueInt(request, 
SEGMENT_ID_HEADER, true);
+Integer fragmentCount = getHeaderValueInt(request, 
FRAGMENT_COUNT_HEADER, false);
+Integer fragmentIndex = getHeaderValueInt(request, 
FRAGMENT_INDEX_HEADER, false);
+
+SessionId session = new SessionId(segmentId, transactionId, 
gpdbUser);
+if (LOG.isDebugEnabled() && fragmentCount != null) {
+LOG.debug(session.toString() + " Fragment = " + 
fragmentIndex + " of " + fragmentCount);
 }
 
 // TODO refresh Kerberos token when security is enabled
 
-// prepare pivileged action to run on behalf of proxy user
+// prepare privileged action to run on behalf of proxy user
 PrivilegedExceptionAction action = new 
PrivilegedExceptionAction() {
 @Override
 public Boolean run() throws IOException, ServletException {
-LOG.debug("Performing request chain call for proxy 
user = " + user);
+LOG.debug("Performing request chain call for proxy 
user = " + gpdbUser);
 chain.doFilter(request, response);
 return true;
 }
 };
 
 // create proxy user UGI from the UGI of the logged in user 
and execute the servlet chain as that user
-UserGroupInformation proxyUGI = null;
+UserGroupInformation ugi = 
cache.getUserGroupInformation(session);
 try {
-LOG.debug("Creating proxy user = " + user);
-proxyUGI = UserGroupInformation.createProxyUser(user, 
UserGroupInformation.getLoginUser());
-proxyUGI.doAs(action);
+ugi.doAs(action);
 } catch (UndeclaredThrowableException ute) {
 // unwrap the real exception thrown by the action
 throw new ServletException(ute.getCause());
 } catch (InterruptedException ie) {
 throw new ServletException(ie);
 } finally {
-try {
-if (proxyUGI != null) {
-LOG.debug("Closing FileSystem for proxy user = " + 
proxyUGI.getUserName());
-FileSystem.closeAllForUGI(proxyUGI);
-}
-} catch (Throwable t) {
-LOG.warn("Error closing FileSystem for proxy user = " 
+ proxyUGI.getUserName());
-}
+// Optimization to cleanup the cache if it is the last 
fragment
+boolean forceClean = (fragmentIndex != null && 
fragmentIndex.equals(fragmentCount));
--- End diff --

For the same segment, they are always called in order.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202121620
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -61,48 +64,51 @@
  * create and destroy UserGroupInformation instances.
  */
 public UGICache() {
-this(new UGIProvider());
+this(new UGIProvider(), Ticker.systemTicker());
 }
 
 /**
  * Create new proxy UGI if not found in cache and increment reference 
count
  */
-public Entry getTimedProxyUGI(SessionId session)
-throws IOException {
-
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
 Integer segmentId = session.getSegmentId();
 String user = session.getUser();
 DelayQueue delayQueue = getExpirationQueue(segmentId);
 synchronized (delayQueue) {
 // Use the opportunity to cleanup any expired entries
-cleanup(segmentId);
+cleanup(session);
 Entry entry = cache.get(session);
 if (entry == null) {
 LOG.info(session.toString() + " Creating proxy user = " + 
user);
-entry = new Entry(ugiProvider.createProxyUGI(user), 
session);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
 delayQueue.offer(entry);
 cache.put(session, entry);
 }
 entry.acquireReference();
-return entry;
+return entry.getUGI();
 }
 }
 
 /**
- * Decrement reference count for the given Entry.
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
  *
- * @param timedProxyUGI the cache entry to release
- * @param forceClean if true, destroys the UGI for the given Entry 
(only if it is now unreferenced).
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
  */
-public void release(Entry timedProxyUGI, boolean forceClean) {
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
+
+Entry timedProxyUGI = cache.get(session);
+
+if (timedProxyUGI == null) return;
--- End diff --

We'll change this to `assert timedProxyUGI != null`.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread lavjain
Github user lavjain commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202121358
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -244,10 +246,10 @@ void setCleaned() {
 }
 
 /**
- * @return the number of active requests using the {@link 
UserGroupInformation}.
+ * @return true if the UGI is being referenced by a session, false 
otherwise
  */
-int countReferences() {
-return referenceCount.get();
+private boolean isInUse() {
--- End diff --

All function modified should be of the same type as they are being used in 
a similar fashion.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread lavjain
Github user lavjain commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202120502
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -129,90 +135,93 @@ public void release(Entry timedProxyUGI, boolean 
forceClean) {
 }
 
 /**
- * Iterate through all the entries in the queue for the given segment
+ * Iterate through all the entries in the queue for the given 
session's segment
  * and close expired {@link UserGroupInformation}, otherwise it resets
  * the timer for every non-expired entry.
  *
- * @param segmentId
+ * @param session
  */
-private void cleanup(Integer segmentId) {
+private void cleanup(SessionId session) {
 
 Entry ugi;
-DelayQueue delayQueue = getExpirationQueue(segmentId);
+DelayQueue delayQueue = 
getExpirationQueue(session.getSegmentId());
--- End diff --

It might be better to pass the delayQueue to this method instead of doing a 
lookup again.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread lavjain
Github user lavjain commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202119803
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -129,90 +135,93 @@ public void release(Entry timedProxyUGI, boolean 
forceClean) {
 }
 
 /**
- * Iterate through all the entries in the queue for the given segment
+ * Iterate through all the entries in the queue for the given 
session's segment
  * and close expired {@link UserGroupInformation}, otherwise it resets
  * the timer for every non-expired entry.
  *
- * @param segmentId
+ * @param session
  */
-private void cleanup(Integer segmentId) {
+private void cleanup(SessionId session) {
--- End diff --

The cleanup here is confusing because it is using the current session with 
any expired UGI


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread lavjain
Github user lavjain commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202118803
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -61,48 +64,51 @@
  * create and destroy UserGroupInformation instances.
  */
 public UGICache() {
-this(new UGIProvider());
+this(new UGIProvider(), Ticker.systemTicker());
 }
 
 /**
  * Create new proxy UGI if not found in cache and increment reference 
count
  */
-public Entry getTimedProxyUGI(SessionId session)
-throws IOException {
-
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
 Integer segmentId = session.getSegmentId();
 String user = session.getUser();
 DelayQueue delayQueue = getExpirationQueue(segmentId);
 synchronized (delayQueue) {
 // Use the opportunity to cleanup any expired entries
-cleanup(segmentId);
+cleanup(session);
 Entry entry = cache.get(session);
 if (entry == null) {
 LOG.info(session.toString() + " Creating proxy user = " + 
user);
-entry = new Entry(ugiProvider.createProxyUGI(user), 
session);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
 delayQueue.offer(entry);
 cache.put(session, entry);
 }
 entry.acquireReference();
-return entry;
+return entry.getUGI();
 }
 }
 
 /**
- * Decrement reference count for the given Entry.
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
  *
- * @param timedProxyUGI the cache entry to release
- * @param forceClean if true, destroys the UGI for the given Entry 
(only if it is now unreferenced).
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
  */
-public void release(Entry timedProxyUGI, boolean forceClean) {
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
+
+Entry timedProxyUGI = cache.get(session);
+
+if (timedProxyUGI == null) return;
--- End diff --

Is there a case where the above condition would hold true?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread lavjain
Github user lavjain commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202118226
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -61,48 +64,51 @@
  * create and destroy UserGroupInformation instances.
  */
 public UGICache() {
-this(new UGIProvider());
+this(new UGIProvider(), Ticker.systemTicker());
 }
 
 /**
  * Create new proxy UGI if not found in cache and increment reference 
count
  */
-public Entry getTimedProxyUGI(SessionId session)
-throws IOException {
-
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
 Integer segmentId = session.getSegmentId();
 String user = session.getUser();
 DelayQueue delayQueue = getExpirationQueue(segmentId);
 synchronized (delayQueue) {
 // Use the opportunity to cleanup any expired entries
-cleanup(segmentId);
+cleanup(session);
 Entry entry = cache.get(session);
 if (entry == null) {
 LOG.info(session.toString() + " Creating proxy user = " + 
user);
-entry = new Entry(ugiProvider.createProxyUGI(user), 
session);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
 delayQueue.offer(entry);
 cache.put(session, entry);
 }
 entry.acquireReference();
-return entry;
+return entry.getUGI();
 }
 }
 
 /**
- * Decrement reference count for the given Entry.
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
  *
- * @param timedProxyUGI the cache entry to release
- * @param forceClean if true, destroys the UGI for the given Entry 
(only if it is now unreferenced).
+ * @param session  the session for which we want to 
release the UGI.
+ * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the 
given session (only if it is
+ * now unreferenced).
  */
-public void release(Entry timedProxyUGI, boolean forceClean) {
+public void release(SessionId session, boolean 
cleanImmediatelyIfNoRefs) {
+
+Entry timedProxyUGI = cache.get(session);
+
+if (timedProxyUGI == null) return;
 
-Integer segmentId = timedProxyUGI.getSession().getSegmentId();
 timedProxyUGI.resetTime();
 timedProxyUGI.releaseReference();
-if (forceClean) {
-synchronized (getExpirationQueue(segmentId)) {
+if (cleanImmediatelyIfNoRefs) {
--- End diff --

cleanImmediately here does not know if there are any refs.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread lavjain
Github user lavjain commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202117665
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -129,90 +135,93 @@ public void release(Entry timedProxyUGI, boolean 
forceClean) {
 }
 
 /**
- * Iterate through all the entries in the queue for the given segment
+ * Iterate through all the entries in the queue for the given 
session's segment
  * and close expired {@link UserGroupInformation}, otherwise it resets
  * the timer for every non-expired entry.
  *
- * @param segmentId
+ * @param session
  */
-private void cleanup(Integer segmentId) {
+private void cleanup(SessionId session) {
 
 Entry ugi;
-DelayQueue delayQueue = getExpirationQueue(segmentId);
+DelayQueue delayQueue = 
getExpirationQueue(session.getSegmentId());
 while ((ugi = delayQueue.poll()) != null) {
 // Place it back in the queue if still in use and was not 
closed
-if (!closeUGI(ugi)) {
+if (!closeUGI(session, ugi)) {
 ugi.resetTime();
 delayQueue.offer(ugi);
 }
 LOG.debug("Delay Queue Size for segment " +
-segmentId + " = " + delayQueue.size());
+session.getSegmentId() + " = " + delayQueue.size());
 }
 }
 
-// There is no need to synchronize this method because it should be 
called
-// from within a synchronized block
-private boolean closeUGI(Entry expiredProxyUGI) {
-
-SessionId session = expiredProxyUGI.getSession();
-String fsMsg = "FileSystem for proxy user = " + 
expiredProxyUGI.getUGI().getUserName();
+/**
+ * This method must be called from a synchronized block for the 
delayQueue for the given
+ * session.getSegmentId(). When the reference count is 0, the Entry is 
removed from the
+ * cache where it will then be processed to be destroyed by the 
UGIProvider.
+ *
+ * @param session
+ * @param toDelete
+ * @return true if the UGI entry was cleaned, false when the UGI entry 
was still in use
+ * and cleaning up was skipped
+ */
+private boolean closeUGI(SessionId session, Entry toDelete) {
+// There is no need to synchronize this method because it should 
be called
+// from within a synchronized block
+String fsMsg = "FileSystem for proxy user = " + 
toDelete.getUGI().getUserName();
 try {
 // The UGI object is still being used by another thread
-if (expiredProxyUGI.countReferences() != 0) {
+if (toDelete.isInUse()) {
 LOG.info(session.toString() + " Skipping close of " + 
fsMsg);
-// Reset time so that it doesn't expire until release
-// updates the time again
-//expiredProxyUGI.resetTime();
 return false;
 }
 
 // Expired UGI object can be cleaned since it is not used
 // Determine if it can be removed from cache also
 Entry cachedUGI = cache.get(session);
-if (expiredProxyUGI == cachedUGI) {
+if (toDelete == cachedUGI) {
--- End diff --

Not sure if this logic is still true. The cache could be holding a 
reference to a newer UGI. That is why we were comparing the reference. This is 
because the release is also getting it from cache.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread lavjain
Github user lavjain commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202108154
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java ---
@@ -65,12 +74,15 @@ public int hashCode() {
  * {@inheritDoc}
  */
 @Override
-public boolean equals(Object other) {
-if (!(other instanceof SessionId)) {
-return false;
-}
-SessionId that = (SessionId) other;
-return this.sessionId.equals(that.sessionId);
+public boolean equals(Object obj) {
+if (obj == null) return false;
+if (obj == this) return true;
+if (obj.getClass() != getClass()) return false;
+
+SessionId that = (SessionId) obj;
+return new EqualsBuilder()
+.append(sessionId, that.sessionId)
+.isEquals();
--- End diff --

EqualsBuilder is not needed for comparing strings


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread shivzone
Github user shivzone commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202065515
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,264 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Ticker;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+private static final long MINUTES = 60 * 1000L;
+private UGIProvider provider = null;
+private SessionId session = null;
+private UGICache cache = null;
+private FakeTicker fakeTicker;
+
+private static class FakeTicker extends Ticker {
+private final AtomicLong nanos = new AtomicLong();
+
+@Override
+public long read() {
+return nanos.get();
+}
+
+long advanceTime(long milliseconds) {
+return nanos.addAndGet(milliseconds * 1000) / 1000;
+}
+}
+
+@Before
+public void setUp() throws Exception {
+provider = mock(UGIProvider.class);
+when(provider.createProxyUGI(any(String.class))).thenAnswer(new 
Answer() {
+@Override
+public UserGroupInformation answer(InvocationOnMock 
invocation) {
+return mock(UserGroupInformation.class);
+}
+});
+
+session = new SessionId(0, "txn-id", "the-user");
+fakeTicker = new FakeTicker();
+cache = new UGICache(provider, fakeTicker);
+}
+
+@Test
+public void getUGIFromEmptyCache() throws Exception {
+UserGroupInformation ugi = cache.getUserGroupInformation(session);
+assertNotNull(ugi);
+verify(provider).createProxyUGI("the-user");
+}
+
+@Test
+public void getSameUGITwiceUsesCache() throws Exception {
+UserGroupInformation ugi1 = cache.getUserGroupInformation(session);
+UserGroupInformation ugi2 = cache.getUserGroupInformation(session);
+assertEquals(ugi1, ugi2);
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentSessionsForSameUser() throws 
Exception {
+SessionId otherSession = new SessionId(0, "txn-id-2", "the-user");
+UserGroupInformation proxyUGI1 = 
cache.getUserGroupInformation(session);
+UserGroupInformation proxyUGI2 = 
cache.getUserGroupInformation(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(2)).createProxyUGI("the-user");
+// TODO: this seems weird. We're creating two UGIs with the same 
params,
+// even though we have two different sessions. Why?
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsers() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UserGroupInformation proxyUGI1 = 
cache.getUserGroupInformation(session);
+UserGroupInformation proxyUGI2 = 
cache.getUserGroupInformation(otherSes

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread shivzone
Github user shivzone commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202072273
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
@@ -52,69 +62,90 @@
  */
 @Override
 public void init(FilterConfig filterConfig) throws ServletException {
+//TODO: initialize cache here
 }
 
 /**
  * If user impersonation is configured, examines the request for the 
presense of the expected security headers
  * and create a proxy user to execute further request chain. Responds 
with an HTTP error if the header is missing
  * or the chain processing throws an exception.
  *
- * @param request http request
+ * @param request  http request
  * @param response http response
- * @param chain filter chain
+ * @param chainfilter chain
  */
 @Override
-public void doFilter(final ServletRequest request, final 
ServletResponse response, final FilterChain chain) throws IOException, 
ServletException {
+public void doFilter(final ServletRequest request, final 
ServletResponse response, final FilterChain chain)
+throws IOException, ServletException {
 
 if (SecureLogin.isUserImpersonationEnabled()) {
 
 // retrieve user header and make sure header is present and is 
not empty
-final String user = ((HttpServletRequest) 
request).getHeader(USER_HEADER);
-if (user == null) {
-throw new IllegalArgumentException(MISSING_HEADER_ERROR);
-} else if (user.trim().isEmpty()) {
-throw new IllegalArgumentException(EMPTY_HEADER_ERROR);
+final String gpdbUser = getHeaderValue(request, USER_HEADER);
+String transactionId = getHeaderValue(request, 
TRANSACTION_ID_HEADER);
+Integer segmentId = getHeaderValueInt(request, 
SEGMENT_ID_HEADER, true);
+Integer fragmentCount = getHeaderValueInt(request, 
FRAGMENT_COUNT_HEADER, false);
+Integer fragmentIndex = getHeaderValueInt(request, 
FRAGMENT_INDEX_HEADER, false);
+
+SessionId session = new SessionId(segmentId, transactionId, 
gpdbUser);
+if (LOG.isDebugEnabled() && fragmentCount != null) {
+LOG.debug(session.toString() + " Fragment = " + 
fragmentIndex + " of " + fragmentCount);
 }
 
 // TODO refresh Kerberos token when security is enabled
 
-// prepare pivileged action to run on behalf of proxy user
+// prepare privileged action to run on behalf of proxy user
 PrivilegedExceptionAction action = new 
PrivilegedExceptionAction() {
 @Override
 public Boolean run() throws IOException, ServletException {
-LOG.debug("Performing request chain call for proxy 
user = " + user);
+LOG.debug("Performing request chain call for proxy 
user = " + gpdbUser);
 chain.doFilter(request, response);
 return true;
 }
 };
 
 // create proxy user UGI from the UGI of the logged in user 
and execute the servlet chain as that user
-UserGroupInformation proxyUGI = null;
+UserGroupInformation ugi = 
cache.getUserGroupInformation(session);
 try {
-LOG.debug("Creating proxy user = " + user);
-proxyUGI = UserGroupInformation.createProxyUser(user, 
UserGroupInformation.getLoginUser());
-proxyUGI.doAs(action);
+ugi.doAs(action);
 } catch (UndeclaredThrowableException ute) {
 // unwrap the real exception thrown by the action
 throw new ServletException(ute.getCause());
 } catch (InterruptedException ie) {
 throw new ServletException(ie);
 } finally {
-try {
-if (proxyUGI != null) {
-LOG.debug("Closing FileSystem for proxy user = " + 
proxyUGI.getUserName());
-FileSystem.closeAllForUGI(proxyUGI);
-}
-} catch (Throwable t) {
-LOG.warn("Error closing FileSystem for proxy user = " 
+ proxyUGI.getUserName());
-}
+// Optimization to cleanup the cache if it is the last 
fragment
+boolean forceClean = (fragmentIndex != null && 
fragmentIndex.equals(fragmentCount));
--- End diff --

Why would fragmentIndex matter here ? Are we certain that the request

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread shivzone
Github user shivzone commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202065133
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
+ * 
+ * The motivation for caching is that destroying UGIs is slow. The 
alternative, creating and
+ * destroying a UGI per-request, is wasteful.
+ */
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+private Ticker ticker;
+private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 
Minutes
+
+/**
+ * Create a UGICache with the given {@link UGIProvider}. Intended for 
use by tests which need
+ * to substitute a mock UGIProvider.
+ */
+UGICache(UGIProvider provider, Ticker ticker) {
+this.ticker = ticker;
+this.ugiProvider = provider;
+}
+
+/**
+ * Create a UGICache. Automatically creates a {@link UGIProvider} that 
this cache will use to
+ * create and destroy UserGroupInformation instances.
+ */
+public UGICache() {
+this(new UGIProvider(), Ticker.systemTicker());
+}
+
+/**
+ * Create new proxy UGI if not found in cache and increment reference 
count
+ */
+public UserGroupInformation getUserGroupInformation(SessionId session) 
throws IOException {
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getExpirationQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(session);
+Entry entry = cache.get(session);
+if (entry == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+entry = new Entry(ticker, 
ugiProvider.createProxyUGI(user));
+delayQueue.offer(entry);
+cache.put(session, entry);
+}
+entry.acquireReference();
+return entry.getUGI();
+}
+}
+
+/**
+ * Decrement reference count for the given session's UGI. Resets the 
time at which the UGI will
+ * expire to 15 minutes in the future.
--- End diff --

Remove references to 15 minutes as this can change with time. Mention 
UGI_CACHE_EXPIRY instead


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-12 Thread shivzone
Github user shivzone commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r202065667
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,318 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Ticker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Stores UserGroupInformation instances for each active session. The UGIs 
are cleaned up if they
+ * have not been accessed for 15 minutes.
--- End diff --

Remove reference to actual time


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-11 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201877270
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,197 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+private UGIProvider provider = null;
+private SessionId session = null;
+private UGICache cache = null;
+
+@Before
+public void setUp() throws Exception {
+provider = mock(UGIProvider.class);
+
+
when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer)
 invocation -> mock(UserGroupInformation.class));
+
+session = new SessionId(0, "txn-id", "the-user");
+
+cache = new UGICache(provider);
+}
+
+@Test
+public void getUGIFromEmptyCache() throws Exception {
+UGICacheEntry entry = cache.getTimedProxyUGI(session);
+assertNotNull(entry.getUGI());
+verify(provider).createProxyUGI("the-user");
+}
+
+@Test
+public void getSameUGITwiceUsesCache() throws Exception {
+UGICacheEntry entry1 = cache.getTimedProxyUGI(session);
+UGICacheEntry entry2 = cache.getTimedProxyUGI(session);
+assertEquals(entry1, entry2);
+assertNotNull(entry1.getUGI());
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentSessionsForSameUser() throws 
Exception {
+SessionId otherSession = new SessionId(0, "txn-id-2", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(2)).createProxyUGI("the-user");
+// TODO: this seems weird. We're creating two UGIs with the same 
params,
+// even though we have two different sessions. Why?
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsers() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession);
+UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession);
+assertEquals(proxyUGI1a, proxyUGI1b);
+assertEquals(proxyUGI2a, proxyUGI2b);
+assertNotEquals(proxyUGI1a, proxyUGI2a);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getUGIWhenRequestedUserDoesNotExist() throws Exception {
+// what does UserGroupInformation.createProxyUser() do in this 
scenario?
+// how about getLoginUser()?
+}
+
+@Test
+public void anySegmentIdIsValid() throws Exception {
+session = new SessionId(65, "txn-id", "the-user");
--- End diff --

We used this test to prove that our switch to using a `ConcurrentHashMap` 
instead of an array fixed the problem. Maybe it can be deleted now (or we could 
change the number 65 to something crazier like 2^31).


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-11 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201876799
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,197 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+private UGIProvider provider = null;
+private SessionId session = null;
+private UGICache cache = null;
+
+@Before
+public void setUp() throws Exception {
+provider = mock(UGIProvider.class);
+
+
when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer)
 invocation -> mock(UserGroupInformation.class));
+
+session = new SessionId(0, "txn-id", "the-user");
+
+cache = new UGICache(provider);
+}
+
+@Test
+public void getUGIFromEmptyCache() throws Exception {
+UGICacheEntry entry = cache.getTimedProxyUGI(session);
+assertNotNull(entry.getUGI());
+verify(provider).createProxyUGI("the-user");
--- End diff --

The ref count and expiration time are internal details of the cache, so 
rather than directly test them, we have other tests that simulate time passing, 
acquire multiple references etc. and assert that the UGIs are destroyed at the 
right time.

I believe the mock verification 
`verify(provider).createProxyUGI("the-user")` is as close as we can get to 
verifying the principals of the UGI, because we're using a mock UGI here.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-11 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201875770
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
@@ -89,32 +106,49 @@ public Boolean run() throws IOException, 
ServletException {
 };
 
 // create proxy user UGI from the UGI of the logged in user 
and execute the servlet chain as that user
-UserGroupInformation proxyUGI = null;
+UGICacheEntry timedProxyUGI = cache.getTimedProxyUGI(session);
 try {
-LOG.debug("Creating proxy user = " + user);
-proxyUGI = UserGroupInformation.createProxyUser(user, 
UserGroupInformation.getLoginUser());
-proxyUGI.doAs(action);
+timedProxyUGI.getUGI().doAs(action);
 } catch (UndeclaredThrowableException ute) {
 // unwrap the real exception thrown by the action
 throw new ServletException(ute.getCause());
 } catch (InterruptedException ie) {
 throw new ServletException(ie);
-} finally {
-try {
-if (proxyUGI != null) {
-LOG.debug("Closing FileSystem for proxy user = " + 
proxyUGI.getUserName());
-FileSystem.closeAllForUGI(proxyUGI);
-}
-} catch (Throwable t) {
-LOG.warn("Error closing FileSystem for proxy user = " 
+ proxyUGI.getUserName());
-}
+}
+finally {
+// Optimization to cleanup the cache if it is the last 
fragment
+boolean forceClean = (fragmentIndex != null && 
fragmentCount.equals(fragmentIndex));
+cache.release(timedProxyUGI, forceClean);
 }
 } else {
 // no user impersonation is configured
 chain.doFilter(request, response);
 }
 }
 
+
+private Integer getHeaderValueInt(ServletRequest request, String 
headerKey, boolean required)
+throws IllegalArgumentException {
+String value = getHeaderValue(request, headerKey, required);
+return value != null ? Integer.valueOf(value) : null;
--- End diff --

which is a subclass of `IllegalArgumentException`. Should we handle the 
`NumberFormatException` differently?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-11 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201875409
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
@@ -64,21 +75,28 @@ public void init(FilterConfig filterConfig) throws 
ServletException {
  * @param chain filter chain
  */
 @Override
-public void doFilter(final ServletRequest request, final 
ServletResponse response, final FilterChain chain) throws IOException, 
ServletException {
+public void doFilter(final ServletRequest request, final 
ServletResponse response, final FilterChain chain)
+throws IOException, ServletException {
 
 if (SecureLogin.isUserImpersonationEnabled()) {
 
 // retrieve user header and make sure header is present and is 
not empty
-final String user = ((HttpServletRequest) 
request).getHeader(USER_HEADER);
-if (user == null) {
-throw new IllegalArgumentException(MISSING_HEADER_ERROR);
-} else if (user.trim().isEmpty()) {
-throw new IllegalArgumentException(EMPTY_HEADER_ERROR);
+final String user = getHeaderValue(request, USER_HEADER);
+String transactionId = getHeaderValue(request, 
TRANSACTION_ID_HEADER);
--- End diff --

@lavjain pointed out that the default value for `required` is `true`. Maybe 
it's better to be explicit, though.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-11 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201875131
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICacheEntry.java ---
@@ -0,0 +1,94 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICacheEntry implements Delayed {
+
+private volatile long startTime;
+private UserGroupInformation proxyUGI;
+private SessionId session;
+private boolean cleaned = false;
+private AtomicInteger inProgress = new AtomicInteger();
+private static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes
--- End diff --

For testing? Or for some other reason?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-11 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201767807
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,197 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+private UGIProvider provider = null;
+private SessionId session = null;
+private UGICache cache = null;
+
+@Before
+public void setUp() throws Exception {
+provider = mock(UGIProvider.class);
+
+
when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer)
 invocation -> mock(UserGroupInformation.class));
+
+session = new SessionId(0, "txn-id", "the-user");
+
+cache = new UGICache(provider);
+}
+
+@Test
+public void getUGIFromEmptyCache() throws Exception {
+UGICacheEntry entry = cache.getTimedProxyUGI(session);
+assertNotNull(entry.getUGI());
+verify(provider).createProxyUGI("the-user");
+}
+
+@Test
+public void getSameUGITwiceUsesCache() throws Exception {
+UGICacheEntry entry1 = cache.getTimedProxyUGI(session);
+UGICacheEntry entry2 = cache.getTimedProxyUGI(session);
+assertEquals(entry1, entry2);
+assertNotNull(entry1.getUGI());
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentSessionsForSameUser() throws 
Exception {
+SessionId otherSession = new SessionId(0, "txn-id-2", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(2)).createProxyUGI("the-user");
+// TODO: this seems weird. We're creating two UGIs with the same 
params,
+// even though we have two different sessions. Why?
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsers() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession);
+UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession);
+assertEquals(proxyUGI1a, proxyUGI1b);
+assertEquals(proxyUGI2a, proxyUGI2b);
+assertNotEquals(proxyUGI1a, proxyUGI2a);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getUGIWhenRequestedUserDoesNotExist() throws Exception {
+// what does UserGroupInformation.createProxyUser() do in this 
scenario?
+// how about getLoginUser()?
+}
+
+@Test
+public void anySegmentIdIsValid() throws Exception {
+session = new SessionId(65, "txn-id", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+assertNotNull(proxyUGI1.getUGI());
+}
+
+@Test
+public void releaseWithoutForceClean() throws Exception {
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+
+cache.release(proxyUGI1, false);
+// UGI wasn't cleaned up, so we can still get it
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(session);
+assertEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void releaseWithForceClea

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201523938
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,197 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+private UGIProvider provider = null;
+private SessionId session = null;
+private UGICache cache = null;
+
+@Before
+public void setUp() throws Exception {
+provider = mock(UGIProvider.class);
+
+
when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer)
 invocation -> mock(UserGroupInformation.class));
+
+session = new SessionId(0, "txn-id", "the-user");
+
+cache = new UGICache(provider);
+}
+
+@Test
+public void getUGIFromEmptyCache() throws Exception {
+UGICacheEntry entry = cache.getTimedProxyUGI(session);
+assertNotNull(entry.getUGI());
+verify(provider).createProxyUGI("the-user");
+}
+
+@Test
+public void getSameUGITwiceUsesCache() throws Exception {
+UGICacheEntry entry1 = cache.getTimedProxyUGI(session);
+UGICacheEntry entry2 = cache.getTimedProxyUGI(session);
+assertEquals(entry1, entry2);
+assertNotNull(entry1.getUGI());
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentSessionsForSameUser() throws 
Exception {
+SessionId otherSession = new SessionId(0, "txn-id-2", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(2)).createProxyUGI("the-user");
+// TODO: this seems weird. We're creating two UGIs with the same 
params,
+// even though we have two different sessions. Why?
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsers() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession);
+UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession);
+assertEquals(proxyUGI1a, proxyUGI1b);
+assertEquals(proxyUGI2a, proxyUGI2b);
+assertNotEquals(proxyUGI1a, proxyUGI2a);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getUGIWhenRequestedUserDoesNotExist() throws Exception {
+// what does UserGroupInformation.createProxyUser() do in this 
scenario?
+// how about getLoginUser()?
+}
+
+@Test
+public void anySegmentIdIsValid() throws Exception {
+session = new SessionId(65, "txn-id", "the-user");
--- End diff --

is this relevant anymore since magic number 64 is gone now ?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201525176
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,197 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+private UGIProvider provider = null;
+private SessionId session = null;
+private UGICache cache = null;
+
+@Before
+public void setUp() throws Exception {
+provider = mock(UGIProvider.class);
+
+
when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer)
 invocation -> mock(UserGroupInformation.class));
+
+session = new SessionId(0, "txn-id", "the-user");
+
+cache = new UGICache(provider);
+}
+
+@Test
+public void getUGIFromEmptyCache() throws Exception {
+UGICacheEntry entry = cache.getTimedProxyUGI(session);
+assertNotNull(entry.getUGI());
+verify(provider).createProxyUGI("the-user");
+}
+
+@Test
+public void getSameUGITwiceUsesCache() throws Exception {
+UGICacheEntry entry1 = cache.getTimedProxyUGI(session);
+UGICacheEntry entry2 = cache.getTimedProxyUGI(session);
+assertEquals(entry1, entry2);
+assertNotNull(entry1.getUGI());
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentSessionsForSameUser() throws 
Exception {
+SessionId otherSession = new SessionId(0, "txn-id-2", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(2)).createProxyUGI("the-user");
+// TODO: this seems weird. We're creating two UGIs with the same 
params,
+// even though we have two different sessions. Why?
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsers() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession);
+UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession);
+assertEquals(proxyUGI1a, proxyUGI1b);
+assertEquals(proxyUGI2a, proxyUGI2b);
+assertNotEquals(proxyUGI1a, proxyUGI2a);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getUGIWhenRequestedUserDoesNotExist() throws Exception {
+// what does UserGroupInformation.createProxyUser() do in this 
scenario?
+// how about getLoginUser()?
+}
+
+@Test
+public void anySegmentIdIsValid() throws Exception {
+session = new SessionId(65, "txn-id", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+assertNotNull(proxyUGI1.getUGI());
+}
+
+@Test
+public void releaseWithoutForceClean() throws Exception {
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+
+cache.release(proxyUGI1, false);
+// UGI wasn't cleaned up, so we can still get it
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(session);
+assertEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void releaseWithForceClean() 

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201523243
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,197 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+private UGIProvider provider = null;
+private SessionId session = null;
+private UGICache cache = null;
+
+@Before
+public void setUp() throws Exception {
+provider = mock(UGIProvider.class);
+
+
when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer)
 invocation -> mock(UserGroupInformation.class));
+
+session = new SessionId(0, "txn-id", "the-user");
+
+cache = new UGICache(provider);
+}
+
+@Test
+public void getUGIFromEmptyCache() throws Exception {
+UGICacheEntry entry = cache.getTimedProxyUGI(session);
+assertNotNull(entry.getUGI());
+verify(provider).createProxyUGI("the-user");
+}
+
+@Test
+public void getSameUGITwiceUsesCache() throws Exception {
+UGICacheEntry entry1 = cache.getTimedProxyUGI(session);
+UGICacheEntry entry2 = cache.getTimedProxyUGI(session);
+assertEquals(entry1, entry2);
--- End diff --

assertSame for ref comparison ?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201519410
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
@@ -89,32 +106,49 @@ public Boolean run() throws IOException, 
ServletException {
 };
 
 // create proxy user UGI from the UGI of the logged in user 
and execute the servlet chain as that user
-UserGroupInformation proxyUGI = null;
+UGICacheEntry timedProxyUGI = cache.getTimedProxyUGI(session);
--- End diff --

name it proxyUGIEntry ? since actual UGI is a member of that object


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201517606
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICacheEntry.java ---
@@ -0,0 +1,94 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICacheEntry implements Delayed {
+
+private volatile long startTime;
+private UserGroupInformation proxyUGI;
+private SessionId session;
+private boolean cleaned = false;
+private AtomicInteger inProgress = new AtomicInteger();
+private static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes
--- End diff --

we need to parameterize this


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201523091
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,197 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+private UGIProvider provider = null;
+private SessionId session = null;
+private UGICache cache = null;
+
+@Before
+public void setUp() throws Exception {
+provider = mock(UGIProvider.class);
+
+
when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer)
 invocation -> mock(UserGroupInformation.class));
+
+session = new SessionId(0, "txn-id", "the-user");
+
+cache = new UGICache(provider);
+}
+
+@Test
+public void getUGIFromEmptyCache() throws Exception {
+UGICacheEntry entry = cache.getTimedProxyUGI(session);
+assertNotNull(entry.getUGI());
+verify(provider).createProxyUGI("the-user");
--- End diff --

do we want to assert ref count and expiration time set as a result of such 
get ? Do we want to see if the principal name under proxy ugi matches the user 
passed in ?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201512843
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java ---
@@ -0,0 +1,60 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class SessionId {
--- End diff --

please javadoc class and all methods


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201524673
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,197 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+private UGIProvider provider = null;
+private SessionId session = null;
+private UGICache cache = null;
+
+@Before
+public void setUp() throws Exception {
+provider = mock(UGIProvider.class);
+
+
when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer)
 invocation -> mock(UserGroupInformation.class));
+
+session = new SessionId(0, "txn-id", "the-user");
+
+cache = new UGICache(provider);
+}
+
+@Test
+public void getUGIFromEmptyCache() throws Exception {
+UGICacheEntry entry = cache.getTimedProxyUGI(session);
+assertNotNull(entry.getUGI());
+verify(provider).createProxyUGI("the-user");
+}
+
+@Test
+public void getSameUGITwiceUsesCache() throws Exception {
+UGICacheEntry entry1 = cache.getTimedProxyUGI(session);
+UGICacheEntry entry2 = cache.getTimedProxyUGI(session);
+assertEquals(entry1, entry2);
+assertNotNull(entry1.getUGI());
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentSessionsForSameUser() throws 
Exception {
+SessionId otherSession = new SessionId(0, "txn-id-2", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(2)).createProxyUGI("the-user");
+// TODO: this seems weird. We're creating two UGIs with the same 
params,
+// even though we have two different sessions. Why?
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsers() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession);
+UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession);
+assertEquals(proxyUGI1a, proxyUGI1b);
+assertEquals(proxyUGI2a, proxyUGI2b);
+assertNotEquals(proxyUGI1a, proxyUGI2a);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getUGIWhenRequestedUserDoesNotExist() throws Exception {
+// what does UserGroupInformation.createProxyUser() do in this 
scenario?
+// how about getLoginUser()?
+}
+
+@Test
+public void anySegmentIdIsValid() throws Exception {
+session = new SessionId(65, "txn-id", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+assertNotNull(proxyUGI1.getUGI());
+}
+
+@Test
+public void releaseWithoutForceClean() throws Exception {
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+
+cache.release(proxyUGI1, false);
+// UGI wasn't cleaned up, so we can still get it
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(session);
+assertEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void releaseWithForceClean() 

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201520779
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
@@ -89,32 +106,49 @@ public Boolean run() throws IOException, 
ServletException {
 };
 
 // create proxy user UGI from the UGI of the logged in user 
and execute the servlet chain as that user
-UserGroupInformation proxyUGI = null;
+UGICacheEntry timedProxyUGI = cache.getTimedProxyUGI(session);
 try {
-LOG.debug("Creating proxy user = " + user);
-proxyUGI = UserGroupInformation.createProxyUser(user, 
UserGroupInformation.getLoginUser());
-proxyUGI.doAs(action);
+timedProxyUGI.getUGI().doAs(action);
 } catch (UndeclaredThrowableException ute) {
 // unwrap the real exception thrown by the action
 throw new ServletException(ute.getCause());
 } catch (InterruptedException ie) {
 throw new ServletException(ie);
-} finally {
-try {
-if (proxyUGI != null) {
-LOG.debug("Closing FileSystem for proxy user = " + 
proxyUGI.getUserName());
-FileSystem.closeAllForUGI(proxyUGI);
-}
-} catch (Throwable t) {
-LOG.warn("Error closing FileSystem for proxy user = " 
+ proxyUGI.getUserName());
-}
+}
+finally {
+// Optimization to cleanup the cache if it is the last 
fragment
+boolean forceClean = (fragmentIndex != null && 
fragmentCount.equals(fragmentIndex));
+cache.release(timedProxyUGI, forceClean);
 }
 } else {
 // no user impersonation is configured
 chain.doFilter(request, response);
 }
 }
 
+
+private Integer getHeaderValueInt(ServletRequest request, String 
headerKey, boolean required)
+throws IllegalArgumentException {
+String value = getHeaderValue(request, headerKey, required);
+return value != null ? Integer.valueOf(value) : null;
--- End diff --

this will throw NumberFormatException


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201524234
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,197 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+private UGIProvider provider = null;
+private SessionId session = null;
+private UGICache cache = null;
+
+@Before
+public void setUp() throws Exception {
+provider = mock(UGIProvider.class);
+
+
when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer)
 invocation -> mock(UserGroupInformation.class));
+
+session = new SessionId(0, "txn-id", "the-user");
+
+cache = new UGICache(provider);
+}
+
+@Test
+public void getUGIFromEmptyCache() throws Exception {
+UGICacheEntry entry = cache.getTimedProxyUGI(session);
+assertNotNull(entry.getUGI());
+verify(provider).createProxyUGI("the-user");
+}
+
+@Test
+public void getSameUGITwiceUsesCache() throws Exception {
+UGICacheEntry entry1 = cache.getTimedProxyUGI(session);
+UGICacheEntry entry2 = cache.getTimedProxyUGI(session);
+assertEquals(entry1, entry2);
+assertNotNull(entry1.getUGI());
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentSessionsForSameUser() throws 
Exception {
+SessionId otherSession = new SessionId(0, "txn-id-2", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(2)).createProxyUGI("the-user");
+// TODO: this seems weird. We're creating two UGIs with the same 
params,
+// even though we have two different sessions. Why?
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsers() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession);
+UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession);
+assertEquals(proxyUGI1a, proxyUGI1b);
+assertEquals(proxyUGI2a, proxyUGI2b);
+assertNotEquals(proxyUGI1a, proxyUGI2a);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getUGIWhenRequestedUserDoesNotExist() throws Exception {
+// what does UserGroupInformation.createProxyUser() do in this 
scenario?
+// how about getLoginUser()?
+}
+
+@Test
+public void anySegmentIdIsValid() throws Exception {
+session = new SessionId(65, "txn-id", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+assertNotNull(proxyUGI1.getUGI());
+}
+
+@Test
+public void releaseWithoutForceClean() throws Exception {
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+
+cache.release(proxyUGI1, false);
+// UGI wasn't cleaned up, so we can still get it
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(session);
+assertEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
--- End diff --

check refcount still one and new expiration date

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201515004
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,143 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new 
ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private DelayQueue[] delayQueues = 
(DelayQueue[])new DelayQueue[64];
+private final UGIProvider ugiProvider;
--- End diff --

again, only 1 final, if you bother with final members, then declare others 
as well, as they will not be changing ?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201513628
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java ---
@@ -0,0 +1,60 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class SessionId {
+
+private final String user;
--- End diff --

why only 1 private field ? There are no setter methods for any of the other 
members either.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201517214
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,143 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new 
ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private DelayQueue[] delayQueues = 
(DelayQueue[])new DelayQueue[64];
+private final UGIProvider ugiProvider;
+
+public UGICache(UGIProvider provider) {
+this.ugiProvider = provider;
+for (int i = 0; i < delayQueues.length; i++) {
+delayQueues[i] = new DelayQueue<>();
+}
+}
+
+public UGICache() {
+this(new UGIProvider());
+}
+
+// Create new proxy UGI if not found in cache and increment reference 
count
+public UGICacheEntry getTimedProxyUGI(SessionId session)
+throws IOException {
+
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+synchronized (delayQueues[segmentId]) {
+// Use the opportunity to cleanup any expired entries
+cleanup(segmentId);
+UGICacheEntry timedProxyUGI = cache.get(session);
+if (timedProxyUGI == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+UserGroupInformation proxyUGI = 
ugiProvider.createProxyUGI(user);
+timedProxyUGI = new UGICacheEntry(proxyUGI, session);
+delayQueues[segmentId].offer(timedProxyUGI);
+cache.put(session, timedProxyUGI);
+}
+timedProxyUGI.incrementCounter();
+return timedProxyUGI;
+}
+}
+
+// Poll segment expiration queue for all expired entries
+// and clean them if possible
+private void cleanup(Integer segmentId) {
+
+UGICacheEntry ugi = null;
+while ((ugi = delayQueues[segmentId].poll()) != null) {
+// Place it back in the queue if still in use and was not 
closed
+if (!closeUGI(ugi)) {
+delayQueues[segmentId].offer(ugi);
+}
+LOG.debug("Delay Queue Size for segment " +
--- End diff --

surround with isDebugEnabled


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201517046
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,143 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new 
ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private DelayQueue[] delayQueues = 
(DelayQueue[])new DelayQueue[64];
+private final UGIProvider ugiProvider;
+
+public UGICache(UGIProvider provider) {
+this.ugiProvider = provider;
+for (int i = 0; i < delayQueues.length; i++) {
+delayQueues[i] = new DelayQueue<>();
+}
+}
+
+public UGICache() {
+this(new UGIProvider());
+}
+
+// Create new proxy UGI if not found in cache and increment reference 
count
+public UGICacheEntry getTimedProxyUGI(SessionId session)
+throws IOException {
+
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+synchronized (delayQueues[segmentId]) {
+// Use the opportunity to cleanup any expired entries
+cleanup(segmentId);
+UGICacheEntry timedProxyUGI = cache.get(session);
+if (timedProxyUGI == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+UserGroupInformation proxyUGI = 
ugiProvider.createProxyUGI(user);
+timedProxyUGI = new UGICacheEntry(proxyUGI, session);
+delayQueues[segmentId].offer(timedProxyUGI);
+cache.put(session, timedProxyUGI);
+}
+timedProxyUGI.incrementCounter();
+return timedProxyUGI;
+}
+}
+
+// Poll segment expiration queue for all expired entries
+// and clean them if possible
+private void cleanup(Integer segmentId) {
--- End diff --

proper javadoc, please


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201518811
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
@@ -42,8 +47,13 @@
 
 private static final Log LOG = 
LogFactory.getLog(SecurityServletFilter.class);
 private static final String USER_HEADER = "X-GP-USER";
-private static final String MISSING_HEADER_ERROR = 
String.format("Header %s is missing in the request", USER_HEADER);
-private static final String EMPTY_HEADER_ERROR = String.format("Header 
%s is empty in the request", USER_HEADER);
+private static final String SEGMENT_ID_HEADER = "X-GP-SEGMENT-ID";
+private static final String TRANSACTION_ID_HEADER = "X-GP-XID";
+private static final String FRAGMENT_INDEX_HEADER = 
"X-GP-FRAGMENT-INDEX";
+private static final String FRAGMENT_COUNT_HEADER = 
"X-GP-FRAGMENT-COUNT";
+private static final String MISSING_HEADER_ERROR = "Header %s is 
missing in the request";
+private static final String EMPTY_HEADER_ERROR = "Header %s is empty 
in the request";
+private static UGICache cache = new UGICache();
--- End diff --

this doesn't have to be static, create a single instance in the init() 
method


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201525708
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,197 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+private UGIProvider provider = null;
+private SessionId session = null;
+private UGICache cache = null;
+
+@Before
+public void setUp() throws Exception {
+provider = mock(UGIProvider.class);
+
+
when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer)
 invocation -> mock(UserGroupInformation.class));
+
+session = new SessionId(0, "txn-id", "the-user");
+
+cache = new UGICache(provider);
+}
+
+@Test
+public void getUGIFromEmptyCache() throws Exception {
+UGICacheEntry entry = cache.getTimedProxyUGI(session);
+assertNotNull(entry.getUGI());
+verify(provider).createProxyUGI("the-user");
+}
+
+@Test
+public void getSameUGITwiceUsesCache() throws Exception {
+UGICacheEntry entry1 = cache.getTimedProxyUGI(session);
+UGICacheEntry entry2 = cache.getTimedProxyUGI(session);
+assertEquals(entry1, entry2);
+assertNotNull(entry1.getUGI());
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentSessionsForSameUser() throws 
Exception {
+SessionId otherSession = new SessionId(0, "txn-id-2", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(2)).createProxyUGI("the-user");
+// TODO: this seems weird. We're creating two UGIs with the same 
params,
+// even though we have two different sessions. Why?
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsers() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession);
+UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession);
+assertEquals(proxyUGI1a, proxyUGI1b);
+assertEquals(proxyUGI2a, proxyUGI2b);
+assertNotEquals(proxyUGI1a, proxyUGI2a);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getUGIWhenRequestedUserDoesNotExist() throws Exception {
+// what does UserGroupInformation.createProxyUser() do in this 
scenario?
+// how about getLoginUser()?
+}
+
+@Test
+public void anySegmentIdIsValid() throws Exception {
+session = new SessionId(65, "txn-id", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+assertNotNull(proxyUGI1.getUGI());
+}
+
+@Test
+public void releaseWithoutForceClean() throws Exception {
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+
+cache.release(proxyUGI1, false);
+// UGI wasn't cleaned up, so we can still get it
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(session);
+assertEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void releaseWithForceClean() 

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201521673
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,154 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new 
ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private final Map> queueMap = new 
HashMap<>();
+private final UGIProvider ugiProvider;
+
+public UGICache(UGIProvider provider) {
+this.ugiProvider = provider;
+}
+
+public UGICache() {
+this(new UGIProvider());
+}
+
+private DelayQueue getDelayQueue(Integer segmentId) {
+DelayQueue queue = queueMap.get(segmentId);
+if (queue == null) {
+synchronized (queueMap) {
+queue = queueMap.get(segmentId);
+if (queue == null) {
+queue = new DelayQueue<>();
+queueMap.put(segmentId, queue);
+}
+}
+}
+return queue;
+}
+
+// Create new proxy UGI if not found in cache and increment reference 
count
+public UGICacheEntry getTimedProxyUGI(SessionId session)
+throws IOException {
+
+Integer segmentId = session.getSegmentId();
+String user = session.getUser();
+DelayQueue delayQueue = getDelayQueue(segmentId);
+synchronized (delayQueue) {
+// Use the opportunity to cleanup any expired entries
+cleanup(segmentId);
+UGICacheEntry timedProxyUGI = cache.get(session);
+if (timedProxyUGI == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+timedProxyUGI = new 
UGICacheEntry(ugiProvider.createProxyUGI(user), session);
+delayQueue.offer(timedProxyUGI);
+cache.put(session, timedProxyUGI);
+}
+timedProxyUGI.incrementCounter();
+return timedProxyUGI;
+}
+}
+
+// Poll segment expiration queue for all expired entries
+// and clean them if possible
+private void cleanup(Integer segmentId) {
+
+UGICacheEntry ugi = null;
+DelayQueue delayQueue = getDelayQueue(segmentId);
+while ((ugi = delayQueue.poll()) != null) {
+// Place it back in the queue if still in use and was not 
closed
+if (!closeUGI(ugi)) {
+delayQueue.offer(ugi);
+}
+LOG.debug("Delay Queue Size for segment " +
+segmentId + " = " + delayQueue.size());
--- End diff --

wrap with isDebugEnabled()


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201516439
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,143 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new 
ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private DelayQueue[] delayQueues = 
(DelayQueue[])new DelayQueue[64];
+private final UGIProvider ugiProvider;
+
+public UGICache(UGIProvider provider) {
+this.ugiProvider = provider;
+for (int i = 0; i < delayQueues.length; i++) {
+delayQueues[i] = new DelayQueue<>();
+}
+}
+
+public UGICache() {
+this(new UGIProvider());
+}
+
+// Create new proxy UGI if not found in cache and increment reference 
count
+public UGICacheEntry getTimedProxyUGI(SessionId session)
--- End diff --

since we return UGICacheEntry and method is called getTimedProxyUGI, it 
looks inconsistent


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201523390
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,197 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+private UGIProvider provider = null;
+private SessionId session = null;
+private UGICache cache = null;
+
+@Before
+public void setUp() throws Exception {
+provider = mock(UGIProvider.class);
+
+
when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer)
 invocation -> mock(UserGroupInformation.class));
+
+session = new SessionId(0, "txn-id", "the-user");
+
+cache = new UGICache(provider);
+}
+
+@Test
+public void getUGIFromEmptyCache() throws Exception {
+UGICacheEntry entry = cache.getTimedProxyUGI(session);
+assertNotNull(entry.getUGI());
+verify(provider).createProxyUGI("the-user");
+}
+
+@Test
+public void getSameUGITwiceUsesCache() throws Exception {
+UGICacheEntry entry1 = cache.getTimedProxyUGI(session);
+UGICacheEntry entry2 = cache.getTimedProxyUGI(session);
+assertEquals(entry1, entry2);
--- End diff --

assert ref count == 2 ?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201519133
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
@@ -64,21 +75,28 @@ public void init(FilterConfig filterConfig) throws 
ServletException {
  * @param chain filter chain
  */
 @Override
-public void doFilter(final ServletRequest request, final 
ServletResponse response, final FilterChain chain) throws IOException, 
ServletException {
+public void doFilter(final ServletRequest request, final 
ServletResponse response, final FilterChain chain)
+throws IOException, ServletException {
 
 if (SecureLogin.isUserImpersonationEnabled()) {
 
 // retrieve user header and make sure header is present and is 
not empty
-final String user = ((HttpServletRequest) 
request).getHeader(USER_HEADER);
-if (user == null) {
-throw new IllegalArgumentException(MISSING_HEADER_ERROR);
-} else if (user.trim().isEmpty()) {
-throw new IllegalArgumentException(EMPTY_HEADER_ERROR);
+final String user = getHeaderValue(request, USER_HEADER);
+String transactionId = getHeaderValue(request, 
TRANSACTION_ID_HEADER);
--- End diff --

why not mandate transaction id and segment id to be not null ?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201522755
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,197 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
--- End diff --

if you use @RunWith(MockitoJUnitRunner.class) then you can use @Mock 
annotations, and do not have to verify if you trained the mock with specific 
values, not any . We discussed it with Scala, which didn't have that support.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201519675
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
@@ -89,32 +106,49 @@ public Boolean run() throws IOException, 
ServletException {
 };
 
 // create proxy user UGI from the UGI of the logged in user 
and execute the servlet chain as that user
-UserGroupInformation proxyUGI = null;
+UGICacheEntry timedProxyUGI = cache.getTimedProxyUGI(session);
 try {
-LOG.debug("Creating proxy user = " + user);
-proxyUGI = UserGroupInformation.createProxyUser(user, 
UserGroupInformation.getLoginUser());
-proxyUGI.doAs(action);
+timedProxyUGI.getUGI().doAs(action);
 } catch (UndeclaredThrowableException ute) {
 // unwrap the real exception thrown by the action
 throw new ServletException(ute.getCause());
 } catch (InterruptedException ie) {
 throw new ServletException(ie);
-} finally {
-try {
-if (proxyUGI != null) {
-LOG.debug("Closing FileSystem for proxy user = " + 
proxyUGI.getUserName());
-FileSystem.closeAllForUGI(proxyUGI);
-}
-} catch (Throwable t) {
-LOG.warn("Error closing FileSystem for proxy user = " 
+ proxyUGI.getUserName());
-}
+}
+finally {
+// Optimization to cleanup the cache if it is the last 
fragment
+boolean forceClean = (fragmentIndex != null && 
fragmentCount.equals(fragmentIndex));
+cache.release(timedProxyUGI, forceClean);
--- End diff --

if release throws exception, we do not want to propagate it to user, so 
restore catch (Throwable t) block ?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201523834
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,197 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+private UGIProvider provider = null;
+private SessionId session = null;
+private UGICache cache = null;
+
+@Before
+public void setUp() throws Exception {
+provider = mock(UGIProvider.class);
+
+
when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer)
 invocation -> mock(UserGroupInformation.class));
+
+session = new SessionId(0, "txn-id", "the-user");
+
+cache = new UGICache(provider);
+}
+
+@Test
+public void getUGIFromEmptyCache() throws Exception {
+UGICacheEntry entry = cache.getTimedProxyUGI(session);
+assertNotNull(entry.getUGI());
+verify(provider).createProxyUGI("the-user");
+}
+
+@Test
+public void getSameUGITwiceUsesCache() throws Exception {
+UGICacheEntry entry1 = cache.getTimedProxyUGI(session);
+UGICacheEntry entry2 = cache.getTimedProxyUGI(session);
+assertEquals(entry1, entry2);
+assertNotNull(entry1.getUGI());
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentSessionsForSameUser() throws 
Exception {
+SessionId otherSession = new SessionId(0, "txn-id-2", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(2)).createProxyUGI("the-user");
+// TODO: this seems weird. We're creating two UGIs with the same 
params,
+// even though we have two different sessions. Why?
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsers() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession);
+UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession);
+assertEquals(proxyUGI1a, proxyUGI1b);
+assertEquals(proxyUGI2a, proxyUGI2b);
+assertNotEquals(proxyUGI1a, proxyUGI2a);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getUGIWhenRequestedUserDoesNotExist() throws Exception {
+// what does UserGroupInformation.createProxyUser() do in this 
scenario?
+// how about getLoginUser()?
--- End diff --

user existence is not required for proxy users, remove the method


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201513956
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java ---
@@ -0,0 +1,60 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class SessionId {
+
+private final String user;
+private Integer segmentId;
+private String sessionId;
+
+public SessionId(Integer segmentId, String transactionId, String 
gpdbUser) {
+this.segmentId = segmentId;
+this.user = gpdbUser;
+this.sessionId = segmentId + ":" + transactionId + ":" + gpdbUser;
--- End diff --

I mentally scope in the other order: user > transaction > segment, so maybe 
reverse the order of components so it is easier to analyze when debugging / 
logging ?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201516265
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,143 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
--- End diff --

javadocs please


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201524862
  
--- Diff: 
pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/UGICacheTest.java ---
@@ -0,0 +1,197 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class UGICacheTest {
+private UGIProvider provider = null;
+private SessionId session = null;
+private UGICache cache = null;
+
+@Before
+public void setUp() throws Exception {
+provider = mock(UGIProvider.class);
+
+
when(provider.createProxyUGI(any(String.class))).thenAnswer((Answer)
 invocation -> mock(UserGroupInformation.class));
+
+session = new SessionId(0, "txn-id", "the-user");
+
+cache = new UGICache(provider);
+}
+
+@Test
+public void getUGIFromEmptyCache() throws Exception {
+UGICacheEntry entry = cache.getTimedProxyUGI(session);
+assertNotNull(entry.getUGI());
+verify(provider).createProxyUGI("the-user");
+}
+
+@Test
+public void getSameUGITwiceUsesCache() throws Exception {
+UGICacheEntry entry1 = cache.getTimedProxyUGI(session);
+UGICacheEntry entry2 = cache.getTimedProxyUGI(session);
+assertEquals(entry1, entry2);
+assertNotNull(entry1.getUGI());
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentSessionsForSameUser() throws 
Exception {
+SessionId otherSession = new SessionId(0, "txn-id-2", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(2)).createProxyUGI("the-user");
+// TODO: this seems weird. We're creating two UGIs with the same 
params,
+// even though we have two different sessions. Why?
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsers() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(otherSession);
+assertNotEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getTwoUGIsWithDifferentUsersCachesBoth() throws Exception {
+SessionId otherSession = new SessionId(0, "txn-id", 
"different-user");
+UGICacheEntry proxyUGI1a = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI1b = cache.getTimedProxyUGI(session);
+UGICacheEntry proxyUGI2a = cache.getTimedProxyUGI(otherSession);
+UGICacheEntry proxyUGI2b = cache.getTimedProxyUGI(otherSession);
+assertEquals(proxyUGI1a, proxyUGI1b);
+assertEquals(proxyUGI2a, proxyUGI2b);
+assertNotEquals(proxyUGI1a, proxyUGI2a);
+verify(provider, times(1)).createProxyUGI("the-user");
+verify(provider, times(1)).createProxyUGI("different-user");
+}
+
+@Test
+public void getUGIWhenRequestedUserDoesNotExist() throws Exception {
+// what does UserGroupInformation.createProxyUser() do in this 
scenario?
+// how about getLoginUser()?
+}
+
+@Test
+public void anySegmentIdIsValid() throws Exception {
+session = new SessionId(65, "txn-id", "the-user");
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+assertNotNull(proxyUGI1.getUGI());
+}
+
+@Test
+public void releaseWithoutForceClean() throws Exception {
+UGICacheEntry proxyUGI1 = cache.getTimedProxyUGI(session);
+
+cache.release(proxyUGI1, false);
+// UGI wasn't cleaned up, so we can still get it
+UGICacheEntry proxyUGI2 = cache.getTimedProxyUGI(session);
+assertEquals(proxyUGI1, proxyUGI2);
+verify(provider, times(1)).createProxyUGI("the-user");
+}
+
+@Test
+public void releaseWithForceClean() 

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201513391
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java ---
@@ -0,0 +1,60 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class SessionId {
+
+private final String user;
+private Integer segmentId;
+private String sessionId;
+
+public SessionId(Integer segmentId, String transactionId, String 
gpdbUser) {
+this.segmentId = segmentId;
+this.user = gpdbUser;
+this.sessionId = segmentId + ":" + transactionId + ":" + gpdbUser;
+}
+
+public Integer getSegmentId() {
+return segmentId;
+}
+
+@Override
+public int hashCode() {
+return sessionId.hashCode();
+}
+
+@Override
+public boolean equals(Object other) {
+if (!(other instanceof SessionId)) {
+return false;
+}
+SessionId that = (SessionId) other;
+return this.sessionId.equals(that.sessionId);
--- End diff --

I usually follow the pattern and utility methods from: 
https://commons.apache.org/proper/commons-lang/apidocs/org/apache/commons/lang3/builder/EqualsBuilder.html


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201515880
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,143 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private Map cache = new 
ConcurrentHashMap<>();
+@SuppressWarnings("unchecked")
+// There is a separate DelayQueue for each segment (also being used 
for locking)
+private DelayQueue[] delayQueues = 
(DelayQueue[])new DelayQueue[64];
--- End diff --

can we use List backed by ArrayList -- similar performance 
without hacking the lack of generics support for arrays


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-10 Thread lavjain
Github user lavjain commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201510484
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/TimedProxyUGI.java ---
@@ -0,0 +1,93 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class TimedProxyUGI implements Delayed {
+
+private volatile long startTime;
+private UserGroupInformation proxyUGI;
+private SegmentTransactionId session;
+private boolean cleaned = false;
+AtomicInteger inProgress = new AtomicInteger();
+
+public TimedProxyUGI(UserGroupInformation proxyUGI, 
SegmentTransactionId session) {
+this.startTime = System.currentTimeMillis();
+this.proxyUGI = proxyUGI;
+this.session = session;
+}
+
+public UserGroupInformation getProxyUGI() {
+return proxyUGI;
+}
+
+public SegmentTransactionId getSession() {
+return session;
+}
+
+public boolean isCleaned() {
+return cleaned;
+}
+
+public void setCleaned() {
+cleaned = true;
+}
+
+public int getCounter() {
+return inProgress.get();
+}
+
+public void incrementCounter() {
+inProgress.incrementAndGet();
+}
+
+public void decrementCounter() {
+inProgress.decrementAndGet();
+}
+
+public void resetTime() {
+startTime = System.currentTimeMillis();
+}
+
+public void setExpired() {
+startTime = -1L;
+}
+
+@Override
+public long getDelay(TimeUnit unit) {
+return unit.convert(getDelayMillis(), TimeUnit.MILLISECONDS);
+}
+
+@Override
+public int compareTo(Delayed other) {
+TimedProxyUGI that = (TimedProxyUGI) other;
+return Long.compare(this.getDelayMillis(), that.getDelayMillis());
+}
+
+public long getDelayMillis() {
--- End diff --

`Delayed` just expects you to implement `compareTo(Delayed other)`


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-09 Thread lavjain
Github user lavjain commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201091933
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,129 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private static Map cache = new 
ConcurrentHashMap<>();
+//private static DelayQueue delayQueue = new 
DelayQueue<>();
+private static DelayQueue[] delayQueues = new 
DelayQueue<>[64];
+public static long UGI_CACHE_EXPIRY = 15 * 1 * 1000L; // 15 Minutes
+
+public UGICache() {
+for (int i = 0; i < delayQueues.length; i++) {
+delayQueues[i] = new DelayQueue<>();
--- End diff --

w\ @benchristel Will change `DelayQueues` to be not static.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-09 Thread shivzone
Github user shivzone commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r201090455
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SegmentTransactionId.java
 ---
@@ -0,0 +1,60 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class SegmentTransactionId {
+
+private Integer segmentId;
+private String transactionId;
--- End diff --

We might need it in the future. It doesn't hurt to leave in there for now.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200794005
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/TimedProxyUGI.java ---
@@ -0,0 +1,93 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class TimedProxyUGI implements Delayed {
+
+private volatile long startTime;
+private UserGroupInformation proxyUGI;
+private SegmentTransactionId session;
+private boolean cleaned = false;
+AtomicInteger inProgress = new AtomicInteger();
+
+public TimedProxyUGI(UserGroupInformation proxyUGI, 
SegmentTransactionId session) {
+this.startTime = System.currentTimeMillis();
+this.proxyUGI = proxyUGI;
+this.session = session;
+}
+
+public UserGroupInformation getProxyUGI() {
+return proxyUGI;
+}
+
+public SegmentTransactionId getSession() {
+return session;
+}
+
+public boolean isCleaned() {
+return cleaned;
+}
+
+public void setCleaned() {
+cleaned = true;
+}
+
+public int getCounter() {
+return inProgress.get();
+}
+
+public void incrementCounter() {
+inProgress.incrementAndGet();
+}
+
+public void decrementCounter() {
+inProgress.decrementAndGet();
+}
+
+public void resetTime() {
+startTime = System.currentTimeMillis();
+}
+
+public void setExpired() {
+startTime = -1L;
+}
+
+@Override
+public long getDelay(TimeUnit unit) {
+return unit.convert(getDelayMillis(), TimeUnit.MILLISECONDS);
+}
+
+@Override
+public int compareTo(Delayed other) {
+TimedProxyUGI that = (TimedProxyUGI) other;
+return Long.compare(this.getDelayMillis(), that.getDelayMillis());
+}
+
+public long getDelayMillis() {
--- End diff --

Is this an `@Override` from `Delayed`?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200797045
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,129 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private static Map cache = new 
ConcurrentHashMap<>();
+//private static DelayQueue delayQueue = new 
DelayQueue<>();
+private static DelayQueue[] delayQueues = new 
DelayQueue<>[64];
+public static long UGI_CACHE_EXPIRY = 15 * 1 * 1000L; // 15 Minutes
+
+public UGICache() {
+for (int i = 0; i < delayQueues.length; i++) {
+delayQueues[i] = new DelayQueue<>();
+}
+}
+
+public TimedProxyUGI getTimedProxyUGI(String user, 
SegmentTransactionId session) throws IOException {
+
+Integer segmentId = session.getSegmentId();
+synchronized (delayQueues[segmentId]) {
+// use the opportunity to cleanup any expired entries
+cleanup(segmentId);
+TimedProxyUGI timedProxyUGI = cache.get(session);
+if (timedProxyUGI == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+UserGroupInformation proxyUGI =
+UserGroupInformation.createProxyUser(user, 
UserGroupInformation.getLoginUser());
+timedProxyUGI = new TimedProxyUGI(proxyUGI, session);
+delayQueues[segmentId].offer(timedProxyUGI);
+cache.put(session, timedProxyUGI);
+}
+timedProxyUGI.incrementCounter();
+return timedProxyUGI;
+}
+}
+
+private cleanup(Integer segmentId) {
+// poll segment expiration queue for all expired entries and clean 
them if possible   
+TimedProxyUGI ugi = null;
+while ((ugi = delayQueues[segmentId].poll()) != null) {
+// place it back in the queue if still in use and was not 
closed
+if (!closeUGI(ugi)) {
+delayQueues[segmentId].offer(ugi);
+}
+LOG.info("Delay Queue Size for segment " +
+segmentId + " = " + delayQueues[segmentId].size());
+}
+}
+
+public void release(TimedProxyUGI timedProxyUGI, boolean forceClean) {
+
+Integer segmentId = timedProxyUGI.getSession().getSegmentId();
+
+timedProxyUGI.resetTime();
+timedProxyUGI.decrementCounter();
+
+if (forceClean) {
+synchronized (delayQueues[segmentId]) {
+timedProxyUGI.setExpired();
+closeUGI(timedProxyUGI);
+}
+}
+}
+
+private static boolean closeUGI(TimedProxyUGI expiredProxyUGI) {
+
+SegmentTransactionId session = expiredProxyUGI.getSession();
+Integer segmentId = session.getSegmentId();
+//synchronized (delayQueues[segmentId]) {
+String fsMsg = "FileSystem for proxy user = " + 
expiredProxyUGI.getProxyUGI().getUserName();
+try {
+// The UGI object is still being used by another thread
+if (expiredProxyUGI.inProgress.get() != 0) {
+LOG.info(session.toString() + " Skipping close of " + 
fsMsg);
+expiredProx

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200793393
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/TimedProxyUGI.java ---
@@ -0,0 +1,93 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class TimedProxyUGI implements Delayed {
+
+private volatile long startTime;
+private UserGroupInformation proxyUGI;
+private SegmentTransactionId session;
+private boolean cleaned = false;
+AtomicInteger inProgress = new AtomicInteger();
+
+public TimedProxyUGI(UserGroupInformation proxyUGI, 
SegmentTransactionId session) {
+this.startTime = System.currentTimeMillis();
+this.proxyUGI = proxyUGI;
+this.session = session;
+}
+
+public UserGroupInformation getProxyUGI() {
+return proxyUGI;
+}
+
+public SegmentTransactionId getSession() {
+return session;
+}
+
+public boolean isCleaned() {
+return cleaned;
+}
+
+public void setCleaned() {
+cleaned = true;
+}
+
+public int getCounter() {
+return inProgress.get();
+}
+
+public void incrementCounter() {
+inProgress.incrementAndGet();
+}
+
+public void decrementCounter() {
+inProgress.decrementAndGet();
+}
+
+public void resetTime() {
+startTime = System.currentTimeMillis();
+}
+
+public void setExpired() {
+startTime = -1L;
+}
+
+@Override
+public long getDelay(TimeUnit unit) {
+return unit.convert(getDelayMillis(), TimeUnit.MILLISECONDS);
+}
+
+@Override
+public int compareTo(Delayed other) {
+TimedProxyUGI that = (TimedProxyUGI) other;
+return Long.compare(this.getDelayMillis(), that.getDelayMillis());
+}
+
+public long getDelayMillis() {
+return (startTime + UGICache.UGI_CACHE_EXPIRY) - 
System.currentTimeMillis();
--- End diff --

It's a little strange that we're referencing `UGI_CACHE_EXPIRY` directly 
here—it hints at a tight coupling between this class and the `UGICache` that 
isn't explicit elsewhere, and also creates a dependency cycle between these two 
classes. That may be fine if we want to let these classes be coupled, but if so 
I'd want to make the coupling even more explicit, e.g. by making this class a 
public static inner class of `UGICache`. If we want to decouple them, we could 
instead pass the expiry time to the constructor of `TimedProxyUGI`.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200794109
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/TimedProxyUGI.java ---
@@ -0,0 +1,93 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class TimedProxyUGI implements Delayed {
+
+private volatile long startTime;
+private UserGroupInformation proxyUGI;
+private SegmentTransactionId session;
+private boolean cleaned = false;
+AtomicInteger inProgress = new AtomicInteger();
+
+public TimedProxyUGI(UserGroupInformation proxyUGI, 
SegmentTransactionId session) {
+this.startTime = System.currentTimeMillis();
+this.proxyUGI = proxyUGI;
+this.session = session;
+}
+
+public UserGroupInformation getProxyUGI() {
+return proxyUGI;
+}
+
+public SegmentTransactionId getSession() {
+return session;
+}
+
+public boolean isCleaned() {
+return cleaned;
+}
+
+public void setCleaned() {
+cleaned = true;
+}
+
+public int getCounter() {
+return inProgress.get();
+}
+
+public void incrementCounter() {
+inProgress.incrementAndGet();
+}
+
+public void decrementCounter() {
+inProgress.decrementAndGet();
+}
+
+public void resetTime() {
+startTime = System.currentTimeMillis();
+}
+
+public void setExpired() {
+startTime = -1L;
+}
+
+@Override
+public long getDelay(TimeUnit unit) {
+return unit.convert(getDelayMillis(), TimeUnit.MILLISECONDS);
+}
+
+@Override
+public int compareTo(Delayed other) {
+TimedProxyUGI that = (TimedProxyUGI) other;
+return Long.compare(this.getDelayMillis(), that.getDelayMillis());
--- End diff --

This comparison isn't safe, because `getDelayMillis()` gets the current 
time and therefore two `TimedProxyUGI`s that compare equal at one moment may 
not be equal the next. We're also doing an unsafe cast from `Delayed` to 
`TimedProxyUGI`.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200797423
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java
 ---
@@ -64,21 +75,28 @@ public void init(FilterConfig filterConfig) throws 
ServletException {
  * @param chain filter chain
  */
 @Override
-public void doFilter(final ServletRequest request, final 
ServletResponse response, final FilterChain chain) throws IOException, 
ServletException {
+public void doFilter(final ServletRequest request, final 
ServletResponse response, final FilterChain chain)
+throws IOException, ServletException {
 
 if (SecureLogin.isUserImpersonationEnabled()) {
 
 // retrieve user header and make sure header is present and is 
not empty
-final String user = ((HttpServletRequest) 
request).getHeader(USER_HEADER);
-if (user == null) {
-throw new IllegalArgumentException(MISSING_HEADER_ERROR);
-} else if (user.trim().isEmpty()) {
-throw new IllegalArgumentException(EMPTY_HEADER_ERROR);
+final String user = getHeaderValue(request, USER_HEADER);
+String transactionId = getHeaderValue(request, 
TRANSACTION_ID_HEADER);
--- End diff --

We are not passing `required=true` to `getHeaderValue()` here—is it 
really valid for `transactionId` to be null?

If `transactionId` can be null, that means we're going to create 
`SegmentTransactionId`s with nothing but a segment ID in them, and we'll 
potentially get the wrong UGI from the cache if another request with no 
`transactionId` was recently served.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200792239
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/TimedProxyUGI.java ---
@@ -0,0 +1,81 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class TimedProxyUGI implements Delayed {
+
+private long startTime;
+private UserGroupInformation proxyUGI;
+private SegmentTransactionId session;
+AtomicInteger inProgress = new AtomicInteger();
+
+public TimedProxyUGI(UserGroupInformation proxyUGI, 
SegmentTransactionId session) {
+this.startTime = System.currentTimeMillis();
+this.proxyUGI = proxyUGI;
+this.session = session;
+inProgress.incrementAndGet();
+}
+
+public UserGroupInformation getProxyUGI() {
--- End diff --

If a "ProxyUGI" is a wrapper around UGI (as the name of this class 
implies), maybe the name of this method should be `getUGI`. The corresponding 
field could be named `ugi` or `delegateUGI` or something.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200796211
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,129 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private static Map cache = new 
ConcurrentHashMap<>();
+//private static DelayQueue delayQueue = new 
DelayQueue<>();
+private static DelayQueue[] delayQueues = new 
DelayQueue<>[64];
+public static long UGI_CACHE_EXPIRY = 15 * 1 * 1000L; // 15 Minutes
+
+public UGICache() {
+for (int i = 0; i < delayQueues.length; i++) {
+delayQueues[i] = new DelayQueue<>();
+}
+}
+
+public TimedProxyUGI getTimedProxyUGI(String user, 
SegmentTransactionId session) throws IOException {
--- End diff --

I think there's a potential session hijacking vulnerability here. If you 
pass a `session` that exists in the cache—for any user—the `user` param is 
ignored when looking up the corresponding UGI. This means that if I know a 
transaction ID for one of your recent transactions I can authenticate as you 
and use your UGI.

At a minimum, I think there should be some information in the session that 
is more difficult to guess than a transaction ID. I'm not sure what else needs 
to be done to make this secure.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200793656
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/TimedProxyUGI.java ---
@@ -0,0 +1,93 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class TimedProxyUGI implements Delayed {
+
+private volatile long startTime;
+private UserGroupInformation proxyUGI;
+private SegmentTransactionId session;
+private boolean cleaned = false;
+AtomicInteger inProgress = new AtomicInteger();
--- End diff --

Is this essentially a reference count? If so, could it be named 
`referenceCount` or similar? We might also want to rename the accessor methods 
to match.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200791798
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SegmentTransactionId.java
 ---
@@ -0,0 +1,60 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class SegmentTransactionId {
--- End diff --

The code that uses this class refers to its instances as `session`s. Could 
the class be called `Session`?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200794348
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,129 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private static Map cache = new 
ConcurrentHashMap<>();
+//private static DelayQueue delayQueue = new 
DelayQueue<>();
+private static DelayQueue[] delayQueues = new 
DelayQueue<>[64];
+public static long UGI_CACHE_EXPIRY = 15 * 1 * 1000L; // 15 Minutes
--- End diff --

Should this be `15 * 60 * 1000L`?

We could define a constant `MINUTES = 60 * 1000L` to make this more 
readable and obviate the comment.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200797134
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,129 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private static Map cache = new 
ConcurrentHashMap<>();
+//private static DelayQueue delayQueue = new 
DelayQueue<>();
+private static DelayQueue[] delayQueues = new 
DelayQueue<>[64];
+public static long UGI_CACHE_EXPIRY = 15 * 1 * 1000L; // 15 Minutes
+
+public UGICache() {
+for (int i = 0; i < delayQueues.length; i++) {
+delayQueues[i] = new DelayQueue<>();
+}
+}
+
+public TimedProxyUGI getTimedProxyUGI(String user, 
SegmentTransactionId session) throws IOException {
+
+Integer segmentId = session.getSegmentId();
+synchronized (delayQueues[segmentId]) {
+// use the opportunity to cleanup any expired entries
+cleanup(segmentId);
+TimedProxyUGI timedProxyUGI = cache.get(session);
+if (timedProxyUGI == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+UserGroupInformation proxyUGI =
+UserGroupInformation.createProxyUser(user, 
UserGroupInformation.getLoginUser());
+timedProxyUGI = new TimedProxyUGI(proxyUGI, session);
+delayQueues[segmentId].offer(timedProxyUGI);
+cache.put(session, timedProxyUGI);
+}
+timedProxyUGI.incrementCounter();
+return timedProxyUGI;
+}
+}
+
+private cleanup(Integer segmentId) {
+// poll segment expiration queue for all expired entries and clean 
them if possible   
+TimedProxyUGI ugi = null;
+while ((ugi = delayQueues[segmentId].poll()) != null) {
+// place it back in the queue if still in use and was not 
closed
+if (!closeUGI(ugi)) {
+delayQueues[segmentId].offer(ugi);
+}
+LOG.info("Delay Queue Size for segment " +
+segmentId + " = " + delayQueues[segmentId].size());
+}
+}
+
+public void release(TimedProxyUGI timedProxyUGI, boolean forceClean) {
+
+Integer segmentId = timedProxyUGI.getSession().getSegmentId();
+
+timedProxyUGI.resetTime();
+timedProxyUGI.decrementCounter();
+
+if (forceClean) {
+synchronized (delayQueues[segmentId]) {
+timedProxyUGI.setExpired();
+closeUGI(timedProxyUGI);
+}
+}
+}
+
+private static boolean closeUGI(TimedProxyUGI expiredProxyUGI) {
+
+SegmentTransactionId session = expiredProxyUGI.getSession();
+Integer segmentId = session.getSegmentId();
+//synchronized (delayQueues[segmentId]) {
+String fsMsg = "FileSystem for proxy user = " + 
expiredProxyUGI.getProxyUGI().getUserName();
+try {
+// The UGI object is still being used by another thread
+if (expiredProxyUGI.inProgress.get() != 0) {
+LOG.info(session.toString() + " Skipping close of " + 
fsMsg);
+expiredProx

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200792669
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,123 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private static Map cache = new 
ConcurrentHashMap<>();
+private static DelayQueue delayQueue = new 
DelayQueue<>();
+private static Object[] segmentLocks = new Object[100];
+public static long UGI_CACHE_EXPIRY = 15 * 1 * 1000L; // 15 Minutes
+
+static {
+Thread t = new Thread(new Runnable() {
+
+public void run() {
+while (true) {
+try {
+Thread.sleep(UGI_CACHE_EXPIRY);
+TimedProxyUGI timedProxyUGI = delayQueue.poll();
+while (timedProxyUGI != null) {
+closeUGI(timedProxyUGI);
+LOG.info("Delay Queue Size = " + 
delayQueue.size());
+timedProxyUGI = delayQueue.poll();
+}
+} catch (InterruptedException ie) {
+LOG.warn("Terminating reaper thread");
+return;
+}
+}
+}
+});
+
+t.setName("UGI Reaper Thread");
+t.start();
+for (int i = 0; i < segmentLocks.length; i++) {
+segmentLocks[i] = new Object();
+}
+}
+
+public TimedProxyUGI getTimedProxyUGI(String user, 
SegmentTransactionId session) throws IOException {
+
+Integer segmentId = session.getSegmentId();
+synchronized (segmentLocks[segmentId]) {
+TimedProxyUGI timedProxyUGI = cache.get(session);
+if (timedProxyUGI == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+UserGroupInformation proxyUGI =
+UserGroupInformation.createProxyUser(user, 
UserGroupInformation.getLoginUser());
+timedProxyUGI = new TimedProxyUGI(proxyUGI, session);
+delayQueue.offer(timedProxyUGI);
+cache.put(session, timedProxyUGI);
+} else if (timedProxyUGI.getDelayMillis() < 0) {
+closeUGI(timedProxyUGI);
+} else {
+timedProxyUGI.incrementCounter();
+}
+return timedProxyUGI;
+}
+}
+
+public void release(TimedProxyUGI timedProxyUGI, Integer 
fragmentIndex, Integer fragmentCount) {
+
+Integer segmentId = timedProxyUGI.getSession().getSegmentId();
+synchronized (segmentLocks[segmentId]) {
+timedProxyUGI.resetTime();
+timedProxyUGI.decrementCounter();
+if (fragmentIndex != null && 
fragmentCount.equals(fragmentIndex))
+closeUGI(timedProxyUGI);
+}
+}
+
+private static void closeUGI(TimedProxyUGI timedProxyUGI) {
+
+Integer segmentId = timedProxyUGI.getSession().getSegmentId();
+synchronized (segmentLocks[segmentId]) {
+String fsMsg = "FileSystem for proxy user = " + 
timedProxyUGI.getProxyUGI().getUserName();
+try {
+if (timedProxyU

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200795039
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,129 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private static Map cache = new 
ConcurrentHashMap<>();
+//private static DelayQueue delayQueue = new 
DelayQueue<>();
+private static DelayQueue[] delayQueues = new 
DelayQueue<>[64];
+public static long UGI_CACHE_EXPIRY = 15 * 1 * 1000L; // 15 Minutes
+
+public UGICache() {
+for (int i = 0; i < delayQueues.length; i++) {
+delayQueues[i] = new DelayQueue<>();
+}
+}
+
+public TimedProxyUGI getTimedProxyUGI(String user, 
SegmentTransactionId session) throws IOException {
+
+Integer segmentId = session.getSegmentId();
+synchronized (delayQueues[segmentId]) {
+// use the opportunity to cleanup any expired entries
+cleanup(segmentId);
+TimedProxyUGI timedProxyUGI = cache.get(session);
+if (timedProxyUGI == null) {
+LOG.info(session.toString() + " Creating proxy user = " + 
user);
+UserGroupInformation proxyUGI =
+UserGroupInformation.createProxyUser(user, 
UserGroupInformation.getLoginUser());
+timedProxyUGI = new TimedProxyUGI(proxyUGI, session);
+delayQueues[segmentId].offer(timedProxyUGI);
+cache.put(session, timedProxyUGI);
+}
+timedProxyUGI.incrementCounter();
+return timedProxyUGI;
+}
+}
+
+private cleanup(Integer segmentId) {
+// poll segment expiration queue for all expired entries and clean 
them if possible   
+TimedProxyUGI ugi = null;
+while ((ugi = delayQueues[segmentId].poll()) != null) {
+// place it back in the queue if still in use and was not 
closed
+if (!closeUGI(ugi)) {
+delayQueues[segmentId].offer(ugi);
+}
+LOG.info("Delay Queue Size for segment " +
+segmentId + " = " + delayQueues[segmentId].size());
+}
+}
+
+public void release(TimedProxyUGI timedProxyUGI, boolean forceClean) {
+
+Integer segmentId = timedProxyUGI.getSession().getSegmentId();
+
+timedProxyUGI.resetTime();
+timedProxyUGI.decrementCounter();
+
+if (forceClean) {
+synchronized (delayQueues[segmentId]) {
+timedProxyUGI.setExpired();
+closeUGI(timedProxyUGI);
+}
+}
+}
+
+private static boolean closeUGI(TimedProxyUGI expiredProxyUGI) {
+
+SegmentTransactionId session = expiredProxyUGI.getSession();
+Integer segmentId = session.getSegmentId();
+//synchronized (delayQueues[segmentId]) {
+String fsMsg = "FileSystem for proxy user = " + 
expiredProxyUGI.getProxyUGI().getUserName();
+try {
+// The UGI object is still being used by another thread
+if (expiredProxyUGI.inProgress.get() != 0) {
--- End diff --

Should this be `expiredProxyUGI.getCounter()`? Maybe there should be a 
method `isInUse()` that

[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200794241
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,129 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private static Map cache = new 
ConcurrentHashMap<>();
+//private static DelayQueue delayQueue = new 
DelayQueue<>();
+private static DelayQueue[] delayQueues = new 
DelayQueue<>[64];
--- End diff --

What's the significance of `64`?

**edit**: oh, I see... it's the max segment ID? How does that work? Are 
there at most 64 segments?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread benchristel
Github user benchristel commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200791285
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SegmentTransactionId.java
 ---
@@ -0,0 +1,60 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class SegmentTransactionId {
+
+private Integer segmentId;
+private String transactionId;
+private String segmentTransactionId;
+
+public SegmentTransactionId(Integer segmentId, String transactionId) {
+this.segmentId = segmentId;
+this.transactionId = transactionId;
+this.segmentTransactionId = segmentId + ":" + transactionId;
+}
+
+public Integer getSegmentId() {
+return segmentId;
+}
+
+public String getSegmentTransactionId() {
--- End diff --

This method doesn't appear to be used.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread frankgh
Github user frankgh commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200791461
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SegmentTransactionId.java
 ---
@@ -0,0 +1,60 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class SegmentTransactionId {
+
+private Integer segmentId;
+private String transactionId;
--- End diff --

`transactionId` is not being used anywhere, it can be removed.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread frankgh
Github user frankgh commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200793645
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,129 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private static Map cache = new 
ConcurrentHashMap<>();
+//private static DelayQueue delayQueue = new 
DelayQueue<>();
+private static DelayQueue[] delayQueues = new 
DelayQueue<>[64];
+public static long UGI_CACHE_EXPIRY = 15 * 1 * 1000L; // 15 Minutes
+
+public UGICache() {
+for (int i = 0; i < delayQueues.length; i++) {
+delayQueues[i] = new DelayQueue<>();
--- End diff --

delayQueues is static. Every time a UGICache is instantiated, the 
`delayQueues` array will be initialized. We may want to initialize in a static 
ctor.


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread frankgh
Github user frankgh commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200793774
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,129 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private static Map cache = new 
ConcurrentHashMap<>();
+//private static DelayQueue delayQueue = new 
DelayQueue<>();
+private static DelayQueue[] delayQueues = new 
DelayQueue<>[64];
--- End diff --

Are we limiting to 64 segments?


---


[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...

2018-07-06 Thread frankgh
Github user frankgh commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1379#discussion_r200793183
  
--- Diff: 
pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java ---
@@ -0,0 +1,129 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class UGICache {
+
+private static final Log LOG = LogFactory.getLog(UGICache.class);
+private static Map cache = new 
ConcurrentHashMap<>();
+//private static DelayQueue delayQueue = new 
DelayQueue<>();
+private static DelayQueue[] delayQueues = new 
DelayQueue<>[64];
+public static long UGI_CACHE_EXPIRY = 15 * 1 * 1000L; // 15 Minutes
--- End diff --

Can `UGI_CACHE_EXPIRY` be parameterized?


---