[ https://issues.apache.org/jira/browse/FLINK-20377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu updated FLINK-20377: ---------------------------- Fix Version/s: (was: 1.12.0) > flink-1.11.2 -kerberos config on kafka connector not working > ------------------------------------------------------------ > > Key: FLINK-20377 > URL: https://issues.apache.org/jira/browse/FLINK-20377 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Table SQL / Ecosystem > Affects Versions: 1.11.2 > Environment: flink on yarn > kafka with kerberos > flink-1.11.2_2.11 > Reporter: 谢波 > Priority: Major > > I refer to the configuration on the official website to configure Kafka and > flink-conf.yaml ,but the configuration does not work. > my table config : > WITH ( > 'connector' = 'kafka', > 'properties.bootstrap.servers' = 'xxxxxxxx', > 'topic' = 'kafka_hepecc_ekko_cut_json', > 'properties.group.id' = 'ekko.group', > 'properties.security.protocol' = 'SASL_PLAINTEXT', > 'properties.sasl.kerberos.service.name' = 'kafka', > – 'properties.sasl.mechanism' = 'GSSAPI', > 'format' = 'json' > ); > yaml: > security.kerberos.login.use-ticket-cache: false > security.kerberos.login.keytab: /home/xiebo/module/flink/keytab/xiebo.keytab > security.kerberos.login.principal: xi...@xxx.cn > # The configuration below defines which JAAS login contexts > security.kerberos.login.contexts: Client,KafkaClient > > dir content: > [xiebo@ww021 keytab]$ pwd > /home/xiebo/module/flink/keytab > [xiebo@ww021 keytab]$ ll > total 12 > -rw-r--r-- 1 xiebo bigdata_dev 486 Nov 26 18:15 kafka_client_jaas.conf > -rw-r--r-- 1 xiebo bigdata_dev 568 Nov 26 14:10 krb5.conf > -rw-r--r-- 1 xiebo bigdata_dev 436 Nov 26 15:14 xiebo.keytab > > I get an error: > > 2020-11-26 19:01:55 > org.apache.kafka.common.KafkaException: Failed to construct kafka producer > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1141) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1242) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1238) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:940) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1111) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.KafkaException: > javax.security.auth.login.LoginException: Unable to obtain password from user > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158) > at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146) > at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67) > at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99) > at > org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:450) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:421) > ... 23 more > Caused by: javax.security.auth.login.LoginException: Unable to obtain > password from user > at > com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:901) > at > com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:764) > at > com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755) > at > javax.security.auth.login.LoginContext.access$000(LoginContext.java:195) > at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) > at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) > at java.security.AccessController.doPrivileged(Native Method) > at > javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680) > at javax.security.auth.login.LoginContext.login(LoginContext.java:587) > at > org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60) > at > org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:103) > at > org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62) > at > org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112) > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:147) > > reference : > [https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-kerberos.html] > [https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#auth-with-external-systems] > [https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#enabling-kerberos-authentication] > -- This message was sent by Atlassian Jira (v8.3.4#803005)