[
https://issues.apache.org/jira/browse/RANGER-5222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17956834#comment-17956834
]
Wenyue Li commented on RANGER-5222:
-----------------------------------
[~abhi_2110] Oh, I see. The issue was indeed fixed in version 2.6. Thank you
very much.
> The Kafka plugin throws a NoClassDefFoundError due to a missing log4j
> dependency.
> ---------------------------------------------------------------------------------
>
> Key: RANGER-5222
> URL: https://issues.apache.org/jira/browse/RANGER-5222
> Project: Ranger
> Issue Type: Bug
> Components: audit
> Affects Versions: 2.1.0
> Reporter: Wenyue Li
> Priority: Major
> Original Estimate: 96h
> Remaining Estimate: 96h
>
> When I set {{xasecure.audit.destination.elasticsearch}} to {{true}} in the
> Kafka plugin configuration, if only one Elasticsearch node is online or
> Elasticsearch is not functioning properly, an exception is thrown during the
> initialization of the {{RestHighLevelClient}} class, resulting in a {{null}}
> return value,The code is as follows:
> {code:java}
> package org.apache.ranger.audit.destination;
> public class ElasticSearchAuditDestination extends AuditDestination {
> private RestHighLevelClient newClient() {
> try {
> if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) &&
> password.contains("keytab") && new File(password).exists()) {
> subject = CredentialsProviderUtil.login(user, password);
> }
> RestClientBuilder restClientBuilder =
> getRestClientBuilder(hosts, protocol, user, password, port);
> RestHighLevelClient restHighLevelClient = new
> RestHighLevelClient(restClientBuilder);
> if (LOG.isDebugEnabled()) {
> LOG.debug("Initialized client");
> }
> boolean exits = false;
> try {
> exits = restHighLevelClient.indices().open(new OpenIndexRequest(this.index),
> RequestOptions.DEFAULT).isShardsAcknowledged();
> } catch (Exception e) {
> LOG.warn("Error validating index " + this.index);
> }
> if(exits) {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Index exists");
> }
> } else {
> LOG.info("Index does not exist");
> }
> return restHighLevelClient;
> } catch (Throwable t) {
> lastLoggedAt.updateAndGet(lastLoggedAt -> {
> long now = System.currentTimeMillis();
> long elapsed = now - lastLoggedAt;
> if (elapsed > TimeUnit.MINUTES.toMillis(1)) {
> LOG.fatal("Can't connect to ElasticSearch server: " + connectionString(), t);
> return now;
> } else {
> return lastLoggedAt;
> }
> });
> return null;
> }
> }
> }{code}
> The root cause of the error is that when the Elasticsearch cluster is running
> with a single node, the parsed entity is not null, which leads to an
> exception during the parsing of the BytesRestResponse class because
> LogManager.getLogger cannot find the log4j dependency. As a result, the
> RestHighLevelClient is initialized each time an audit log is written,
> creating a large number of threads and causing thread leaks, eventually
> preventing the Kafka cluster from functioning properly,The code is as follows:
> {code:java}
> package org.elasticsearch.client;
> protected final ElasticsearchStatusException
> parseResponseException(ResponseException responseException) {
> Response response = responseException.getResponse();
> HttpEntity entity = response.getEntity();
> ElasticsearchStatusException elasticsearchException;
> RestStatus restStatus =
> RestStatus.fromCode(response.getStatusLine().getStatusCode());
> if (entity == null) {
> elasticsearchException = new ElasticsearchStatusException(
> responseException.getMessage(), restStatus,
> responseException);
> } else {
> try {
> elasticsearchException = parseEntity(entity,
> BytesRestResponse::errorFromXContent);
> elasticsearchException.addSuppressed(responseException);
> } catch (Exception e) {
> elasticsearchException = new ElasticsearchStatusException("Unable
> to parse response body", restStatus, responseException);
> elasticsearchException.addSuppressed(e);
> }
> }
> return elasticsearchException;
> } {code}
> {code:java}
> package org.elasticsearch.rest;
> private static final Logger SUPPRESSED_ERROR_LOGGER =
> LogManager.getLogger("rest.suppressed"); {code}
> Full error attached:
> {code:java}
> FATAL Can't connect to ElasticSearch server: User:elastic,
> http://xxx,xxx,xxx:9201/ranger_audits
> (org.apache.ranger.audit.destination.ElasticSearchAuditDestination)
> java.lang.NoClassDefFoundError: org/apache/logging/log4j/LogManager
> at
> org.elasticsearch.rest.BytesRestResponse.<clinit>(BytesRestResponse.java:120)
> at
> org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793)
> at
> org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1770)
> at
> org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527)
> at
> org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484)
> at
> org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454)
> at org.elasticsearch.client.IndicesClient.open(IndicesClient.java:467)
> at
> org.apache.ranger.audit.destination.ElasticSearchAuditDestination.newClient(ElasticSearchAuditDestination.java:253)
> at
> org.apache.ranger.audit.destination.ElasticSearchAuditDestination.getClient(ElasticSearchAuditDestination.java:184)
> at
> org.apache.ranger.audit.destination.ElasticSearchAuditDestination.init(ElasticSearchAuditDestination.java:98)
> at
> org.apache.ranger.audit.provider.AuditProviderFactory.init(AuditProviderFactory.java:181)
> at
> org.apache.ranger.plugin.service.RangerBasePlugin.init(RangerBasePlugin.java:175)
> at
> org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.configure(RangerKafkaAuthorizer.java:118)
> at
> org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.configure(RangerKafkaAuthorizer.java:94)
> at
> kafka.security.authorizer.AuthorizerWrapper.configure(AuthorizerWrapper.scala:40)
> at kafka.server.KafkaServer.$anonfun$startup$4(KafkaServer.scala:297)
> at kafka.server.KafkaServer.$anonfun$startup$4$adapted(KafkaServer.scala:297)
> at scala.Option.foreach(Option.scala:437)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:297)
> at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
> at kafka.Kafka$.main(Kafka.scala:84)
> at kafka.Kafka.main(Kafka.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.logging.log4j.LogManager
> at java.lang.ClassLoader.findClass(ClassLoader.java:530)
> at
> org.apache.ranger.plugin.classloader.RangerPluginClassLoader$MyClassLoader.findClass(RangerPluginClassLoader.java:290)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at
> org.apache.ranger.plugin.classloader.RangerPluginClassLoader.loadClass(RangerPluginClassLoader.java:132)
> ... 22 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)