Prabhu Joseph created SPARK-36328:
-------------------------------------

             Summary: HadoopRDD#getPartitions fetches FileSystem Delegation 
Token for evert partition
                 Key: SPARK-36328
                 URL: https://issues.apache.org/jira/browse/SPARK-36328
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 3.1.2
            Reporter: Prabhu Joseph


Spark Job creates a separate JobConf for every RDD (every hive table partition) 
in HadoopRDD#getPartitions.

{code}
  override def getPartitions: Array[Partition] = {
    val jobConf = getJobConf()
    // add the credentials here as this can be called before SparkContext 
initialized
    SparkHadoopUtil.get.addCredentials(jobConf)
{code}

Hadoop FileSystem fetches FileSystem Delegation Token and sets into the 
Credentials which is part of JobConf. On further call, will reuse the token 
from the Credentials if already exists.

{code}
       if (serviceName != null) { // fs has token, grab it
      final Text service = new Text(serviceName);
      Token<?> token = credentials.getToken(service);
      if (token == null) {
        token = getDelegationToken(renewer);
        if (token != null) {
          tokens.add(token);
          credentials.addToken(service, token);
        }
      }
    }
{code}

 But since Spark Job creates a new JobConf (which will have a new Credentials) 
for every hive table partition, the token is not reused and gets fetched for 
every partition. This is slowing down the query as each delegation token has to 
go through KDC and SSL handshake on Secure Clusters.

*Improvement:*

Spark can add the credentials from previous JobConf into the new JobConf to 
reuse the FileSystem Delegation Token similar to how the User Credentials are 
added into JobConf after construction.

{code}
     val jobConf = getJobConf()
    // add the credentials here as this can be called before SparkContext 
initialized
    SparkHadoopUtil.get.addCredentials(jobConf)
{code}



*Repro*

{code}
beeline>
create table parttable (key char(1), value int) partitioned by (p int);
insert into table parttable partition(p=100) values ('d', 1), ('e', 2), ('f', 
3);
insert into table parttable partition(p=200) values ('d', 1), ('e', 2), ('f', 
3);
insert into table parttable partition(p=300) values ('d', 1), ('e', 2), ('f', 
3);

spark-sql>
select value, count(*) from parttable group by value
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to