[ https://issues.apache.org/jira/browse/SPARK-31685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17255462#comment-17255462 ]
L. C. Hsieh commented on SPARK-31685: ------------------------------------- Is this particularly related to SS? The issue described for {{HadoopFSDelegationTokenProvider}} looks not SS specific. BTW, for the tokens obtained in {{getTokenRenewalInterval}}, looks like it is only used for getting renewal interval. Why it expires will cause the application failure? Spark should only use the first tokens in {{obtainDelegationTokens}}. > Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN > expiration issue > --------------------------------------------------------------------------------------- > > Key: SPARK-31685 > URL: https://issues.apache.org/jira/browse/SPARK-31685 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.4 > Environment: spark-2.4.4-bin-hadoop2.7 > Reporter: Rajeev Kumar > Priority: Major > > I am facing issue for spark-2.4.4-bin-hadoop2.7. I am using spark structured > streaming with Kafka. Reading the stream from Kafka and saving it to HBase. > I get this error on the driver after 24 hours. > > {code:java} > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > token (HDFS_DELEGATION_TOKEN token 6972072 for <user>) is expired > at org.apache.hadoop.ipc.Client.call(Client.java:1475) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771) > at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108) > at org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:130) > at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1169) > at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1165) > at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) > at > org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1171) > at org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1630) > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) > ... 1 more ApplicationMaster host: <host> > ApplicationMaster RPC port: <port> > queue: default > start time: <start time> > final status: FAILED > tracking URL: <tracking url> > user: <user> > {code} > > I am putting the logs from my application (after removing ip and username). > When application starts it prints this log. We can see it is creating the > HDFS_DELEGATION_TOKEN (token id = 6972072) > Driver Log - > {code:java} > 20/03/17 13:24:09 INFO DFSClient: Created HDFS_DELEGATION_TOKEN token 6972072 > for <user> on ha-hdfs:<name>20/03/17 13:24:09 INFO > HadoopFSDelegationTokenProvider: getting token for: > DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_631280530_1, > ugi=<user>@<abc.com> (auth:KERBEROS)]]20/03/17 13:24:09 INFO DFSClient: > Created HDFS_DELEGATION_TOKEN token 6972073 for <user> on > ha-hdfs:<name>20/03/17 13:24:09 INFO HadoopFSDelegationTokenProvider: Renewal > interval is 86400039 for token HDFS_DELEGATION_TOKEN20/03/17 13:24:10 DEBUG > HadoopDelegationTokenManager: Service hive does not require a token. Check > your configuration to see if security is disabled or not.20/03/17 13:24:11 > DEBUG HBaseDelegationTokenProvider: Attempting to fetch HBase security > token.20/03/17 13:24:12 INFO HBaseDelegationTokenProvider: Get token from > HBase: Kind: HBASE_AUTH_TOKEN, Service: 812777c29d67, Ident: > (org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier@d1)20/03/17 > 13:24:12 INFO AMCredentialRenewer: Scheduling login from keytab in 18.0 h. > {code} > After 18 hours as mentioned in log, it created new tokens also. Token number > is increased (7041621). > Driver logs - > {code:java} > 20/03/18 07:24:10 INFO AMCredentialRenewer: Attempting to login to KDC using > principal: <user>20/03/18 07:24:10 INFO AMCredentialRenewer: Successfully > logged into KDC.20/03/18 07:24:16 DEBUG HadoopFSDelegationTokenProvider: > Delegation token renewer is: rm/<host_name>@<abc.com>20/03/18 07:24:16 INFO > HadoopFSDelegationTokenProvider: getting token for: > DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-2072296893_22, > ugi=<user>@<abc.com> (auth:KERBEROS)]]20/03/18 07:24:16 INFO DFSClient: > Created HDFS_DELEGATION_TOKEN token 7041621 for <user> on > ha-hdfs:<name>20/03/18 07:24:16 DEBUG HadoopDelegationTokenManager: Service > hive does not require a token. Check your configuration to see if security is > disabled or not.20/03/18 07:24:16 DEBUG HBaseDelegationTokenProvider: > Attempting to fetch HBase security token.20/03/18 07:24:16 INFO > HBaseDelegationTokenProvider: Get token from HBase: Kind: HBASE_AUTH_TOKEN, > Service: 812777c29d67, Ident: > (org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier@102)20/03/18 > 07:24:16 INFO AMCredentialRenewer: Scheduling login from keytab in 18.0 > h.20/03/18 07:24:16 INFO AMCredentialRenewer: Updating delegation > tokens.20/03/18 07:24:17 INFO SparkHadoopUtil: Updating delegation tokens for > current user.20/03/18 07:24:17 DEBUG SparkHadoopUtil: Adding/updating > delegation tokens List(Kind: HBASE_AUTH_TOKEN, Service: 812777c29d67, Ident: > (org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier@102); > org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier@102, > Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:<name>, Ident: > (HDFS_DELEGATION_TOKEN token 7041621 for <user>); HDFS_DELEGATION_TOKEN token > 7041621 for <user>; Renewer: yarn; Issued: 3/18/20 7:24 AM; Max Date: 3/25/20 > 7:24 AM)20/03/18 07:24:17 INFO SparkHadoopUtil: Updating delegation tokens > for current user.20/03/18 07:24:17 DEBUG SparkHadoopUtil: Adding/updating > delegation tokens List(Kind: HBASE_AUTH_TOKEN, Service: 812777c29d67, Ident: > (org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier@102); > org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier@102, > Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:<name>, Ident: > (HDFS_DELEGATION_TOKEN token 7041621 for <user>); HDFS_DELEGATION_TOKEN token > 7041621 for <user>; Renewer: yarn; Issued: 3/18/20 7:24 AM; Max Date: 3/25/20 > 7:24 AM) > {code} > Everything goes fine till 24 hours. After that I see LeaseRenewer exception. > But it is picking the older token number (6972072).This behaviour is same > even if I use "--conf spark.hadoop.fs.hdfs.impl.disable.cache=true" > Driver log - > > > {code:java} > 20/03/18 13:24:28 WARN Client: Exception encountered while connecting to the > server : > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > token (HDFS_DELEGATION_TOKEN token 6972072 for <user>) is expired20/03/18 > 13:24:28 WARN LeaseRenewer: Failed to renew lease for > [DFSClient_NONMAPREDUCE_631280530_1] for 30 seconds. Will retry shortly > ...org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > token (HDFS_DELEGATION_TOKEN token 6972072 for <user>) is expired at > org.apache.hadoop.ipc.Client.call(Client.java:1475) at > org.apache.hadoop.ipc.Client.call(Client.java:1412) at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy10.renewLease(Unknown Source) at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewLease(ClientNamenodeProtocolTranslatorPB.java:590) > at sun.reflect.GeneratedMethodAccessor59.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy11.renewLease(Unknown Source) at > org.apache.hadoop.hdfs.DFSClient.renewLease(DFSClient.java:892) at > org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:423) at > org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:448) at > org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71) > at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:304) at > java.lang.Thread.run(Thread.java:745) 20/03/18 13:24:29 WARN Client: > Exception encountered while connecting to the server : > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > token (HDFS_DELEGATION_TOKEN token 6972072 for <user>) is expired > {code} > > Now, when the application receives new stream, it fails with the error - > Driver log - > > {code:java} > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > token (HDFS_DELEGATION_TOKEN token 6972072 for <user>) is expired > {code} > > I can see below is executor log. Nothing abnormal. > {code:java} > INFO CoarseGrainedExecutorBackend: Received tokens of 330 bytes > INFO SparkHadoopUtil: Updating delegation tokens for current user. > DEBUG SparkHadoopUtil: Adding/updating delegation tokens List(Kind: > HBASE_AUTH_TOKEN, Service: <some id>, Ident: <some number>; null, Kind: > HDFS_DELEGATION_TOKEN, Service: ha-hdfs:<host>, Ident: (HDFS_DELEGATION_TOKEN > token <number> for <user>); HDFS_DELEGATION_TOKEN token <number> for <user>; > Renewer: yarn; Issued: 3/10/20 3:58 PM; Max Date: 3/17/20 3:58 PM) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org