This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push: new b57fef9 [SPARK-27301][DSTREAM] Shorten the FileSystem cached life cycle to the cleanup method inner scope b57fef9 is described below commit b57fef9f0cb29fb4bc4bb905a84fecb3a1ed5ddd Author: Kent Yao <yaooq...@hotmail.com> AuthorDate: Sat Mar 30 02:35:49 2019 -0500 [SPARK-27301][DSTREAM] Shorten the FileSystem cached life cycle to the cleanup method inner scope ## What changes were proposed in this pull request? The cached FileSystem's token will expire if no tokens explicitly are add into it. ```scala 19/03/28 13:40:16 INFO storage.BlockManager: Removing RDD 83189 19/03/28 13:40:16 INFO rdd.MapPartitionsRDD: Removing RDD 82860 from persistence list 19/03/28 13:40:16 INFO spark.ContextCleaner: Cleaned shuffle 6005 19/03/28 13:40:16 INFO storage.BlockManager: Removing RDD 82860 19/03/28 13:40:16 INFO scheduler.ReceivedBlockTracker: Deleting batches: 19/03/28 13:40:16 INFO scheduler.InputInfoTracker: remove old batch metadata: 1553750250000 ms 19/03/28 13:40:17 WARN security.UserGroupInformation: PriviledgedActionException as:ursHADOOP.HZ.NETEASE.COM (auth:KERBEROS) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800 19/03/28 13:40:17 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800 19/03/28 13:40:17 WARN security.UserGroupInformation: PriviledgedActionException as:ursHADOOP.HZ.NETEASE.COM (auth:KERBEROS) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800 19/03/28 13:40:17 WARN hdfs.LeaseRenewer: Failed to renew lease for [DFSClient_NONMAPREDUCE_-1396157959_1] for 53 seconds. Will retry shortly ... org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800 at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy11.renewLease(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewLease(ClientNamenodeProtocolTranslatorPB.java:571) at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy12.renewLease(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.renewLease(DFSClient.java:878) at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:417) at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:442) at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71) at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:298) at java.lang.Thread.run(Thread.java:748) ``` This PR shorten the FileSystem cached life cycle to the cleanup method inner scope in case of token expiry. ## How was this patch tested? existing ut Closes #24235 from yaooqinn/SPARK-27301. Authored-by: Kent Yao <yaooq...@hotmail.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> (cherry picked from commit f4c73b7c685b901dd69950e4929c65e3b8dd3a55) Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../org/apache/spark/streaming/dstream/DStreamCheckpointData.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index e73837e..ebfaa83 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -39,7 +39,6 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T]) // in that batch's checkpoint data @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] - @transient private var fileSystem: FileSystem = null protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] /** @@ -80,6 +79,7 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T]) // even after master fails, as the checkpoint data of `time` does not refer to those files val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) logDebug("Files to delete:\n" + filesToDelete.mkString(",")) + var fileSystem: FileSystem = null filesToDelete.foreach { case (time, file) => try { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org