zeroflag commented on code in PR #1043:
URL: https://github.com/apache/knox/pull/1043#discussion_r2097694327


##########
gateway-provider-identity-assertion-common/src/main/java/org/apache/knox/gateway/identityasserter/common/filter/CommonIdentityAssertionFilter.java:
##########
@@ -62,286 +35,318 @@
 import org.apache.knox.gateway.util.AuthorizationException;
 import org.apache.knox.gateway.util.HttpExceptionUtils;
 
+import javax.security.auth.Subject;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletRequestWrapper;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.security.AccessController;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.stream.Collectors;
+
+import static 
org.apache.knox.gateway.identityasserter.common.filter.AbstractIdentityAsserterDeploymentContributor.IMPERSONATION_PARAMS;
+import static 
org.apache.knox.gateway.identityasserter.common.filter.AbstractIdentityAsserterDeploymentContributor.ROLE;
+import static 
org.apache.knox.gateway.identityasserter.common.filter.VirtualGroupMapper.addRequestFunctions;
+
 public class CommonIdentityAssertionFilter extends 
AbstractIdentityAssertionFilter {
-  private static final IdentityAsserterMessages LOG = 
MessagesFactory.get(IdentityAsserterMessages.class);
-
-  public static final String VIRTUAL_GROUP_MAPPING_PREFIX = "group.mapping.";
-  public static final String GROUP_PRINCIPAL_MAPPING = 
"group.principal.mapping";
-  public static final String PRINCIPAL_MAPPING = "principal.mapping";
-  public static final String ADVANCED_PRINCIPAL_MAPPING = 
"expression.principal.mapping";
-  private static final String PRINCIPAL_PARAM = "user.name";
-  private static final String DOAS_PRINCIPAL_PARAM = "doAs";
-  static final String IMPERSONATION_ENABLED_PARAM = 
AuthFilterUtils.PROXYUSER_PREFIX + ".impersonation.enabled";
-
-  private SimplePrincipalMapper mapper = new SimplePrincipalMapper();
-  private final Parser parser = new Parser();
-  private VirtualGroupMapper virtualGroupMapper;
-  /* List of all default and configured impersonation params */
-  protected final List<String> impersonationParamsList = new ArrayList<>();
-  protected boolean impersonationEnabled;
-  private AbstractSyntaxTree expressionPrincipalMapping;
-  private String topologyName;
-
-  @Override
-  public void init(FilterConfig filterConfig) throws ServletException {
-    topologyName = (String) 
filterConfig.getServletContext().getAttribute(GatewayServices.GATEWAY_CLUSTER_ATTRIBUTE);
-    String principalMapping = filterConfig.getInitParameter(PRINCIPAL_MAPPING);
-    if (principalMapping == null || principalMapping.isEmpty()) {
-      principalMapping = 
filterConfig.getServletContext().getInitParameter(PRINCIPAL_MAPPING);
+    public static final String VIRTUAL_GROUP_MAPPING_PREFIX = "group.mapping.";
+    public static final String GROUP_PRINCIPAL_MAPPING = 
"group.principal.mapping";
+    public static final String PRINCIPAL_MAPPING = "principal.mapping";
+    public static final String ADVANCED_PRINCIPAL_MAPPING = 
"expression.principal.mapping";
+    private static final IdentityAsserterMessages LOG = 
MessagesFactory.get(IdentityAsserterMessages.class);
+    private static final String PRINCIPAL_PARAM = "user.name";
+    private static final String DOAS_PRINCIPAL_PARAM = "doAs";
+    /* List of all default and configured impersonation params */
+    protected final List<String> impersonationParamsList = new ArrayList<>();
+    private final Parser parser = new Parser();
+    protected boolean impersonationEnabled;
+    private SimplePrincipalMapper mapper = new SimplePrincipalMapper();
+    private VirtualGroupMapper virtualGroupMapper;
+    private AbstractSyntaxTree expressionPrincipalMapping;
+    private String topologyName;
+
+    private static List<String> virtualGroupParameterNames(List<String> 
initParameterNames) {
+        return initParameterNames == null ? new ArrayList<>()
+                : initParameterNames.stream().filter(name -> 
name.startsWith(VIRTUAL_GROUP_MAPPING_PREFIX)).collect(Collectors.toList());
     }
-    String groupPrincipalMapping = 
filterConfig.getInitParameter(GROUP_PRINCIPAL_MAPPING);
-    if (groupPrincipalMapping == null || groupPrincipalMapping.isEmpty()) {
-      groupPrincipalMapping = 
filterConfig.getServletContext().getInitParameter(GROUP_PRINCIPAL_MAPPING);
-    }
-    if (principalMapping != null && !principalMapping.isEmpty() || 
groupPrincipalMapping != null && !groupPrincipalMapping.isEmpty()) {
-      try {
-        mapper.loadMappingTable(principalMapping, groupPrincipalMapping);
-      } catch (PrincipalMappingException e) {
-        throw new ServletException("Unable to load principal mapping table.", 
e);
-      }
+
+    private static String[] unique(String[] groups) {
+        return new HashSet<>(Arrays.asList(groups)).toArray(new String[0]);
     }
-    expressionPrincipalMapping = parseAdvancedPrincipalMapping(filterConfig);
 
-    final List<String> initParameterNames = 
AuthFilterUtils.getInitParameterNamesAsList(filterConfig);
+    @Override
+    public void init(FilterConfig filterConfig) throws ServletException {
+        topologyName = (String) 
filterConfig.getServletContext().getAttribute(GatewayServices.GATEWAY_CLUSTER_ATTRIBUTE);
+        String principalMapping = 
filterConfig.getInitParameter(PRINCIPAL_MAPPING);
+        if (principalMapping == null || principalMapping.isEmpty()) {
+            principalMapping = 
filterConfig.getServletContext().getInitParameter(PRINCIPAL_MAPPING);
+        }
+        String groupPrincipalMapping = 
filterConfig.getInitParameter(GROUP_PRINCIPAL_MAPPING);
+        if (groupPrincipalMapping == null || groupPrincipalMapping.isEmpty()) {
+            groupPrincipalMapping = 
filterConfig.getServletContext().getInitParameter(GROUP_PRINCIPAL_MAPPING);
+        }
+        if (principalMapping != null && !principalMapping.isEmpty() || 
groupPrincipalMapping != null && !groupPrincipalMapping.isEmpty()) {
+            try {
+                mapper.loadMappingTable(principalMapping, 
groupPrincipalMapping);
+            } catch (PrincipalMappingException e) {
+                throw new ServletException("Unable to load principal mapping 
table.", e);
+            }
+        }
+        expressionPrincipalMapping = 
parseAdvancedPrincipalMapping(filterConfig);
 
-    virtualGroupMapper = new 
VirtualGroupMapper(loadVirtualGroups(filterConfig, initParameterNames));
+        final List<String> initParameterNames = 
AuthFilterUtils.getInitParameterNamesAsList(filterConfig);
 
-    initImpersonationParamsList(filterConfig);
-    initProxyUserConfiguration(filterConfig, initParameterNames);
-  }
+        virtualGroupMapper = new 
VirtualGroupMapper(loadVirtualGroups(filterConfig, initParameterNames));
 
-  private AbstractSyntaxTree parseAdvancedPrincipalMapping(FilterConfig 
filterConfig) {
-    String expression = 
filterConfig.getInitParameter(ADVANCED_PRINCIPAL_MAPPING);
-    if (StringUtils.isBlank(expression)) {
-      expression = 
filterConfig.getServletContext().getInitParameter(ADVANCED_PRINCIPAL_MAPPING);
+        initImpersonationParamsList(filterConfig);
+        initProxyUserConfiguration(filterConfig, initParameterNames);
     }
-    return StringUtils.isBlank(expression) ? null : parser.parse(expression);
-  }
-
-  /*
-   * Initialize the impersonation params list.
-   * This list contains query params that needs to be scrubbed
-   * from the outgoing request.
-   */
-  private void initImpersonationParamsList(FilterConfig filterConfig) {
-    String impersonationListFromConfig = 
filterConfig.getInitParameter(IMPERSONATION_PARAMS);
-    if (impersonationListFromConfig == null || 
impersonationListFromConfig.isEmpty()) {
-      impersonationListFromConfig = 
filterConfig.getServletContext().getInitParameter(IMPERSONATION_PARAMS);
+
+    private AbstractSyntaxTree parseAdvancedPrincipalMapping(FilterConfig 
filterConfig) {
+        String expression = 
filterConfig.getInitParameter(ADVANCED_PRINCIPAL_MAPPING);
+        if (StringUtils.isBlank(expression)) {
+            expression = 
filterConfig.getServletContext().getInitParameter(ADVANCED_PRINCIPAL_MAPPING);
+        }
+        return StringUtils.isBlank(expression) ? null : 
parser.parse(expression);
     }
 
-    /* Add default impersonation params */
-    impersonationParamsList.add(DOAS_PRINCIPAL_PARAM);
-    impersonationParamsList.add(PRINCIPAL_PARAM);
-
-    if (impersonationListFromConfig != null && 
!impersonationListFromConfig.isEmpty()) {
-      /* Add configured impersonation params */
-      LOG.impersonationConfig(impersonationListFromConfig);
-      final StringTokenizer t = new 
StringTokenizer(impersonationListFromConfig, ",");
-      while (t.hasMoreElements()) {
-        final String token = t.nextToken().trim();
-        if (!impersonationParamsList.contains(token)) {
-          impersonationParamsList.add(token);
+    /*
+     * Initialize the impersonation params list.
+     * This list contains query params that needs to be scrubbed
+     * from the outgoing request.
+     */
+    private void initImpersonationParamsList(FilterConfig filterConfig) {
+        String impersonationListFromConfig = 
filterConfig.getInitParameter(IMPERSONATION_PARAMS);
+        if (impersonationListFromConfig == null || 
impersonationListFromConfig.isEmpty()) {
+            impersonationListFromConfig = 
filterConfig.getServletContext().getInitParameter(IMPERSONATION_PARAMS);
+        }
+
+        /* Add default impersonation params */
+        impersonationParamsList.add(DOAS_PRINCIPAL_PARAM);
+        impersonationParamsList.add(PRINCIPAL_PARAM);
+
+        if (impersonationListFromConfig != null && 
!impersonationListFromConfig.isEmpty()) {
+            /* Add configured impersonation params */
+            LOG.impersonationConfig(impersonationListFromConfig);
+            final StringTokenizer t = new 
StringTokenizer(impersonationListFromConfig, ",");
+            while (t.hasMoreElements()) {
+                final String token = t.nextToken().trim();
+                if (!impersonationParamsList.contains(token)) {
+                    impersonationParamsList.add(token);
+                }
+            }
         }
-      }
     }
-  }
-
-  private void initProxyUserConfiguration(FilterConfig filterConfig, 
List<String> initParameterNames) {
-    final String impersonationEnabledValue = 
filterConfig.getInitParameter(IMPERSONATION_ENABLED_PARAM);
-    impersonationEnabled = impersonationEnabledValue == null ? Boolean.FALSE : 
Boolean.parseBoolean(impersonationEnabledValue);
-
-    if (impersonationEnabled) {
-      if (AuthFilterUtils.hasProxyConfig(topologyName, "HadoopAuth")) {
-        LOG.ignoreProxyuserConfig();
-        impersonationEnabled = false; //explicitly set to false to avoid 
redundant authorization attempts at request processing time
-      } else {
-        AuthFilterUtils.refreshSuperUserGroupsConfiguration(filterConfig, 
initParameterNames, topologyName, ROLE);
-        
filterConfig.getServletContext().setAttribute(ContextAttributes.IMPERSONATION_ENABLED_ATTRIBUTE,
 Boolean.TRUE);
-      }
-    } else {
-      
filterConfig.getServletContext().setAttribute(ContextAttributes.IMPERSONATION_ENABLED_ATTRIBUTE,
 Boolean.FALSE);
+
+    private void initProxyUserConfiguration(FilterConfig filterConfig, 
List<String> initParameterNames) {
+        impersonationEnabled = shouldAddImpersonationProvider(filterConfig);
+        if (impersonationEnabled) {
+            if (AuthFilterUtils.hasProxyConfig(topologyName, "HadoopAuth")) {
+                LOG.ignoreProxyuserConfig();
+                impersonationEnabled = false; //explicitly set to false to 
avoid redundant authorization attempts at request processing time
+            } else {
+                
AuthFilterUtils.refreshSuperUserGroupsConfiguration(filterConfig, 
initParameterNames, topologyName, ROLE);
+                
filterConfig.getServletContext().setAttribute(ContextAttributes.IMPERSONATION_ENABLED_ATTRIBUTE,
 Boolean.TRUE);
+            }
+        } else {
+            
filterConfig.getServletContext().setAttribute(ContextAttributes.IMPERSONATION_ENABLED_ATTRIBUTE,
 Boolean.FALSE);
+        }
     }
-  }
 
-  boolean isImpersonationEnabled() {
-    return impersonationEnabled;
-  }
+    /**
+     * Check if the impersonation provider should be added to the filter chain 
based on
+     * user and group impersonation settings.
+     *
+     * @param filterConfig
+     * @return
+     */
+    boolean shouldAddImpersonationProvider(final FilterConfig filterConfig) {

Review Comment:
   Hi Sandeep, if I am not mistaken, the only relevant change in this file is 
how the `impersonationEnabled` flag is determined. Are all the other changes 
just related to formatting?



##########
gateway-spi/src/main/java/org/apache/knox/gateway/util/AuthFilterUtils.java:
##########
@@ -27,227 +39,270 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import javax.servlet.FilterConfig;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletRequest;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletRequestWrapper;
+public class AuthFilterUtils {
+    public static final String DEFAULT_AUTH_UNAUTHENTICATED_PATHS_PARAM = 
"/knoxtoken/api/v1/jwks.json";
+    public static final String PROXYUSER_PREFIX = "hadoop.proxyuser";
+    public static final String PROXYGROUP_PREFIX = "hadoop.proxygroup";
+    public static final String IMPERSONATION_MODE = 
"hadoop.impersonation.mode";
+    public static final String DEFAULT_IMPERSONATION_MODE = "OR";
+    public static final String QUERY_PARAMETER_DOAS = "doAs";
+    public static final String REAL_USER_NAME_ATTRIBUTE = "real.user.name";
+    public static final String DO_GLOBAL_LOGOUT_ATTRIBUTE = "do.global.logout";
+    public static final String IMPERSONATION_ENABLED_PARAM = 
AuthFilterUtils.PROXYUSER_PREFIX + ".impersonation.enabled";
+    public static final String GROUP_IMPERSONATION_ENABLED_PARAM = 
AuthFilterUtils.PROXYGROUP_PREFIX + ".impersonation.enabled";
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
-import org.apache.hadoop.security.authorize.ImpersonationProvider;
-import org.apache.knox.gateway.i18n.GatewaySpiMessages;
-import org.apache.knox.gateway.i18n.messages.MessagesFactory;
 
-public class AuthFilterUtils {
-  public static final String DEFAULT_AUTH_UNAUTHENTICATED_PATHS_PARAM = 
"/knoxtoken/api/v1/jwks.json";
-  public static final String PROXYUSER_PREFIX = "hadoop.proxyuser";
-  public static final String QUERY_PARAMETER_DOAS = "doAs";
-  public static final String REAL_USER_NAME_ATTRIBUTE = "real.user.name";
-  public static final String DO_GLOBAL_LOGOUT_ATTRIBUTE = "do.global.logout";
-
-  private static final GatewaySpiMessages LOG = 
MessagesFactory.get(GatewaySpiMessages.class);
-  private static final Map<String, Map<String, ImpersonationProvider>> 
TOPOLOGY_IMPERSONATION_PROVIDERS = new ConcurrentHashMap<>();
-  private static final Lock refreshSuperUserGroupsLock = new ReentrantLock();
-
-  /**
-   * A helper method that checks whether request contains
-   * unauthenticated path
-   * @param request
-   * @return
-   */
-  public static boolean doesRequestContainUnauthPath(
-      final Set<String> unAuthenticatedPaths, final ServletRequest request) {
-    /* make sure the path matches EXACTLY to prevent auth bypass */
-    return unAuthenticatedPaths.contains(((HttpServletRequest) 
request).getPathInfo());
-  }
-
-  /**
-   * A helper method that parses a string and adds to the
-   * provided unauthenticated set.
-   * @param unAuthenticatedPaths
-   * @param list
-   */
-  public static void parseStringThenAdd(final Set<String> 
unAuthenticatedPaths, final String list) {
-    final StringTokenizer tokenizer = new StringTokenizer(list, ";,");
-    while (tokenizer.hasMoreTokens()) {
-      unAuthenticatedPaths.add(tokenizer.nextToken());
-    }
-  }
-
-  /**
-   * A method that parses a string (delimiters = ;,) and adds them to the
-   * provided un-authenticated path set.
-   * @param unAuthenticatedPaths
-   * @param list
-   * @param defaultList
-   */
-  public static void addUnauthPaths(final Set<String> unAuthenticatedPaths, 
final String list, final String defaultList) {
-    /* add default unauthenticated paths list */
-    parseStringThenAdd(unAuthenticatedPaths, defaultList);
-    /* add provided unauthenticated paths list if specified */
-    if (!StringUtils.isBlank(list)) {
-      AuthFilterUtils.parseStringThenAdd(unAuthenticatedPaths, list);
-    }
-  }
-
-  public static void refreshSuperUserGroupsConfiguration(ServletContext 
context, List<String> initParameterNames, String topologyName, String role) {
-    if (context == null) {
-      throw new IllegalArgumentException("Cannot get proxyuser configuration 
from NULL context");
-    }
-    refreshSuperUserGroupsConfiguration(context, null, initParameterNames, 
topologyName, role);
-  }
-
-  public static void refreshSuperUserGroupsConfiguration(FilterConfig 
filterConfig, List<String> initParameterNames, String topologyName, String 
role) {
-    if (filterConfig == null) {
-      throw new IllegalArgumentException("Cannot get proxyuser configuration 
from NULL filter config");
-    }
-    refreshSuperUserGroupsConfiguration(null, filterConfig, 
initParameterNames, topologyName, role);
-  }
-
-  private static void refreshSuperUserGroupsConfiguration(ServletContext 
context, FilterConfig filterConfig, List<String> initParameterNames, String 
topologyName, String role) {
-    final Configuration conf = new Configuration(false);
-    if (initParameterNames != null) {
-      initParameterNames.stream().filter(name -> 
name.startsWith(PROXYUSER_PREFIX + ".")).forEach(name -> {
-        String value = context == null ? filterConfig.getInitParameter(name) : 
context.getInitParameter(name);
-        conf.set(name, value);
-      });
-    }
-
-    saveImpersonationProvider(topologyName, role, conf);
-  }
-
-  private static void saveImpersonationProvider(String topologyName, String 
role, final Configuration conf) {
-    refreshSuperUserGroupsLock.lock();
-    try {
-      final ImpersonationProvider impersonationProvider = new 
DefaultImpersonationProvider();
-      impersonationProvider.setConf(conf);
-      impersonationProvider.init(PROXYUSER_PREFIX);
-      LOG.createImpersonationProvider(topologyName, role, PROXYUSER_PREFIX, 
conf.getPropsWithPrefix(PROXYUSER_PREFIX + ".").toString());
-      TOPOLOGY_IMPERSONATION_PROVIDERS.putIfAbsent(topologyName, new 
ConcurrentHashMap<String, ImpersonationProvider>());
-      TOPOLOGY_IMPERSONATION_PROVIDERS.get(topologyName).put(role, 
impersonationProvider);
-    } finally {
-      refreshSuperUserGroupsLock.unlock();
-    }
-  }
-
-  public static HttpServletRequest getProxyRequest(HttpServletRequest request, 
String doAsUser, String topologyName, String role) throws 
AuthorizationException {
-    return getProxyRequest(request, request.getUserPrincipal().getName(), 
doAsUser, topologyName, role);
-  }
-
-  public static HttpServletRequest getProxyRequest(HttpServletRequest request, 
String remoteUser, String doAsUser, String topologyName, String role) throws 
AuthorizationException {
-    final UserGroupInformation remoteRequestUgi = 
getRemoteRequestUgi(remoteUser, doAsUser);
-    if (remoteRequestUgi != null) {
-      authorizeImpersonationRequest(request, remoteRequestUgi, topologyName, 
role);
-
-      return new HttpServletRequestWrapper(request) {
-        @Override
-        public String getRemoteUser() {
-          return remoteRequestUgi.getShortUserName();
+    private static final GatewaySpiMessages LOG = 
MessagesFactory.get(GatewaySpiMessages.class);
+    private static final Map<String, Map<String, ImpersonationProvider>> 
TOPOLOGY_IMPERSONATION_PROVIDERS = new ConcurrentHashMap<>();
+    private static final Lock refreshSuperUserGroupsLock = new ReentrantLock();
+
+    /**
+     * A helper method that checks whether request contains
+     * unauthenticated path
+     *
+     * @param request
+     * @return
+     */
+    public static boolean doesRequestContainUnauthPath(
+            final Set<String> unAuthenticatedPaths, final ServletRequest 
request) {
+        /* make sure the path matches EXACTLY to prevent auth bypass */
+        return unAuthenticatedPaths.contains(((HttpServletRequest) 
request).getPathInfo());
+    }
+
+    /**
+     * A helper method that parses a string and adds to the
+     * provided unauthenticated set.
+     *
+     * @param unAuthenticatedPaths
+     * @param list
+     */
+    public static void parseStringThenAdd(final Set<String> 
unAuthenticatedPaths, final String list) {
+        final StringTokenizer tokenizer = new StringTokenizer(list, ";,");
+        while (tokenizer.hasMoreTokens()) {
+            unAuthenticatedPaths.add(tokenizer.nextToken());
+        }
+    }
+
+    /**
+     * A method that parses a string (delimiters = ;,) and adds them to the
+     * provided un-authenticated path set.
+     *
+     * @param unAuthenticatedPaths
+     * @param list
+     * @param defaultList
+     */
+    public static void addUnauthPaths(final Set<String> unAuthenticatedPaths, 
final String list, final String defaultList) {
+        /* add default unauthenticated paths list */
+        parseStringThenAdd(unAuthenticatedPaths, defaultList);
+        /* add provided unauthenticated paths list if specified */
+        if (!StringUtils.isBlank(list)) {
+            AuthFilterUtils.parseStringThenAdd(unAuthenticatedPaths, list);
+        }
+    }
+    public static void refreshSuperUserGroupsConfiguration(FilterConfig 
filterConfig, List<String> initParameterNames, String topologyName, String 
role) {
+        if (filterConfig == null) {
+            throw new IllegalArgumentException("Cannot get proxyuser 
configuration from NULL filter config");
+        }
+        refreshSuperUserGroupsConfiguration(null, filterConfig, 
initParameterNames, topologyName, role);
+    }
+
+    private static void refreshSuperUserGroupsConfiguration(ServletContext 
context, FilterConfig filterConfig, List<String> initParameterNames, String 
topologyName, String role) {
+        if (filterConfig == null) {
+            throw new IllegalArgumentException("Cannot get proxyuser 
configuration from NULL filter config");
+        }
+
+        final Configuration conf = new Configuration(false);
+        if (initParameterNames != null) {
+            initParameterNames.stream().filter(name -> 
name.startsWith(PROXYUSER_PREFIX + ".")).forEach(name -> {
+                String value = context == null ? 
filterConfig.getInitParameter(name) : context.getInitParameter(name);
+                conf.set(name, value);
+            });
+
+            initParameterNames.stream().filter(name -> 
name.startsWith(PROXYGROUP_PREFIX + ".")).forEach(name -> {
+                String value = context == null ? 
filterConfig.getInitParameter(name) : context.getInitParameter(name);
+                conf.set(name, value);
+            });
+
+            initParameterNames.stream().filter(name -> 
name.startsWith(IMPERSONATION_MODE + ".")).forEach(name -> {
+                String value = context == null ? 
filterConfig.getInitParameter(name) : context.getInitParameter(name);
+                conf.set(name, value);
+            });
+
+
+        }
+
+        saveImpersonationProvider(filterConfig, topologyName, role, conf);
+    }
+
+    private static void saveImpersonationProvider(FilterConfig filterConfig, 
String topologyName, String role, final Configuration conf) {
+        refreshSuperUserGroupsLock.lock();
+        try {
+            boolean[] impersonationFlags = 
getImpersonationEnabledFlags(filterConfig);
+            boolean isProxyUserEnabled = impersonationFlags[0];
+            boolean isProxyGroupEnabled = impersonationFlags[1];
+
+            final GroupBasedImpersonationProvider 
groupBasedImpersonationProvider = new 
GroupBasedImpersonationProvider(isProxyUserEnabled, isProxyGroupEnabled);
+            groupBasedImpersonationProvider.setConf(conf);
+            groupBasedImpersonationProvider.init(PROXYUSER_PREFIX, 
PROXYGROUP_PREFIX);
+            LOG.createImpersonationProvider(topologyName, role, 
PROXYUSER_PREFIX, conf.getPropsWithPrefix(PROXYUSER_PREFIX + ".").toString());
+            LOG.createImpersonationProvider(topologyName, role, 
PROXYGROUP_PREFIX, conf.getPropsWithPrefix(PROXYGROUP_PREFIX + ".").toString());
+
+            TOPOLOGY_IMPERSONATION_PROVIDERS.putIfAbsent(topologyName, new 
ConcurrentHashMap<String, ImpersonationProvider>());
+            TOPOLOGY_IMPERSONATION_PROVIDERS.get(topologyName).put(role, 
groupBasedImpersonationProvider);
+        } finally {
+            refreshSuperUserGroupsLock.unlock();
+        }
+    }
+
+    public static HttpServletRequest getProxyRequest(HttpServletRequest 
request, String doAsUser, String topologyName, String role) throws 
AuthorizationException {
+        return getProxyRequest(request, request.getUserPrincipal().getName(), 
doAsUser, topologyName, role);
+    }
+
+    public static HttpServletRequest getProxyRequest(HttpServletRequest 
request, String remoteUser, String doAsUser, String topologyName, String role) 
throws AuthorizationException {
+        final UserGroupInformation remoteRequestUgi = 
getRemoteRequestUgi(remoteUser, doAsUser);
+        if (remoteRequestUgi != null) {
+            authorizeImpersonationRequest(request, remoteRequestUgi, 
topologyName, role);
+
+            return new HttpServletRequestWrapper(request) {
+                @Override
+                public String getRemoteUser() {
+                    return remoteRequestUgi.getShortUserName();
+                }
+
+                @Override
+                public Principal getUserPrincipal() {
+                    return remoteRequestUgi::getUserName;
+                }
+
+                @Override
+                public Object getAttribute(String name) {
+                    if (name != null && name.equals(REAL_USER_NAME_ATTRIBUTE)) 
{
+                        return 
remoteRequestUgi.getRealUser().getShortUserName();
+                    } else {
+                        return super.getAttribute(name);
+                    }
+                }
+            };
+
         }
+        return null;
+    }
 
-        @Override
-        public Principal getUserPrincipal() {
-          return remoteRequestUgi::getUserName;
+    public static void authorizeImpersonationRequest(HttpServletRequest 
request, String remoteUser, String doAsUser, String topologyName, String role) 
throws AuthorizationException {
+        final UserGroupInformation remoteRequestUgi = 
getRemoteRequestUgi(remoteUser, doAsUser);
+        if (remoteRequestUgi != null) {
+            authorizeImpersonationRequest(request, remoteRequestUgi, 
topologyName, role);
         }
+    }
+
+    private static void authorizeImpersonationRequest(HttpServletRequest 
request, UserGroupInformation remoteRequestUgi, String topologyName, String 
role)
+            throws AuthorizationException {
 
-        @Override
-        public Object getAttribute(String name) {
-          if (name != null && name.equals(REAL_USER_NAME_ATTRIBUTE)) {
-            return remoteRequestUgi.getRealUser().getShortUserName();
-          } else {
-            return super.getAttribute(name);
-          }
+        final ImpersonationProvider impersonationProvider = 
getImpersonationProvider(topologyName, role);
+
+        if (impersonationProvider != null) {
+            try {
+                impersonationProvider.authorize(remoteRequestUgi, 
request.getRemoteAddr());
+            } catch 
(org.apache.hadoop.security.authorize.AuthorizationException e) {
+                throw new AuthorizationException(e);
+            }
+        } else {
+            throw new AuthorizationException("ImpersonationProvider for " + 
topologyName + " / " + role + " not found!");
         }
-      };
-
-    }
-    return null;
-  }
-
-  public static void authorizeImpersonationRequest(HttpServletRequest request, 
String remoteUser, String doAsUser, String topologyName, String role) throws 
AuthorizationException {
-    final UserGroupInformation remoteRequestUgi = 
getRemoteRequestUgi(remoteUser, doAsUser);
-    if (remoteRequestUgi != null) {
-      authorizeImpersonationRequest(request, remoteRequestUgi, topologyName, 
role);
-    }
-  }
-
-  private static void authorizeImpersonationRequest(HttpServletRequest 
request, UserGroupInformation remoteRequestUgi, String topologyName, String 
role)
-      throws AuthorizationException {
-
-    final ImpersonationProvider impersonationProvider = 
getImpersonationProvider(topologyName, role);
-
-    if (impersonationProvider != null) {
-      try {
-        impersonationProvider.authorize(remoteRequestUgi, 
request.getRemoteAddr());
-      } catch (org.apache.hadoop.security.authorize.AuthorizationException e) {
-        throw new AuthorizationException(e);
-      }
-    } else {
-      throw new AuthorizationException("ImpersonationProvider for " + 
topologyName + " / " + role + " not found!");
-    }
-  }
-
-  private static ImpersonationProvider getImpersonationProvider(String 
topologyName, String role) {
-    refreshSuperUserGroupsLock.lock();
-    final ImpersonationProvider impersonationProvider;
-    try {
-      impersonationProvider = 
(TOPOLOGY_IMPERSONATION_PROVIDERS.getOrDefault(topologyName, 
Collections.emptyMap())).get(role);
-    } finally {
-      refreshSuperUserGroupsLock.unlock();
-    }
-    return impersonationProvider;
-  }
-
-  private static UserGroupInformation getRemoteRequestUgi(String remoteUser, 
String doAsUser) {
-    if (remoteUser != null) {
-      final UserGroupInformation remoteUserUgi = 
UserGroupInformation.createRemoteUser(remoteUser);
-      return UserGroupInformation.createProxyUser(doAsUser, remoteUserUgi);
-    }
-    return null;
-  }
-
-  public static boolean hasProxyConfig(String topologyName, String role) {
-    return getImpersonationProvider(topologyName, role) != null;
-  }
-
-  public static void removeProxyUserConfig(String topologyName, String role) {
-    if (hasProxyConfig(topologyName, role)) {
-      refreshSuperUserGroupsLock.lock();
-      try {
-        TOPOLOGY_IMPERSONATION_PROVIDERS.get(topologyName).remove(role);
-      } finally {
-        refreshSuperUserGroupsLock.unlock();
-      }
-    }
-  }
-
-  /**
-   * FilterConfig.getInitParameters() returns an enumeration and the first 
time we
-   * iterate thru on its elements we can process the parameter names as desired
-   * (because hasMoreElements returns true). The subsequent calls, however, 
will not
-   * succeed because getInitParameters() returns the same object where the
-   * hasMoreElements returns false.
-   * <p>
-   * In classes where there are multiple iterations should be conducted, a
-   * collection should be used instead.
-   *
-   * @return the names of the filter's initialization parameters as a List of
-   *         String objects, or an empty List if the filter has no 
initialization
-   *         parameters.
-   */
-  public static List<String> getInitParameterNamesAsList(FilterConfig 
filterConfig) {
-    return filterConfig.getInitParameterNames() == null ? 
Collections.emptyList() : 
Collections.list(filterConfig.getInitParameterNames());
-  }
-
-  public static void markDoGlobalLogoutInRequest(HttpServletRequest request) {
-    request.setAttribute(DO_GLOBAL_LOGOUT_ATTRIBUTE, "true");
-  }
-
-  public static boolean shouldDoGlobalLogout(HttpServletRequest request) {
-    return request.getAttribute(DO_GLOBAL_LOGOUT_ATTRIBUTE) == null ? false : 
Boolean.parseBoolean((String) request.getAttribute(DO_GLOBAL_LOGOUT_ATTRIBUTE));
-  }
+    }
+
+    private static ImpersonationProvider getImpersonationProvider(String 
topologyName, String role) {
+        refreshSuperUserGroupsLock.lock();
+        final ImpersonationProvider impersonationProvider;
+        try {
+            impersonationProvider = 
(TOPOLOGY_IMPERSONATION_PROVIDERS.getOrDefault(topologyName, 
Collections.emptyMap())).get(role);
+        } finally {
+            refreshSuperUserGroupsLock.unlock();
+        }
+        return impersonationProvider;
+    }
+
+    private static UserGroupInformation getRemoteRequestUgi(String remoteUser, 
String doAsUser) {
+        if (remoteUser != null) {
+            final UserGroupInformation remoteUserUgi = 
UserGroupInformation.createRemoteUser(remoteUser);
+            return UserGroupInformation.createProxyUser(doAsUser, 
remoteUserUgi);
+        }
+        return null;
+    }
+
+    public static boolean hasProxyConfig(String topologyName, String role) {
+        return getImpersonationProvider(topologyName, role) != null;
+    }
+
+    public static void removeProxyUserConfig(String topologyName, String role) 
{
+        if (hasProxyConfig(topologyName, role)) {
+            refreshSuperUserGroupsLock.lock();
+            try {
+                
TOPOLOGY_IMPERSONATION_PROVIDERS.get(topologyName).remove(role);
+            } finally {
+                refreshSuperUserGroupsLock.unlock();
+            }
+        }
+    }
+
+    /**
+     * FilterConfig.getInitParameters() returns an enumeration and the first 
time we
+     * iterate thru on its elements we can process the parameter names as 
desired
+     * (because hasMoreElements returns true). The subsequent calls, however, 
will not
+     * succeed because getInitParameters() returns the same object where the
+     * hasMoreElements returns false.
+     * <p>
+     * In classes where there are multiple iterations should be conducted, a
+     * collection should be used instead.
+     *
+     * @return the names of the filter's initialization parameters as a List of
+     * String objects, or an empty List if the filter has no initialization
+     * parameters.
+     */
+    public static List<String> getInitParameterNamesAsList(FilterConfig 
filterConfig) {
+        return filterConfig.getInitParameterNames() == null ? 
Collections.emptyList() : 
Collections.list(filterConfig.getInitParameterNames());
+    }
+
+    public static void markDoGlobalLogoutInRequest(HttpServletRequest request) 
{
+        request.setAttribute(DO_GLOBAL_LOGOUT_ATTRIBUTE, "true");
+    }
+
+    public static boolean shouldDoGlobalLogout(HttpServletRequest request) {
+        return request.getAttribute(DO_GLOBAL_LOGOUT_ATTRIBUTE) == null ? 
false : Boolean.parseBoolean((String) 
request.getAttribute(DO_GLOBAL_LOGOUT_ATTRIBUTE));
+    }
+
+    /**
+     * Check if user or group impersonation is enabled based on filter 
configuration.
+     *
+     * @param filterConfig The filter configuration
+     * @return A boolean array where the first element indicates if user 
impersonation is enabled
+     *         and the second element indicates if group impersonation is 
enabled
+     */
+    public static boolean[] getImpersonationEnabledFlags(final FilterConfig 
filterConfig) {

Review Comment:
   Maybe using enums instead of the boolean array would be more robust.
   
   For example:
   
   ```java
   public enum ImperonationFlags {
     USER_IMPERSONATION, GROUP_IMPERSONATION
   }
   ```
   
   Usage:
   
   ```java
   public static EnumSet<ImperonationFlags> getImpersonationEnabledFlags(final 
FilterConfig filterConfig) {
     ...
     return EnumSet.of(USER_IMPERSONATION);
    // or
     return EnumSet.of(GROUP_IMPERSONATION);
    // or
     return EnumSet.of(USER_IMPERSONATION, GROUP_IMPERSONATION);
   }
   ```



##########
gateway-spi/src/main/java/org/apache/knox/gateway/util/AuthFilterUtils.java:
##########
@@ -27,227 +39,270 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import javax.servlet.FilterConfig;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletRequest;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletRequestWrapper;
+public class AuthFilterUtils {
+    public static final String DEFAULT_AUTH_UNAUTHENTICATED_PATHS_PARAM = 
"/knoxtoken/api/v1/jwks.json";
+    public static final String PROXYUSER_PREFIX = "hadoop.proxyuser";
+    public static final String PROXYGROUP_PREFIX = "hadoop.proxygroup";
+    public static final String IMPERSONATION_MODE = 
"hadoop.impersonation.mode";
+    public static final String DEFAULT_IMPERSONATION_MODE = "OR";

Review Comment:
   Could illustrate it with an example, how the OR and AND impersonation modes 
are used?



##########
gateway-spi/src/main/java/org/apache/knox/gateway/util/GroupBasedImpersonationProvider.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.knox.gateway.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
+import org.apache.hadoop.util.MachineList;
+import org.apache.knox.gateway.i18n.GatewaySpiMessages;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static 
org.apache.knox.gateway.util.AuthFilterUtils.DEFAULT_IMPERSONATION_MODE;
+import static org.apache.knox.gateway.util.AuthFilterUtils.IMPERSONATION_MODE;
+
+/**
+ * An extension of Hadoop's DefaultImpersonationProvider that adds support for 
group-based impersonation.
+ * This provider allows users who belong to specific groups to impersonate 
other users.
+ */
+public class GroupBasedImpersonationProvider extends 
DefaultImpersonationProvider {
+    private static final GatewaySpiMessages LOG = 
MessagesFactory.get(GatewaySpiMessages.class);
+    private static final String CONF_HOSTS = ".hosts";
+    private static final String CONF_USERS = ".users";
+    private static final String CONF_GROUPS = ".groups";
+    private final Map<String, AccessControlList> proxyGroupsAcls = new 
HashMap<>();
+    /* is the proxy user configuration enabled  */
+    boolean isProxyUserEnabled = true;
+    /* is the proxy group configuration enabled, defaulting it to false for 
backwards compatibility  */
+    boolean isProxyGroupEnabled;
+    private Map<String, MachineList> groupProxyHosts = new HashMap<>();
+    private String groupConfigPrefix;
+    private String impersonationMode = DEFAULT_IMPERSONATION_MODE;
+
+    public GroupBasedImpersonationProvider() {
+        super();
+    }
+
+    public GroupBasedImpersonationProvider(boolean isProxyUserEnabled, boolean 
isProxyGroupEnabled) {
+        super();
+        this.isProxyUserEnabled = isProxyUserEnabled;
+        this.isProxyGroupEnabled = isProxyGroupEnabled;
+    }
+
+    @Override
+    public Configuration getConf() {
+        return super.getConf();
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        super.setConf(conf);
+    }
+
+    public void init(String configurationPrefix, String proxyGroupPrefix) {
+        super.init(configurationPrefix);
+        initGroupBasedProvider(proxyGroupPrefix);
+    }
+
+    private void initGroupBasedProvider(String proxyGroupPrefix) {
+        impersonationMode = getConf().get(IMPERSONATION_MODE, 
DEFAULT_IMPERSONATION_MODE);
+        groupConfigPrefix = proxyGroupPrefix +
+                (proxyGroupPrefix.endsWith(".") ? "" : ".");
+
+        // constructing regex to match the following patterns:
+        //   $configPrefix.[ANY].users
+        //   $configPrefix.[ANY].groups
+        //   $configPrefix.[ANY].hosts
+        //
+        String prefixRegEx = groupConfigPrefix.replace(".", "\\.");
+        String usersGroupsRegEx = prefixRegEx + "[\\S]*(" +
+                Pattern.quote(CONF_USERS) + "|" + Pattern.quote(CONF_GROUPS) + 
")";
+        String hostsRegEx = prefixRegEx + "[\\S]*" + Pattern.quote(CONF_HOSTS);
+
+        // get list of users and groups per proxygroup
+        // Map of <hadoop.proxygroup.[VIRTUAL_GROUP].users|groups, 
group1,group2>
+        Map<String, String> allMatchKeys =
+                getConf().getValByRegex(usersGroupsRegEx);
+
+        for (Map.Entry<String, String> entry : allMatchKeys.entrySet()) {
+            //aclKey = hadoop.proxygroup.[VIRTUAL_GROUP]
+            String aclKey = getAclKey(entry.getKey());
+
+            if (!proxyGroupsAcls.containsKey(aclKey)) {
+                proxyGroupsAcls.put(aclKey, new AccessControlList(
+                        allMatchKeys.get(aclKey + CONF_USERS),
+                        allMatchKeys.get(aclKey + CONF_GROUPS)));
+            }
+        }
+
+        // get hosts per proxygroup
+        allMatchKeys = getConf().getValByRegex(hostsRegEx);
+        for (Map.Entry<String, String> entry : allMatchKeys.entrySet()) {
+            groupProxyHosts.put(entry.getKey(),
+                    new MachineList(entry.getValue()));
+        }
+    }
+
+    private String getAclKey(String key) {
+        int endIndex = key.lastIndexOf('.');
+        if (endIndex != -1) {
+            return key.substring(0, endIndex);
+        }
+        return key;
+    }
+
+    @Override
+    public void authorize(UserGroupInformation user, InetAddress 
remoteAddress) throws AuthorizationException {
+        // If both authorization methods are disabled, allow the operation to 
proceed
+        if (!isProxyUserEnabled && !isProxyGroupEnabled) {
+            LOG.successfulImpersonation(user.getRealUser().getUserName(), 
user.getUserName());
+            return;
+        }
+
+        boolean isProxyUserAuthorized = false;
+        boolean isProxyGroupAuthorized = false;
+        if (isProxyUserEnabled) {
+            /* authorize proxy user */
+            isProxyUserAuthorized = checkProxyUserAuthorization(user, 
remoteAddress);
+            /* In case of AND no reason to check for proxy groups if 
isProxyUserAuthorized=false */
+            if("AND".equalsIgnoreCase(impersonationMode) && 
!isProxyUserAuthorized) {

Review Comment:
   I would either extract this to a string constant, or introduce an enum.



##########
gateway-spi/src/main/java/org/apache/knox/gateway/util/GroupBasedImpersonationProvider.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.knox.gateway.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
+import org.apache.hadoop.util.MachineList;
+import org.apache.knox.gateway.i18n.GatewaySpiMessages;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static 
org.apache.knox.gateway.util.AuthFilterUtils.DEFAULT_IMPERSONATION_MODE;
+import static org.apache.knox.gateway.util.AuthFilterUtils.IMPERSONATION_MODE;
+
+/**
+ * An extension of Hadoop's DefaultImpersonationProvider that adds support for 
group-based impersonation.
+ * This provider allows users who belong to specific groups to impersonate 
other users.
+ */
+public class GroupBasedImpersonationProvider extends 
DefaultImpersonationProvider {
+    private static final GatewaySpiMessages LOG = 
MessagesFactory.get(GatewaySpiMessages.class);
+    private static final String CONF_HOSTS = ".hosts";
+    private static final String CONF_USERS = ".users";
+    private static final String CONF_GROUPS = ".groups";
+    private final Map<String, AccessControlList> proxyGroupsAcls = new 
HashMap<>();
+    /* is the proxy user configuration enabled  */
+    boolean isProxyUserEnabled = true;
+    /* is the proxy group configuration enabled, defaulting it to false for 
backwards compatibility  */
+    boolean isProxyGroupEnabled;
+    private Map<String, MachineList> groupProxyHosts = new HashMap<>();
+    private String groupConfigPrefix;
+    private String impersonationMode = DEFAULT_IMPERSONATION_MODE;
+
+    public GroupBasedImpersonationProvider() {
+        super();
+    }
+
+    public GroupBasedImpersonationProvider(boolean isProxyUserEnabled, boolean 
isProxyGroupEnabled) {
+        super();
+        this.isProxyUserEnabled = isProxyUserEnabled;
+        this.isProxyGroupEnabled = isProxyGroupEnabled;
+    }
+
+    @Override
+    public Configuration getConf() {
+        return super.getConf();
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        super.setConf(conf);
+    }
+
+    public void init(String configurationPrefix, String proxyGroupPrefix) {
+        super.init(configurationPrefix);
+        initGroupBasedProvider(proxyGroupPrefix);
+    }
+
+    private void initGroupBasedProvider(String proxyGroupPrefix) {
+        impersonationMode = getConf().get(IMPERSONATION_MODE, 
DEFAULT_IMPERSONATION_MODE);
+        groupConfigPrefix = proxyGroupPrefix +
+                (proxyGroupPrefix.endsWith(".") ? "" : ".");
+
+        // constructing regex to match the following patterns:
+        //   $configPrefix.[ANY].users
+        //   $configPrefix.[ANY].groups
+        //   $configPrefix.[ANY].hosts
+        //
+        String prefixRegEx = groupConfigPrefix.replace(".", "\\.");
+        String usersGroupsRegEx = prefixRegEx + "[\\S]*(" +
+                Pattern.quote(CONF_USERS) + "|" + Pattern.quote(CONF_GROUPS) + 
")";
+        String hostsRegEx = prefixRegEx + "[\\S]*" + Pattern.quote(CONF_HOSTS);
+
+        // get list of users and groups per proxygroup
+        // Map of <hadoop.proxygroup.[VIRTUAL_GROUP].users|groups, 
group1,group2>
+        Map<String, String> allMatchKeys =
+                getConf().getValByRegex(usersGroupsRegEx);
+
+        for (Map.Entry<String, String> entry : allMatchKeys.entrySet()) {
+            //aclKey = hadoop.proxygroup.[VIRTUAL_GROUP]
+            String aclKey = getAclKey(entry.getKey());
+
+            if (!proxyGroupsAcls.containsKey(aclKey)) {
+                proxyGroupsAcls.put(aclKey, new AccessControlList(
+                        allMatchKeys.get(aclKey + CONF_USERS),
+                        allMatchKeys.get(aclKey + CONF_GROUPS)));
+            }
+        }
+
+        // get hosts per proxygroup
+        allMatchKeys = getConf().getValByRegex(hostsRegEx);
+        for (Map.Entry<String, String> entry : allMatchKeys.entrySet()) {
+            groupProxyHosts.put(entry.getKey(),
+                    new MachineList(entry.getValue()));
+        }
+    }
+
+    private String getAclKey(String key) {
+        int endIndex = key.lastIndexOf('.');
+        if (endIndex != -1) {
+            return key.substring(0, endIndex);
+        }
+        return key;
+    }
+
+    @Override
+    public void authorize(UserGroupInformation user, InetAddress 
remoteAddress) throws AuthorizationException {
+        // If both authorization methods are disabled, allow the operation to 
proceed
+        if (!isProxyUserEnabled && !isProxyGroupEnabled) {
+            LOG.successfulImpersonation(user.getRealUser().getUserName(), 
user.getUserName());
+            return;
+        }
+
+        boolean isProxyUserAuthorized = false;
+        boolean isProxyGroupAuthorized = false;
+        if (isProxyUserEnabled) {
+            /* authorize proxy user */
+            isProxyUserAuthorized = checkProxyUserAuthorization(user, 
remoteAddress);
+            /* In case of AND no reason to check for proxy groups if 
isProxyUserAuthorized=false */
+            if("AND".equalsIgnoreCase(impersonationMode) && 
!isProxyUserAuthorized) {
+                throw new AuthorizationException("User: " + 
user.getRealUser().getUserName()
+                        + " is not allowed to impersonate " + 
user.getUserName());
+            }
+        }
+
+        if (isProxyGroupEnabled) {
+            /* check for proxy group impersonation */
+            isProxyGroupAuthorized = checkProxyGroupAuthorization(user, 
remoteAddress);
+        }
+
+        if("AND".equalsIgnoreCase(impersonationMode)) {

Review Comment:
   Same as above.



##########
gateway-spi/src/main/java/org/apache/knox/gateway/util/GroupBasedImpersonationProvider.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.knox.gateway.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
+import org.apache.hadoop.util.MachineList;
+import org.apache.knox.gateway.i18n.GatewaySpiMessages;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static 
org.apache.knox.gateway.util.AuthFilterUtils.DEFAULT_IMPERSONATION_MODE;
+import static org.apache.knox.gateway.util.AuthFilterUtils.IMPERSONATION_MODE;
+
+/**
+ * An extension of Hadoop's DefaultImpersonationProvider that adds support for 
group-based impersonation.
+ * This provider allows users who belong to specific groups to impersonate 
other users.
+ */
+public class GroupBasedImpersonationProvider extends 
DefaultImpersonationProvider {
+    private static final GatewaySpiMessages LOG = 
MessagesFactory.get(GatewaySpiMessages.class);
+    private static final String CONF_HOSTS = ".hosts";
+    private static final String CONF_USERS = ".users";
+    private static final String CONF_GROUPS = ".groups";
+    private final Map<String, AccessControlList> proxyGroupsAcls = new 
HashMap<>();
+    /* is the proxy user configuration enabled  */
+    boolean isProxyUserEnabled = true;
+    /* is the proxy group configuration enabled, defaulting it to false for 
backwards compatibility  */
+    boolean isProxyGroupEnabled;
+    private Map<String, MachineList> groupProxyHosts = new HashMap<>();
+    private String groupConfigPrefix;
+    private String impersonationMode = DEFAULT_IMPERSONATION_MODE;
+
+    public GroupBasedImpersonationProvider() {
+        super();
+    }
+
+    public GroupBasedImpersonationProvider(boolean isProxyUserEnabled, boolean 
isProxyGroupEnabled) {
+        super();
+        this.isProxyUserEnabled = isProxyUserEnabled;
+        this.isProxyGroupEnabled = isProxyGroupEnabled;
+    }
+
+    @Override
+    public Configuration getConf() {
+        return super.getConf();
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        super.setConf(conf);
+    }
+
+    public void init(String configurationPrefix, String proxyGroupPrefix) {
+        super.init(configurationPrefix);
+        initGroupBasedProvider(proxyGroupPrefix);
+    }
+
+    private void initGroupBasedProvider(String proxyGroupPrefix) {
+        impersonationMode = getConf().get(IMPERSONATION_MODE, 
DEFAULT_IMPERSONATION_MODE);
+        groupConfigPrefix = proxyGroupPrefix +
+                (proxyGroupPrefix.endsWith(".") ? "" : ".");
+
+        // constructing regex to match the following patterns:
+        //   $configPrefix.[ANY].users
+        //   $configPrefix.[ANY].groups
+        //   $configPrefix.[ANY].hosts
+        //
+        String prefixRegEx = groupConfigPrefix.replace(".", "\\.");
+        String usersGroupsRegEx = prefixRegEx + "[\\S]*(" +
+                Pattern.quote(CONF_USERS) + "|" + Pattern.quote(CONF_GROUPS) + 
")";
+        String hostsRegEx = prefixRegEx + "[\\S]*" + Pattern.quote(CONF_HOSTS);
+
+        // get list of users and groups per proxygroup
+        // Map of <hadoop.proxygroup.[VIRTUAL_GROUP].users|groups, 
group1,group2>
+        Map<String, String> allMatchKeys =
+                getConf().getValByRegex(usersGroupsRegEx);
+
+        for (Map.Entry<String, String> entry : allMatchKeys.entrySet()) {
+            //aclKey = hadoop.proxygroup.[VIRTUAL_GROUP]
+            String aclKey = getAclKey(entry.getKey());
+
+            if (!proxyGroupsAcls.containsKey(aclKey)) {
+                proxyGroupsAcls.put(aclKey, new AccessControlList(
+                        allMatchKeys.get(aclKey + CONF_USERS),
+                        allMatchKeys.get(aclKey + CONF_GROUPS)));
+            }
+        }
+
+        // get hosts per proxygroup
+        allMatchKeys = getConf().getValByRegex(hostsRegEx);
+        for (Map.Entry<String, String> entry : allMatchKeys.entrySet()) {
+            groupProxyHosts.put(entry.getKey(),
+                    new MachineList(entry.getValue()));
+        }
+    }
+
+    private String getAclKey(String key) {
+        int endIndex = key.lastIndexOf('.');
+        if (endIndex != -1) {
+            return key.substring(0, endIndex);
+        }
+        return key;
+    }
+
+    @Override
+    public void authorize(UserGroupInformation user, InetAddress 
remoteAddress) throws AuthorizationException {
+        // If both authorization methods are disabled, allow the operation to 
proceed
+        if (!isProxyUserEnabled && !isProxyGroupEnabled) {
+            LOG.successfulImpersonation(user.getRealUser().getUserName(), 
user.getUserName());
+            return;
+        }
+
+        boolean isProxyUserAuthorized = false;
+        boolean isProxyGroupAuthorized = false;
+        if (isProxyUserEnabled) {
+            /* authorize proxy user */
+            isProxyUserAuthorized = checkProxyUserAuthorization(user, 
remoteAddress);
+            /* In case of AND no reason to check for proxy groups if 
isProxyUserAuthorized=false */
+            if("AND".equalsIgnoreCase(impersonationMode) && 
!isProxyUserAuthorized) {
+                throw new AuthorizationException("User: " + 
user.getRealUser().getUserName()
+                        + " is not allowed to impersonate " + 
user.getUserName());
+            }
+        }
+
+        if (isProxyGroupEnabled) {
+            /* check for proxy group impersonation */
+            isProxyGroupAuthorized = checkProxyGroupAuthorization(user, 
remoteAddress);
+        }
+
+        if("AND".equalsIgnoreCase(impersonationMode)) {
+            if (isProxyUserAuthorized && isProxyGroupAuthorized) {
+                LOG.successfulImpersonation(user.getRealUser().getUserName(), 
user.getUserName());
+            } else {
+                throw new AuthorizationException("User: " + 
user.getRealUser().getUserName()
+                        + " is not allowed to impersonate " + 
user.getUserName());
+            }
+
+        } else {
+            /* OR */
+            if (isProxyUserAuthorized || isProxyGroupAuthorized) {
+                LOG.successfulImpersonation(user.getRealUser().getUserName(), 
user.getUserName());
+            } else {
+                throw new AuthorizationException("User: " + 
user.getRealUser().getUserName()
+                        + " is not allowed to impersonate " + 
user.getUserName());
+            }
+        }
+    }
+
+    /**
+     * Helper method to check if the user is authorized to impersonate
+     * Returns true if the user is authorized, false otherwise.
+     * @param user
+     * @param remoteAddress
+     * @return
+     */
+    private boolean checkProxyUserAuthorization(final UserGroupInformation 
user, final InetAddress remoteAddress) {
+        try {
+            super.authorize(user, remoteAddress);
+            return true;
+        } catch (final AuthorizationException e) {
+            LOG.failedToImpersonateUser(user.getRealUser().getUserName(), 
remoteAddress.getHostAddress());
+            return false;
+        }
+    }
+
+    /**
+     * Helper method to check if the group a given user belongs to is 
authorized to impersonate
+     * Returns true if the user is authorized, false otherwise.
+     * @param user
+     * @param remoteAddress
+     * @return
+     */
+    private boolean checkProxyGroupAuthorization(final UserGroupInformation 
user, final InetAddress remoteAddress) {
+        if (user == null) {
+            throw new IllegalArgumentException("user is null.");
+        }
+
+        final UserGroupInformation realUser = user.getRealUser();
+        if (realUser == null) {
+            return true;
+        }
+
+        // Get the real user's groups (both real and virtual)
+        Set<String> realUserGroups = new HashSet<>();
+        if (user.getRealUser().getGroupNames() != null) {
+            Collections.addAll(realUserGroups, 
user.getRealUser().getGroupNames());
+        }
+
+        boolean proxyGroupFound = false;

Review Comment:
   This method is already fairly long, so maybe extracting this part below 
(`proxyGroupFound`) would make it readable.



##########
gateway-spi/src/main/java/org/apache/knox/gateway/util/GroupBasedImpersonationProvider.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.knox.gateway.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
+import org.apache.hadoop.util.MachineList;
+import org.apache.knox.gateway.i18n.GatewaySpiMessages;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static 
org.apache.knox.gateway.util.AuthFilterUtils.DEFAULT_IMPERSONATION_MODE;
+import static org.apache.knox.gateway.util.AuthFilterUtils.IMPERSONATION_MODE;
+
+/**
+ * An extension of Hadoop's DefaultImpersonationProvider that adds support for 
group-based impersonation.
+ * This provider allows users who belong to specific groups to impersonate 
other users.
+ */
+public class GroupBasedImpersonationProvider extends 
DefaultImpersonationProvider {
+    private static final GatewaySpiMessages LOG = 
MessagesFactory.get(GatewaySpiMessages.class);
+    private static final String CONF_HOSTS = ".hosts";
+    private static final String CONF_USERS = ".users";
+    private static final String CONF_GROUPS = ".groups";
+    private final Map<String, AccessControlList> proxyGroupsAcls = new 
HashMap<>();
+    /* is the proxy user configuration enabled  */
+    boolean isProxyUserEnabled = true;
+    /* is the proxy group configuration enabled, defaulting it to false for 
backwards compatibility  */
+    boolean isProxyGroupEnabled;
+    private Map<String, MachineList> groupProxyHosts = new HashMap<>();
+    private String groupConfigPrefix;
+    private String impersonationMode = DEFAULT_IMPERSONATION_MODE;
+
+    public GroupBasedImpersonationProvider() {
+        super();
+    }
+
+    public GroupBasedImpersonationProvider(boolean isProxyUserEnabled, boolean 
isProxyGroupEnabled) {
+        super();
+        this.isProxyUserEnabled = isProxyUserEnabled;
+        this.isProxyGroupEnabled = isProxyGroupEnabled;
+    }
+
+    @Override
+    public Configuration getConf() {
+        return super.getConf();
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        super.setConf(conf);
+    }
+
+    public void init(String configurationPrefix, String proxyGroupPrefix) {
+        super.init(configurationPrefix);
+        initGroupBasedProvider(proxyGroupPrefix);
+    }
+
+    private void initGroupBasedProvider(String proxyGroupPrefix) {
+        impersonationMode = getConf().get(IMPERSONATION_MODE, 
DEFAULT_IMPERSONATION_MODE);
+        groupConfigPrefix = proxyGroupPrefix +
+                (proxyGroupPrefix.endsWith(".") ? "" : ".");
+
+        // constructing regex to match the following patterns:
+        //   $configPrefix.[ANY].users
+        //   $configPrefix.[ANY].groups
+        //   $configPrefix.[ANY].hosts
+        //
+        String prefixRegEx = groupConfigPrefix.replace(".", "\\.");
+        String usersGroupsRegEx = prefixRegEx + "[\\S]*(" +
+                Pattern.quote(CONF_USERS) + "|" + Pattern.quote(CONF_GROUPS) + 
")";
+        String hostsRegEx = prefixRegEx + "[\\S]*" + Pattern.quote(CONF_HOSTS);
+
+        // get list of users and groups per proxygroup
+        // Map of <hadoop.proxygroup.[VIRTUAL_GROUP].users|groups, 
group1,group2>
+        Map<String, String> allMatchKeys =
+                getConf().getValByRegex(usersGroupsRegEx);
+
+        for (Map.Entry<String, String> entry : allMatchKeys.entrySet()) {
+            //aclKey = hadoop.proxygroup.[VIRTUAL_GROUP]
+            String aclKey = getAclKey(entry.getKey());
+
+            if (!proxyGroupsAcls.containsKey(aclKey)) {
+                proxyGroupsAcls.put(aclKey, new AccessControlList(
+                        allMatchKeys.get(aclKey + CONF_USERS),
+                        allMatchKeys.get(aclKey + CONF_GROUPS)));
+            }
+        }
+
+        // get hosts per proxygroup
+        allMatchKeys = getConf().getValByRegex(hostsRegEx);
+        for (Map.Entry<String, String> entry : allMatchKeys.entrySet()) {
+            groupProxyHosts.put(entry.getKey(),
+                    new MachineList(entry.getValue()));
+        }
+    }
+
+    private String getAclKey(String key) {
+        int endIndex = key.lastIndexOf('.');
+        if (endIndex != -1) {
+            return key.substring(0, endIndex);
+        }
+        return key;
+    }
+
+    @Override
+    public void authorize(UserGroupInformation user, InetAddress 
remoteAddress) throws AuthorizationException {
+        // If both authorization methods are disabled, allow the operation to 
proceed
+        if (!isProxyUserEnabled && !isProxyGroupEnabled) {
+            LOG.successfulImpersonation(user.getRealUser().getUserName(), 
user.getUserName());
+            return;
+        }
+
+        boolean isProxyUserAuthorized = false;
+        boolean isProxyGroupAuthorized = false;
+        if (isProxyUserEnabled) {
+            /* authorize proxy user */
+            isProxyUserAuthorized = checkProxyUserAuthorization(user, 
remoteAddress);
+            /* In case of AND no reason to check for proxy groups if 
isProxyUserAuthorized=false */
+            if("AND".equalsIgnoreCase(impersonationMode) && 
!isProxyUserAuthorized) {
+                throw new AuthorizationException("User: " + 
user.getRealUser().getUserName()
+                        + " is not allowed to impersonate " + 
user.getUserName());
+            }
+        }
+
+        if (isProxyGroupEnabled) {
+            /* check for proxy group impersonation */
+            isProxyGroupAuthorized = checkProxyGroupAuthorization(user, 
remoteAddress);
+        }
+
+        if("AND".equalsIgnoreCase(impersonationMode)) {
+            if (isProxyUserAuthorized && isProxyGroupAuthorized) {
+                LOG.successfulImpersonation(user.getRealUser().getUserName(), 
user.getUserName());
+            } else {
+                throw new AuthorizationException("User: " + 
user.getRealUser().getUserName()
+                        + " is not allowed to impersonate " + 
user.getUserName());
+            }
+
+        } else {
+            /* OR */
+            if (isProxyUserAuthorized || isProxyGroupAuthorized) {
+                LOG.successfulImpersonation(user.getRealUser().getUserName(), 
user.getUserName());
+            } else {
+                throw new AuthorizationException("User: " + 
user.getRealUser().getUserName()
+                        + " is not allowed to impersonate " + 
user.getUserName());
+            }
+        }
+    }
+
+    /**
+     * Helper method to check if the user is authorized to impersonate
+     * Returns true if the user is authorized, false otherwise.
+     * @param user
+     * @param remoteAddress
+     * @return
+     */
+    private boolean checkProxyUserAuthorization(final UserGroupInformation 
user, final InetAddress remoteAddress) {
+        try {
+            super.authorize(user, remoteAddress);
+            return true;
+        } catch (final AuthorizationException e) {
+            LOG.failedToImpersonateUser(user.getRealUser().getUserName(), 
remoteAddress.getHostAddress());
+            return false;
+        }
+    }
+
+    /**
+     * Helper method to check if the group a given user belongs to is 
authorized to impersonate
+     * Returns true if the user is authorized, false otherwise.
+     * @param user
+     * @param remoteAddress
+     * @return
+     */
+    private boolean checkProxyGroupAuthorization(final UserGroupInformation 
user, final InetAddress remoteAddress) {
+        if (user == null) {
+            throw new IllegalArgumentException("user is null.");
+        }
+
+        final UserGroupInformation realUser = user.getRealUser();
+        if (realUser == null) {
+            return true;
+        }
+
+        // Get the real user's groups (both real and virtual)
+        Set<String> realUserGroups = new HashSet<>();
+        if (user.getRealUser().getGroupNames() != null) {
+            Collections.addAll(realUserGroups, 
user.getRealUser().getGroupNames());
+        }
+
+        boolean proxyGroupFound = false;
+        // Check if any of the real user's groups have permission to 
impersonate the proxy user
+        for (String group : realUserGroups) {
+            final AccessControlList acl = 
proxyGroupsAcls.get(groupConfigPrefix +
+                    group);
+
+            if (acl == null || !acl.isUserAllowed(user)) {
+                continue;
+            } else {
+                proxyGroupFound = true;
+                break;
+            }
+        }
+
+        if (!proxyGroupFound) {
+            LOG.failedToImpersonateGroups(realUser.getUserName(), 
realUserGroups.toString(), user.getUserName());
+            return false;
+        }
+
+        boolean proxyGroupHostFound = false;

Review Comment:
   That could be another candidate to extract into its own method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@knox.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to