[ 
https://issues.apache.org/jira/browse/FLINK-22329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junfan Zhang updated FLINK-22329:
---------------------------------
    Description: 
Related Flink code: 
[https://github.com/apache/flink/blob/577113f0c339df844f2cc32b1d4a09d3da28085a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java#L107]

 

In this {{getSplits}} method, it will call hadoop {{FileInputFormat's 
getSplits}} method. related hadoop code is here. Simple code is as follows
{code:java}
// Hadoop FileInputFormat

public InputSplit[] getSplits(JobConf job, int numSplits)
  throws IOException {
  StopWatch sw = new StopWatch().start();
  FileStatus[] stats = listStatus(job);

 
  ......
}


protected FileStatus[] listStatus(JobConf job) throws IOException {
  Path[] dirs = getInputPaths(job);
  if (dirs.length == 0) {
    throw new IOException("No input paths specified in job");
  }

  // get tokens for all the required FileSystems..
  TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
  
  // Whether we need to recursive look into the directory structure

  ......
}
{code}
 

In {{listStatus}} method, it will obtain delegation tokens by calling  
{{TokenCache.obtainTokensForNamenodes}} method. Howerver this method will give 
up to get delegation tokens when credentials in jobconf.

So it's neccessary to inject current ugi credentials into jobconf.

 

Besides, when Flink support delegation tokens directly without keytab([refer to 
this PR|https://issues.apache.org/jira/browse/FLINK-21700]), 
{{TokenCache.obtainTokensForNamenodes}} will failed  without this patch because 
of no corresponding credentials.

 

 

 

 

> Missing crendentials in jobconf causes repeated authentication in Hive 
> datasource
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-22329
>                 URL: https://issues.apache.org/jira/browse/FLINK-22329
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>            Reporter: Junfan Zhang
>            Priority: Major
>              Labels: pull-request-available
>
> Related Flink code: 
> [https://github.com/apache/flink/blob/577113f0c339df844f2cc32b1d4a09d3da28085a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java#L107]
>  
> In this {{getSplits}} method, it will call hadoop {{FileInputFormat's 
> getSplits}} method. related hadoop code is here. Simple code is as follows
> {code:java}
> // Hadoop FileInputFormat
> public InputSplit[] getSplits(JobConf job, int numSplits)
>   throws IOException {
>   StopWatch sw = new StopWatch().start();
>   FileStatus[] stats = listStatus(job);
>  
>   ......
> }
> protected FileStatus[] listStatus(JobConf job) throws IOException {
>   Path[] dirs = getInputPaths(job);
>   if (dirs.length == 0) {
>     throw new IOException("No input paths specified in job");
>   }
>   // get tokens for all the required FileSystems..
>   TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
>   
>   // Whether we need to recursive look into the directory structure
>   ......
> }
> {code}
>  
> In {{listStatus}} method, it will obtain delegation tokens by calling  
> {{TokenCache.obtainTokensForNamenodes}} method. Howerver this method will 
> give up to get delegation tokens when credentials in jobconf.
> So it's neccessary to inject current ugi credentials into jobconf.
>  
> Besides, when Flink support delegation tokens directly without keytab([refer 
> to this PR|https://issues.apache.org/jira/browse/FLINK-21700]), 
> {{TokenCache.obtainTokensForNamenodes}} will failed  without this patch 
> because of no corresponding credentials.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to