Hi Juan,

to summarize and clarify various emails: currently, you can only use Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant bit of code is in the Hadoop security module: [1]. Here you can see that we either use keytab.

I think we should be able to extend this to also work with delegation tokens (DTs). In Spark, how do you pass the DTs to the system as a user?

Best,
Aljoscha

[1] https://github.com/apache/flink/blob/64e2f27640946bf3b1608d4d85585fe18891dcee/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L68

On 06.01.20 09:52, Yang Wang wrote:
I guess you have set some kerberos related configuration in spark jobs. For
Flink, you need
to do this too by the following configs. And the keytab file should existed
on Flink client. In your
environment, it means the scheduler(oozie) could access the keytab file.

security.kerberos.login.keytab
security.kerberos.login.principal
security.kerberos.login.contexts



Best,
Yang

Juan Gentile <j.gent...@criteo.com> 于2020年1月6日周一 下午3:55写道:

Hello Rong, Chesnay,



Thank you for your answer, the way we are trying to launch the job is
through a scheduler (similar to oozie) where we have a keytab for the
scheduler user and with that keytab we get delegation tokens impersonating
the right user (owner of the job). But the only way I was able to make this
work is by getting a ticket (through kinit).

As a comparison, if I launch a spark job (without doing kinit) just with
the delegation tokens, it works okay. So I guess Spark does something extra.

This is as far as I could go but at this point I’m not sure if this is
something just not supported by Flink or I’m doing something wrong.



Thank you,

Juan



*From: *Rong Rong <walter...@gmail.com>
*Date: *Saturday, January 4, 2020 at 6:06 PM
*To: *Chesnay Schepler <ches...@apache.org>
*Cc: *Juan Gentile <j.gent...@criteo.com>, "user@flink.apache.org" <
user@flink.apache.org>, Oleksandr Nitavskyi <o.nitavs...@criteo.com>
*Subject: *Re: Yarn Kerberos issue



Hi Juan,



Chesnay was right. If you are using CLI to launch your session cluster
based on the document [1], you following the instruction to use kinit [2]
first seems to be one of the right way to go.

Another way of approaching it is to setup the kerberos settings in the
flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up your
keytab files and run the CLI securely.



As far as I know the option `security.kerberos.login.use-ticket-cache`
doesn't actually change the behavior of the authentication process, it is
more of a hint whether to use the ticket cache instantiated by `kinit`. If
you disable using the ticket cache, you will have to use the
"keytab/principle" approach - this doc [4] might be helpful to explain
better.



Thanks,

Rong





[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#start-flink-session
<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&data=02%7C01%7Cj.gentile%40criteo.com%7C62d8034f25d94e52ccb008d791387e31%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637137544137797845&sdata=d6QcPdLPDolW0Nv4jo469HyxP99E5mEGBOUjVw219a0%3D&reserved=0>

[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-kerberos.html#using-kinit-yarn-only
<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&data=02%7C01%7Cj.gentile%40criteo.com%7C62d8034f25d94e52ccb008d791387e31%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637137544137807833&sdata=pPCm4%2BNzJ6oQ0tA5%2B7uSLR3BuGAxbJiCI8xs1nc355Y%3D&reserved=0>

[3]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-kerberos.html#yarnmesos-mode
<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&data=02%7C01%7Cj.gentile%40criteo.com%7C62d8034f25d94e52ccb008d791387e31%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637137544137807833&sdata=rsVxD%2B3QteiNPaIRQriF3lTKV22Rxk7TyU0hbCDr9pk%3D&reserved=0>

[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#enabling-kerberos-authentication-for-versions-09-and-above-only
<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&data=02%7C01%7Cj.gentile%40criteo.com%7C62d8034f25d94e52ccb008d791387e31%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637137544137807833&sdata=urJNklA%2Bz9IVV7k0H%2Fp8k5NWm526wHcXiw2qY4tiI7g%3D&reserved=0>



On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler <ches...@apache.org>
wrote:

 From what I understand from the documentation, if you want to use
delegation tokens you always first have to issue a ticket using kinit; so
you did everything correctly?



On 02/01/2020 13:00, Juan Gentile wrote:

Hello,



Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying
to use delegation tokens and I’m getting the following error:

*org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
deploy Yarn session cluster*

*               at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)*

*               at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)*

*               at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)*

*               at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)*

*               at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)*

*               at java.security.AccessController.doPrivileged(Native
Method)*

*               at javax.security.auth.Subject.doAs(Subject.java:422)*

*               at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*

*               at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)*

*               at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)*

*Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
Delegation Token can be issued only with kerberos or web authentication*

*               at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)*

*               at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)*

*               at
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)*

*               at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)*

*               at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*

*               at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)*

*               at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)*

*               at
org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)*

*               at
org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)*

*               at java.security.AccessController.doPrivileged(Native
Method)*

*               at javax.security.auth.Subject.doAs(Subject.java:422)*

*               at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*

*               at
org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215) *

*                at org.apache.hadoop.ipc.Client.call(Client.java:1472)*

*               at org.apache.hadoop.ipc.Client.call(Client.java:1409)*

*               at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)*

*               at com.sun.proxy.$Proxy18.getDelegationToken(Unknown
Source)*

*               at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)*

*               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)*

*               at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*

*               at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*

*               at java.lang.reflect.Method.invoke(Method.java:498)*

*               at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)*

*               at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)*

*               at com.sun.proxy.$Proxy19.getDelegationToken(Unknown
Source)*

*               at
org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)*

*               at
org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)*

*               at
org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)*

*               at
org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)*

*               at
org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)*

*               at
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)*

*               at
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)*

*               at
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)*

*               at
org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)*

*               at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)*

*               at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)*

*               at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)*



The kerberos configuration in this case is the default one. Then I tried
with this option set to false ‘security.kerberos.login.use-ticket-cache‘
but I get the same error.

I was able to solve the problem by issuing a ticket (with kinit) but I’d
like to know if it’s possible to make flink work with delegation tokens and
if so what is the right config.



Thank you,

Juan





Reply via email to