目前Application模式确实不能支持已经生成好的JobGraph运行,我能想到一个work around的办法是就先写一个user jar直接把JobGraph提交到local的集群里面
就像下面这样 public class JobGraphRunner { private static final Logger LOG = LoggerFactory.getLogger(JobGraphRunner.class); public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final String restServerAddress = "http://localhost:8081"; LOG.info("Creating RestClusterClient({})", restServerAddress); Configuration flinkConfig = GlobalConfiguration.loadConfiguration(); try (ClusterClient<String> clusterClient = new RestClusterClient<>( flinkConfig, flinkConfig.toMap().get("kubernetes.cluster-id"), (c, e) -> new StandaloneClientHAServices(restServerAddress))) { final String jobGraphPath = params.get("jobgraph"); Preconditions.checkNotNull(jobGraphPath, "--jobgraph should be configured."); LOG.info("Loading jobgraph from {}", jobGraphPath); FileInputStream fileInputStream = new FileInputStream(jobGraphPath); ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream); JobGraph jobGraph = (JobGraph) objectInputStream.readObject(); objectInputStream.close(); final JobID jobID = clusterClient.submitJob(jobGraph).get(); LOG.info("Job {} is submitted successfully", jobID); } } } Best, Yang 吕宴全 <1365976...@qq.com.invalid> 于2022年4月24日周日 14:45写道: > 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application > mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下: > > > CREATE TABLE T ( > id INT > ) WITH ( > 'connector.type' = 'filesystem', > 'connector.path' = 'file:///tmp/tmp.csv', > 'format.type' = 'csv', > 'format.derive-schema' = 'true' > ); > > > > insert into T values(1); > > > > > insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行, > 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。 > 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per > job这种模式的实现了。 > > > 我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?