Re: FlinkSQL 对接k8s的提交问题

2022-04-25 Thread LuNing Wang
SQL Client的Application模式现在还不支持,方案在设计中。
https://issues.apache.org/jira/browse/FLINK-26541

吕宴全 <1365976...@qq.com.invalid> 于2022年4月24日周日 14:45写道:

> 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application
> mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:
>
>
> CREATE TABLE T (
> id INT
>  ) WITH (
> 'connector.type' = 'filesystem',
> 'connector.path' = 'file:///tmp/tmp.csv',
> 'format.type' = 'csv',
> 'format.derive-schema' = 'true'
> );
>
>
>
> insert into T values(1);
>
>
>
>
> insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行,
> 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。
> 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per
> job这种模式的实现了。
>
>
> 我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?


Re: FlinkSQL 对接k8s的提交问题

2022-04-25 Thread Yang Wang
目前Application模式确实不能支持已经生成好的JobGraph运行,我能想到一个work around的办法是就先写一个user
jar直接把JobGraph提交到local的集群里面

就像下面这样

public class JobGraphRunner {

private static final Logger LOG =
LoggerFactory.getLogger(JobGraphRunner.class);

public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);

final String restServerAddress = "http://localhost:8081";;
LOG.info("Creating RestClusterClient({})", restServerAddress);

Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
try (ClusterClient clusterClient =
new RestClusterClient<>(
flinkConfig,
flinkConfig.toMap().get("kubernetes.cluster-id"),
(c, e) -> new
StandaloneClientHAServices(restServerAddress))) {
final String jobGraphPath = params.get("jobgraph");
Preconditions.checkNotNull(jobGraphPath, "--jobgraph
should be configured.");

LOG.info("Loading jobgraph from {}", jobGraphPath);
FileInputStream fileInputStream = new FileInputStream(jobGraphPath);
ObjectInputStream objectInputStream = new
ObjectInputStream(fileInputStream);
JobGraph jobGraph = (JobGraph) objectInputStream.readObject();
objectInputStream.close();

final JobID jobID = clusterClient.submitJob(jobGraph).get();
LOG.info("Job {} is submitted successfully", jobID);
}
}
}


Best,
Yang

吕宴全 <1365976...@qq.com.invalid> 于2022年4月24日周日 14:45写道:

> 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application
> mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:
>
>
> CREATE TABLE T (
> id INT
>  ) WITH (
> 'connector.type' = 'filesystem',
> 'connector.path' = 'file:///tmp/tmp.csv',
> 'format.type' = 'csv',
> 'format.derive-schema' = 'true'
> );
>
>
>
> insert into T values(1);
>
>
>
>
> insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行,
> 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。
> 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per
> job这种模式的实现了。
>
>
> 我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?


flink ????????k8s????????jar??????????

2022-04-25 Thread ????????
flink??kubernetes session  
jar
??!

?????? FlinkSQL ????k8s??????????

2022-04-25 Thread ??????
?? dlink ?? FlinkSQL??
https://github.com/DataLinkDC/dlink


--  --
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-26541

?? <1365976...@qq.com.invalid> ??2022??4??24?? 14:45??

> 
??KyuubiFlinkSQL??k8s(Application
> mode)??(Session 
mode)Application??jar??jobgraph??SQL??
>
>
> CREATE TABLE T (
> id INT
>  ) WITH (
> 'connector.type' = 'filesystem',
> 'connector.path' = 'file:///tmp/tmp.csv',
> 'format.type' = 'csv',
> 'format.derive-schema' = 'true'
> );
>
>
>
> insert into T values(1);
>
>
>
>
> 
insertApplicationSQL,
> 
??(catalog)??SQLSQL
> 
table??Executorjobgraphjobgraph??SQLk8s??per
> job??
>
>
> 


Flink SQL??????Java code????debug

2022-04-25 Thread zhiyezou

Ideadebug??

Re: flink 任务对接k8s的第三方jar包管理问题

2022-04-25 Thread zns
https://blog.csdn.net/ifenggege/article/details/113731793 

这个yarn的供参考

> 2022年4月25日 16:50,天道酬勤 <1262420...@qq.com.INVALID> 写道:
> 
> 我的flink是通过kubernetes session 模式部署 
> ,在提交任务的时候希望可以动态指定第三方jar包来运行自己的任务,目前在官网中未找到可用的配置项。
> 希望大家能给我一些建议!



Re: flink 任务对接k8s的第三方jar包管理问题

2022-04-25 Thread Yang Wang
* 使用flink run命令来提交任务到running的Session集群的话,只能是本地的jar

* 也可以使用rest接口来提交,先上传到JobManager端[1],然后运行上传的jar[2]

* 最后可以尝试一下flink-kubernetes-operator项目,目前Session job是支持远程jar的[3],项目还在不断完善

[1].
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/rest_api/#jars-upload
[2].
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/rest_api/#jars-jarid-run
[3].
https://github.com/apache/flink-kubernetes-operator/blob/main/e2e-tests/data/sessionjob-cr.yaml

Best,
Yang

天道酬勤 <1262420...@qq.com.invalid> 于2022年4月25日周一 16:51写道:

> 我的flink是通过kubernetes session 模式部署
> ,在提交任务的时候希望可以动态指定第三方jar包来运行自己的任务,目前在官网中未找到可用的配置项。
> 希望大家能给我一些建议!


flink ????????k8s????????jar??????????

2022-04-25 Thread ????????
flink??kubernetes session 
??jarjar??flink/libjarjar??flink/lib??,
> ??!

????helm??Flink Kubernetes Operator ????

2022-04-25 Thread ??????
??
[streamx@localhost ~]$ helm install flink-kubernetes-operator 
flink-operator-repo/flink-kubernetes-operator
WARNING: Kubernetes configuration file is group-readable. This is insecure. 
Location: /home/streamx/.kube/config
WARNING: Kubernetes configuration file is world-readable. This is insecure. 
Location: /home/streamx/.kube/config
Error: INSTALLATION FAILED: unable to build kubernetes objects from release 
manifest: [unable to recognize "": no matches for kind "Certificate" in version 
"cert-manager.io/v1", unable to recognize "": no matches for kind "Issuer" in 
version "cert-manager.io/v1"]



??


 

Re: 安装helm的Flink Kubernetes Operator 失败

2022-04-25 Thread Biao Geng
Hi,
报错看着是没有找到cert-manager,你有参考官网的QuickStart
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/
 先运行 kubectl create -f
https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml
吗? 或者不需要开启webhoook的话,也可以helm install flink-kubernetes-operator
flink-operator-repo/flink-kubernetes-operator --set webhook.create=false
跳过安装webhook及其依赖的cert-manager.

Best,
Biao Geng


陈卓宇 <2572805...@qq.com.invalid> 于2022年4月25日周一 19:46写道:

> 报错:
> [streamx@localhost ~]$ helm install flink-kubernetes-operator
> flink-operator-repo/flink-kubernetes-operator
> WARNING: Kubernetes configuration file is group-readable. This is
> insecure. Location: /home/streamx/.kube/config
> WARNING: Kubernetes configuration file is world-readable. This is
> insecure. Location: /home/streamx/.kube/config
> Error: INSTALLATION FAILED: unable to build kubernetes objects from
> release manifest: [unable to recognize "": no matches for kind
> "Certificate" in version "cert-manager.io/v1", unable to recognize "": no
> matches for kind "Issuer" in version "cert-manager.io/v1"]
>
> 求大佬教解决办法
>
> 陈卓宇
>
>
>  


????io????????????

2022-04-25 Thread San
flink1.12 
??iohbase??hbase 
 2000qps   flink??io??capacity??100?? 
 ??

Re: 关于Flink1.15文档,有一些小疑惑求助

2022-04-25 Thread 林影
谢谢回复。
原来是这样,我原来还以为intermediate savepoints
指代的是checkpoint呢,我们这边的Flink平台正在做从ckp的状态恢复所以才有此一问。

Jiangang Liu  于2022年4月22日周五 19:52写道:

> intermediate
>
> savepoints指的是非stop-with-savepoint,也就是不停止作业的情况下做savepoint。我的感觉是,这个时候的savepoint不会commit(比如sink写出到外部系统),如果作业失败会从最近一次的checkpoint恢复。如果恰好有一个作业从savepoint恢复,两个作业同时跑,可能会造成结果的重复或者不一致,这个时候最好丢弃掉sink(换uid)。对于只有一个作业运行的情况,比如停了作业再恢复,是不会有问题的。
>
> 林影  于2022年4月22日周五 17:05写道:
>
> > 在Flink 官网savepoint的页面中,出现下面一段话
> > Starting from Flink 1.15 intermediate savepoints (savepoints other than
> > created with stop-with-savepoint
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint
> > >)
> > are not used for recovery and do not commit any side effects.
> >
> > intermediate savepoints具体指的是什么呢?
> >
>


关于使用SQL Client提交执行目标为yarn-per-job的作业时无日志文件(jobmanager.log)的求助

2022-04-25 Thread ruiyun wan
Flink版本:1.13
问题描述:使用sql-client.sh启动yarn-per-job(execution.target =
yarn-per-job)时,在YARN集群侧生成的launch_container.sh中,启动org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint的参数中无-Dlog.file和-Dlog4j.configuration属性参数,导致没有jobmanager.log日志文件,如何设置才能影响Yarn生成的launch_container.sh包含上述参数。