mneethiraj commented on code in PR #847: URL: https://github.com/apache/ranger/pull/847#discussion_r2922683068
########## agents-audit/dest-auditserver/src/main/java/org/apache/ranger/audit/destination/RangerAuditServerDestination.java: ########## @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.destination; + +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.apache.ranger.audit.provider.MiscUtil; +import org.apache.ranger.plugin.util.RangerRESTClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServletResponse; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + +public class RangerAuditServerDestination extends AuditDestination { + private static final Logger LOG = LoggerFactory.getLogger(RangerAuditServerDestination.class); + + public static final String PROP_AUDITSERVER_URL = "xasecure.audit.destination.auditserver.url"; + public static final String PROP_AUDITSERVER_USER_NAME = "xasecure.audit.destination.auditserver.username"; + public static final String PROP_AUDITSERVER_USER_PASSWORD = "xasecure.audit.destination.auditserver.password"; + public static final String PROP_AUDITSERVER_AUTH_TYPE = "xasecure.audit.destination.auditserver.authentication.type"; + public static final String PROP_AUDITSERVER_JWT_TOKEN = "xasecure.audit.destination.auditserver.jwt.token"; + public static final String PROP_AUDITSERVER_JWT_TOKEN_FILE = "xasecure.audit.destination.auditserver.jwt.token.file"; + public static final String PROP_AUDITSERVER_CLIENT_CONN_TIMEOUT_MS = "xasecure.audit.destination.auditserver.connection.timeout.ms"; + public static final String PROP_AUDITSERVER_CLIENT_READ_TIMEOUT_MS = "xasecure.audit.destination.auditserver.read.timeout.ms"; + public static final String PROP_AUDITSERVER_SSL_CONFIG_FILE = "xasecure.audit.destination.auditserver.ssl.config.file"; + public static final String PROP_AUDITSERVER_MAX_RETRY_ATTEMPTS = "xasecure.audit.destination.auditserver.max.retry.attempts"; + public static final String PROP_AUDITSERVER_RETRY_INTERVAL_MS = "xasecure.audit.destination.auditserver.retry.interval.ms"; + public static final String REST_RELATIVE_PATH_POST = "/api/audit/access"; + public static final String QUERY_PARAM_SERVICE_NAME = "serviceName"; + public static final String QUERY_PARAM_APP_ID = "appId"; + + // Authentication types + public static final String AUTH_TYPE_KERBEROS = "kerberos"; + public static final String AUTH_TYPE_BASIC = "basic"; + public static final String AUTH_TYPE_JWT = "jwt"; + + private RangerRESTClient restClient; + + @Override + public void init(Properties props, String propPrefix) { + LOG.info("==> RangerAuditServerDestination:init()"); + super.init(props, propPrefix); + + String url = MiscUtil.getStringProperty(props, PROP_AUDITSERVER_URL); + String sslConfigFileName = MiscUtil.getStringProperty(props, PROP_AUDITSERVER_SSL_CONFIG_FILE); + String userName = MiscUtil.getStringProperty(props, PROP_AUDITSERVER_USER_NAME); + String password = MiscUtil.getStringProperty(props, PROP_AUDITSERVER_USER_PASSWORD); + int connTimeoutMs = MiscUtil.getIntProperty(props, PROP_AUDITSERVER_CLIENT_CONN_TIMEOUT_MS, 120000); + int readTimeoutMs = MiscUtil.getIntProperty(props, PROP_AUDITSERVER_CLIENT_READ_TIMEOUT_MS, 30000); + int maxRetryAttempts = MiscUtil.getIntProperty(props, PROP_AUDITSERVER_MAX_RETRY_ATTEMPTS, 3); + int retryIntervalMs = MiscUtil.getIntProperty(props, PROP_AUDITSERVER_RETRY_INTERVAL_MS, 1000); + + String authType = MiscUtil.getStringProperty(props, PROP_AUDITSERVER_AUTH_TYPE); + + if (StringUtils.isEmpty(authType)) { // Authentication priority: JWT → Kerberos → Basic Review Comment: It is okay to require the authNType be provided in the configuration. I suggest removing auto-detection of authNType. ########## agents-audit/dest-auditserver/src/main/java/org/apache/ranger/audit/destination/RangerAuditServerDestination.java: ########## @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.destination; + +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.apache.ranger.audit.provider.MiscUtil; +import org.apache.ranger.plugin.util.RangerRESTClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServletResponse; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + +public class RangerAuditServerDestination extends AuditDestination { + private static final Logger LOG = LoggerFactory.getLogger(RangerAuditServerDestination.class); + + public static final String PROP_AUDITSERVER_URL = "xasecure.audit.destination.auditserver.url"; + public static final String PROP_AUDITSERVER_USER_NAME = "xasecure.audit.destination.auditserver.username"; + public static final String PROP_AUDITSERVER_USER_PASSWORD = "xasecure.audit.destination.auditserver.password"; + public static final String PROP_AUDITSERVER_AUTH_TYPE = "xasecure.audit.destination.auditserver.authentication.type"; + public static final String PROP_AUDITSERVER_JWT_TOKEN = "xasecure.audit.destination.auditserver.jwt.token"; + public static final String PROP_AUDITSERVER_JWT_TOKEN_FILE = "xasecure.audit.destination.auditserver.jwt.token.file"; + public static final String PROP_AUDITSERVER_CLIENT_CONN_TIMEOUT_MS = "xasecure.audit.destination.auditserver.connection.timeout.ms"; + public static final String PROP_AUDITSERVER_CLIENT_READ_TIMEOUT_MS = "xasecure.audit.destination.auditserver.read.timeout.ms"; + public static final String PROP_AUDITSERVER_SSL_CONFIG_FILE = "xasecure.audit.destination.auditserver.ssl.config.file"; + public static final String PROP_AUDITSERVER_MAX_RETRY_ATTEMPTS = "xasecure.audit.destination.auditserver.max.retry.attempts"; + public static final String PROP_AUDITSERVER_RETRY_INTERVAL_MS = "xasecure.audit.destination.auditserver.retry.interval.ms"; + public static final String REST_RELATIVE_PATH_POST = "/api/audit/access"; + public static final String QUERY_PARAM_SERVICE_NAME = "serviceName"; + public static final String QUERY_PARAM_APP_ID = "appId"; + + // Authentication types + public static final String AUTH_TYPE_KERBEROS = "kerberos"; + public static final String AUTH_TYPE_BASIC = "basic"; + public static final String AUTH_TYPE_JWT = "jwt"; + + private RangerRESTClient restClient; + + @Override + public void init(Properties props, String propPrefix) { + LOG.info("==> RangerAuditServerDestination:init()"); + super.init(props, propPrefix); + + String url = MiscUtil.getStringProperty(props, PROP_AUDITSERVER_URL); + String sslConfigFileName = MiscUtil.getStringProperty(props, PROP_AUDITSERVER_SSL_CONFIG_FILE); + String userName = MiscUtil.getStringProperty(props, PROP_AUDITSERVER_USER_NAME); + String password = MiscUtil.getStringProperty(props, PROP_AUDITSERVER_USER_PASSWORD); + int connTimeoutMs = MiscUtil.getIntProperty(props, PROP_AUDITSERVER_CLIENT_CONN_TIMEOUT_MS, 120000); + int readTimeoutMs = MiscUtil.getIntProperty(props, PROP_AUDITSERVER_CLIENT_READ_TIMEOUT_MS, 30000); + int maxRetryAttempts = MiscUtil.getIntProperty(props, PROP_AUDITSERVER_MAX_RETRY_ATTEMPTS, 3); + int retryIntervalMs = MiscUtil.getIntProperty(props, PROP_AUDITSERVER_RETRY_INTERVAL_MS, 1000); + + String authType = MiscUtil.getStringProperty(props, PROP_AUDITSERVER_AUTH_TYPE); + + if (StringUtils.isEmpty(authType)) { // Authentication priority: JWT → Kerberos → Basic + try { + if (StringUtils.isNotEmpty(MiscUtil.getStringProperty(props, PROP_AUDITSERVER_JWT_TOKEN)) || + StringUtils.isNotEmpty(MiscUtil.getStringProperty(props, PROP_AUDITSERVER_JWT_TOKEN_FILE))) { + authType = AUTH_TYPE_JWT; + } else if (isKerberosAuthenticated()) { + authType = AUTH_TYPE_KERBEROS; + } else if (StringUtils.isNotEmpty(userName)) { + authType = AUTH_TYPE_BASIC; + } + } catch (Exception e) { + LOG.warn("Failed to auto-detect authentication type", e); + } + } + + LOG.info("Audit destination authentication type: {}", authType); + + if (AUTH_TYPE_KERBEROS.equalsIgnoreCase(authType)) { + initKerberos(); + } + + Configuration config = createConfigurationFromProperties(props, authType, userName, password); + + this.restClient = new RangerRESTClient(url, sslConfigFileName, config); + + this.restClient.setRestClientConnTimeOutMs(connTimeoutMs); + this.restClient.setRestClientReadTimeOutMs(readTimeoutMs); + this.restClient.setMaxRetryAttempts(maxRetryAttempts); + this.restClient.setRetryIntervalMs(retryIntervalMs); + + LOG.info("<== RangerAuditServerDestination:init()"); + } + + @Override + public void stop() { + LOG.info("==> RangerAuditServerDestination.stop() called.."); + + logStatus(); + + if (restClient != null) { + restClient.resetClient(); + + restClient = null; + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#flush() + */ + @Override + public void flush() { + } + + @Override + public boolean log(Collection<AuditEventBase> events) { + boolean ret = false; + + try { + logStatusIfRequired(); + + addTotalCount(events.size()); + + if (restClient != null) { + ret = logAsBatch(events); + } else { + LOG.error("REST client is not initialized. Cannot send audit events"); + + addDeferredCount(events.size()); + } + } catch (Throwable t) { + logError("Error sending audit to Audit Server", t); + + addDeferredCount(events.size()); + } + + return ret; + } + + public boolean isAsync() { + return true; + } + + private boolean logAsBatch(Collection<AuditEventBase> events) { + int totalEvents = events.size(); + + LOG.debug("==> logAsBatch() Sending batch of {} events to Audit Server....", totalEvents); + + boolean batchSuccess = sendBatch(events); + + if (batchSuccess) { + addSuccessCount(totalEvents); + } else { + LOG.error("Failed to send batch of {} events", totalEvents); + + addFailedCount(totalEvents); + } + + LOG.debug("<== logAsBatch() Batch processing complete: {}/{} events sent successfully", batchSuccess ? totalEvents : 0, totalEvents); + + return batchSuccess; + } + + private boolean sendBatch(Collection<AuditEventBase> events) { + boolean ret; + Map<String, String> queryParams = new HashMap<>(); + String serviceName = fetchServiceName(events); + String appId = fetchAppId(events); + + LOG.debug("Adding serviceName={} to audit request", serviceName); + + queryParams.put(QUERY_PARAM_SERVICE_NAME, serviceName); + + if (StringUtils.isNotEmpty(appId)) { + LOG.debug("Adding appId={} to audit request for batch processing", appId); + + queryParams.put(QUERY_PARAM_APP_ID, appId); + } + + try { + final UserGroupInformation user = MiscUtil.getUGILoginUser(); + final boolean isSecureMode = isKerberosAuthenticated(); + + if (isSecureMode && user != null) { + LOG.debug("Sending audit batch of {} events using Kerberos. Principal: {}, AuthMethod: {}", events.size(), user.getUserName(), user.getAuthenticationMethod()); + } else { + LOG.debug("Sending audit batch of {} events. SecureMode: {}, User: {}", events.size(), isSecureMode, user != null ? user.getUserName() : "null"); + } + + final ClientResponse response; + + if (isSecureMode) { + response = MiscUtil.executePrivilegedAction((PrivilegedExceptionAction<ClientResponse>) () -> { + try { + return postAuditEvents(REST_RELATIVE_PATH_POST, queryParams, events); + } catch (Exception e) { + LOG.error("Failed to post audit events in privileged action: {}", e.getMessage()); + throw e; + } + }); + } else { + response = postAuditEvents(REST_RELATIVE_PATH_POST, queryParams, events); + } + + if (response != null) { + int status = response.getStatus(); + + if (status == HttpServletResponse.SC_OK) { + if (LOG.isDebugEnabled()) { + LOG.debug("Audit batch sent successfully. {} events delivered. Response: {}", events.size(), response.getEntity(String.class)); + } + + ret = true; + } else { + String errorBody = ""; + + try { + if (response.hasEntity()) { + errorBody = response.getEntity(String.class); + } + } catch (Exception e) { + LOG.debug("Failed to read error response body", e); + } + + LOG.error("Failed to send audit batch. HTTP status: {}, Response: {}", status, errorBody); + + if (status == HttpServletResponse.SC_UNAUTHORIZED) { + LOG.error("Authentication failure (401). Verify credentials are valid and audit server is properly configured."); + } + + ret = false; + } + } else { + LOG.error("Received null response from audit server for batch of {} events", events.size()); + + ret = false; + } + } catch (Exception e) { + LOG.error("Failed to send audit batch of {} events. Error: {}", events.size(), e.getMessage(), e); + + ret = false; + } + + return ret; + } + + private int fetchServiceType(Collection<AuditEventBase> events) { + Iterator<AuditEventBase> iter = events.iterator(); + AuditEventBase auditEvent = iter.hasNext() ? iter.next() : null; + + return (auditEvent instanceof AuthzAuditEvent) ? ((AuthzAuditEvent) auditEvent).getRepositoryType() : -1; + } + + private String fetchServiceName(Collection<AuditEventBase> events) { + Iterator<AuditEventBase> iter = events.iterator(); + AuditEventBase auditEvent = iter.hasNext() ? iter.next() : null; + + return (auditEvent instanceof AuthzAuditEvent) ? ((AuthzAuditEvent) auditEvent).getRepositoryName() : null; + } + + private String fetchAppId(Collection<AuditEventBase> events) { + Iterator<AuditEventBase> iter = events.iterator(); + AuditEventBase auditEvent = iter.hasNext() ? iter.next() : null; + + return (auditEvent instanceof AuthzAuditEvent) ? ((AuthzAuditEvent) auditEvent).getAgentId() : null; + } + + private ClientResponse postAuditEvents(String relativeUrl, Map<String, String> params, Collection<AuditEventBase> events) throws Exception { + LOG.debug("Posting {} audit events to {}", events.size(), relativeUrl); + + WebResource webResource = restClient.getResource(relativeUrl); + + if (params != null && !params.isEmpty()) { + for (Map.Entry<String, String> entry : params.entrySet()) { + webResource = webResource.queryParam(entry.getKey(), entry.getValue()); + } + } + + return webResource + .accept("application/json") + .type("application/json") + .entity(events) + .post(ClientResponse.class); + } + + private Configuration createConfigurationFromProperties(Properties props, String authType, String userName, String password) { + Configuration config = new Configuration(); + + for (String key : props.stringPropertyNames()) { + config.set(key, props.getProperty(key)); + } + + final String restClientPrefix = "ranger.plugin"; + + if (AUTH_TYPE_JWT.equalsIgnoreCase(authType)) { + String jwtToken = initJwtToken(); + + if (StringUtils.isNotEmpty(jwtToken)) { Review Comment: The value given in `jwt.token` seems to be ignored - insider this `if` block if `jwt.token.file` is not set, no configuration is set on `config`. This doesn't seem right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
