[ 
https://issues.apache.org/jira/browse/SPARK-36328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395218#comment-17395218
 ] 

Apache Spark commented on SPARK-36328:
--------------------------------------

User 'Shockang' has created a pull request for this issue:
https://github.com/apache/spark/pull/33674

> HadoopRDD#getPartitions fetches FileSystem Delegation Token for every 
> partition
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-36328
>                 URL: https://issues.apache.org/jira/browse/SPARK-36328
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.1.2
>            Reporter: Prabhu Joseph
>            Priority: Major
>
> Spark Job creates a separate JobConf for every RDD (every hive table 
> partition) in HadoopRDD#getPartitions.
> {code}
>   override def getPartitions: Array[Partition] = {
>     val jobConf = getJobConf()
>     // add the credentials here as this can be called before SparkContext 
> initialized
>     SparkHadoopUtil.get.addCredentials(jobConf)
> {code}
> Hadoop FileSystem fetches FileSystem Delegation Token and sets into the 
> Credentials which is part of JobConf. On further requests, will reuse the 
> token from the Credentials if already exists.
> {code}
>        if (serviceName != null) { // fs has token, grab it
>       final Text service = new Text(serviceName);
>       Token<?> token = credentials.getToken(service);
>       if (token == null) {
>         token = getDelegationToken(renewer);
>         if (token != null) {
>           tokens.add(token);
>           credentials.addToken(service, token);
>         }
>       }
>     }
> {code}
>  But since Spark Job creates a new JobConf (which will have a new 
> Credentials) for every hive table partition, the token is not reused and gets 
> fetched for every partition. This is slowing down the query as each 
> delegation token has to go through KDC and SSL handshake on Secure Clusters.
> *Improvement:*
> Spark can add the credentials from previous JobConf into the new JobConf to 
> reuse the FileSystem Delegation Token similar to how the User Credentials are 
> added into JobConf after construction.
> {code}
>      val jobConf = getJobConf()
>     // add the credentials here as this can be called before SparkContext 
> initialized
>     SparkHadoopUtil.get.addCredentials(jobConf)
> {code}
> *Repro*
> {code}
> beeline>
> create table parttable (key char(1), value int) partitioned by (p int);
> insert into table parttable partition(p=100) values ('d', 1), ('e', 2), ('f', 
> 3);
> insert into table parttable partition(p=200) values ('d', 1), ('e', 2), ('f', 
> 3);
> insert into table parttable partition(p=300) values ('d', 1), ('e', 2), ('f', 
> 3);
> spark-sql>
> select value, count(*) from parttable group by value
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to