mneethiraj commented on code in PR #847:
URL: https://github.com/apache/ranger/pull/847#discussion_r2922683068


##########
agents-audit/dest-auditserver/src/main/java/org/apache/ranger/audit/destination/RangerAuditServerDestination.java:
##########
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.destination;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.plugin.util.RangerRESTClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+public class RangerAuditServerDestination extends AuditDestination {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RangerAuditServerDestination.class);
+
+    public static final String PROP_AUDITSERVER_URL                    = 
"xasecure.audit.destination.auditserver.url";
+    public static final String PROP_AUDITSERVER_USER_NAME              = 
"xasecure.audit.destination.auditserver.username";
+    public static final String PROP_AUDITSERVER_USER_PASSWORD          = 
"xasecure.audit.destination.auditserver.password";
+    public static final String PROP_AUDITSERVER_AUTH_TYPE              = 
"xasecure.audit.destination.auditserver.authentication.type";
+    public static final String PROP_AUDITSERVER_JWT_TOKEN              = 
"xasecure.audit.destination.auditserver.jwt.token";
+    public static final String PROP_AUDITSERVER_JWT_TOKEN_FILE         = 
"xasecure.audit.destination.auditserver.jwt.token.file";
+    public static final String PROP_AUDITSERVER_CLIENT_CONN_TIMEOUT_MS = 
"xasecure.audit.destination.auditserver.connection.timeout.ms";
+    public static final String PROP_AUDITSERVER_CLIENT_READ_TIMEOUT_MS = 
"xasecure.audit.destination.auditserver.read.timeout.ms";
+    public static final String PROP_AUDITSERVER_SSL_CONFIG_FILE        = 
"xasecure.audit.destination.auditserver.ssl.config.file";
+    public static final String PROP_AUDITSERVER_MAX_RETRY_ATTEMPTS     = 
"xasecure.audit.destination.auditserver.max.retry.attempts";
+    public static final String PROP_AUDITSERVER_RETRY_INTERVAL_MS      = 
"xasecure.audit.destination.auditserver.retry.interval.ms";
+    public static final String REST_RELATIVE_PATH_POST                 = 
"/api/audit/access";
+    public static final String QUERY_PARAM_SERVICE_NAME                = 
"serviceName";
+    public static final String QUERY_PARAM_APP_ID                      = 
"appId";
+
+    // Authentication types
+    public static final String AUTH_TYPE_KERBEROS = "kerberos";
+    public static final String AUTH_TYPE_BASIC    = "basic";
+    public static final String AUTH_TYPE_JWT      = "jwt";
+
+    private RangerRESTClient restClient;
+
+    @Override
+    public void init(Properties props, String propPrefix) {
+        LOG.info("==> RangerAuditServerDestination:init()");
+        super.init(props, propPrefix);
+
+        String url               = MiscUtil.getStringProperty(props, 
PROP_AUDITSERVER_URL);
+        String sslConfigFileName = MiscUtil.getStringProperty(props, 
PROP_AUDITSERVER_SSL_CONFIG_FILE);
+        String userName          = MiscUtil.getStringProperty(props, 
PROP_AUDITSERVER_USER_NAME);
+        String password          = MiscUtil.getStringProperty(props, 
PROP_AUDITSERVER_USER_PASSWORD);
+        int    connTimeoutMs     = MiscUtil.getIntProperty(props, 
PROP_AUDITSERVER_CLIENT_CONN_TIMEOUT_MS, 120000);
+        int    readTimeoutMs     = MiscUtil.getIntProperty(props, 
PROP_AUDITSERVER_CLIENT_READ_TIMEOUT_MS, 30000);
+        int    maxRetryAttempts  = MiscUtil.getIntProperty(props, 
PROP_AUDITSERVER_MAX_RETRY_ATTEMPTS, 3);
+        int    retryIntervalMs   = MiscUtil.getIntProperty(props, 
PROP_AUDITSERVER_RETRY_INTERVAL_MS, 1000);
+
+        String authType = MiscUtil.getStringProperty(props, 
PROP_AUDITSERVER_AUTH_TYPE);
+
+        if (StringUtils.isEmpty(authType)) { // Authentication priority: JWT → 
Kerberos → Basic

Review Comment:
   It is okay to require the authNType be provided in the configuration. I 
suggest removing auto-detection of authNType.



##########
agents-audit/dest-auditserver/src/main/java/org/apache/ranger/audit/destination/RangerAuditServerDestination.java:
##########
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.destination;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.plugin.util.RangerRESTClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+public class RangerAuditServerDestination extends AuditDestination {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RangerAuditServerDestination.class);
+
+    public static final String PROP_AUDITSERVER_URL                    = 
"xasecure.audit.destination.auditserver.url";
+    public static final String PROP_AUDITSERVER_USER_NAME              = 
"xasecure.audit.destination.auditserver.username";
+    public static final String PROP_AUDITSERVER_USER_PASSWORD          = 
"xasecure.audit.destination.auditserver.password";
+    public static final String PROP_AUDITSERVER_AUTH_TYPE              = 
"xasecure.audit.destination.auditserver.authentication.type";
+    public static final String PROP_AUDITSERVER_JWT_TOKEN              = 
"xasecure.audit.destination.auditserver.jwt.token";
+    public static final String PROP_AUDITSERVER_JWT_TOKEN_FILE         = 
"xasecure.audit.destination.auditserver.jwt.token.file";
+    public static final String PROP_AUDITSERVER_CLIENT_CONN_TIMEOUT_MS = 
"xasecure.audit.destination.auditserver.connection.timeout.ms";
+    public static final String PROP_AUDITSERVER_CLIENT_READ_TIMEOUT_MS = 
"xasecure.audit.destination.auditserver.read.timeout.ms";
+    public static final String PROP_AUDITSERVER_SSL_CONFIG_FILE        = 
"xasecure.audit.destination.auditserver.ssl.config.file";
+    public static final String PROP_AUDITSERVER_MAX_RETRY_ATTEMPTS     = 
"xasecure.audit.destination.auditserver.max.retry.attempts";
+    public static final String PROP_AUDITSERVER_RETRY_INTERVAL_MS      = 
"xasecure.audit.destination.auditserver.retry.interval.ms";
+    public static final String REST_RELATIVE_PATH_POST                 = 
"/api/audit/access";
+    public static final String QUERY_PARAM_SERVICE_NAME                = 
"serviceName";
+    public static final String QUERY_PARAM_APP_ID                      = 
"appId";
+
+    // Authentication types
+    public static final String AUTH_TYPE_KERBEROS = "kerberos";
+    public static final String AUTH_TYPE_BASIC    = "basic";
+    public static final String AUTH_TYPE_JWT      = "jwt";
+
+    private RangerRESTClient restClient;
+
+    @Override
+    public void init(Properties props, String propPrefix) {
+        LOG.info("==> RangerAuditServerDestination:init()");
+        super.init(props, propPrefix);
+
+        String url               = MiscUtil.getStringProperty(props, 
PROP_AUDITSERVER_URL);
+        String sslConfigFileName = MiscUtil.getStringProperty(props, 
PROP_AUDITSERVER_SSL_CONFIG_FILE);
+        String userName          = MiscUtil.getStringProperty(props, 
PROP_AUDITSERVER_USER_NAME);
+        String password          = MiscUtil.getStringProperty(props, 
PROP_AUDITSERVER_USER_PASSWORD);
+        int    connTimeoutMs     = MiscUtil.getIntProperty(props, 
PROP_AUDITSERVER_CLIENT_CONN_TIMEOUT_MS, 120000);
+        int    readTimeoutMs     = MiscUtil.getIntProperty(props, 
PROP_AUDITSERVER_CLIENT_READ_TIMEOUT_MS, 30000);
+        int    maxRetryAttempts  = MiscUtil.getIntProperty(props, 
PROP_AUDITSERVER_MAX_RETRY_ATTEMPTS, 3);
+        int    retryIntervalMs   = MiscUtil.getIntProperty(props, 
PROP_AUDITSERVER_RETRY_INTERVAL_MS, 1000);
+
+        String authType = MiscUtil.getStringProperty(props, 
PROP_AUDITSERVER_AUTH_TYPE);
+
+        if (StringUtils.isEmpty(authType)) { // Authentication priority: JWT → 
Kerberos → Basic
+            try {
+                if (StringUtils.isNotEmpty(MiscUtil.getStringProperty(props, 
PROP_AUDITSERVER_JWT_TOKEN)) ||
+                        
StringUtils.isNotEmpty(MiscUtil.getStringProperty(props, 
PROP_AUDITSERVER_JWT_TOKEN_FILE))) {
+                    authType = AUTH_TYPE_JWT;
+                } else if (isKerberosAuthenticated()) {
+                    authType = AUTH_TYPE_KERBEROS;
+                } else if (StringUtils.isNotEmpty(userName)) {
+                    authType = AUTH_TYPE_BASIC;
+                }
+            } catch (Exception e) {
+                LOG.warn("Failed to auto-detect authentication type", e);
+            }
+        }
+
+        LOG.info("Audit destination authentication type: {}", authType);
+
+        if (AUTH_TYPE_KERBEROS.equalsIgnoreCase(authType)) {
+            initKerberos();
+        }
+
+        Configuration config = createConfigurationFromProperties(props, 
authType, userName, password);
+
+        this.restClient = new RangerRESTClient(url, sslConfigFileName, config);
+
+        this.restClient.setRestClientConnTimeOutMs(connTimeoutMs);
+        this.restClient.setRestClientReadTimeOutMs(readTimeoutMs);
+        this.restClient.setMaxRetryAttempts(maxRetryAttempts);
+        this.restClient.setRetryIntervalMs(retryIntervalMs);
+
+        LOG.info("<== RangerAuditServerDestination:init()");
+    }
+
+    @Override
+    public void stop() {
+        LOG.info("==> RangerAuditServerDestination.stop() called..");
+
+        logStatus();
+
+        if (restClient != null) {
+            restClient.resetClient();
+
+            restClient = null;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+     */
+    @Override
+    public void flush() {
+    }
+
+    @Override
+    public boolean log(Collection<AuditEventBase> events) {
+        boolean ret = false;
+
+        try {
+            logStatusIfRequired();
+
+            addTotalCount(events.size());
+
+            if (restClient != null) {
+                ret = logAsBatch(events);
+            } else {
+                LOG.error("REST client is not initialized. Cannot send audit 
events");
+
+                addDeferredCount(events.size());
+            }
+        } catch (Throwable t) {
+            logError("Error sending audit to Audit Server", t);
+
+            addDeferredCount(events.size());
+        }
+
+        return ret;
+    }
+
+    public boolean isAsync() {
+        return true;
+    }
+
+    private boolean logAsBatch(Collection<AuditEventBase> events) {
+        int totalEvents = events.size();
+
+        LOG.debug("==> logAsBatch() Sending batch of {} events to Audit 
Server....", totalEvents);
+
+        boolean batchSuccess = sendBatch(events);
+
+        if (batchSuccess) {
+            addSuccessCount(totalEvents);
+        } else {
+            LOG.error("Failed to send batch of {} events", totalEvents);
+
+            addFailedCount(totalEvents);
+        }
+
+        LOG.debug("<== logAsBatch() Batch processing complete: {}/{} events 
sent successfully", batchSuccess ? totalEvents : 0, totalEvents);
+
+        return batchSuccess;
+    }
+
+    private boolean sendBatch(Collection<AuditEventBase> events) {
+        boolean             ret;
+        Map<String, String> queryParams = new HashMap<>();
+        String              serviceName = fetchServiceName(events);
+        String              appId       = fetchAppId(events);
+
+        LOG.debug("Adding serviceName={} to audit request", serviceName);
+
+        queryParams.put(QUERY_PARAM_SERVICE_NAME, serviceName);
+
+        if (StringUtils.isNotEmpty(appId)) {
+            LOG.debug("Adding appId={} to audit request for batch processing", 
appId);
+
+            queryParams.put(QUERY_PARAM_APP_ID, appId);
+        }
+
+        try {
+            final UserGroupInformation user         = 
MiscUtil.getUGILoginUser();
+            final boolean              isSecureMode = 
isKerberosAuthenticated();
+
+            if (isSecureMode && user != null) {
+                LOG.debug("Sending audit batch of {} events using Kerberos. 
Principal: {}, AuthMethod: {}", events.size(), user.getUserName(), 
user.getAuthenticationMethod());
+            } else {
+                LOG.debug("Sending audit batch of {} events. SecureMode: {}, 
User: {}", events.size(), isSecureMode, user != null ? user.getUserName() : 
"null");
+            }
+
+            final ClientResponse response;
+
+            if (isSecureMode) {
+                response = 
MiscUtil.executePrivilegedAction((PrivilegedExceptionAction<ClientResponse>) () 
-> {
+                    try {
+                        return postAuditEvents(REST_RELATIVE_PATH_POST, 
queryParams, events);
+                    } catch (Exception e) {
+                        LOG.error("Failed to post audit events in privileged 
action: {}", e.getMessage());
+                        throw e;
+                    }
+                });
+            } else {
+                response = postAuditEvents(REST_RELATIVE_PATH_POST, 
queryParams, events);
+            }
+
+            if (response != null) {
+                int status = response.getStatus();
+
+                if (status == HttpServletResponse.SC_OK) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Audit batch sent successfully. {} events 
delivered. Response: {}", events.size(), response.getEntity(String.class));
+                    }
+
+                    ret = true;
+                } else {
+                    String errorBody = "";
+
+                    try {
+                        if (response.hasEntity()) {
+                            errorBody = response.getEntity(String.class);
+                        }
+                    } catch (Exception e) {
+                        LOG.debug("Failed to read error response body", e);
+                    }
+
+                    LOG.error("Failed to send audit batch. HTTP status: {}, 
Response: {}", status, errorBody);
+
+                    if (status == HttpServletResponse.SC_UNAUTHORIZED) {
+                        LOG.error("Authentication failure (401). Verify 
credentials are valid and audit server is properly configured.");
+                    }
+
+                    ret = false;
+                }
+            } else {
+                LOG.error("Received null response from audit server for batch 
of {} events", events.size());
+
+                ret = false;
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to send audit batch of {} events. Error: {}", 
events.size(), e.getMessage(), e);
+
+            ret = false;
+        }
+
+        return ret;
+    }
+
+    private int fetchServiceType(Collection<AuditEventBase> events) {
+        Iterator<AuditEventBase> iter       = events.iterator();
+        AuditEventBase           auditEvent = iter.hasNext() ? iter.next() : 
null;
+
+        return (auditEvent instanceof AuthzAuditEvent) ? ((AuthzAuditEvent) 
auditEvent).getRepositoryType() : -1;
+    }
+
+    private String fetchServiceName(Collection<AuditEventBase> events) {
+        Iterator<AuditEventBase> iter       = events.iterator();
+        AuditEventBase           auditEvent = iter.hasNext() ? iter.next() : 
null;
+
+        return (auditEvent instanceof AuthzAuditEvent) ? ((AuthzAuditEvent) 
auditEvent).getRepositoryName() : null;
+    }
+
+    private String fetchAppId(Collection<AuditEventBase> events) {
+        Iterator<AuditEventBase> iter       = events.iterator();
+        AuditEventBase           auditEvent = iter.hasNext() ? iter.next() : 
null;
+
+        return (auditEvent instanceof AuthzAuditEvent) ? ((AuthzAuditEvent) 
auditEvent).getAgentId() : null;
+    }
+
+    private ClientResponse postAuditEvents(String relativeUrl, Map<String, 
String> params, Collection<AuditEventBase> events) throws Exception {
+        LOG.debug("Posting {} audit events to {}", events.size(), relativeUrl);
+
+        WebResource webResource = restClient.getResource(relativeUrl);
+
+        if (params != null && !params.isEmpty()) {
+            for (Map.Entry<String, String> entry : params.entrySet()) {
+                webResource = webResource.queryParam(entry.getKey(), 
entry.getValue());
+            }
+        }
+
+        return webResource
+                .accept("application/json")
+                .type("application/json")
+                .entity(events)
+                .post(ClientResponse.class);
+    }
+
+    private Configuration createConfigurationFromProperties(Properties props, 
String authType, String userName, String password) {
+        Configuration config = new Configuration();
+
+        for (String key : props.stringPropertyNames()) {
+            config.set(key, props.getProperty(key));
+        }
+
+        final String restClientPrefix = "ranger.plugin";
+
+        if (AUTH_TYPE_JWT.equalsIgnoreCase(authType)) {
+            String jwtToken = initJwtToken();
+
+            if (StringUtils.isNotEmpty(jwtToken)) {

Review Comment:
   The value given in `jwt.token` seems to be ignored - insider this `if` block 
if `jwt.token.file` is not set, no configuration is set on `config`. This 
doesn't seem right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to