[
https://issues.apache.org/jira/browse/FLINK-4653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15522610#comment-15522610
]
ASF GitHub Bot commented on FLINK-4653:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2524#discussion_r80441549
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobClient/JobClientUtils.java
---
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobClient;
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.util.Timeout;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * JobClientUtils is a utility for client.
+ * It offers the following methods:
+ * <ul>
+ * <li>{@link #startJobClientRpcService(Configuration)} Starts a rpc
service for client</li>
+ * <li>{@link #retrieveRunningJobResult(JobID, JobMasterGateway,
RpcService, LeaderRetrievalService, boolean, FiniteDuration, Configuration)}
+ * Attaches to a running Job using the JobID, and wait for its
job result</li>
+ * <li>{@link #awaitJobResult(JobInfoTracker, ClassLoader)} Awaits the
result of the job execution which jobInfoTracker listen for</li>
+ * <li>{@link #retrieveClassLoader(JobID, JobMasterGateway,
Configuration)} Reconstructs the class loader by first requesting information
about it at the JobMaster
+ * and then downloading missing jar files</li>
+ * </ul>
+ */
+public class JobClientUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JobClientUtils.class);
+
+
+ /**
+ * Starts a rpc service for client
+ *
+ * @param config the flink configuration
+ * @return
+ * @throws IOException
+ */
+ public static RpcService startJobClientRpcService(Configuration config)
+ throws IOException
+ {
+ LOG.info("Starting JobClientUtils rpc service");
+ Option<Tuple2<String, Object>> remoting = new Some<>(new
Tuple2<String, Object>("", 0));
+
+ // start a remote actor system to listen on an arbitrary port
+ ActorSystem system = AkkaUtils.createActorSystem(config,
remoting);
+ Address address = system.provider().getDefaultAddress();
+
+ String hostAddress = address.host().isDefined() ?
+
NetUtils.ipAddressToUrlString(InetAddress.getByName(address.host().get())) :
+ "(unknown)";
+ int port = address.port().isDefined() ? ((Integer)
address.port().get()) : -1;
+ LOG.info("Started JobClientUtils actor system at " +
hostAddress + ':' + port);
+
+ Timeout timeout = new
Timeout(AkkaUtils.getClientTimeout(config));
+ return new AkkaRpcService(system, timeout);
+ }
+
+ /**
+ * Attaches to a running Job using the JobID, and wait for its job
result
+ *
+ * @param jobID id of job
+ * @param jobMasterGateway gateway to the JobMaster
+ * @param rpcService
+ * @param leaderRetrievalService leader retriever service of jobMaster
+ * @param sysoutLogUpdates whether status messages shall be
printed to sysout
+ * @param timeout register timeout
+ * @param configuration the flink configuration
+ * @return
+ * @throws JobExecutionException
+ */
+ public static JobExecutionResult retrieveRunningJobResult(
+ JobID jobID,
+ JobMasterGateway jobMasterGateway,
+ RpcService rpcService,
+ LeaderRetrievalService leaderRetrievalService,
+ boolean sysoutLogUpdates,
+ FiniteDuration timeout,
+ Configuration configuration) throws JobExecutionException
+ {
+
+ checkNotNull(jobID, "The jobID must not be null.");
+ checkNotNull(jobMasterGateway, "The jobMasterGateway must not
be null.");
+ checkNotNull(rpcService, "The rpcService must not be null.");
+ checkNotNull(leaderRetrievalService, "The
leaderRetrievalService must not be null.");
+ checkNotNull(timeout, "The timeout must not be null");
+ checkNotNull(configuration, "The configuration must not be
null");
+
+ JobInfoTracker jobInfoTracker = null;
+ try {
+ jobInfoTracker = new JobInfoTracker(rpcService,
leaderRetrievalService, jobID, sysoutLogUpdates);
+ jobInfoTracker.start();
+ registerClientAtJobMaster(jobID,
jobInfoTracker.getAddress(), jobMasterGateway, timeout);
+ ClassLoader classLoader = retrieveClassLoader(jobID,
jobMasterGateway, configuration);
+ return awaitJobResult(jobInfoTracker, classLoader);
+ } finally {
+ if (jobInfoTracker != null) {
+ jobInfoTracker.shutDown();
+ }
+ }
+ }
+
+ /**
+ * Awaits the result of the job execution which jobInfoTracker listen
for
+ *
+ * @param jobInfoTracker job info tracker
+ * @param classLoader classloader to parse the job result
+ * @return
+ * @throws JobExecutionException
+ */
+ public static JobExecutionResult awaitJobResult(JobInfoTracker
jobInfoTracker,
+ ClassLoader classLoader) throws JobExecutionException
+ {
+ try {
+ while (true) {
+ Future<JobExecutionResult>
jobExecutionResultFuture = jobInfoTracker.getJobExecutionResult(classLoader);
+ try {
+ JobExecutionResult jobExecutionResult =
Await.result(jobExecutionResultFuture, new
Timeout(AkkaUtils.INF_TIMEOUT()).duration());
+ return jobExecutionResult;
+ } catch (TimeoutException e) {
+ // ignore timeout exception, retry
--- End diff --
I'm not sure if that will cause an infinite loop in case of reoccurring
connection timeouts.
> Refactor JobClientActor to adapt to the new Rpc framework and new cluster
> managerment
> -------------------------------------------------------------------------------------
>
> Key: FLINK-4653
> URL: https://issues.apache.org/jira/browse/FLINK-4653
> Project: Flink
> Issue Type: Sub-task
> Components: Client
> Reporter: zhangjing
> Assignee: zhangjing
> Fix For: 1.2.0
>
>
> 1. Create a RpcEndpoint(temporary named JobInfoTracker) and
> RpcGateway(temporary named JobInfoTrackerGateway) to replace the old
> JobClientActor.
> 2. Change rpc message communication in JobClientActor to rpc method call to
> apply to the new rpc framework.
> 3. JobInfoTracker is responsible for waiting for the jobStateChange and
> jobResult util job complete. But it is not responsible for submitting new job
> because jobSubmission behavior is different in different cluster
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)