目前Application模式确实不能支持已经生成好的JobGraph运行,我能想到一个work around的办法是就先写一个user
jar直接把JobGraph提交到local的集群里面

就像下面这样

public class JobGraphRunner {

    private static final Logger LOG =
LoggerFactory.getLogger(JobGraphRunner.class);

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);

        final String restServerAddress = "http://localhost:8081";;
        LOG.info("Creating RestClusterClient({})", restServerAddress);

        Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
        try (ClusterClient<String> clusterClient =
                new RestClusterClient<>(
                        flinkConfig,
                        flinkConfig.toMap().get("kubernetes.cluster-id"),
                        (c, e) -> new
StandaloneClientHAServices(restServerAddress))) {
            final String jobGraphPath = params.get("jobgraph");
            Preconditions.checkNotNull(jobGraphPath, "--jobgraph
should be configured.");

            LOG.info("Loading jobgraph from {}", jobGraphPath);
            FileInputStream fileInputStream = new FileInputStream(jobGraphPath);
            ObjectInputStream objectInputStream = new
ObjectInputStream(fileInputStream);
            JobGraph jobGraph = (JobGraph) objectInputStream.readObject();
            objectInputStream.close();

            final JobID jobID = clusterClient.submitJob(jobGraph).get();
            LOG.info("Job {} is submitted successfully", jobID);
        }
    }
}


Best,
Yang

吕宴全 <1365976...@qq.com.invalid> 于2022年4月24日周日 14:45写道:

> 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application
> mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:
>
>
> CREATE TABLE T (
> id INT
> &nbsp;) WITH (
> 'connector.type' = 'filesystem',
> 'connector.path' = 'file:///tmp/tmp.csv',
> 'format.type' = 'csv',
> 'format.derive-schema' = 'true'
> );
>
>
>
> insert into T values(1);
>
>
>
>
> insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行,
> 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。
> 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per
> job这种模式的实现了。
>
>
> 我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?

回复