Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2732#discussion_r87102297
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
 ---
    @@ -142,4 +209,66 @@ private ActorGateway getJobManager() throws 
JobRetrievalException {
                        throw new JobRetrievalException(jobID, "Couldn't 
retrieve leading JobManager.", e);
                }
        }
    +
    +   /**
    +    * Reconstructs the class loader by first requesting information about 
it at the JobManager
    +    * and then downloading missing jar files.
    +    * @param jobID id of job
    +    * @param jobManager gateway to the JobManager
    +    * @param config the flink configuration
    +    * @return A classloader that should behave like the original 
classloader
    +    * @throws JobRetrievalException if anything goes wrong
    +    */
    +   private static ClassLoader retrieveClassLoader(
    +           JobID jobID,
    +           ActorGateway jobManager,
    +           Configuration config)
    +           throws JobRetrievalException {
    +
    +           final Object jmAnswer;
    +           try {
    +                   jmAnswer = Await.result(
    +                           jobManager.ask(
    +                                   new 
JobManagerMessages.RequestClassloadingProps(jobID),
    +                                   
AkkaUtils.getDefaultTimeoutAsFiniteDuration()),
    +                           AkkaUtils.getDefaultTimeoutAsFiniteDuration());
    +           } catch (Exception e) {
    +                   throw new JobRetrievalException(jobID, "Couldn't 
retrieve class loading properties from JobManager.", e);
    +           }
    +
    +           if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
    +                   JobManagerMessages.ClassloadingProps props = 
((JobManagerMessages.ClassloadingProps) jmAnswer);
    +
    +                   Option<String> jmHost = 
jobManager.actor().path().address().host();
    +                   String jmHostname = jmHost.isDefined() ? jmHost.get() : 
"localhost";
    +                   InetSocketAddress serverAddress = new 
InetSocketAddress(jmHostname, props.blobManagerPort());
    +                   final BlobCache blobClient = new 
BlobCache(serverAddress, config);
    +
    +                   final List<BlobKey> requiredJarFiles = 
props.requiredJarFiles();
    +                   final List<URL> requiredClasspaths = 
props.requiredClasspaths();
    +
    +                   final URL[] allURLs = new URL[requiredJarFiles.size() + 
requiredClasspaths.size()];
    +
    +                   int pos = 0;
    +                   for (BlobKey blobKey : props.requiredJarFiles()) {
    +                           try {
    +                                   allURLs[pos++] = 
blobClient.getURL(blobKey);
    +                           } catch (Exception e) {
    +                                   blobClient.shutdown();
    +                                   throw new JobRetrievalException(jobID, 
"Failed to download BlobKey " + blobKey);
    --- End diff --
    
    Exception `e` is swallowed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to