[ 
https://issues.apache.org/jira/browse/FLINK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256814#comment-17256814
 ] 

JieFang.He edited comment on FLINK-19067 at 12/31/20, 3:39 AM:
---------------------------------------------------------------

I think the reason is that the jobFiles are upload to the dispatcher node,but 
the task get jobFiles from resource_manager node. So in HA mode, it need to 
ensure they are on one node

When the job submit to the rest,the rest then submit it to dispatcher,the 
jobFiles are the put to dispatcher

Here is the log
{code:java}
2020-12-31 00:42:29,255 DEBUG [BLOB connection for /127.0.0.1:43576] 
[FileSystemBlobStore]: Copying from 
/tmp/blobStore-5fe6b952-fece-4590-b637-04f19d8121dd/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109
 to 
/data2/zdh/flink/storageDir/default/blob/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109.
{code}
and here is the code in JobSubmitHandler
{code:java}
CompletableFuture<Acknowledge> jobSubmissionFuture = 
finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, 
timeout));
{code}
the gateway is define in DefaultDispatcherResourceManagerComponentFactory
{code:java}
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = 
new RpcGatewayRetriever<>(
   rpcService,
   DispatcherGateway.class,
   DispatcherId::fromUuid,
   10,
   Time.milliseconds(50L));
{code}
{code:java}
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
   configuration,
   dispatcherGatewayRetriever,
   resourceManagerGatewayRetriever,
   blobServer,
   executor,
   metricFetcher,
   highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
   fatalErrorHandler);
{code}
 

But the Task get the jobFiles from resource_manager

The code of BlobClient.downloadFromBlobServer
{code:java}
static void downloadFromBlobServer(
      @Nullable JobID jobId,
      BlobKey blobKey,
      File localJarFile,
      InetSocketAddress serverAddress,
      Configuration blobClientConfig,
      int numFetchRetries) throws IOException {
    ......
      catch (Throwable t) {
         String message = "Failed to fetch BLOB " + jobId + "/" + blobKey + " 
from " + serverAddress +
            " and store it under " + localJarFile.getAbsolutePath();
{code}
The serverAddress is define in TaskExecutor
{code:java}
final InetSocketAddress blobServerAddress = new InetSocketAddress(
   clusterInformation.getBlobServerHostname(),
   clusterInformation.getBlobServerPort());

blobCacheService.setBlobServerAddress(blobServerAddress);
{code}
The clusterInformation define in ResourceManagerRegistrationListener
{code:java}
if (resourceManagerConnection == connection) {
   try {
      establishResourceManagerConnection(
         resourceManagerGateway,
         resourceManagerId,
         taskExecutorRegistrationId,
         clusterInformation);
{code}
Where ResourceManagerRegistrationListener used
{code:java}
resourceManagerConnection =
   new TaskExecutorToResourceManagerConnection(
      log,
      getRpcService(),
      taskManagerConfiguration.getRetryingRegistrationConfiguration(),
      resourceManagerAddress.getAddress(),
      resourceManagerAddress.getResourceManagerId(),
      getMainThreadExecutor(),
      new ResourceManagerRegistrationListener(),
      taskExecutorRegistration);
{code}
 

 


was (Author: hejiefang):
I think the reason is that the jobFiles are upload to the dispatcher node,but 
the task get jobFiles from resource_manager node

When the job submit to the rest,the rest then submit it to dispatcher,the 
jobFiles are the put to dispatcher

Here is the log
{code:java}
2020-12-31 00:42:29,255 DEBUG [BLOB connection for /127.0.0.1:43576] 
[FileSystemBlobStore]: Copying from 
/tmp/blobStore-5fe6b952-fece-4590-b637-04f19d8121dd/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109
 to 
/data2/zdh/flink/storageDir/default/blob/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109.
{code}
and here is the code in JobSubmitHandler
{code:java}
CompletableFuture<Acknowledge> jobSubmissionFuture = 
finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, 
timeout));
{code}
the gateway is define in DefaultDispatcherResourceManagerComponentFactory
{code:java}
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = 
new RpcGatewayRetriever<>(
   rpcService,
   DispatcherGateway.class,
   DispatcherId::fromUuid,
   10,
   Time.milliseconds(50L));
{code}
{code:java}
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
   configuration,
   dispatcherGatewayRetriever,
   resourceManagerGatewayRetriever,
   blobServer,
   executor,
   metricFetcher,
   highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
   fatalErrorHandler);
{code}
 

But the Task get the jobFiles from resource_manager

The code of BlobClient.downloadFromBlobServer
{code:java}
static void downloadFromBlobServer(
      @Nullable JobID jobId,
      BlobKey blobKey,
      File localJarFile,
      InetSocketAddress serverAddress,
      Configuration blobClientConfig,
      int numFetchRetries) throws IOException {
    ......
      catch (Throwable t) {
         String message = "Failed to fetch BLOB " + jobId + "/" + blobKey + " 
from " + serverAddress +
            " and store it under " + localJarFile.getAbsolutePath();
{code}
The serverAddress is define in TaskExecutor
{code:java}
final InetSocketAddress blobServerAddress = new InetSocketAddress(
   clusterInformation.getBlobServerHostname(),
   clusterInformation.getBlobServerPort());

blobCacheService.setBlobServerAddress(blobServerAddress);
{code}
The clusterInformation define in ResourceManagerRegistrationListener
{code:java}
if (resourceManagerConnection == connection) {
   try {
      establishResourceManagerConnection(
         resourceManagerGateway,
         resourceManagerId,
         taskExecutorRegistrationId,
         clusterInformation);
{code}
Where ResourceManagerRegistrationListener used
{code:java}
resourceManagerConnection =
   new TaskExecutorToResourceManagerConnection(
      log,
      getRpcService(),
      taskManagerConfiguration.getRetryingRegistrationConfiguration(),
      resourceManagerAddress.getAddress(),
      resourceManagerAddress.getResourceManagerId(),
      getMainThreadExecutor(),
      new ResourceManagerRegistrationListener(),
      taskExecutorRegistration);
{code}
 

 

> FileNotFoundException when run flink examples
> ---------------------------------------------
>
>                 Key: FLINK-19067
>                 URL: https://issues.apache.org/jira/browse/FLINK-19067
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.11.1
>            Reporter: JieFang.He
>            Priority: Major
>         Attachments: flink-jobmanager-deployer-hejiefang01.log, 
> flink-jobmanager-deployer-hejiefang02.log, 
> flink-taskmanager-deployer-hejiefang01.log, 
> flink-taskmanager-deployer-hejiefang02.log
>
>
> 1、When run examples/batch/WordCount.jar,it will fail with the exception:
> Caused by: java.io.FileNotFoundException: 
> /data2/flink/storageDir/default/blob/job_d29414828f614d5466e239be4d3889ac/blob_p-a2ebe1c5aa160595f214b4bd0f39d80e42ee2e93-f458f1c12dc023e78d25f191de1d7c4b
>  (No such file or directory)
>  at java.io.FileInputStream.open0(Native Method)
>  at java.io.FileInputStream.open(FileInputStream.java:195)
>  at java.io.FileInputStream.<init>(FileInputStream.java:138)
>  at 
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
>  at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:143)
>  at 
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:105)
>  at 
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:87)
>  at 
> org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:501)
>  at 
> org.apache.flink.runtime.blob.BlobServerConnection.get(BlobServerConnection.java:231)
>  at 
> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:117)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to