Prabhu Joseph created SPARK-36328: ------------------------------------- Summary: HadoopRDD#getPartitions fetches FileSystem Delegation Token for evert partition Key: SPARK-36328 URL: https://issues.apache.org/jira/browse/SPARK-36328 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.1.2 Reporter: Prabhu Joseph
Spark Job creates a separate JobConf for every RDD (every hive table partition) in HadoopRDD#getPartitions. {code} override def getPartitions: Array[Partition] = { val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) {code} Hadoop FileSystem fetches FileSystem Delegation Token and sets into the Credentials which is part of JobConf. On further call, will reuse the token from the Credentials if already exists. {code} if (serviceName != null) { // fs has token, grab it final Text service = new Text(serviceName); Token<?> token = credentials.getToken(service); if (token == null) { token = getDelegationToken(renewer); if (token != null) { tokens.add(token); credentials.addToken(service, token); } } } {code} But since Spark Job creates a new JobConf (which will have a new Credentials) for every hive table partition, the token is not reused and gets fetched for every partition. This is slowing down the query as each delegation token has to go through KDC and SSL handshake on Secure Clusters. *Improvement:* Spark can add the credentials from previous JobConf into the new JobConf to reuse the FileSystem Delegation Token similar to how the User Credentials are added into JobConf after construction. {code} val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) {code} *Repro* {code} beeline> create table parttable (key char(1), value int) partitioned by (p int); insert into table parttable partition(p=100) values ('d', 1), ('e', 2), ('f', 3); insert into table parttable partition(p=200) values ('d', 1), ('e', 2), ('f', 3); insert into table parttable partition(p=300) values ('d', 1), ('e', 2), ('f', 3); spark-sql> select value, count(*) from parttable group by value {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org