[ 
https://issues.apache.org/jira/browse/FLINK-22329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344497#comment-17344497
 ] 

Junfan Zhang commented on FLINK-22329:
--------------------------------------

Update it, please review it again. Thanks [~lirui]

> Missing credentials in jobconf causes repeated authentication in Hive 
> datasource
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-22329
>                 URL: https://issues.apache.org/jira/browse/FLINK-22329
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>            Reporter: Junfan Zhang
>            Priority: Major
>              Labels: pull-request-available
>
> Related Flink code: 
> [https://github.com/apache/flink/blob/577113f0c339df844f2cc32b1d4a09d3da28085a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java#L107]
>  
> In this {{getSplits}} method, it will call hadoop {{FileInputFormat's 
> getSplits}} method. related hadoop code is 
> [here|https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#L426].
>  Simple code is as follows
> {code:java}
> // Hadoop FileInputFormat
> public InputSplit[] getSplits(JobConf job, int numSplits)
>   throws IOException {
>   StopWatch sw = new StopWatch().start();
>   FileStatus[] stats = listStatus(job);
>  
>   ......
> }
> protected FileStatus[] listStatus(JobConf job) throws IOException {
>   Path[] dirs = getInputPaths(job);
>   if (dirs.length == 0) {
>     throw new IOException("No input paths specified in job");
>   }
>   // get tokens for all the required FileSystems..
>   TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
>   
>   // Whether we need to recursive look into the directory structure
>   ......
> }
> {code}
>  
> In {{listStatus}} method, it will obtain delegation tokens by calling  
> {{TokenCache.obtainTokensForNamenodes}} method. Howerver this method will 
> give up to get delegation tokens when credentials in jobconf.
> So it's neccessary to inject current ugi credentials into jobconf.
>  
> Besides, when Flink support delegation tokens directly without keytab([refer 
> to this PR|https://issues.apache.org/jira/browse/FLINK-21700]), 
> {{TokenCache.obtainTokensForNamenodes}} will failed  without this patch 
> because of no corresponding credentials.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to