[ 
https://issues.apache.org/jira/browse/FLINK-4150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15387744#comment-15387744
 ] 

ASF GitHub Bot commented on FLINK-4150:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2256#discussion_r71711908
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 ---
    @@ -243,6 +256,268 @@ public void testJobRecoveryWhenLosingLeadership() 
throws Exception {
                }
        }
     
    +   /**
    +    * Tests that the persisted job is not removed from the job graph store
    +    * after the postStop method of the JobManager. Furthermore, it checks
    +    * that BLOBs of the JobGraph are recovered properly and cleaned up 
after
    +    * the job finishes.
    +    */
    +   @Test
    +   public void testBlobRecoveryAfterLostJobManager() throws Exception {
    +           FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
    +           FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, 
TimeUnit.SECONDS);
    +           Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
    +           Configuration flinkConfiguration = new Configuration();
    +           UUID leaderSessionID = UUID.randomUUID();
    +           UUID newLeaderSessionID = UUID.randomUUID();
    +           int slots = 2;
    +           ActorRef archiveRef = null;
    +           ActorRef jobManagerRef = null;
    +           ActorRef taskManagerRef = null;
    +
    +           String haStoragePath = temporaryFolder.newFolder().toString();
    +
    +           flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
    +           
flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, 
haStoragePath);
    +           
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
slots);
    +
    +           try {
    +                   MySubmittedJobGraphStore mySubmittedJobGraphStore = new 
MySubmittedJobGraphStore();
    +                   TestingLeaderElectionService myLeaderElectionService = 
new TestingLeaderElectionService();
    +                   TestingLeaderRetrievalService myLeaderRetrievalService 
= new TestingLeaderRetrievalService();
    +
    +                   archiveRef = system.actorOf(Props.create(
    +                                   MemoryArchivist.class,
    +                                   10), "archive");
    +
    +                   jobManagerRef = createJobManagerActor(
    +                                   "jobmanager-0",
    +                                   flinkConfiguration,
    +                                   myLeaderElectionService,
    +                                   mySubmittedJobGraphStore,
    +                                   3600000,
    +                                   timeout,
    +                                   jobRecoveryTimeout, archiveRef);
    +
    +                   ActorGateway jobManager = new 
AkkaActorGateway(jobManagerRef, leaderSessionID);
    +
    +                   taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
    +                                   flinkConfiguration,
    +                                   ResourceID.generate(),
    +                                   system,
    +                                   "localhost",
    +                                   Option.apply("taskmanager"),
    +                                   Option.apply((LeaderRetrievalService) 
myLeaderRetrievalService),
    +                                   true,
    +                                   TestingTaskManager.class);
    +
    +                   ActorGateway tmGateway = new 
AkkaActorGateway(taskManagerRef, leaderSessionID);
    +
    +                   Future<Object> tmAlive = 
tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
    +
    +                   Await.ready(tmAlive, deadline.timeLeft());
    +
    +                   JobVertex sourceJobVertex = new JobVertex("Source");
    +                   
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
    +                   sourceJobVertex.setParallelism(slots);
    +
    +                   JobGraph jobGraph = new JobGraph("TestingJob", 
sourceJobVertex);
    +
    +                   // Upload fake JAR file to first JobManager
    +                   File jarFile = temporaryFolder.newFile();
    +                   ZipOutputStream out = new ZipOutputStream(new 
FileOutputStream(jarFile));
    +                   out.close();
    +
    +                   jobGraph.addJar(new Path(jarFile.toURI()));
    +                   JobClient.uploadJarFiles(jobGraph, jobManager, 
deadline.timeLeft());
    +
    +                   Future<Object> isLeader = jobManager.ask(
    +                                   
TestingJobManagerMessages.getNotifyWhenLeader(),
    +                                   deadline.timeLeft());
    +
    +                   Future<Object> isConnectedToJobManager = tmGateway.ask(
    +                                   new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
    +                                   deadline.timeLeft());
    +
    +                   // tell jobManager that he's the leader
    +                   myLeaderElectionService.isLeader(leaderSessionID);
    +                   // tell taskManager who's the leader
    +                   
myLeaderRetrievalService.notifyListener(jobManager.path(), leaderSessionID);
    +
    +                   Await.ready(isLeader, deadline.timeLeft());
    +                   Await.ready(isConnectedToJobManager, 
deadline.timeLeft());
    +
    +                   // submit blocking job
    +                   Future<Object> jobSubmitted = jobManager.ask(
    +                                   new 
JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
    +                                   deadline.timeLeft());
    +
    +                   Await.ready(jobSubmitted, deadline.timeLeft());
    +
    +                   // Wait for running
    +                   Future<Object> jobRunning = jobManager.ask(
    +                                   new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), 
JobStatus.RUNNING),
    +                                   deadline.timeLeft());
    +
    +                   Await.ready(jobRunning, deadline.timeLeft());
    +
    +                   // terminate the job manager
    +                   jobManagerRef.tell(PoisonPill.getInstance(), 
ActorRef.noSender());
    +
    +                   Future<Boolean> terminatedFuture = 
Patterns.gracefulStop(jobManagerRef, deadline.timeLeft());
    +                   Boolean terminated = Await.result(terminatedFuture, 
deadline.timeLeft());
    +                   assertTrue("Failed to stop job manager", terminated);
    +
    +                   // job stays in the submitted job graph store
    +                   
assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
    +
    +                   // start new job manager
    +                   myLeaderElectionService.reset();
    +
    +                   jobManagerRef = createJobManagerActor(
    +                                   "jobmanager-1",
    +                                   flinkConfiguration,
    +                                   myLeaderElectionService,
    +                                   mySubmittedJobGraphStore,
    +                                   500,
    +                                   timeout,
    +                                   jobRecoveryTimeout,
    +                                   archiveRef);
    +
    +                   jobManager = new AkkaActorGateway(jobManagerRef, 
newLeaderSessionID);
    +
    +                   Future<Object> isAlive = 
jobManager.ask(TestingMessages.getAlive(), deadline.timeLeft());
    +
    +                   isLeader = jobManager.ask(
    +                                   
TestingJobManagerMessages.getNotifyWhenLeader(),
    +                                   deadline.timeLeft());
    +
    +                   isConnectedToJobManager = tmGateway.ask(
    +                                   new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
    +                                   deadline.timeLeft());
    +
    +                   Await.ready(isAlive, deadline.timeLeft());
    +
    +                   // tell new jobManager that he's the leader
    +                   myLeaderElectionService.isLeader(newLeaderSessionID);
    +                   // tell taskManager who's the leader
    +                   
myLeaderRetrievalService.notifyListener(jobManager.path(), newLeaderSessionID);
    +
    +                   Await.ready(isLeader, deadline.timeLeft());
    +                   Await.ready(isConnectedToJobManager, 
deadline.timeLeft());
    +
    +                   jobRunning = jobManager.ask(
    +                                   new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), 
JobStatus.RUNNING),
    +                                   deadline.timeLeft());
    +
    +                   // wait that the job is recovered and reaches state 
RUNNING
    +                   Await.ready(jobRunning, deadline.timeLeft());
    +
    +                   Future<Object> jobFinished = jobManager.ask(
    +                                   new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
    +                                   deadline.timeLeft());
    +
    +                   BlockingInvokable.unblock();
    +
    +                   // wait til the job has finished
    +                   Await.ready(jobFinished, deadline.timeLeft());
    +
    +                   // check that the job has been removed from the 
submitted job graph store
    +                   
assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
    +
    +                   // Check that the BLOB store files are removed
    +                   File rootPath = new File(haStoragePath);
    +
    +                   boolean cleanedUpFiles = false;
    +                   while (deadline.hasTimeLeft()) {
    +                           if (listFiles(rootPath).isEmpty()) {
    --- End diff --
    
    Yes, that is true. We will for example have empty folders 
`<root>/blob/cache` in this test. I've added a method to try to delete the 
parent directory when deleting a BLOB (same as what are currently doing in 
`AbstractFileStateHandle`). I will adjust this check to check that the 
directory is empty.


> Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-4150
>                 URL: https://issues.apache.org/jira/browse/FLINK-4150
>             Project: Flink
>          Issue Type: Bug
>          Components: Job-Submission
>            Reporter: Stefan Richter
>            Assignee: Ufuk Celebi
>            Priority: Blocker
>             Fix For: 1.1.0
>
>
> Submitting a job in Yarn with HA can lead to the following exception:
> {code}
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load 
> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
> ClassLoader info: URL ClassLoader:
>     file: 
> '/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0'
>  (invalid JAR: zip file is empty)
> Class not resolvable through given classloader.
>       at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> Some job information, including the Blob ids, are stored in Zookeeper. The 
> actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set 
> to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the 
> cluster is shut down, the path for the BlobStore is deleted. When the cluster 
> is then restarted, recovering jobs cannot restore because it's Blob ids 
> stored in Zookeeper now point to deleted files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to