This is an automated email from the ASF dual-hosted git repository. nickallen pushed a commit to branch feature/METRON-2088-support-hdp-3.1 in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push: new 0a619e0 METRON-2297 Enrichment Topology Unable to Load Geo IP Data from HDFS (nickwallen) closes apache/metron#1539 0a619e0 is described below commit 0a619e082f34d65269f03ed4c6e16ba8bb5a0dbb Author: nickwallen <nickal...@apache.org> AuthorDate: Wed Oct 23 15:39:19 2019 -0400 METRON-2297 Enrichment Topology Unable to Load Geo IP Data from HDFS (nickwallen) closes apache/metron#1539 --- metron-analytics/metron-profiler-storm/pom.xml | 5 + .../CURRENT/configuration/metron-security-env.xml | 10 +- .../CURRENT/package/scripts/params/params_linux.py | 7 +- .../storm/security/auth/kerberos/AutoTGT.java | 254 +++++++++++++++++++++ .../metron-elasticsearch-storm/pom.xml | 5 + .../metron-enrichment-storm/pom.xml | 5 + .../metron-indexing/metron-indexing-storm/pom.xml | 5 + metron-platform/metron-pcap-backend/pom.xml | 5 + .../metron-solr/metron-solr-storm/pom.xml | 5 + 9 files changed, 299 insertions(+), 2 deletions(-) diff --git a/metron-analytics/metron-profiler-storm/pom.xml b/metron-analytics/metron-profiler-storm/pom.xml index 84c0631..da049ac 100644 --- a/metron-analytics/metron-profiler-storm/pom.xml +++ b/metron-analytics/metron-profiler-storm/pom.xml @@ -148,6 +148,11 @@ </dependency> <dependency> <groupId>org.apache.metron</groupId> + <artifactId>metron-common-storm</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> <artifactId>metron-writer-storm</artifactId> <version>${project.parent.version}</version> <exclusions> diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-security-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-security-env.xml index beb0451..fc91a4a 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-security-env.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-security-env.xml @@ -159,7 +159,6 @@ </value-attributes> <on-ambari-upgrade add="true"/> </property> - <property> <name>metron.ldap.ssl.truststore</name> <display-name>LDAP Truststore</display-name> @@ -240,4 +239,13 @@ <description>Name of the role at the authentication provider that provides administrative access to Metron.</description> <on-ambari-upgrade add="true"/> </property> + <property> + <name>topology_auto_credentials</name> + <display-name>Topology Auto Credentials</display-name> + <description>The value of Storm's `topology.auto-credentials`. A list of plugins used to unpack credentials on the Storm worker. This value is only used when Kerberos has been enabled.</description> + <value>['org.apache.metron.storm.security.auth.kerberos.AutoTGT']</value> + <value-attributes> + <empty-value-valid>true</empty-value-valid> + </value-attributes> + </property> </configuration> \ No newline at end of file diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py index b7fbcf4..d0026dd 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py @@ -258,7 +258,12 @@ profiler_topology_worker_childopts = client_jaas_arg if security_enabled else '' indexing_topology_worker_childopts = client_jaas_arg if security_enabled else '' pcap_topology_worker_childopts = client_jaas_arg if security_enabled else '' metron_jvm_flags += (' ' + client_jaas_arg) if security_enabled else '' -topology_auto_credentials = config['configurations']['storm-site'].get('nimbus.credential.renewers.classes', []) + +# the user-defined `topology.auto-credentials` are only used if security is enabled +topology_auto_credentials = [] +if security_enabled: + topology_auto_credentials = config['configurations']['metron-security-env'].get('topology_auto_credentials', []) + # Needed for storm.config, because it needs Java String topology_auto_credentials_double_quotes = str(topology_auto_credentials).replace("'", '"') diff --git a/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/security/auth/kerberos/AutoTGT.java b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/security/auth/kerberos/AutoTGT.java new file mode 100644 index 0000000..adf0327 --- /dev/null +++ b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/security/auth/kerberos/AutoTGT.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.storm.security.auth.kerberos; + +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.IAutoCredentials; +import org.apache.storm.security.auth.ICredentialsRenewer; +import org.apache.storm.security.auth.kerberos.ClientCallbackHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.DestroyFailedException; +import javax.security.auth.RefreshFailedException; +import javax.security.auth.Subject; +import javax.security.auth.kerberos.KerberosPrincipal; +import javax.security.auth.kerberos.KerberosTicket; +import javax.security.auth.login.Configuration; +import javax.security.auth.login.LoginContext; +import javax.xml.bind.DatatypeConverter; +import java.lang.reflect.Method; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Automatically take a user's TGT, and push it, and renew it in Nimbus. + * + * <p>To allow a topology running in a Storm Supervisor to authenticate with Hadoop using + * the TGT pushed by Nimbus, this class should be configured as part of a topology's + * `topology.auto-credentials` parameter. + * + * <p>When using Storm's {@link org.apache.storm.security.auth.kerberos.AutoTGT}, Hadoop + * authentication fails because it is unable to dyanamically load Hadoop's + * {@link org.apache.hadoop.security.UserGroupInformation} class at runtime. This issue is + * avoided by using this custom AutoTGT implementation that is packaged in a topology's uber jar. + * + * <p>This work is derived from the {@link org.apache.storm.security.auth.kerberos.AutoTGT} class + * in storm-core version 1.2.1. + */ +public class AutoTGT implements IAutoCredentials, ICredentialsRenewer { + private static final Logger LOG = LoggerFactory.getLogger(AutoTGT.class); + private static final float TICKET_RENEW_WINDOW = 0.80f; + protected static final AtomicReference<KerberosTicket> kerbTicket = new AtomicReference<>(); + private Map conf; + + public void prepare(Map conf) { + this.conf = conf; + } + + private static KerberosTicket getTGT(Subject subject) { + Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class); + for(KerberosTicket ticket: tickets) { + KerberosPrincipal server = ticket.getServer(); + if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) { + return ticket; + } + } + return null; + } + + @Override + public void populateCredentials(Map<String, String> credentials) { + //Log the user in and get the TGT + try { + Configuration login_conf = AuthUtils.GetConfiguration(conf); + ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf); + + //login our user + Configuration.setConfiguration(login_conf); + LoginContext lc = new LoginContext(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler); + try { + lc.login(); + final Subject subject = lc.getSubject(); + KerberosTicket tgt = getTGT(subject); + + if (tgt == null) { //error + throw new RuntimeException("Fail to verify user principal with section \"" + +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf); + } + + if (!tgt.isForwardable()) { + throw new RuntimeException("The TGT found is not forwardable"); + } + + if (!tgt.isRenewable()) { + throw new RuntimeException("The TGT found is not renewable"); + } + + LOG.info("Pushing TGT for "+tgt.getClient()+" to topology."); + saveTGT(tgt, credentials); + } finally { + lc.logout(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void saveTGT(KerberosTicket tgt, Map<String, String> credentials) { + try { + + byte[] bytes = AuthUtils.serializeKerberosTicket(tgt); + credentials.put("TGT", DatatypeConverter.printBase64Binary(bytes)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static KerberosTicket getTGT(Map<String, String> credentials) { + KerberosTicket ret = null; + if (credentials != null && credentials.containsKey("TGT") && credentials.get("TGT") != null) { + ret = AuthUtils.deserializeKerberosTicket(DatatypeConverter.parseBase64Binary(credentials.get("TGT"))); + } + return ret; + } + + @Override + public void updateSubject(Subject subject, Map<String, String> credentials) { + populateSubjectWithTGT(subject, credentials); + } + + @Override + public void populateSubject(Subject subject, Map<String, String> credentials) { + populateSubjectWithTGT(subject, credentials); + loginHadoopUser(subject); + } + + private void populateSubjectWithTGT(Subject subject, Map<String, String> credentials) { + KerberosTicket tgt = getTGT(credentials); + if (tgt != null) { + clearCredentials(subject, tgt); + subject.getPrincipals().add(tgt.getClient()); + kerbTicket.set(tgt); + } else { + LOG.info("No TGT found in credentials"); + } + } + + public static void clearCredentials(Subject subject, KerberosTicket tgt) { + Set<Object> creds = subject.getPrivateCredentials(); + synchronized(creds) { + Iterator<Object> iterator = creds.iterator(); + while (iterator.hasNext()) { + Object o = iterator.next(); + if (o instanceof KerberosTicket) { + KerberosTicket t = (KerberosTicket)o; + iterator.remove(); + try { + t.destroy(); + } catch (DestroyFailedException e) { + LOG.warn("Failed to destory ticket ", e); + } + } + } + if(tgt != null) { + creds.add(tgt); + } + } + } + + /** + * Hadoop does not just go off of a TGT, it needs a bit more. This + * should fill in the rest. + * @param subject the subject that should have a TGT in it. + */ + private void loginHadoopUser(Subject subject) { + final String clazz = "org.apache.hadoop.security.UserGroupInformation"; + Class<?> ugi; + try { + ugi = Class.forName(clazz); + } catch (ClassNotFoundException e) { + /* + * When using Storm's `org.apache.storm.security.auth.kerberos.AutoTGT` class, Hadoop + * authentication fails because it is unable to load Hadoop's UserGroupInformation class + * at runtime. This issue is avoided when using a custom AutoTGT implementation like + * this, packaged into a topology's uber jar. + */ + LOG.error("Hadoop authentication failed. Hadoop was not found on the class path. " + + "Unable to load '{}' because '{}'", clazz, e.getMessage(), e); + return; + } + try { + Method isSecEnabled = ugi.getMethod("isSecurityEnabled"); + if (!((Boolean)isSecEnabled.invoke(null))) { + LOG.warn("Hadoop is on the classpath but not configured for " + + "security, if you want security you need to be sure that " + + "hadoop.security.authentication=kerberos in core-site.xml " + + "in your jar"); + return; + } + Method login = ugi.getMethod("loginUserFromSubject", Subject.class); + login.invoke(null, subject); + } catch (Exception e) { + LOG.warn("Something went wrong while trying to initialize Hadoop through reflection. This version of hadoop may not be compatible.", e); + } + } + + private long getRefreshTime(KerberosTicket tgt) { + long start = tgt.getStartTime().getTime(); + long end = tgt.getEndTime().getTime(); + return start + (long) ((end - start) * TICKET_RENEW_WINDOW); + } + + @Override + public void renew(Map<String,String> credentials, Map<String, Object> topologyConf, String topologyOwnerPrincipal) { + KerberosTicket tgt = getTGT(credentials); + if (tgt != null) { + long refreshTime = getRefreshTime(tgt); + long now = System.currentTimeMillis(); + if (now >= refreshTime) { + try { + LOG.info("Renewing TGT for "+tgt.getClient()); + tgt.refresh(); + saveTGT(tgt, credentials); + } catch (RefreshFailedException e) { + LOG.warn("Failed to refresh TGT", e); + } + } + } + } + + public void renew(Map<String, String> credentials, Map topologyConf) { + throw new IllegalStateException("SHOULD NOT BE CALLED"); + } + + public static void main(String[] args) throws Exception { + AutoTGT at = new AutoTGT(); + Map conf = new java.util.HashMap(); + conf.put("java.security.auth.login.config", args[0]); + at.prepare(conf); + Map<String,String> creds = new java.util.HashMap<String,String>(); + at.populateCredentials(creds); + Subject s = new Subject(); + at.populateSubject(s, creds); + LOG.info("Got a Subject "+s); + } +} \ No newline at end of file diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml b/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml index 9fccf68..a84e45d 100644 --- a/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml +++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml @@ -100,6 +100,11 @@ </dependency> <dependency> <groupId>org.apache.metron</groupId> + <artifactId>metron-common-storm</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> <artifactId>metron-indexing-storm</artifactId> <version>${project.parent.version}</version> <exclusions> diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml b/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml index 0628609..f441daa 100644 --- a/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml +++ b/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml @@ -74,6 +74,11 @@ </dependency> <dependency> <groupId>org.apache.metron</groupId> + <artifactId>metron-common-storm</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> <artifactId>metron-writer-storm</artifactId> <version>${project.parent.version}</version> </dependency> diff --git a/metron-platform/metron-indexing/metron-indexing-storm/pom.xml b/metron-platform/metron-indexing/metron-indexing-storm/pom.xml index ce3fb30..aadd021 100644 --- a/metron-platform/metron-indexing/metron-indexing-storm/pom.xml +++ b/metron-platform/metron-indexing/metron-indexing-storm/pom.xml @@ -42,6 +42,11 @@ </dependency> <dependency> <groupId>org.apache.metron</groupId> + <artifactId>metron-common-storm</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> <artifactId>metron-writer-storm</artifactId> <version>${project.parent.version}</version> </dependency> diff --git a/metron-platform/metron-pcap-backend/pom.xml b/metron-platform/metron-pcap-backend/pom.xml index a9dd765..361ebaf 100644 --- a/metron-platform/metron-pcap-backend/pom.xml +++ b/metron-platform/metron-pcap-backend/pom.xml @@ -70,6 +70,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-common-storm</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${global_junit_version}</version> diff --git a/metron-platform/metron-solr/metron-solr-storm/pom.xml b/metron-platform/metron-solr/metron-solr-storm/pom.xml index da12b0e..ba46af3 100644 --- a/metron-platform/metron-solr/metron-solr-storm/pom.xml +++ b/metron-platform/metron-solr/metron-solr-storm/pom.xml @@ -90,6 +90,11 @@ </dependency> <dependency> <groupId>org.apache.metron</groupId> + <artifactId>metron-common-storm</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> <artifactId>metron-indexing-storm</artifactId> <version>${project.parent.version}</version> </dependency>