[
https://issues.apache.org/jira/browse/RANGER-5222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenyue Li resolved RANGER-5222.
-------------------------------
Fix Version/s: 2.6.0
Resolution: Fixed
This was fixed in the released version 2.6, and it no longer causes thread
leaks.
{code:java}
private RestHighLevelClient newClient() {
RestHighLevelClient restHighLevelClient = null; try {
if (StringUtils.isNotBlank(user) &&
StringUtils.isNotBlank(password) && password.contains("keytab") && new
File(password).exists()) {
subject = CredentialsProviderUtil.login(user, password);
}
RestClientBuilder restClientBuilder = getRestClientBuilder(hosts,
protocol, user, password, port);
restHighLevelClient = new RestHighLevelClient(restClientBuilder);
boolean exists = false; try {
exists = restHighLevelClient.indices().open(new
OpenIndexRequest(this.index), RequestOptions.DEFAULT).isShardsAcknowledged();
} catch (Exception e) {
LOG.warn("Error validating index {}", this.index);
} if (exists) {
LOG.debug("Index exists");
} else {
LOG.info("Index does not exist");
} return restHighLevelClient;
} catch (Throwable t) {
lastLoggedAt.updateAndGet(lastLoggedAt -> {
long now = System.currentTimeMillis();
long elapsed = now - lastLoggedAt;
if (elapsed > TimeUnit.MINUTES.toMillis(1)) {
LOG.error("Can't connect to ElasticSearch server: {}",
connectionString(), t);
return now;
} else {
return lastLoggedAt;
}
}); if (restHighLevelClient != null) {
try {
restHighLevelClient.close();
LOG.debug("Closed RestHighLevelClient after failure");
} catch (IOException e) {
LOG.warn("Error closing RestHighLevelClient: {}",
e.getMessage(), e);
}
} return null;
}
}
{code}
> The Kafka plugin throws a NoClassDefFoundError due to a missing log4j
> dependency.
> ---------------------------------------------------------------------------------
>
> Key: RANGER-5222
> URL: https://issues.apache.org/jira/browse/RANGER-5222
> Project: Ranger
> Issue Type: Bug
> Components: audit
> Affects Versions: 2.1.0
> Reporter: Wenyue Li
> Priority: Major
> Fix For: 2.6.0
>
> Original Estimate: 96h
> Remaining Estimate: 96h
>
> When I set {{xasecure.audit.destination.elasticsearch}} to {{true}} in the
> Kafka plugin configuration, if only one Elasticsearch node is online or
> Elasticsearch is not functioning properly, an exception is thrown during the
> initialization of the {{RestHighLevelClient}} class, resulting in a {{null}}
> return value,The code is as follows:
> {code:java}
> package org.apache.ranger.audit.destination;
> public class ElasticSearchAuditDestination extends AuditDestination {
> private RestHighLevelClient newClient() {
> try {
> if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) &&
> password.contains("keytab") && new File(password).exists()) {
> subject = CredentialsProviderUtil.login(user, password);
> }
> RestClientBuilder restClientBuilder =
> getRestClientBuilder(hosts, protocol, user, password, port);
> RestHighLevelClient restHighLevelClient = new
> RestHighLevelClient(restClientBuilder);
> if (LOG.isDebugEnabled()) {
> LOG.debug("Initialized client");
> }
> boolean exits = false;
> try {
> exits = restHighLevelClient.indices().open(new OpenIndexRequest(this.index),
> RequestOptions.DEFAULT).isShardsAcknowledged();
> } catch (Exception e) {
> LOG.warn("Error validating index " + this.index);
> }
> if(exits) {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Index exists");
> }
> } else {
> LOG.info("Index does not exist");
> }
> return restHighLevelClient;
> } catch (Throwable t) {
> lastLoggedAt.updateAndGet(lastLoggedAt -> {
> long now = System.currentTimeMillis();
> long elapsed = now - lastLoggedAt;
> if (elapsed > TimeUnit.MINUTES.toMillis(1)) {
> LOG.fatal("Can't connect to ElasticSearch server: " + connectionString(), t);
> return now;
> } else {
> return lastLoggedAt;
> }
> });
> return null;
> }
> }
> }{code}
> The root cause of the error is that when the Elasticsearch cluster is running
> with a single node, the parsed entity is not null, which leads to an
> exception during the parsing of the BytesRestResponse class because
> LogManager.getLogger cannot find the log4j dependency. As a result, the
> RestHighLevelClient is initialized each time an audit log is written,
> creating a large number of threads and causing thread leaks, eventually
> preventing the Kafka cluster from functioning properly,The code is as follows:
> {code:java}
> package org.elasticsearch.client;
> protected final ElasticsearchStatusException
> parseResponseException(ResponseException responseException) {
> Response response = responseException.getResponse();
> HttpEntity entity = response.getEntity();
> ElasticsearchStatusException elasticsearchException;
> RestStatus restStatus =
> RestStatus.fromCode(response.getStatusLine().getStatusCode());
> if (entity == null) {
> elasticsearchException = new ElasticsearchStatusException(
> responseException.getMessage(), restStatus,
> responseException);
> } else {
> try {
> elasticsearchException = parseEntity(entity,
> BytesRestResponse::errorFromXContent);
> elasticsearchException.addSuppressed(responseException);
> } catch (Exception e) {
> elasticsearchException = new ElasticsearchStatusException("Unable
> to parse response body", restStatus, responseException);
> elasticsearchException.addSuppressed(e);
> }
> }
> return elasticsearchException;
> } {code}
> {code:java}
> package org.elasticsearch.rest;
> private static final Logger SUPPRESSED_ERROR_LOGGER =
> LogManager.getLogger("rest.suppressed"); {code}
> Full error attached:
> {code:java}
> FATAL Can't connect to ElasticSearch server: User:elastic,
> http://xxx,xxx,xxx:9201/ranger_audits
> (org.apache.ranger.audit.destination.ElasticSearchAuditDestination)
> java.lang.NoClassDefFoundError: org/apache/logging/log4j/LogManager
> at
> org.elasticsearch.rest.BytesRestResponse.<clinit>(BytesRestResponse.java:120)
> at
> org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793)
> at
> org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1770)
> at
> org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527)
> at
> org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484)
> at
> org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454)
> at org.elasticsearch.client.IndicesClient.open(IndicesClient.java:467)
> at
> org.apache.ranger.audit.destination.ElasticSearchAuditDestination.newClient(ElasticSearchAuditDestination.java:253)
> at
> org.apache.ranger.audit.destination.ElasticSearchAuditDestination.getClient(ElasticSearchAuditDestination.java:184)
> at
> org.apache.ranger.audit.destination.ElasticSearchAuditDestination.init(ElasticSearchAuditDestination.java:98)
> at
> org.apache.ranger.audit.provider.AuditProviderFactory.init(AuditProviderFactory.java:181)
> at
> org.apache.ranger.plugin.service.RangerBasePlugin.init(RangerBasePlugin.java:175)
> at
> org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.configure(RangerKafkaAuthorizer.java:118)
> at
> org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer.configure(RangerKafkaAuthorizer.java:94)
> at
> kafka.security.authorizer.AuthorizerWrapper.configure(AuthorizerWrapper.scala:40)
> at kafka.server.KafkaServer.$anonfun$startup$4(KafkaServer.scala:297)
> at kafka.server.KafkaServer.$anonfun$startup$4$adapted(KafkaServer.scala:297)
> at scala.Option.foreach(Option.scala:437)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:297)
> at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
> at kafka.Kafka$.main(Kafka.scala:84)
> at kafka.Kafka.main(Kafka.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.logging.log4j.LogManager
> at java.lang.ClassLoader.findClass(ClassLoader.java:530)
> at
> org.apache.ranger.plugin.classloader.RangerPluginClassLoader$MyClassLoader.findClass(RangerPluginClassLoader.java:290)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at
> org.apache.ranger.plugin.classloader.RangerPluginClassLoader.loadClass(RangerPluginClassLoader.java:132)
> ... 22 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)