Greg Senia created ATLAS-1647:
---------------------------------

             Summary: AtlasHook does not work with Oozie Sqoop Action or with 
Original HiveAction
                 Key: ATLAS-1647
                 URL: https://issues.apache.org/jira/browse/ATLAS-1647
             Project: Atlas
          Issue Type: Bug
            Reporter: Greg Senia
             Fix For: 0.7-incubating, 0.8-incubating, 0.7.1-incubating, 
0.6-incubating


Doing some testing with Atlas 0.7.x AtlasHook does not place messages onto the 
Kafka queues correctly when the SqoopAction or HiveAction executes from within 
a secure Oozie Context because the job is running within the cluster and is 
using Delegation Token's which can be turned back into a UGI context by doing a 
UserGroupInformation.loginUserFromSubject. Problem is Kafka does not support 
UGI or Java Subject... 

AtlasHook class:
        if (!(isLoginKeytabBased())){
            if (isLoginTicketBased()) {
                
InMemoryJAASConfiguration.setConfigSectionRedirect("KafkaClient", 
"ticketBased-KafkaClient");
                LOG.info("TicketBased=true Kafka");
            } else {
                LOG.info("TicketBased=false and KeyTabBased=false Kafka");
                 AccessControlContext context = AccessController.getContext();
                 Subject subject = Subject.getSubject(context);
                 if (subject == null) {
                     LOG.info("No Subject Available");
                 } else {
                         try {
                                
UserGroupInformation.loginUserFromSubject(subject);


Example of log output showing debug from Oozie Sqoop Action:
1 [main] INFO  org.apache.sqoop.mapreduce.ImportJobBase  - Publishing Hive/Hcat 
import job data to Listeners
33181 [main] INFO  org.apache.sqoop.mapreduce.ImportJobBase  - Publishing 
Hive/Hcat import job data to Listeners
33196 [main] INFO  org.apache.atlas.ApplicationProperties  - Looking for 
atlas-application.properties in classpath
33196 [main] INFO  org.apache.atlas.ApplicationProperties  - Loading 
atlas-application.properties from 
file:/gss/hadoop/diska/hadoop/yarn/local/usercache/gss2002/appcache/application_1488823620014_0005/container_e135_1488823620014_0005_01_000002/atlas-application.properties
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - Configuration 
loaded:
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.authentication.method.kerberos = True
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - atlas.cluster.name 
= tech
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.hook.hive.keepAliveTime = 10
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.hook.hive.maxThreads = 5
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.hook.hive.minThreads = 5
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.hook.hive.numRetries = 3
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.hook.hive.queueSize = 1000
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.hook.hive.synchronous = false
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.jaas.KafkaClient.loginModuleControlFlag = required
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.jaas.KafkaClient.loginModuleName = 
com.sun.security.auth.module.Krb5LoginModule
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.jaas.KafkaClient.option.renewTicket = True
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.jaas.KafkaClient.option.serviceName = kafka
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.jaas.KafkaClient.option.storeKey = false
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.jaas.KafkaClient.option.useKeyTab = false
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.jaas.KafkaClient.option.useTicketCache = True
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.kafka.bootstrap.servers = ha21t55mn.tech.hdp.example.com:6667
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.kafka.hook.group.id = atlas
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.kafka.sasl.kerberos.service.name = kafka
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.kafka.security.protocol = PLAINTEXTSASL
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.kafka.zookeeper.connect = [ha21t53mn.tech.hdp.example.com:2181, 
ha21t51mn.tech.hdp.example.com:2181, ha21t52mn.tech.hdp.example.com:2181]
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.kafka.zookeeper.connection.timeout.ms = 200
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.kafka.zookeeper.session.timeout.ms = 400
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.kafka.zookeeper.sync.time.ms = 20
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.notification.create.topics = True
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.notification.replicas = 1
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - 
atlas.notification.topics = [ATLAS_HOOK, ATLAS_ENTITIES]
33214 [main] DEBUG org.apache.atlas.ApplicationProperties  - atlas.rest.address 
= http://ha21t55mn.tech.hdp.example.com:21000
33215 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration  - ==> 
InMemoryJAASConfiguration.init()
33217 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration  - ==> 
InMemoryJAASConfiguration.init()
33220 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration  - ==> 
InMemoryJAASConfiguration.initialize()
33223 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration  - 
Adding client: [KafkaClient{-1}]
        loginModule: [com.sun.security.auth.module.Krb5LoginModule]
        controlFlag: [LoginModuleControlFlag: required]
        Options:  [storeKey] => [false]
        Options:  [renewTicket] => [True]
        Options:  [useKeyTab] => [false]
        Options:  [serviceName] => [kafka]
        Options:  [useTicketCache] => [True]

33223 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration  - <== 
InMemoryJAASConfiguration.initialize({KafkaClient=[javax.security.auth.login.AppConfigurationEntry@669c2b07]})
33223 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration  - <== 
InMemoryJAASConfiguration.init()
33223 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration  - <== 
InMemoryJAASConfiguration.init()
33239 [main] INFO  org.apache.atlas.hook.AtlasHook  - gss TicketBased=false and 
KeyTabBased=false Kafka
33240 [main] DEBUG org.apache.hadoop.security.UserGroupInformation  - hadoop 
login
33240 [main] DEBUG org.apache.hadoop.security.UserGroupInformation  - hadoop 
login commit
33240 [main] DEBUG org.apache.hadoop.security.UserGroupInformation  - using 
existing subject:[gss2002, UnixPrincipal: gss2002, UnixNumericUserPrincipal: 
190186246, UnixNumericGroupPrincipal [Primary Group]: 190000513, 
UnixNumericGroupPrincipal [Supplementary Group]: 190172138, 
UnixNumericGroupPrincipal [Supplementary Group]: 190172480, 
UnixNumericGroupPrincipal [Supplementary Group]: 190179404, 
UnixNumericGroupPrincipal [Supplementary Group]: 190180058, 
UnixNumericGroupPrincipal [Supplementary Group]: 190180097, 
UnixNumericGroupPrincipal [Supplementary Group]: 190180140, 
UnixNumericGroupPrincipal [Supplementary Group]: 190190874]
33240 [main] DEBUG org.apache.hadoop.security.UserGroupInformation  - Assuming 
keytab is managed externally since logged in from subject.
33240 [main] DEBUG org.apache.hadoop.security.UserGroupInformation  - Reading 
credentials from location set in HADOOP_TOKEN_FILE_LOCATION: 
/gss/hadoop/diska/hadoop/yarn/local/usercache/gss2002/appcache/application_1488823620014_0005/container_e135_1488823620014_0005_01_000002/container_tokens
33240 [main] DEBUG org.apache.hadoop.security.UserGroupInformation  - Loaded 6 
tokens
33241 [main] DEBUG org.apache.hadoop.security.UserGroupInformation  - UGI 
loginUser:gss2002 (auth:KERBEROS)
33435 [main] INFO  org.apache.atlas.hook.AtlasHook  - Created Atlas Hook
34062 [IPC Client (1267105885) connection to /10.70.41.7:43513 from 
job_1488823620014_0005] DEBUG org.apache.hadoop.security.SaslRpcClient  - 
reading next wrapped RPC packet
34062 [IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client  - 
IPC Client (1267105885) connection to /10.70.41.7:43513 from 
job_1488823620014_0005 sending #1563
34062 [IPC Parameter Sending Thread #0] DEBUG 
org.apache.hadoop.security.SaslRpcClient  - wrapping token of length:264
34063 [IPC Client (1267105885) connection to /10.70.41.7:43513 from 
job_1488823620014_0005] DEBUG org.apache.hadoop.security.SaslRpcClient  - 
unwrapping token of length:62
34063 [IPC Client (1267105885) connection to /10.70.41.7:43513 from 
job_1488823620014_0005] DEBUG org.apache.hadoop.ipc.Client  - IPC Client 
(1267105885) connection to /10.70.41.7:43513 from job_1488823620014_0005 got 
value #1563
34063 [communication thread] DEBUG org.apache.hadoop.ipc.RPC  - Call: ping 2
34435 [main] INFO  org.apache.kafka.clients.producer.ProducerConfig  - 
ProducerConfig values: 
        metric.reporters = []


Failure Snippit:
36504 [main] ERROR org.apache.atlas.hook.AtlasHook  - Failed to notify atlas 
for entity [[{Id='(type: sqoop_dbdatastore, id: <unassigned>)', traits=[], 
values={owner=gss2002, storeUri=jdbc:oracle:thin:

Excluded secure information 

, storeUse=TABLE}}, name=sqoop 

excluded secure information

 --hive-cluster tech, startTime=Mon Mar 06 14:32:22 EST 2017, endTime=Mon Mar 
06 14:32:51 EST 2017, userName=gss2002, operation=import}}]] after 3 retries. 
Quitting
35491 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name bufferpool-wait-time
35491 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name buffer-exhausted-records
35491 [main] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster 
metadata version 1 to Cluster(nodes = [ha21t55mn.tech.hdp.example.com:6667 (id: 
-1 rack: null)], partitions = [])
35491 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration  - ==> 
InMemoryJAASConfiguration.getAppConfigurationEntry(KafkaClient)
35491 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration  - <== 
InMemoryJAASConfiguration.getAppConfigurationEntry(KafkaClient): 
{javax.security.auth.login.AppConfigurationEntry@669c2b07}
35491 [main] DEBUG org.apache.kafka.common.security.authenticator.AbstractLogin 
 - System property 'java.security.auth.login.config' is not set, using default 
JAAS configuration.
35491 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration  - ==> 
InMemoryJAASConfiguration.getAppConfigurationEntry(KafkaClient)
35491 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration  - <== 
InMemoryJAASConfiguration.getAppConfigurationEntry(KafkaClient): 
{javax.security.auth.login.AppConfigurationEntry@669c2b07}
35491 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration  - ==> 
InMemoryJAASConfiguration.getAppConfigurationEntry(KafkaClient)
35492 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration  - <== 
InMemoryJAASConfiguration.getAppConfigurationEntry(KafkaClient): 
{javax.security.auth.login.AppConfigurationEntry@669c2b07}
35492 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer  - Closing 
the Kafka producer with timeoutMillis = 0 ms.
35492 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer  - The Kafka 
producer has closed.
35492 [main] ERROR org.apache.atlas.hook.AtlasHook  - Failed to send 
notification - attempt #2; error=Failed to construct kafka producer
35492 [main] DEBUG org.apache.atlas.hook.AtlasHook  - Sleeping for 1000 ms 
before retry
36501 [main] INFO  org.apache.kafka.clients.producer.ProducerConfig  - 
ProducerConfig values: 
        metric.reporters = []
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:188)
        at 
org.apache.atlas.kafka.KafkaNotification.createProducer(KafkaNotification.java:311)
        at 
org.apache.atlas.kafka.KafkaNotification.sendInternal(KafkaNotification.java:220)
        at 
org.apache.atlas.notification.AbstractNotification.send(AbstractNotification.java:84)
        at 
org.apache.atlas.hook.AtlasHook.notifyEntitiesInternal(AtlasHook.java:158)
        at org.apache.atlas.hook.AtlasHook.notifyEntities(AtlasHook.java:143)
        at org.apache.atlas.sqoop.hook.SqoopHook.publish(SqoopHook.java:177)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to