Re:Re:flink的sql gateway支持自定义的UDF吗?

2023-11-19 文章 RS
est! >Xuyang > > > > > >在 2023-11-01 14:21:04,"RS" 写道: >>Hi >>flink的sql gateway支持自定义的UDF吗?包括java和python的,有示例可以参考下吗?

Re:Re:flink的sql gateway支持自定义的UDF吗?

2023-11-19 文章 RS
elease-1.17/docs/dev/table/sql/jar/ > >-- > >Best! >Xuyang > > > > > >在 2023-11-01 14:21:04,"RS" 写道: >>Hi >>flink的sql gateway支持自定义的UDF吗?包括java和python的,有示例可以参考下吗?

flink的sql gateway支持自定义的UDF吗?

2023-11-01 文章 RS
Hi flink的sql gateway支持自定义的UDF吗?包括java和python的,有示例可以参考下吗?

Re:flink 1.18.0的sql gateway支持提交作业到k8s上运行吗?

2023-11-01 文章 RS
Hi, 提交到本地是flink配置文件里面配置的jobmanager的地址,所以肯定也是提交到K8S的吧 yarn的不太清楚。 在 2023-10-30 14:36:23,"casel.chen" 写道: >想问一下目前flink 1.18.0的sql gateway只支持提交作业到本地运行吗?能否支持提交作业到yarn或k8s上运行呢?

Re:[急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-18 文章 RS
Hi, connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into Thanks 在 2023-02-17 15:56:51,"casel.chen" 写道: >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner >join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。

Re:Re:Re:flink sql connector options如何支持Map数据类型?

2022-12-27 文章 RS
eaders吗?例如 Content-Type 之类的,对应的flink sql kafka connector中的 >>> properties.group.id 和 properties.bootstrap.servers > >在 2022-12-26 11:12:57,"RS" 写道: >>Hi, >> >> >>> 遇到用户添加自定义请求头Headers问题 >>如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表

Re:如何查看Flink on Native K8s模式下失败作业的日志

2022-12-25 文章 RS
Hi, 我的也是on K8S,是session模式的,得看你的模式是什么 我的流作业失败了,配置了checkpoint,会自动重试,jm和tm都还在,可以直接看到作业异常信息。 Thanks 在 2022-12-22 23:18:27,"hjw" 写道: >Flink On Native K8s >模式下,如果流作业因异常失败了,作业的JobManager和TaskManager所在Pod都会消失,就无法查看作业日志。 >请问在K8s模式下,在查看日志方面有没有相关解决方案。

Re:Re: 提交任务不能指定第三方jar

2022-12-25 文章 RS
Hi, 试试 -C,--classpath,我都是用这个提交UDF的 另外邮件列表发图片是看不到了,别再发截图了 Thanks, 在 2022-12-13 18:13:47,"melin li" 写道: 类似: spark-submit 支持--jars,更灵活方便, melin li 于2022年12月8日周四 11:09写道: 如果是作业依赖的jar,是可以打一个flat jar。有两种场景: 1、sql作业中,用户依赖某个connector jar,但平台不提供这个connector,需要用户上传, 2、自定义udf

Re:flink sql connector options如何支持Map数据类型?

2022-12-25 文章 RS
Hi, > 遇到用户添加自定义请求头Headers问题 如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表的字段来描述 如果自定义Headers是定义table的时候确定的,那就是定义connector options的问题了 > 如何在connector options中支持Map数据类型呢? options里面都是k=v的结构,v都是字符串,所以你要考虑的是如何用 字符串 描述 Map Thanks 在 2022-12-17 10:20:29,"casel.chen" 写道: >我想开发一个Flink SQL

Re:flink如何调用py模型

2022-12-25 文章 RS
Hi, 我考虑过的几种方式: 1. 做成http服务,和flink分离,py模型本来跑起来也不快,做成http可以动态扩容,无状态的话 2. 用pyflink来跑任务,可以嵌python代码,就是任务启动非常慢,要复制虚拟环境,模型可以写成pandas的输入输出,这样模型也是可以独立开发的 3. Java调python的udf,py必须要能封装成函数,写udf毕竟麻烦 Thanks 在 2022-12-19 16:51:33,"kcz" <573693...@qq.com.INVALID> 写道: >flink调用py模型时候,大是采取什么方案,直接跟flink集成嘛?

Re:Re:Re: Flink启动python任务异常:undefined symbol: _Py_LegacyLocaleDetected

2022-11-22 文章 RS
0x7ff010275740 (most recent call first): Thanks 在 2022-11-23 09:25:04,"RS" 写道: >Hi, >我这边使用的python命令构建的,没有用conda,这个应该没有影响吧 >python3 -m venv jxy_venv > > >我启动了一个单点的flink测试,本机启动,有python环境,测试是可以运行成功的 > > > >Thanks > > > > > >在 2022-11

Re:flink作业提交运行后如何监听作业状态发生变化?

2022-11-22 文章 RS
Hi, Flink的Metric了解下,里面应该有作业的状态 https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#reporter 配置不同的Metric方式,有的是拉取,有的是推送的机制, https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/ Thanks 在 2022-11-23 08:32:11,"casel.chen" 写道:

Re:Re: Flink启动python任务异常:undefined symbol: _Py_LegacyLocaleDetected

2022-11-22 文章 RS
Hi, 我这边使用的python命令构建的,没有用conda,这个应该没有影响吧 python3 -m venv jxy_venv 我启动了一个单点的flink测试,本机启动,有python环境,测试是可以运行成功的 Thanks 在 2022-11-22 15:39:48,"Xingbo Huang" 写道: >Hi RS, > >你是使用conda构建的venv吗,可以参考PyFlink 准备环境的文档[1] > >Best, >Xingbo > >[1] >htt

Flink启动python任务异常:undefined symbol: _Py_LegacyLocaleDetected

2022-11-21 文章 RS
Hi, Flink版本:1.15.1 A机器:在A机器上创建python虚拟环境,版本3.6.8,安装flink等python包,然后打包ZIP,jxy_venv.zip,上传到HDFS上 B机器:在B机器上,主机上没有Python环境,Flink运行在K8S的docker中,docker里面也没有python环境, C机器:在C机器上,有flink的client,存在python环境,负责启动任务 启动命令: ./bin/flink run -Dexecution.runtime-mode=BATCH -d -m 192.168.1.2:8081 -n -py

Re:flinksql join

2022-11-14 文章 RS
Hi, 我的理解是后插入的维表数据,关联不到是正常现象, 如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据, Thanks 在 2022-11-11 11:10:03,"Jason_H" 写道: > > >hi,大家好 >我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: >kakfa输入: >账号

Re:Re: OutOfMemoryError: Direct buffer memory

2022-10-10 文章 RS
> allanqinjy >> | >> | >> allanqi...@163.com >> | >> 签名由网易邮箱大师定制 >> >> >> On 10/8/2022 21:00,RS wrote: >> Hi, >> >> >> 版本:Flink-1.15.1 >> >> >> 有个场景,从hdfs读文件再处理数据,batch mode,10个并发,使用Flink >> SQL定义执行,source是

Re:Re: Re: table store 和connector-kafka包冲突吗?

2022-10-10 文章 RS
.2.0.pom >把flink-connector-kafka-1.15.1.jar 去掉再试试? > > >RS 于2022年10月8日周六 17:19写道: > >> Hi, >> 报错如下: >> >> >> [ERROR] Could not execute SQL statement. Reason: >> org.apache.flink.table.api.ValidationExc

OutOfMemoryError: Direct buffer memory

2022-10-08 文章 RS
Hi, 版本:Flink-1.15.1 有个场景,从hdfs读文件再处理数据,batch mode,10个并发,使用Flink SQL定义执行,source是connector=filesystem,format=raw,path= 执行任务的时候,有时候能成功,有时候失败了然后就一直失败,重启集群好像可以解决问题,这种情况如何是什么原因导致的? 集群的off-heap都是默认配置, taskmanager.memory.task.off-heap.size=0 taskmanager.memory.framework.off-heap.size=128MB

Re:Re: table store 和connector-kafka包冲突吗?

2022-10-08 文章 RS
are: org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory org.apache.flink.table.store.kafka.KafkaLogStoreFactory org.apache.flink.table.store.shaded.streaming.connectors.kafka.table.KafkaDynamicTableFactory Thanks 在 2022-10-08 13:38:20,"Shammon FY" 写道: >Hi RS >你

table store 和connector-kafka包冲突吗?

2022-10-07 文章 RS
Hi, 版本:flink-1.15.1 使用table store,需要在lib下放置flink-table-store-dist-0.2.0.jar,之前集群的lib下有一个flink-connector-kafka-1.15.1.jar,使用sql-client,定义kafka源表的时候,发现connector冲突了 是不是lib下有了flink-table-store-dist-0.2.0.jar,就不能有flink-connector-kafka-1.15.1.jar? Thanks

Re:如何处理Flink KafkaSource的异常的数据

2022-10-05 文章 RS
Hi, 当前的SQL是不支持的,需要的话,可以自己实现一个connector或者UDF,把错误数据输出到其他地方 Thanks 在 2022-09-29 10:02:34,"Summer" 写道: > >你好,我想问一下,如果来源于Kakfka的一条数据出现错误,会导致任务执行失败,日志抛出该条错误数据。 > > >为保证任务执行,需要在*** WITH内加'value.json.ignore-parse-errors' = 'true', >'value.json.fail-on-missing-field' = 'false' > > > >

Re:如何实现flink作业失败实时通知告警?

2022-09-30 文章 RS
Hi, 个人推荐方式二, 1. 部分场景下,有些异常可以自动恢复,任务异常会自动重启,继续运行 2. 告警通知到介入处理,如果是人来介入处理的话,20s通常时间不是问题,到分钟级都可以 3. failure之前调用某个hook去通知相关方,应该是要修改jobmanager的代码,具体就要请教大佬们了。 在 2022-09-30 13:50:56,"casel.chen" 写道: >当flink作业失败时如何第一时间发通知告警到相关方?现有方式 >方式一:flink作业本身提供的rest

Re:Hive提交分区异常,Caused by: java.io.FileNotFoundException: /tmp/... (No such file or directory)

2022-09-30 文章 RS
是后面新启动的任务全部都还是在这个路径下查找hive配置,导致异常 如果重启集群的话,同样的任务提交,不会报错,看起来是个概率事件,所以这个问题可能是什么原因导致的呢? Thanks 在 2022-07-21 14:52:51,"RS" 写道: >Hi, > > >环境: >flink-1.15.1 on K8S session集群 >hive3 >flink写hive任务,配置了定时提交分区 > > >现象: >1. checkpoint是30s一次 >2. HDFS上有

Re:Re:Flink1.15.1读取Tidb6,sql-client执行select count异常 java.sql.SQLSyntaxErrorException

2022-09-01 文章 RS
r嘛?你select的表名是`origin_object_data_61`? >在 2022-09-01 20:18:05,"RS" 写道: >>Hi, >>环境: >>flink-1.15.1 >>TiDB-v6.1.0 >> >> >>现象: >>Flink SQL> select count(*) from t1; >> >>[ERROR] Could not execute SQL statement. Reason: &

Flink1.15.1读取Tidb6,sql-client执行select count异常 java.sql.SQLSyntaxErrorException

2022-09-01 文章 RS
Hi, 环境: flink-1.15.1 TiDB-v6.1.0 现象: Flink SQL> select count(*) from t1; [ERROR] Could not execute SQL statement. Reason: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column

Re:Client启动有异常: SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.

2022-08-12 文章 RS
Hi, 1. lib下只需要一个flink-sql-connector-hive的包,不再需要flink-connector-hive和hive-exec 2. flink-sql-connector-hive包版本不对,需要和flink的1.13.3版本一致 先试试看,不行的话,就是代码的问题了。 Thanks At 2022-08-12 10:31:35, "Summer" wrote: > > >版本:1.13.3 > > >lib目录下: >-rw-r--r-- 1 root root 7759243 Aug 12 10:12

Flink SQL 如何描述 ES(ElasticSearch)的nested字段类型?

2022-07-30 文章 RS
Hi, flink sql如何写es的nested数组数据? 原始示例数据: { "id": "123", "field1":[ { "k1":1 }, { "k1":1, "k2":2 }, { "k3":"3" } ] } filed1是一个数组,里面的原始是字典,字典内的字段名是动态的,我不知道里面有多少个key

Re:咨询 Flink 在 OLAP、即席查询场景下的应用问题

2022-07-14 文章 RS
Hi, 打算通过Flink查询HDFS中的数据,对查询实效性要求高,查询平均时延要求在秒级。 = 这种高实时性的要求,是不适合Presto或者Flink引擎的。 如果是数据量不大,查询逻辑不复杂,实时性要求高,建议数据同步到数据库中,使用数据库引擎来查询; 如果是数据量大,查询逻辑复杂,实时性要求不高,Flink或者Presto是可以的; 如果是数据量大,查询逻辑复杂,实时性要求高,那什么都拯救不了你 Thanks 在 2022-07-14 11:54:00,"barbzhang(张博)" 写道:

IF IS NULL 函数运行报错:NullPointerException

2022-07-13 文章 RS
Hi, 版本:Flink-1.15.1 模式:单机 s1为kafka的source表,已定义, g_name为s1表的字段,string类型 select `g_name` from s1; 这个运行正常,有数据输出 然后加上 IF 语句,则会报错: Flink SQL> select if(`g_name` is null, 'no', `g_name`) from s1; [ERROR] Could not execute SQL statement. Reason:

Re:请教:关于如何释放 Flink Job 中某个对象持有的资源

2022-07-12 文章 RS
Hi, 如果是访问ES的话,Flink里面自带ES的connector,你可以直接使用,或者参考源码,source和sink接口都有对应的方法 资源是否在一个线程里面,这个取决与你代码逻辑,如果在不同的线程或者进程的话,设计上,就不要用同一个EsClientHolder,各个不同阶段各自去new和close对象, Thanks 在 2022-07-12 12:35:31,"Bruce Zu" 写道: > Flink team好, > 我有一个很一般的问题,关于如何释放 Flink Job 中某个对象持有的资源。 > > 我是 Flink

Re:Re: filesystem java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.ParserFactory

2022-07-11 文章 RS
" 写道: >Hi, > >有更详细的日志吗?看起来是类加载冲突的,需要明确下是哪个类冲突了 > >Best, >Weihua > > >On Wed, Jul 6, 2022 at 1:53 PM RS wrote: > >> Hi, >> >> >> 通过sql-client执行flink sql,connector选择filesystem,会出现如下报错 >> java.lang.ClassNotFoundException: >> org.ap

Re:Re:Re: filesystem java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.ParserFactory

2022-07-11 文章 RS
le-planner_2.12-1.15.1.jar >冲突了 去掉一个就可以了 > > > > > > > >在 2022-07-11 19:45:04,"Weihua Hu" 写道: >>Hi, >> >>有更详细的日志吗?看起来是类加载冲突的,需要明确下是哪个类冲突了 >> >>Best, >>Weihua >> >> >>On We

Re:flink sql解析kafka数据

2022-07-06 文章 RS
Hi, 你可以在定义表ccc_test_20220630_2字段的时候,结构如果固定,可以指定字段类型为ARRAY+ROW吧,例如 abc ARRAY>,如果里面是动态结构,可以定义为MAP 结构如果比较复杂,或者字段不明确,就自定义UDF解决。 Thanks 在 2022-06-30 15:02:55,"小昌同学" 写道: 各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink sql建表语句拿到最里面的字段的值 我百度找到了

Re:请教下flink的提交方式

2022-07-06 文章 RS
Hi, 通过命令行的方式提交,可以捕获flink run的标准输出,里面包含job id,然后正则匹配或者字符串截取就可以提取到job id了 Thanks 在 2022-07-04 17:50:07,"sherlock zw" 写道: >目前我需要去监控已经提交的flink任务, >但是通过命令行方式提交的话拿不到任务id,只能通过INFO级别的日志过滤出来,但是我们的环境里面的日志界别是WARN,看不到任务id的日志输出,所以想问下除了命令行的方式提交任务还有其他方式吗,例如有和Spark类似的SparkLaunch一样的jar提交的方式吗?希望大佬指点下,谢谢。

filesystem java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.ParserFactory

2022-07-05 文章 RS
Hi, 通过sql-client执行flink sql,connector选择filesystem,会出现如下报错 java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.ParserFactory Flink SQL> CREATE TABLE t1 ( > a STRING, > b INT > )WITH( > 'connector'='filesystem', > 'path'='/tmp/qwe', > 'format'='csv', >

Flink-1.15.0 消费kafka提交offset失败?

2022-06-26 文章 RS
Hi, 请教下各位,Flink-1.15.0,消费Kafka发现下面个问题,offset提交失败的情况,有的任务应该是一直提交失败的,数据消费了,但是offset不变,这种情况如何处理? 现象如下: 1. 任务没有异常, 2. 数据能正常消费处理,不影响数据使用 3. 任务有配置checkpoint,几分钟一次,理论上执行checkpoint的时候会提交offset 4. 部分任务的从Kafka的offset提交失败,部分正常 WARN日志如下: 2022-06-27 01:07:42,725 INFO

Re:Re: sql-client pyexec参数生效疑问

2022-06-08 文章 RS
e-1.14/docs/dev/python/dependency_management/#python-interpreter-of-client > >可以把这个参数-pyclientexec 也加上试试。 > >On Tue, Jun 7, 2022 at 11:24 AM RS wrote: > >> Hi, >> >> >> 环境: >> - flink-1.14.3, 单机集群 >> - 服务器上默认python2,也存在python3.6.8 >> - /xxx/bin/python3

sql-client pyexec参数生效疑问

2022-06-06 文章 RS
Hi, 环境: - flink-1.14.3, 单机集群 - 服务器上默认python2,也存在python3.6.8 - /xxx/bin/python3是python3生成的虚拟环境 使用sql-client测试pyflink的udf,自定义了一个函数f1,/xxx/p.py 启动命令: ./bin/sql-client.sh -pyfs file:///xxx/p.py -pyexec /xxx/bin/python3 配置pyexec指定了使用的python为python3 执行命令报错,报错信息如下: Flink SQL> create temporary

Re:kafka数据落地到Hive/Filesystem(orc/parquet格式)的疑问

2022-06-06 文章 RS
Hi, 每次checkpoint都会生成文件的,在一个checkpoint期间内,可以配置文件大小合并等, 不同checkpoint的生成的文件无法合并,所以文件数量最少是和checkpoint时间间隔相关的, 如果checkpoint时间间隔比较短,就需要自己去合并小文件了 在 2022-06-06 16:44:51,"谭家良" 写道: >大家好,关于kafka数据消费到hive/filesystem(orc/parquet)我有个疑问。orc/parquet如何调整落地的文件大小?是根据checkpoint时间来的吗?在落地到hive/filesystem >

Re:Re: pyflink报错:Java gateway process exited before sending its port number

2022-05-23 文章 RS
es/pyflink/lib) 的那些 jar 包的版本。 > > > >On Mon, May 23, 2022 at 4:22 PM RS wrote: > >> Hi, >> 在Pycharm中,测试Pyflink示例代码,启动运行报错,代码为官方文档中的代码 >> 参考官方文档: >> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table_api_tutorial/ &g

pyflink报错:Java gateway process exited before sending its port number

2022-05-23 文章 RS
Hi, 在Pycharm中,测试Pyflink示例代码,启动运行报错,代码为官方文档中的代码 参考官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table_api_tutorial/ 报错如下: Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError:

Re:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

2022-05-16 文章 RS
Hi, cancel的时候要加savepoint,然后启动的时候指定savepoint应该就不会丢数据了,直接cancel的话是可能丢数据的, checkpoint的作用和你想到可能不一样,你再看看 Thx 在 2022-05-12 10:38:33,"徐战辉" 写道: hello, 请教下,如何设置flink配置及作业参数,在取消作业重新部署、flink作业失败重跑情况下,保证不丢失数据。 目前有一份作业,开启checkpoint, cancel 后重新启动,发现数据会丢失1小部分。 1. flink.conf

Re:flink-sql,对于kafka 的一些额外参数如何配置

2022-03-22 文章 RS
Hi, partition.discovery.interval.ms 这个是Flink connector kafka里面加上的,KafkaSourceOptions里面定义的, 看下你的kafka-client的版本,官方的是 2.4.1,如果版本一样,那只能先忽略了。 在 2022-03-22 17:10:52,"Michael Ran" 写道: >dear all : > 目前用flink1.4 table api +kafka 的情况下,有各种警告,比如: > The configuration

Re:Re: Re: 维度表变化,关联结果全部更新在Flink SQL该如何实现

2022-03-21 文章 RS
Hi, 你这个例子中,捕获到B的变更CDC,若最终结果表支持部分字段更新,就直接更新结果表就行,都不需要关联, 只要你的B的CDC处理 晚于 A流的join处理就行 如果一定要全部关联的话,ttl又不可行,那你这个数据量会无限增大,后面就无法关联了的,设计肯定得改 在 2022-03-22 09:01:30,"JianWen Huang" 写道: >是的。其实我想到的也是将维度表和事实表都通过Cdc方式做成流,然后regular

Re:Table Api Connectors kafka bounded

2022-03-21 文章 RS
Hi, 参考官方文档的话,目前应该是不支持的,kafka的connector, source只支持Unbounded Scan, 不支持Bounded Scan NameVersionSourceSink Apache Kafka0.10+Unbounded ScanStreaming Sink, Batch Sink https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/#supported-connectors

Re:回复:hive 进行 overwrite 合并数据后文件变大?

2022-02-24 文章 RS
感谢,确定了下是压缩格式的问题, 原hive文件的压缩是SNAPPY压缩,使用Flink SQL合并小文件之后,默认不压缩,导致文件变大了。 Flink默认没有继承原文件的压缩算法。。。 在 2022-02-22 12:08:39,"‪junjie.m...@goupwith.com‬" 写道: 检查下数据格式和压缩格式是否和之前的不一致 原始邮件 ---- 发件人: RS 日期: 2022年2月22日周二 09:35 收件人: user-zh@flink.apache.org 主 题: hive 进行 overwrite

hive 进行 overwrite 合并数据后文件变大?

2022-02-21 文章 RS
Hi, flink写hive任务,checkpoint周期配置的比较短,生成了很多小文件,一天一个目录, 然后我调用flink sql合并之前的数据,跑完之后,发现存储变大了,请教下这个是什么原因导致的? 合并之前是很多小part文件,overwrite之后文件减少了,但是存储变大了,从274MB变大成2.9GB了? hive表table1的分区字段是`date` insert overwrite aw_topic_compact select * from `table1` where `date`='2022-02-21'; 合并前: 514.0 M 1.5 G

Re:flink 不触发checkpoint

2022-02-18 文章 RS
1. 图片挂了,看不到,尽量用文字,或者用图床等工具 2. 启动任务有配置checkpoint吗? 在 2022-02-17 11:40:04,"董少杰" 写道: flink读取csv文件建表,同时消费kafka数据建表,两张表join之后写入hdfs(hudi),读取csv数据的任务已经是finished状态,就会触发不了checkpoint,看有什么办法能让它正常触发checkpoint? flink版本1.12.2。 谢谢! | | 董少杰 | | eric21...@163.com |

Re:Re: flink sql 如何提高下游并发度?

2022-01-11 文章 RS
Hi, 请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10? 如果source是10的话,那还有7个线程就空闲了? 在 2022-01-11 11:10:41,"Caizhi Weng" 写道: >Hi! > >可以设置 parallelism.default 为需要的并发数。 > >Jeff 于2022年1月9日周日 19:44写道: > >> 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?

Re:Re: 咨询个Flink SQL的问题,如何去除null的字段

2022-01-06 文章 RS
Hi, 感谢回复,我也测试过这类方法, 我给json format加了个参数,在序列化的时候,row里面去除null,但是这个要修改代码,单独更新flink-json的jar包了,后期维护可能会有问题 这种很适合写ES和写文件,不会有冗余的字段 如果社区能新增这个功能或者合并进去就方便了 在 2022-01-06 21:18:37,"Benchao Li" 写道: >我们内部是给json format加了一个功能,允许不把null字段进行序列化。主要解决的也是es这个写入的场景。 >你们也可以试一下。 > >RS 于2021年12月

Re:Re: Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-30 文章 RS
>我不太熟悉 es,如果某一个字段不写的话,是会写入一个默认值吗?如果是的话,可以使用 coalesce 函数。coalesce(a, b, c, >...) 会返回第一个非 null 的值,因此只要把默认值放在最后一个,如果前面都是 null 就会写默认值。 > >RS 于2021年12月30日周四 17:06写道: > >> 有10~20个字段,这样一个个写,手都敲断了,还有其他的方式吗?或者如何开发代码适配到SQL? >> >> >> >> >> >> 在 2021-12

Re:Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-30 文章 RS
有10~20个字段,这样一个个写,手都敲断了,还有其他的方式吗?或者如何开发代码适配到SQL? 在 2021-12-30 11:36:21,"Xuyang" 写道: >可以使用case when试一下 >在 2021-12-29 16:40:39,"RS" 写道: >>Hi, >>使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢? >> >> >>比如:源数据有3个字段,a,b,c >>inser

咨询个Flink SQL的问题,如何去除null的字段

2021-12-29 文章 RS
Hi, 使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢? 比如:源数据有3个字段,a,b,c insert into table2 select a,b,c from table1 当b=null的时候,只希望写入a和c 当c=null的时候,只希望写入a和b

Re:请教flink sql作业链路延迟监控如何实现

2021-12-22 文章 RS
我是直接监控kafka的lag,如果lag数值较大或持续上升,肯定就有延迟了。收到告警后,再查看下plan,有个busy指标,红色的节点就是有问题的 在 2021-12-23 08:36:33,"casel.chen" 写道: >想问一下flink sql作业链路延迟监控如何实现? >我们的flink >sql作业基本上都是上游接kafka,下游sink到es/hbase/kafka/mongodb/redis/clickhouse/doris这些存储

Re:Re: Re:Re: batch模式下任务plan的状态一直为CREATED

2021-12-22 文章 RS
只有一条SQL,只是数据量比较大,使用的BATCH模式。 SELECT price FROM hive.data.data1 ORDER BY price DESC 在 2021-12-22 18:35:13,"刘建刚" 写道: >你的SQL是怎么写的?两个独立的SQL吗?Flink中有个参数table.dml-sync >,决定是否多条SQL语句顺序执行,默认是false,也就是多条语句是同时执行的。 > >RS 于2021年12月22日周三 09:25写道: > >> 跑了10几个小时终于跑完了,测试

Re:Re:Re: batch模式下任务plan的状态一直为CREATED

2021-12-21 文章 RS
跑了10几个小时终于跑完了,测试发现BATCH模式下,只有Source把所有数据消费完,后面的SortLimit plan才会创建,和流模式不太一样 在 2021-12-21 20:06:08,"RS" 写道: >slot资源是绝对充足的,你提到的资源还涉及到其他资源吗? > > > > > >在 2021-12-21 17:57:21,"刘建刚" 写道: >>固定资源的情况下,batch的调度会按照拓扑顺序执行算子。如果你的资源只够运行一个source,那么等source运行完毕

Re:实时读取hive参数不生效

2021-12-21 文章 RS
set table.dynamic-table-options.enabled=true; sql-client的话这样配置,不要引号 在 2021-12-21 20:20:11,"Fei Han" 写道: >@all: >大家好! > 我在实时读取hive的时候动态参数不生效,另外flink是否可以通过流读方式读取hive的普通表呢? >版本如下: >Flink版本1.13.3 > Hive版本hive2.1.1-CDH6.2.0 >设置的参数是set 'table.dynamic-table-options.enabled'='true' >报错如下:

Re:Re: batch模式下任务plan的状态一直为CREATED

2021-12-21 文章 RS
slot资源是绝对充足的,你提到的资源还涉及到其他资源吗? 在 2021-12-21 17:57:21,"刘建刚" 写道: >固定资源的情况下,batch的调度会按照拓扑顺序执行算子。如果你的资源只够运行一个source,那么等source运行完毕后才能运行SortLimit。 > >RS 于2021年12月21日周二 16:53写道: > >> hi, >> >> 版本:flink1.14 >> >> 模式:batch >> >> 测试场景:消费h

batch模式下任务plan的状态一直为CREATED

2021-12-21 文章 RS
hi, 版本:flink1.14 模式:batch 测试场景:消费hive大量数据,计算某个字段的 top 10 使用sql-client测试,创建任务之后,生成2个plan,一个Source,一个SortLimit。Source状态为RUNNING,SortLimit状态一直为CREATED。 请问下,SortLimit状态一直为CREATED是正常现象吗? 数据量比较大,全部消费完的话,估计得好几天时间,BATCH模式下,SortLimit的状态需要等所有数据全部消费完才改变吗? 测试SQL: SELECT price FROM hive.data.data1

Re:Re:Re: flink sql collect函数使用问题

2021-12-03 文章 RS
SELECT class_no, collect(info) FROM ( SELECT class_no, ROW(student_no, name, age) AS info FROM source_table ) GROUP BY class_no; 从SQL层面想到比较接近的方法,但multiset无法转array 从你的需求描述看,mongodb目标表的这种班级设计平时可能不太需要,如果是为了查某个班所有的学生的话,在查询的时候加个where条件即可,没有必要把明细数据再放到一个数组里面 感觉可能是你定义表结构和实际使用方面的问题,可以换个角度思考下 在

Re:回复: flink远程调用时环境变量问题

2021-11-29 文章 RS
试试 flink-conf.yam里面配置 env.hadoop.conf.dir: /xxx/hadoop https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/ 在 2021-11-26 11:14:17,"王健" <13166339...@163.com> 写道: > > >您好,ssh远程调用,/etc/profile配置是不起作用的呢 > > >| | >王健 >| >| >13166339...@163.com >| >签名由网易邮箱大师定制

关于flink plugins目录不生效的疑问

2021-11-22 文章 RS
Hi, 环境:flink-1.14.0,单节点standalone https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/ 参考官方文档,执行下面命令: mkdir plugins/s3-fs-hadoop cp opt/flink-s3-fs-hadoop-1.14.0.jar plugins/s3-fs-hadoop/ 在flink-conf中配置了hadoop的路径(s3使用了hadoop的配置文件)

flink1.12 请教下如何配置多hadoop参数,s3使用问题

2021-11-22 文章 RS
hi, 环境: 1. flink-1.12,版本可以升级 2. flink-conf中配置了env.hadoop.conf.dir,路径下有hdfs集群的core-site.xml和hdfs-site.xml, state.backend保存在该HDFS上 3. flink的部署模式是K8S+session 需求: 需要从一个s3协议的分布式文件系统中读取文件,处理完写到mysql中 问题: s3配置采用hadoop的配置方式,保存为一个新的core-site.xml文件,参考的

Re:回复:flink1.13.1 sql client connect hivecatalog 报错

2021-11-21 文章 RS
图片看不到的,尽量不要发图片,你可以复制文字出来并说明下, 在 2021-11-22 13:14:13,"zhiyuan su" 写道: 我使用的是上面的jar 包。从1.13的文档处获取的,但维标注flink 版本,我理解应该是flink1.13版本编译的。 这个是yaml文件,我直接在sql 客户端,通过DDL 的方式去编写的话,也是如下报错: Caused by: java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory:

Re:Re: flinksql 写 hive ,orc格式,应该支持下压缩。

2021-11-17 文章 RS
1. 文件名是不带.zlib后缀的 2. ORC格式默认是配置了ZIP压缩的,并且开启的,你可以配置'orc.compress'='NONE'测试下,看下不压缩的大小,没有压缩的文件应该是更大的 在 2021-11-16 17:29:17,"yidan zhao" 写道: >我看了下,默认不带.zlib之类的后缀,我加了也看不出来到底有没有压缩。 >其次,orc.compression官方介绍默认是zlib,貌似默认就有开启压缩? > >RS 于2021年11月15日周一 上午9:55写道: > >&g

Re:flinksql 写 hive ,orc格式,应该支持下压缩。

2021-11-14 文章 RS
官网里面有介绍这个,你是要这个吧 https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/orc/ Orc format also supports table properties from Table properties. For example, you can configure orc.compress=SNAPPY to enable snappy compression. 在 2021-11-11 12:37:31,"yidan zhao" 写道:

Re:请求帮助

2021-11-14 文章 RS
查看下client的日志,一般在flink的logs目录下 在 2021-11-12 20:59:59,"sky" 写道: >我使用的事flink on yarn。在执行命令时: flink run -m yarn-cluster >./examples/batch/WordCount.jar 结果却报错了: > >The program finished with the following exception: >

Re:flink 1.13.1 通过yarn-application运行批应用,处理mysql源一亿条数据到hive,发现需要配置16G+的Taskmangaer内存

2021-11-04 文章 RS
看看任务并行度是多少,可能是并发太大导致的内存占用?? 在 2021-11-04 15:52:14,"Asahi Lee" <978466...@qq.com.INVALID> 写道: >hi! >我通过flink sql,将mysql的一亿条数据传输到hive库中,通过yarn-application方式运行,结果配置16G的内存,执行失败!

Re:回复:如何监控kafka延迟

2021-08-17 文章 RS
1. metric指标每次都会清0的2. 数据对账的话, 可以将每次的统计数据按时间点保存起来, 然后查询时间范围的时候, 做sum求和来对账 在 2021-08-09 09:51:43,"Jimmy Zhang" 写道: >您好,看到你们在用kafka相关metrics,我想咨询一个问题。你们是否遇见过在重启一个kafka sink >job后,相关指标清零的情况?这样是不是就无法持续的进行数据想加?我们想做一个数据对账,查询不同时间段的输出量统计,这样可能中间归零就有问题,所以想咨询下,任何的回复都非常感谢! > > > > >| >Best, >Jimmy >| >

Re:Re:Re: 请教下Flink时间戳问题

2021-08-16 文章 RS
T只是时间格式显示问题, 数据格式都是timestamp(3), 这个和T应该无关的 在 2021-08-16 13:45:12,"Geoff nie" 写道: >谢谢你!第二个问题确实是我版本太低问题,我flink版本是1.12.1。 >第一个问题,是因为我通过flink写入iceberg >表中,然后通过presto查询iceberg表,其他字段的表都可以查询,但是当写入的是含有TIMESTAMP 类型的表时,presto查询如下报错: > >Query failed (#20210816_020321_00011_wa8bs) in your-presto: Cannot

Re:关于 Flink on K8S Deploy Job Cluster 部署问题

2020-11-21 文章 RS
taskmanager-job-deployment.yaml 和 jobmanager-job.yaml 部署的时候只用启动一次服务, 后续启动实际job的时候,就占用slot,available slot就少了 当job执行完之后,slot资源就释放了,available的slot又恢复了,可以给下一次的job提供资源 如果你的slot用完了的话,那就是资源不够了,需要重新配置taskmanager-job-deployment.yaml 在 2020-11-20 15:20:58,"WeiXubin" <18925434...@163.com> 写道:

Re:关于flink任务挂掉报警的监控指标选择

2020-11-05 文章 RS
可以配置任务重启告警, flink任务挂掉之后会自动尝试重启 如果是固定任务数量的话, 还可以配置slot数量告警 在 2020-11-05 10:15:01,"bradyMk" 写道: >请问各位大佬,我基于grafana+prometheus构建的Flink监控,现在想实现flink任务挂掉后,grafana就发出报警的功能,但是目前不知道该用什么指标去监控,我之前想监控flink_jobmanager_job_uptime这个指标,设置的监控规则是:max_over_time(flink_jobmanager_job_uptime[1m]) >-

sql-client 连接hive报错 TTransportException

2020-10-26 文章 RS
Hi, 请教下 我尝试使用sql-client连接hive, hive正常, 使用beeline -u jdbc:hive2://x.x.x.x:1 可以正常连接 sql-client-defaults.yaml配置内容: tables: [] functions: [] catalogs: - name: myhive type: hive hive-conf-dir: /home/hive/flink-1.11.1/conf default-database: default execution: planner: blink type:

Re:启动任务异常, Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1

2020-08-30 文章 RS
测试出来了, rowtime参数需要是最后一个参数, $(timeField).rowtime() 但是这个报错也太隐晦了吧 . 在 2020-08-30 14:54:15,"RS" 写道: >Hi, 请教下 > > >启动任务的时候抛异常了, 但是没看懂报错原因, 麻烦各位大佬帮看下 >这里我是先创建了一个DataStreamSource, 然后配置转为view, 配置EventTime, 后面再用SQL DDL进行数据处理 >DataStreamSource sou

启动任务异常, Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1

2020-08-30 文章 RS
Hi, 请教下 启动任务的时候抛异常了, 但是没看懂报错原因, 麻烦各位大佬帮看下 这里我是先创建了一个DataStreamSource, 然后配置转为view, 配置EventTime, 后面再用SQL DDL进行数据处理 DataStreamSource source = env.addSource(consumer); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); tableEnv.createTemporaryView(table_name,

Re:请教:用flink实现实时告警的功能

2020-08-11 文章 RS
Hi, 你这个完全就是CEP的使用场景啊, 大于多少次, 大于一定数值组合起来判定事件, 1. 规则变更了, 重启任务就行, 规则都变了, 任务重启也没影响 2. CEP支持规则组合, 时间窗口 3. 最佳实践官网的介绍就很合适 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/cep.html 在 2020-08-06 10:26:19,"samuel@ubtrobot.com" 写道: >由于需要实时告警功能,经调研,用flink

Re:Re: Re: Flink 1.11.1 on k8s 如何配置hadoop

2020-08-10 文章 RS
Hi 恩, 重新试了下, 这种是可以的, 前面是我操作错了, 谢谢~ Thx 在 2020-08-10 13:36:36,"Yang Wang" 写道: >你是自己打了一个新的镜像,把flink-shaded-hadoop-2-uber-2.8.3-10.0.jar放到lib下面了吗 >如果是的话不应该有这样的问题 > >Best, >Yang > >RS 于2020年8月10日周一 下午12:04写道: > >> Hi, >> 我下载了flink-shaded-hadoop-2-uber-

Re:Re: Flink 1.11.1 on k8s 如何配置hadoop

2020-08-09 文章 RS
Hi, 我下载了flink-shaded-hadoop-2-uber-2.8.3-10.0.jar, 然后放到了lib下, 重启了集群, 但是启动任务还是会报错: Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to

Re:flink-jdbc_2.11:1.11.1依赖 找不到

2020-08-07 文章 RS
Hi, 找下这种的 flink-connector-jdbc_2.12-1.11.1.jar 在 2020-08-07 14:47:29,"lydata" 写道: >flink-jdbc_2.11:1.11.1依赖 在 https://mvnrepository.com/ 找不到 ,是不是没有上传?

Flink 1.11.1 on k8s 如何配置hadoop

2020-08-06 文章 RS
Hi, Flink 1.11.1 想运行到K8S上面, 使用的镜像是flink:1.11.1-scala_2.12, 按照官网上面介绍的, 部署session cluster, jobmanager和taskmanager都启动成功了 然后提交任务的时候会报错: Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. at

Flink 1.11.1 消费带SASL的Kafka报错: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config

2020-08-02 文章 RS
Hi, 我尝试消费SASL机制的Kafka集群 jaas.conf 文件内容: KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin001" password="123456"; }; 执行命令如下: export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/jaas.conf" ./bin/sql-client.sh embedded CREATE TABLE t1

kafka avro格式sink报错,NoClassDefFoundError: Could not initialize class org.apache.avro.SchemaBuilder

2020-07-27 文章 RS
Hi, 版本:Flink-1.11.1 任务启动模式:standalone Flink任务编译的jar的maven中包含了flink-avro,jar-with-dependencies编译的 org.apache.flink flink-avro 1.11.1 编译出来的jar也包含了这个class 我看官网上说明 Flink has extensive built-in support for Apache Avro。感觉默认是支持avro的 1. 直接启动的话,会报错

Re:Re: kafka-connect json格式适配问题?

2020-07-27 文章 RS
s schema, ROW(totall_count, username, >update_time) as payload >FROM ... > > >Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到 >mysql 不是很方便么? > >Best, >Jark > > >On Mon, 27 Jul 2020 at 17:33, RS wrote: > >> hi, >&g

Re:回复:解析kafka的mysql binlog问题

2020-07-27 文章 RS
Hi, 附近应该是收不到的,包括图片啥的 只能回复纯文本,贴代码,如果真的需要图片的话,可以上传到其他的网站上,然后给个连接跳转过去 在 2020-07-27 19:21:51,"air23" 写道: 我再上传一次 在2020年07月27日 18:55,Jark Wu 写道: Hi, 你的附件好像没有上传。 On Mon, 27 Jul 2020 at 18:17, air23 wrote: > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* > > private

Re:Re: kafka-connect json格式适配问题?

2020-07-27 文章 RS
, 用 ROW 函数封装 payload >INSERT INTO output >SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username, >update_time) as payload >FROM ... > > >Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到 >mysql 不是很方便么? > >Best, >

kafka-connect json格式适配问题?

2020-07-27 文章 RS
hi, kafka->Flink->kafka->mysql Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。 使用kafka-connect是方便数据同时导出到其他存储 Flink定义输出表结构: CREATE TABLE print_table \ (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \ WITH (\ 'connector' = 'kafka', \ 'topic' = 'test_out',

Re:Re: Re: Could not find any factory for identifier 'kafka'

2020-07-27 文章 RS
link 的依赖也打在大包里呢?因为 Flink 本身的 classpath 里就已经有这些依赖了,这个大包作为 Flink >的用户 jar 的话,并不需要把 Flink 的依赖也放进去。 > >RS 于2020年7月24日周五 下午8:30写道: > >> hi, >> 感谢回复,尝试了多次之后,发现应该不是依赖包的问题 >> >> >> 我项目中新增目录:resources/META-INF/services >> 然后从Flink源码中复制了2个文件 >> org.apache.

Re:flink 聚合 job 重启问题

2020-07-27 文章 RS
伪代码发下看看?看下jdbc sink的配置,是不是支持删除记录,更新的时候旧记录被删除了 在 2020-07-27 11:33:31,"郑斌斌" 写道: >hi all : > > 请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt > from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。 >但问题是,每次JOB重启后,之前mysql

Re:Re: flink1.11查询结果每秒入库到mysql数量很少

2020-07-24 文章 RS
你看下INERT SQL的执行时长,看下是不是MySQL那边的瓶颈?比如写入的数据较大,索引创建比较慢等其他问题? 或者你手动模拟执行下SQL写数据对比下速度? 在 2020-07-25 10:20:35,"小学生" <201782...@qq.com> 写道: >您好,谢谢您的解答,但是我测试了按您这个方式添加以后,每秒入mysql数据量变成了8条左右,提升还达不到需要。

Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 RS
t;只需要-sql和-json两个包就可以了 > > >| | >JasonLee >| >| >邮箱:17610775...@163.com >| > >Signature is customized by Netease Mail Master > >On 07/24/2020 17:02, RS wrote: >hi, >Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了 >编译的jar包是jar-with-dependencies的 >

Re:Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 RS
邮件格式不对,我重新回复下 我这边是直接打成jar包扔到服务器上运行的,没有在IDEA运行过。 > flink run xxx 没有使用shade-plugin maven build参数: 1.8 1.11.1 maven-compiler-plugin ${jdk.version} ${jdk.version}

Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 RS
序如果直接在idea里面运行是可以运行的么? > >如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。 >如果你用的是shade plugin,需要看下这个transformer[1] > >[1] >https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer > >RS 于2020年7月24日周五 下午5:02写道: > >>

Could not find any factory for identifier 'kafka'

2020-07-24 文章 RS
hi, Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了 编译的jar包是jar-with-dependencies的 代码片段: public String ddlSql = String.format("CREATE TABLE %s (\n" + " number BIGINT,\n" + " msg STRING,\n" + " username STRING,\n" + " update_time