[jira] [Commented] (SPARK-31685) Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN expiration issue

2020-12-28 Thread Kent Yao (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255502#comment-17255502
 ] 

Kent Yao commented on SPARK-31685:
--

Thanks for [~dongjoon] for the quick response.

This issue is not SS specific and any long-running application may run into it. 
For example, in the project [Kyuubi|https://github.com/yaooqinn/kyuubi], we 
will always try to cache the spark application as long as possible if users 
keep pushing SQL statements to the server. The HadoopFSDelegationTokenProvider 
is YARN specific in old Spark versions and now moved to CORE. 

I took a quick glance at this part of the master branch, based on my 
understanding of the HDFS token, it seems to me that the problem still exists.

> Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN 
> expiration issue
> ---
>
> Key: SPARK-31685
> URL: https://issues.apache.org/jira/browse/SPARK-31685
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
> Environment: spark-2.4.4-bin-hadoop2.7
>Reporter: Rajeev Kumar
>Priority: Major
>
> I am facing issue for spark-2.4.4-bin-hadoop2.7. I am using spark structured 
> streaming with Kafka. Reading the stream from Kafka and saving it to HBase.
> I get this error on the driver after 24 hours.
>  
> {code:java}
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  token (HDFS_DELEGATION_TOKEN token 6972072 for ) is expired
> at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
> at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
> at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
> at org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:130)
> at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1169)
> at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1165)
> at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
> at 
> org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1171)
> at org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1630)
> at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
> at 
> 

[jira] [Commented] (SPARK-31685) Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN expiration issue

2020-12-28 Thread L. C. Hsieh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255462#comment-17255462
 ] 

L. C. Hsieh commented on SPARK-31685:
-

Is this particularly related to SS? The issue described for 
{{HadoopFSDelegationTokenProvider}} looks not SS specific.

BTW, for the tokens obtained in {{getTokenRenewalInterval}}, looks like it is 
only used for getting renewal interval. Why it expires will cause the 
application failure? Spark should only use the first tokens in 
{{obtainDelegationTokens}}.

> Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN 
> expiration issue
> ---
>
> Key: SPARK-31685
> URL: https://issues.apache.org/jira/browse/SPARK-31685
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
> Environment: spark-2.4.4-bin-hadoop2.7
>Reporter: Rajeev Kumar
>Priority: Major
>
> I am facing issue for spark-2.4.4-bin-hadoop2.7. I am using spark structured 
> streaming with Kafka. Reading the stream from Kafka and saving it to HBase.
> I get this error on the driver after 24 hours.
>  
> {code:java}
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  token (HDFS_DELEGATION_TOKEN token 6972072 for ) is expired
> at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
> at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
> at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
> at org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:130)
> at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1169)
> at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1165)
> at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
> at 
> org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1171)
> at org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1630)
> at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
> at 
> 

[jira] [Commented] (SPARK-31685) Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN expiration issue

2020-12-27 Thread Dongjoon Hyun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255446#comment-17255446
 ] 

Dongjoon Hyun commented on SPARK-31685:
---

Thank you for pinging me, [~Qin Yao].
cc [~viirya] since he is looking at streaming.

> Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN 
> expiration issue
> ---
>
> Key: SPARK-31685
> URL: https://issues.apache.org/jira/browse/SPARK-31685
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
> Environment: spark-2.4.4-bin-hadoop2.7
>Reporter: Rajeev Kumar
>Priority: Major
>
> I am facing issue for spark-2.4.4-bin-hadoop2.7. I am using spark structured 
> streaming with Kafka. Reading the stream from Kafka and saving it to HBase.
> I get this error on the driver after 24 hours.
>  
> {code:java}
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  token (HDFS_DELEGATION_TOKEN token 6972072 for ) is expired
> at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
> at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
> at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
> at org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:130)
> at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1169)
> at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1165)
> at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
> at 
> org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1171)
> at org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1630)
> at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
> 

[jira] [Commented] (SPARK-31685) Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN expiration issue

2020-12-27 Thread Kent Yao (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255420#comment-17255420
 ] 

Kent Yao commented on SPARK-31685:
--

Hi, [~rajeevkumar], does this issue still exist in the latest release 3.0.1, or 
the master branch? If so I guess this should be fixed as soon as possible for 
the 2.4 LTS version and the coming 3.1.0. 

Stability for long-running applications is essential. And I guess it is not 
that hard to fix it. cc [~cloud_fan] [~hyukjin.kwon] [~dongjoon]

> Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN 
> expiration issue
> ---
>
> Key: SPARK-31685
> URL: https://issues.apache.org/jira/browse/SPARK-31685
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
> Environment: spark-2.4.4-bin-hadoop2.7
>Reporter: Rajeev Kumar
>Priority: Major
>
> I am facing issue for spark-2.4.4-bin-hadoop2.7. I am using spark structured 
> streaming with Kafka. Reading the stream from Kafka and saving it to HBase.
> I get this error on the driver after 24 hours.
>  
> {code:java}
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  token (HDFS_DELEGATION_TOKEN token 6972072 for ) is expired
> at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
> at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
> at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
> at org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:130)
> at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1169)
> at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1165)
> at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
> at 
> org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1171)
> at org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1630)
> at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
> at 
> 

[jira] [Commented] (SPARK-31685) Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN expiration issue

2020-12-24 Thread Jim Huang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17254670#comment-17254670
 ] 

Jim Huang commented on SPARK-31685:
---

Thanks Rajeev for all the work you have done on this ticket so far.  

I am running into the same issue with the following stack and spark version
 * Hadoop 2.7.3
 * spark-2.4.5-bin-without-hadoop
 * yarn-client mode

The logic path to trigger this bug is a bit elusive and difficult to pinpoint 
because I have observed only few occurrences and each time the complete runtime 
duration of the Spark Structure Streaming job's wallclock time seem to be 
random (hundreds of hours). 

Currently, I *believe* it may have to do with another runtime event taking 
place.  The current hypothesis: when the original YARN node running one of the 
Spark executor fails for any reason (i.e. YARN healthcheck-script, YARN node 
being decommissioned, YARN container preemption, etc..), a new YARN container 
is assigned and started up on another YARN node by the YARN AM (Application 
Manager).  The symptom is reported by YARN AM that it tried to restart that 
particular Spark executor task within that container 3 times and failed with 
the exact error message reported here and caused the entire Spark job to fail.  
I believe this external event is another logic path that eventually hit the 
code you are testing.  

 

> Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN 
> expiration issue
> ---
>
> Key: SPARK-31685
> URL: https://issues.apache.org/jira/browse/SPARK-31685
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
> Environment: spark-2.4.4-bin-hadoop2.7
>Reporter: Rajeev Kumar
>Priority: Major
>
> I am facing issue for spark-2.4.4-bin-hadoop2.7. I am using spark structured 
> streaming with Kafka. Reading the stream from Kafka and saving it to HBase.
> I get this error on the driver after 24 hours.
>  
> {code:java}
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  token (HDFS_DELEGATION_TOKEN token 6972072 for ) is expired
> at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
> at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
> at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
> at org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:130)
> at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1169)
> at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1165)
> at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
> at 
> org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1171)
> at org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1630)
> at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> 

[jira] [Commented] (SPARK-31685) Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN expiration issue

2020-05-12 Thread Rajeev Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17105309#comment-17105309
 ] 

Rajeev Kumar commented on SPARK-31685:
--

Another ticket is open for similar issue.

https://issues.apache.org/jira/browse/SPARK-26385

I had used that Jira to post comments and was asked to create new Jira.

> Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN 
> expiration issue
> ---
>
> Key: SPARK-31685
> URL: https://issues.apache.org/jira/browse/SPARK-31685
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
> Environment: spark-2.4.4-bin-hadoop2.7
>Reporter: Rajeev Kumar
>Priority: Major
>
> I am facing issue for spark-2.4.4-bin-hadoop2.7. I am using spark structured 
> streaming with Kafka. Reading the stream from Kafka and saving it to HBase.
> I get this error on the driver after 24 hours.
>  
> {code:java}
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  token (HDFS_DELEGATION_TOKEN token 6972072 for ) is expired
> at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
> at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
> at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
> at org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:130)
> at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1169)
> at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1165)
> at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
> at 
> org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1171)
> at org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1630)
> at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
> at 
> 

[jira] [Commented] (SPARK-31685) Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN expiration issue

2020-05-12 Thread Rajeev Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17105307#comment-17105307
 ] 

Rajeev Kumar commented on SPARK-31685:
--

I did some testing. I found one issue in HadoopFSDelegationTokenProvider. I 
might be wrong also. Please validate below.

Spark does not renew the token rather it creates the token at the scheduled 
interval.

Spark needs two HDFS_DELEGATION_TOKEN. One for resource manager and second for 
application user.
{code:java}
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), 
fsToGetTokens, creds) 
// Get the token renewal interval if it is not set. It will only be called 
once. 
if (tokenRenewalInterval == null) { 
 tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf, 
fsToGetTokens) 
}
{code}
At the first call to the obtainDelegationTokens it creates TWO tokens correctly.

Token for resource manager is getting created by method fetchDelegationTokens.

Token for application user is getting created inside getTokenRenewalInterval 
method.
{code:java}
private var tokenRenewalInterval: Option[Long] = null
{code}
{code:java}
 sparkConf.get(PRINCIPAL).flatMap { renewer => 
val creds = new Credentials() 
fetchDelegationTokens(renewer, filesystems, creds)
{code}
But after 18 hours or whatever the renewal period when scheduled thread of 
AMCredentialRenewer tries to create HDFS_DELEFATION_TOKEN, it creates only one 
token (for resource manager as result of call to fetchDelegationTokens method ).

But it does not create HDFS_DELEFATION_TOKEN for application user because 
tokenRenewalInterval is NOT NULL this time. Hence after expiration of 
HDFS_DELEFATION_TOKEN (typically 24 hrs) spark fails to update the spark 
checkpointing directory and job dies.

As part of my testing, I just called getTokenRenewalInterval in else block and 
job is running fine. It did not die after 24 hrs.
{code:java}
if (tokenRenewalInterval == null) {
// I put this custom log
  logInfo("Token Renewal interval is null. Calling getTokenRenewalInterval "
+ getTokenRenewer(hadoopConf))
  tokenRenewalInterval =
getTokenRenewalInterval(hadoopConf, sparkConf, fsToGetTokens)
} else {
// I put this custom log
  logInfo("Token Renewal interval is NOT null. Calling getTokenRenewalInterval "
+ getTokenRenewer(hadoopConf))
  getTokenRenewalInterval(hadoopConf, sparkConf, fsToGetTokens)
}
{code}
Driver logs -
{code:java}
20/05/01 14:36:19 INFO HadoopFSDelegationTokenProvider: Token Renewal interval 
is null. Calling getTokenRenewalInterval rm/host:port
20/05/02 08:36:42 INFO HadoopFSDelegationTokenProvider: Token Renewal interval 
is NOT null. Calling getTokenRenewalInterval rm/host:port
20/05/03 02:37:00 INFO HadoopFSDelegationTokenProvider: Token Renewal interval 
is NOT null. Calling getTokenRenewalInterval rm/host:port
20/05/03 20:37:18 INFO HadoopFSDelegationTokenProvider: Token Renewal interval 
is NOT null. Calling getTokenRenewalInterval rm/host:port
{code}

> Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN 
> expiration issue
> ---
>
> Key: SPARK-31685
> URL: https://issues.apache.org/jira/browse/SPARK-31685
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
> Environment: spark-2.4.4-bin-hadoop2.7
>Reporter: Rajeev Kumar
>Priority: Major
>
> I am facing issue for spark-2.4.4-bin-hadoop2.7. I am using spark structured 
> streaming with Kafka. Reading the stream from Kafka and saving it to HBase.
> I get this error on the driver after 24 hours.
>  
> {code:java}
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  token (HDFS_DELEGATION_TOKEN token 6972072 for ) is expired
> at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
> at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
> at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
> at