PyFlink调查问卷

2021-04-20 文章 Dian Fu
Hi all,

【填问卷,抽奖送T恤!】它来了它来了 ~ 为了更好地服务 PyFlink 用户,帮助 PyFlink 用户将 PyFlink 应用到生产环境中,Apache 
Flink 中文社区接下来计划推出一系列 PyFlink 的相关的文档及参考资料,让 PyFlink 用户得到更多优质的 PyFlink 学习资料!

为此我们推出这个调查问卷,了解大家感兴趣的内容,希望大家积极参与这个问卷,帮助我们更好的去整理 PyFlink 相关学习资料~

PS:填完问卷后即可参与抽奖,Flink 定制款 Polo 衫送送送!4月30日中午12:00准时开奖哦 ~ 

https://survey.aliyun.com/apps/zhiliao/s_92SCyLB 


Regards,
Dian

回复:如何将flink sql 查询语句的count值取出

2021-04-20 文章 guoyb
可以看看这个demo


https://github.com/bingoguo93/flink-1.12-sql-demo/blob/main/src/main/java/org/example/mysql/tableQueryMysql.java



---原始邮件---
发件人: "张锴"

如何将flink sql 查询语句的count值取出

2021-04-20 文章 张锴
我使用的flink版本1.12.2。
有个问题请教一下,如何在flink sql 查询语句中将count值取出。
先举个例子:
val total: Int = hiveContext.sql("select count(*) from
a").collect()(0)(0).toString.toInt
可以把count值拿出来,那如果用flink sql去做的话 怎样取出查询后的结果呢。
1、是否flink sql可以这么做?
2、如果可以应该怎么写呢?


Re: flink1.11版本 -C 指令并未上传udf jar包

2021-04-20 文章 范 佳兴
-C,--classpath  Adds a URL to each user code
  classloader  on all nodes in the
  cluster. The paths must specify a
  protocol (e.g. file://) and be
  accessible on all nodes (e.g. by means
  of a NFS share). You can use this
  option multiple times for specifying
  more than one URL. The protocol must
  be supported by the {@link
  java.net.URLClassLoader}.

-C指定依赖jar包需要放在URLClassLoader能够访问到的位置。



在 2021/4/19 下午10:22,“todd” 写入:

执行指令:flink  run   \
-m yarn-cluster \
-C file:////flink-demo-1.0.jar \
x

在Client端能够构建成功jobgraph,但是在yarn上会报UDF类找不到。我看Classpath中并未上传该JAR包。



--
Sent from: http://apache-flink.147419.n8.nabble.com/




flink任务日志告警:This does not compromise Flink's checkpoint integrity.

2021-04-20 文章 guoxb__...@sina.com
Hi all,
 
我这变flink任务日志发现,一直在报一个 告警,告警内容是:
   
 ```
  2021-04-21 09:13:07,218 WARN 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] - 
Committing offsets to Kafka takes longer than the checkpoint interval. 
Skipping commit of 
previous offsets because newer complete checkpoint offsets are available. This 
does not compromise Flink's checkpoint integrity.
```
导致作业消费kafka遇到问题,请问下,大家有谁遇到过这个问题么,如何排查,烦请给些建议,万分感谢

所使用的flink版本:1.11
程序中设置的checkpoint interval=30s



guoxb__...@sina.com


flink任务日志告警:This does not compromise Flink's checkpoint integrity.

2021-04-20 文章 guoxb__...@sina.com
Hi all,
 
我这变flink任务日志发现,一直在报一个 告警,告警内容是:
   
 ```
  2021-04-21 09:13:07,218 WARN 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher [] - 
Committing offsets to Kafka takes longer than the checkpoint interval. 
Skipping commit of 
previous offsets because newer complete checkpoint offsets are available. This 
does not compromise Flink's checkpoint integrity.
```
导致作业消费kafka遇到问题,请问下,大家有谁遇到过这个问题么,如何排查,烦请给些建议,万分感谢


guoxb__...@sina.com


Application application_1618931441017_0004

2021-04-20 文章 tanggen...@163.com
您好,我在向yarn 集群提交flink任务时遇到了一些问题,希望能帮忙回答一下
我布署了一个三个节点hadoop集群,两个工作节点为4c24G,yarn-site中配置了8个vcore,可用内存为20G,总共是16vcore 
40G的资源,现在我向yarn提交了两个任务,分别分配了3vcore,6G内存,共消耗6vcore,12G内存,从hadoop的web 
ui上也能反映这一点,如下图:
但是当我提交第三个任务时,却无法提交成功,没有明显的报错日志,可是整个集群的资源明显是充足的,所以不知道问题是出现在哪里,还请多多指教
附1(控制台输出):
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Could not deploy Yarn job cluster.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not deploy Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1818)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
at 
com.hfhj.akb.report.order.PlatformOrderStream.main(PlatformOrderStream.java:81)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1618931441017_0004 failed 2 
times in previous 1 milliseconds (global limit =4; local limit is =2) due 
to AM Container for appattempt_1618931441017_0004_03 exited with  exitCode: 
1
Failing this attempt.Diagnostics: [2021-04-20 23:34:16.067]Exception from 
container-launch.
Container id: container_1618931441017_0004_03_01
Exit code: 1

[2021-04-20 23:34:16.069]Container exited with a non-zero exit code 1. Error 
file: prelaunch.err.
Last 4096 bytes of prelaunch.err :

[2021-04-20 23:34:16.069]Container exited with a non-zero exit code 1. Error 
file: prelaunch.err.
Last 4096 bytes of prelaunch.err :

For more detailed output, check the application tracking page: 
http://master107:8088/cluster/app/application_1618931441017_0004 Then click on 
links to logs of each attempt.
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further 
investigate the issue:
yarn logs -applicationId application_1618931441017_0004
at 
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1021)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:524)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424)
... 22 more
附2(hadoop日志):
2021-04-20 23:31:40,293 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Stopping resource-monitoring for container_1618931441017_0004_01_01
2021-04-20 23:31:40,293 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got 
event CONTAINER_STOP for appId application_1618931441017_0004
2021-04-20 23:31:41,297 INFO 
org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Removed 
completed containers from NM context: [container_1618931441017_0004_01_01]
2021-04-20 23:34:12,558 INFO SecurityLogger.org.apache.hadoop.ipc.Server: Auth 
successful for appattempt_1618931441017_0004_03 (auth:SIMPLE)
2021-04-20 23:34:12,565 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 

如何将flink sql 查询语句的count值取出

2021-04-20 文章 张锴
flink版本1.12.2。
有个问题请教一下,如何在flink sql 查询语句中将count值取出
例如:tableEnv.sqlQuery("select count(*) from a")
将这个count值取出并返回。


flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-20 文章 李一飞
flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置
最好分流、批场景回答一下,谢谢!

flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

2021-04-20 文章 casel.chen
目标是用flink作业实现类似canal server的功能


CREATE TABLE `binlog_table` (

`id` INT,

`name` STRING,

`sys_id` STRING,

`sequence` INT,

`filter` STRING,

`tag` STRING,

`remark` STRING,

`create_date` TIMESTAMP,

`update_date` TIMESTAMP,

`reserve` STRING,

`sys_name` STRING,

`metric_seq` INT,

`advanced_function` STRING,

`value_type` STRING,

`value_field` STRING,

`status` INT,

`syn_date` TIMESTAMP,

`confirmer` STRING,

`confirm_time` TIMESTAMP,

`index_explain` STRING,

`field_name` STRING,

`tag_values` STRING,

PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

  'connector' = 'mysql-cdc',

  'hostname' = '${mysql.hostname}',

  'port' = '3306',

  'username' = '${mysql.username}',

  'password' = '${mysql.password}',

  'database-name' = '${mysql.database}',

  'table-name' = '${mysql.table}'

  );




CREATE TABLE `kafka_sink` (

  `id` INT,

  `name` STRING,

  `sys_id` STRING,

  `sequence` INT,

  `filter` STRING,

  `tag` STRING,

  `remark` STRING,

  `create_date` TIMESTAMP,

  `update_date` TIMESTAMP,

  `reserve` STRING,

  `sys_name` STRING,

  `metric_seq` INT,

  `advanced_function` STRING,

  `value_type` STRING,

  `value_field` STRING,

  `status` INT,

  `syn_date` TIMESTAMP,

  `confirmer` STRING,

  `confirm_time` TIMESTAMP,

  `index_explain` STRING,

  `field_name` STRING,

  `tag_values` STRING,

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

  'connector' = 'kafka',

  'topic' = '${topic}',

  'properties.bootstrap.servers' = '${bootstrap.servers}',

  'format' = 'canal-json'

  );




INSERT INTO `kafka_sink`

(SELECT *

 FROM `binlog_table`);

出来的结果是这样:


{
"data": [
{
"id": 3,
"name": "自动付款接口BuyETC金额",
"sys_id": "0184",
"sequence": 2,
"filter": "(a=1)",
"tag": "MerId(商户号)",
"remark": "O",
"create_date": "2020-11-02 15:01:31",
"update_date": "2021-04-07 09:23:59",
"reserve": "",
"sys_name": "NHL",
"metric_seq": 0,
"advanced_function": "",
"value_type": "sum",
"value_field": "value",
"status": 1,
"syn_date": "2021-01-28 19:31:36",
"confirmer": null,
"confirm_time": null,
"index_explain": "aa",
"field_name": null,
"tag_values": null
}
],
"type": "INSERT"
}
并不是标准的canal json格式。改用upsert-kafka connector试了也不行



CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, 
`sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, `create_date` 
TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, `sys_name` STRING, 
`metric_seq` INT, `advanced_function` STRING, `value_type` STRING, 
`value_field` STRING, `status` INT, `syn_date` TIMESTAMP, `confirmer` STRING, 
`confirm_time` TIMESTAMP, `index_explain` STRING, `field_name` STRING, 
`tag_values` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 
'upsert-kafka', 'topic' = '${topic}', 'properties.bootstrap.servers' = 
'${bootstrap.servers}', 'key.format' = 'json',
'value.format' = 'json' );


出来的数据长这样



{"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06
 12:27:30","update_date":"2021-04-06 
12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07
 
16:47:59","confirmer":null,"confirm_time":null,"index_explain":"发展商户商户进件页面点击提交按钮00010017","field_name":null,"tag_values":null}




Re: flink-sql-connector-elasticsearch6_2.11_1.10.0 与 flink-connector-elasticsearch6_2.11_1.10.0 并存问题

2021-04-20 文章 Leonard Xu
Hi
如果只是sql作业,使用flink-sql-connector-elasticsearch6_2.11_1.10.0 
就可以了,如果纯datastream作业使用flink-connector-elasticsearch6_2.11_1.10.0 就可以了
如果两个包都要使用,有两个思路
1. 你自己自己打个包,把上面两个包的依赖放在一起。
2. 和1类似,shade掉flink-connector-elasticsearch6_2.11_1.10.0 
我没实际打过,你可以动手试下。

祝好


> 在 2021年4月20日,14:13,william <712677...@qq.com> 写道:
> 
> 你好,我也遇到了同样的问题,请问你们是怎么解决的,谢谢!
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: flink1.11版本 -C 指令并未上传udf jar包

2021-04-20 文章 Qishang
Hi todd.
-C 不会上传对应路径下的 jar,最终会被添加到集群的 classpath 中,需要运行的机器对应的路径下要有同样的Jar包才可以。
可以放在私服或者oss的服务,通过 http 的方式加载的 udf jar
-C "http://host:port/xxx.jar;

希望能帮到你。


todd  于2021年4月19日周一 下午10:22写道:

> 执行指令:flink  run   \
> -m yarn-cluster \
> -C file:////flink-demo-1.0.jar \
> x
>
> 在Client端能够构建成功jobgraph,但是在yarn上会报UDF类找不到。我看Classpath中并未上传该JAR包。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink-kafka-connector Consumer配置警告

2021-04-20 文章 范 佳兴
flink.partition-discovery.interval-millis这个配置在Flink中是生效的,flink kafka connectors 
会根据配置的时间去获取kafka topic的分区信息,代码实现见: FlinkKafkaConsumerBase 
中的createAndStartDiscoveryLoop方法。

19:38:37,557 WARN  org.apache.kafka.clients.consumer.ConsumerConfig
[] - The configuration 'flink.partition-discovery.interval-millis' was
supplied but isn't a known config.

这个WARN是kafka报出来的,意思是说kafka收到了提供这个参数,但是kafka并不认识。
这个参数并不是给kafka用的,只不过在获取kafka分区的时候需要创建一个KafkaConsumer实例,把设置的参数也一并传给了Kafka。
对应的Warn位置为KafkaConsumer构造函数里面调用的config.logUnused()方法。


在 2021/4/18 下午7:45,“lp”<973182...@qq.com> 写入:

flink1.12正常程序中,有如下告警:

19:38:37,557 WARN  org.apache.kafka.clients.consumer.ConsumerConfig 
   
[] - The configuration 'flink.partition-discovery.interval-millis' was
supplied but isn't a known config.

我有一行如下配置:

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,10);



根据官网https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#topic-discovery介绍:
By default, partition discovery is disabled. To enable it, set a
non-negative value for flink.partition-discovery.interval-millis in the
provided properties config, representing the discovery interval in
milliseconds.


上述配置应该是合法的,但是为何会报如此警告呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/




Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

2021-04-20 文章 Rui Li
用partition-time的话是用watermark与分区字段的timestamp对比来触发提交的,因此还需要你的source有watermark。

On Fri, Apr 16, 2021 at 9:32 AM HunterXHunter <1356469...@qq.com> wrote:

> 但是用process-time是有数据的,目前用partition-time一直没成功写出过数据
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


Re: flink1.12.1 Sink数据到ES7,遇到 Invalid lambda deserialization 问题

2021-04-20 文章 william
您好,我是同时使用flink-sql-connector-elasticsearch7,和 flink-connector-elasticsearch7
就会报Invalid lambda deserialization 问题,请问您有解决办法吗?谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-sql-connector-elasticsearch6_2.11_1.10.0 与 flink-connector-elasticsearch6_2.11_1.10.0 并存问题

2021-04-20 文章 william
你好,我也遇到了同样的问题,请问你们是怎么解决的,谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

(无主题)

2021-04-20 文章 小屁孩
退订

退订

2021-04-20 文章 hongkong_tang
退订