[ 
https://issues.apache.org/jira/browse/FLINK-32976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17759670#comment-17759670
 ] 

Feng Jin edited comment on FLINK-32976 at 8/28/23 5:41 PM:
-----------------------------------------------------------

[~martijnvisser]  Sorry for the not clearly description. I have updated the 
reproducible steps.   

 

The corresponding code for  is as follows:

https://github.com/hackergin/flink/blob/78136133fbec4ca145dec66d4bc0c324c8e16d82/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java#L173
{code:java}
//代码占位符

if 
(flinkConfiguration.getString(DeploymentOptions.TARGET).toLowerCase().contains("yarn"))
 {
    LOG.debug("Running on YARN, trying to add staging directory to file systems 
to access");
    String yarnStagingDirectory =
            flinkConfiguration.getString("yarn.staging-directory", "");
    if (!StringUtils.isBlank(yarnStagingDirectory)) {
        LOG.debug(
                "Adding staging directory to file systems to access {}",
                yarnStagingDirectory);
        result.add(new 
Path(yarnStagingDirectory).getFileSystem(hadoopConfiguration));
        LOG.debug("Staging directory added to file systems to access 
successfully");
    } else {
        LOG.debug(
                "Staging directory is not set or empty so not added to file 
systems to access");
    }
} {code}
When starting the standalone cluster, since the TARGET parameter is not set, 
there is no check for the existence of TARGET, resulting in a null pointer 
error.

 

Therefore, we can easily fix this issue by adding a check. 

 

Flink stand alone cluster with Hadoop configuration is not a common practice. 
Most Flink jobs are either run on YARN or on Kubernetes. In these two modes, 
the TARGET parameter is correctly set, so this issue does not occur.

 


was (Author: hackergin):
[~martijnvisser]  Sorry for the not clearly description. I have updated the 
reproducible steps.   

 

The corresponding code for  is as follows:

 
{code:java}
//代码占位符

if 
(flinkConfiguration.getString(DeploymentOptions.TARGET).toLowerCase().contains("yarn"))
 {
    LOG.debug("Running on YARN, trying to add staging directory to file systems 
to access");
    String yarnStagingDirectory =
            flinkConfiguration.getString("yarn.staging-directory", "");
    if (!StringUtils.isBlank(yarnStagingDirectory)) {
        LOG.debug(
                "Adding staging directory to file systems to access {}",
                yarnStagingDirectory);
        result.add(new 
Path(yarnStagingDirectory).getFileSystem(hadoopConfiguration));
        LOG.debug("Staging directory added to file systems to access 
successfully");
    } else {
        LOG.debug(
                "Staging directory is not set or empty so not added to file 
systems to access");
    }
} {code}
When starting the standalone cluster, since the TARGET parameter is not set, 
there is no check for the existence of TARGET, resulting in a null pointer 
error.

 

Therefore, we can easily fix this issue by adding a check. 

 

Flink stand alone cluster with Hadoop configuration is not a common practice. 
Most Flink jobs are either run on YARN or on Kubernetes. In these two modes, 
the TARGET parameter is correctly set, so this issue does not occur.

 

> NullPointException when starting flink cluster
> ----------------------------------------------
>
>                 Key: FLINK-32976
>                 URL: https://issues.apache.org/jira/browse/FLINK-32976
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Configuration
>    Affects Versions: 1.17.1
>            Reporter: Feng Jin
>            Priority: Major
>
> It can be reproduced when starting flink cluster with hadoop configuration. 
>  
> {code:java}
> //代码占位符
> // Set up hadoop conf , hadoop classpath
> // start jobManager
> ./jobmanager.sh start-foreground {code}
>  
> The error message as follows: 
>  
> {code:java}
> //代码占位符
> Caused by: java.ang.NullPointerException
> at org.apache.flink. runtime. 
> security.token.hadoop.HadoopFSDelegationTokenProvider.getFileSystemsToAccess(HadoopFSDelegationTokenProvider.java:173)~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.security.token.hadoop.HadoopFSDelegationTokenProvidertionTokens$1(HadoopFSDelegationTokenProvider.java:113)
>  ~[flink-dist-1.17.1.jar:1.17.1
> at java.security.AccessController.doprivileged(Native Method)~[?:1.8.0 281]
> at javax.security.auth.Subject.doAs(Subject.java:422)~[?:1.8.0 281]
> at org. apache.hadoop . 
> security.UserGroupInformation.doAs(UserGroupInformation. java:1876) 
> ~flink-shacd-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar:3.1.1.7.2.1.0-327-9.0]
> at org. apache.flink. runtime.security.token .hadoop 
> .HadoopFSDelegationTokenProvider.obtainDelegationTcens(HadoopFSDelegationTokenProvider.java:108)~flink-dist-1.17.1.jar:1.17.1]
> at org.apache.flink. runtime. security.token.DefaultDelegationTokenManager . 
> lambda$obtainDel
> SAndGetNextRenewal$1(DefaultDelegationTokenManager .java:264)~ 
> flink-dist-1.17.1.jar:1.17.1]
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) 
> ~?:1.8.0 281
> at 
> java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1628)~[?:1.8.0
>  281]at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)~?:1.8.0 
> 281]
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> ~?:1.8.0 281at 
> java,util.stream.Reduce0ps$Reduce0p.evaluateSequential(Reduce0ps.java:708)~?:1.8.0
>  281]
> at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)~[?:1.8.0
>  281]at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:479) 
> ~?:1.8.0 281
> at java.util.stream.ReferencePipeline.min(ReferencePipeline.java:520)~?:1.8.0 
> 281at org. apache. flink. runtime. 
> security.token.DefaultDelegationTokenManager 
> .obtainDelegationTokensAndGeNextRenewal(DefaultDelegationTokenManager 
> .java:286)~[flink-dist-1.17.1.jar:1.17.1
> at org.apache. flink.runtime. security.token.DefaultDelegationTokenManager. 
> obtainDelegationTokens(DefaltDelegationTokenManager.java:242)~[flink-dist-1.17.1.jar:1.17.1]
> at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializes@) 
> ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.entrypoint.clusterEntrypoint.nk-dist-1.17.1.jar:1.17.1]
> at org.apache.flink.runtime.entrypoint.ClusterEntrypoint:232) 
> ~[flink-dist-1.17.1.jar:1.17.1]
> at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8. 281]
> at javax.security.auth.Subject.doAs(Subject.java:422)~?:1.8.0 281]
> at org. apache.hadoop . security.UserGroupInformation. doAs 
> (UserGroupInformation. 
> java:1876)~[flink-shadd-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar:3.1.1.7.2.1.0-327-9.0]
> at org.apache.flink.runtime.security. contexts 
> .HadoopSecurityContext.runSecured(HadoopSecurijava:41) 
> ~[flink-dist-1.17.1.jar:1.17.1
> at org. apache.flink. runtime. entrypoint. ClusterEntrypoint . 
> startCluster(clusterEntrypoint. java:229)link-dist-1.17.1.jar:1.17.1]...2 
> more{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to