est!
>Xuyang
>
>
>
>
>
>在 2023-11-01 14:21:04,"RS" 写道:
>>Hi
>>flink的sql gateway支持自定义的UDF吗?包括java和python的,有示例可以参考下吗?
elease-1.17/docs/dev/table/sql/jar/
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2023-11-01 14:21:04,"RS" 写道:
>>Hi
>>flink的sql gateway支持自定义的UDF吗?包括java和python的,有示例可以参考下吗?
Hi
flink的sql gateway支持自定义的UDF吗?包括java和python的,有示例可以参考下吗?
Hi,
提交到本地是flink配置文件里面配置的jobmanager的地址,所以肯定也是提交到K8S的吧
yarn的不太清楚。
在 2023-10-30 14:36:23,"casel.chen" 写道:
>想问一下目前flink 1.18.0的sql gateway只支持提交作业到本地运行吗?能否支持提交作业到yarn或k8s上运行呢?
Hi,
connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
Thanks
在 2023-02-17 15:56:51,"casel.chen" 写道:
>作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
>join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
eaders吗?例如 Content-Type 之类的,对应的flink sql kafka connector中的
>>> properties.group.id 和 properties.bootstrap.servers
>
>在 2022-12-26 11:12:57,"RS" 写道:
>>Hi,
>>
>>
>>> 遇到用户添加自定义请求头Headers问题
>>如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表
Hi,
我的也是on K8S,是session模式的,得看你的模式是什么
我的流作业失败了,配置了checkpoint,会自动重试,jm和tm都还在,可以直接看到作业异常信息。
Thanks
在 2022-12-22 23:18:27,"hjw" 写道:
>Flink On Native K8s
>模式下,如果流作业因异常失败了,作业的JobManager和TaskManager所在Pod都会消失,就无法查看作业日志。
>请问在K8s模式下,在查看日志方面有没有相关解决方案。
Hi,
试试 -C,--classpath,我都是用这个提交UDF的
另外邮件列表发图片是看不到了,别再发截图了
Thanks,
在 2022-12-13 18:13:47,"melin li" 写道:
类似: spark-submit 支持--jars,更灵活方便,
melin li 于2022年12月8日周四 11:09写道:
如果是作业依赖的jar,是可以打一个flat jar。有两种场景:
1、sql作业中,用户依赖某个connector jar,但平台不提供这个connector,需要用户上传,
2、自定义udf
Hi,
> 遇到用户添加自定义请求头Headers问题
如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表的字段来描述
如果自定义Headers是定义table的时候确定的,那就是定义connector options的问题了
> 如何在connector options中支持Map数据类型呢?
options里面都是k=v的结构,v都是字符串,所以你要考虑的是如何用 字符串 描述 Map
Thanks
在 2022-12-17 10:20:29,"casel.chen" 写道:
>我想开发一个Flink SQL
Hi,
我考虑过的几种方式:
1. 做成http服务,和flink分离,py模型本来跑起来也不快,做成http可以动态扩容,无状态的话
2. 用pyflink来跑任务,可以嵌python代码,就是任务启动非常慢,要复制虚拟环境,模型可以写成pandas的输入输出,这样模型也是可以独立开发的
3. Java调python的udf,py必须要能封装成函数,写udf毕竟麻烦
Thanks
在 2022-12-19 16:51:33,"kcz" <573693...@qq.com.INVALID> 写道:
>flink调用py模型时候,大是采取什么方案,直接跟flink集成嘛?
0x7ff010275740 (most recent call first):
Thanks
在 2022-11-23 09:25:04,"RS" 写道:
>Hi,
>我这边使用的python命令构建的,没有用conda,这个应该没有影响吧
>python3 -m venv jxy_venv
>
>
>我启动了一个单点的flink测试,本机启动,有python环境,测试是可以运行成功的
>
>
>
>Thanks
>
>
>
>
>
>在 2022-11
Hi,
Flink的Metric了解下,里面应该有作业的状态
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#reporter
配置不同的Metric方式,有的是拉取,有的是推送的机制,
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/
Thanks
在 2022-11-23 08:32:11,"casel.chen" 写道:
Hi,
我这边使用的python命令构建的,没有用conda,这个应该没有影响吧
python3 -m venv jxy_venv
我启动了一个单点的flink测试,本机启动,有python环境,测试是可以运行成功的
Thanks
在 2022-11-22 15:39:48,"Xingbo Huang" 写道:
>Hi RS,
>
>你是使用conda构建的venv吗,可以参考PyFlink 准备环境的文档[1]
>
>Best,
>Xingbo
>
>[1]
>htt
Hi,
Flink版本:1.15.1
A机器:在A机器上创建python虚拟环境,版本3.6.8,安装flink等python包,然后打包ZIP,jxy_venv.zip,上传到HDFS上
B机器:在B机器上,主机上没有Python环境,Flink运行在K8S的docker中,docker里面也没有python环境,
C机器:在C机器上,有flink的client,存在python环境,负责启动任务
启动命令:
./bin/flink run -Dexecution.runtime-mode=BATCH -d -m 192.168.1.2:8081 -n -py
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据,
Thanks
在 2022-11-11 11:10:03,"Jason_H" 写道:
>
>
>hi,大家好
>我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
>kakfa输入:
>账号
> allanqinjy
>> |
>> |
>> allanqi...@163.com
>> |
>> 签名由网易邮箱大师定制
>>
>>
>> On 10/8/2022 21:00,RS wrote:
>> Hi,
>>
>>
>> 版本:Flink-1.15.1
>>
>>
>> 有个场景,从hdfs读文件再处理数据,batch mode,10个并发,使用Flink
>> SQL定义执行,source是
.2.0.pom
>把flink-connector-kafka-1.15.1.jar 去掉再试试?
>
>
>RS 于2022年10月8日周六 17:19写道:
>
>> Hi,
>> 报错如下:
>>
>>
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.table.api.ValidationExc
Hi,
版本:Flink-1.15.1
有个场景,从hdfs读文件再处理数据,batch mode,10个并发,使用Flink
SQL定义执行,source是connector=filesystem,format=raw,path=
执行任务的时候,有时候能成功,有时候失败了然后就一直失败,重启集群好像可以解决问题,这种情况如何是什么原因导致的?
集群的off-heap都是默认配置,
taskmanager.memory.task.off-heap.size=0
taskmanager.memory.framework.off-heap.size=128MB
are:
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
org.apache.flink.table.store.kafka.KafkaLogStoreFactory
org.apache.flink.table.store.shaded.streaming.connectors.kafka.table.KafkaDynamicTableFactory
Thanks
在 2022-10-08 13:38:20,"Shammon FY" 写道:
>Hi RS
>你
Hi,
版本:flink-1.15.1
使用table
store,需要在lib下放置flink-table-store-dist-0.2.0.jar,之前集群的lib下有一个flink-connector-kafka-1.15.1.jar,使用sql-client,定义kafka源表的时候,发现connector冲突了
是不是lib下有了flink-table-store-dist-0.2.0.jar,就不能有flink-connector-kafka-1.15.1.jar?
Thanks
Hi,
当前的SQL是不支持的,需要的话,可以自己实现一个connector或者UDF,把错误数据输出到其他地方
Thanks
在 2022-09-29 10:02:34,"Summer" 写道:
>
>你好,我想问一下,如果来源于Kakfka的一条数据出现错误,会导致任务执行失败,日志抛出该条错误数据。
>
>
>为保证任务执行,需要在*** WITH内加'value.json.ignore-parse-errors' = 'true',
>'value.json.fail-on-missing-field' = 'false'
>
>
>
>
Hi,
个人推荐方式二,
1. 部分场景下,有些异常可以自动恢复,任务异常会自动重启,继续运行
2. 告警通知到介入处理,如果是人来介入处理的话,20s通常时间不是问题,到分钟级都可以
3. failure之前调用某个hook去通知相关方,应该是要修改jobmanager的代码,具体就要请教大佬们了。
在 2022-09-30 13:50:56,"casel.chen" 写道:
>当flink作业失败时如何第一时间发通知告警到相关方?现有方式
>方式一:flink作业本身提供的rest
是后面新启动的任务全部都还是在这个路径下查找hive配置,导致异常
如果重启集群的话,同样的任务提交,不会报错,看起来是个概率事件,所以这个问题可能是什么原因导致的呢?
Thanks
在 2022-07-21 14:52:51,"RS" 写道:
>Hi,
>
>
>环境:
>flink-1.15.1 on K8S session集群
>hive3
>flink写hive任务,配置了定时提交分区
>
>
>现象:
>1. checkpoint是30s一次
>2. HDFS上有
r嘛?你select的表名是`origin_object_data_61`?
>在 2022-09-01 20:18:05,"RS" 写道:
>>Hi,
>>环境:
>>flink-1.15.1
>>TiDB-v6.1.0
>>
>>
>>现象:
>>Flink SQL> select count(*) from t1;
>>
>>[ERROR] Could not execute SQL statement. Reason:
&
Hi,
环境:
flink-1.15.1
TiDB-v6.1.0
现象:
Flink SQL> select count(*) from t1;
[ERROR] Could not execute SQL statement. Reason:
java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check
the manual that corresponds to your TiDB version for the right syntax to use
line 1 column
Hi,
1. lib下只需要一个flink-sql-connector-hive的包,不再需要flink-connector-hive和hive-exec
2. flink-sql-connector-hive包版本不对,需要和flink的1.13.3版本一致
先试试看,不行的话,就是代码的问题了。
Thanks
At 2022-08-12 10:31:35, "Summer" wrote:
>
>
>版本:1.13.3
>
>
>lib目录下:
>-rw-r--r-- 1 root root 7759243 Aug 12 10:12
Hi,
flink sql如何写es的nested数组数据?
原始示例数据:
{
"id": "123",
"field1":[
{
"k1":1
},
{
"k1":1,
"k2":2
},
{
"k3":"3"
}
]
}
filed1是一个数组,里面的原始是字典,字典内的字段名是动态的,我不知道里面有多少个key
Hi,
打算通过Flink查询HDFS中的数据,对查询实效性要求高,查询平均时延要求在秒级。
=
这种高实时性的要求,是不适合Presto或者Flink引擎的。
如果是数据量不大,查询逻辑不复杂,实时性要求高,建议数据同步到数据库中,使用数据库引擎来查询;
如果是数据量大,查询逻辑复杂,实时性要求不高,Flink或者Presto是可以的;
如果是数据量大,查询逻辑复杂,实时性要求高,那什么都拯救不了你
Thanks
在 2022-07-14 11:54:00,"barbzhang(张博)" 写道:
Hi,
版本:Flink-1.15.1
模式:单机
s1为kafka的source表,已定义,
g_name为s1表的字段,string类型
select `g_name` from s1;
这个运行正常,有数据输出
然后加上 IF 语句,则会报错:
Flink SQL> select if(`g_name` is null, 'no', `g_name`) from s1;
[ERROR] Could not execute SQL statement. Reason:
Hi,
如果是访问ES的话,Flink里面自带ES的connector,你可以直接使用,或者参考源码,source和sink接口都有对应的方法
资源是否在一个线程里面,这个取决与你代码逻辑,如果在不同的线程或者进程的话,设计上,就不要用同一个EsClientHolder,各个不同阶段各自去new和close对象,
Thanks
在 2022-07-12 12:35:31,"Bruce Zu" 写道:
> Flink team好,
> 我有一个很一般的问题,关于如何释放 Flink Job 中某个对象持有的资源。
>
> 我是 Flink
" 写道:
>Hi,
>
>有更详细的日志吗?看起来是类加载冲突的,需要明确下是哪个类冲突了
>
>Best,
>Weihua
>
>
>On Wed, Jul 6, 2022 at 1:53 PM RS wrote:
>
>> Hi,
>>
>>
>> 通过sql-client执行flink sql,connector选择filesystem,会出现如下报错
>> java.lang.ClassNotFoundException:
>> org.ap
le-planner_2.12-1.15.1.jar
>冲突了 去掉一个就可以了
>
>
>
>
>
>
>
>在 2022-07-11 19:45:04,"Weihua Hu" 写道:
>>Hi,
>>
>>有更详细的日志吗?看起来是类加载冲突的,需要明确下是哪个类冲突了
>>
>>Best,
>>Weihua
>>
>>
>>On We
Hi,
你可以在定义表ccc_test_20220630_2字段的时候,结构如果固定,可以指定字段类型为ARRAY+ROW吧,例如 abc
ARRAY>,如果里面是动态结构,可以定义为MAP
结构如果比较复杂,或者字段不明确,就自定义UDF解决。
Thanks
在 2022-06-30 15:02:55,"小昌同学" 写道:
各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了
Hi,
通过命令行的方式提交,可以捕获flink run的标准输出,里面包含job id,然后正则匹配或者字符串截取就可以提取到job id了
Thanks
在 2022-07-04 17:50:07,"sherlock zw" 写道:
>目前我需要去监控已经提交的flink任务,
>但是通过命令行方式提交的话拿不到任务id,只能通过INFO级别的日志过滤出来,但是我们的环境里面的日志界别是WARN,看不到任务id的日志输出,所以想问下除了命令行的方式提交任务还有其他方式吗,例如有和Spark类似的SparkLaunch一样的jar提交的方式吗?希望大佬指点下,谢谢。
Hi,
通过sql-client执行flink sql,connector选择filesystem,会出现如下报错
java.lang.ClassNotFoundException:
org.apache.flink.table.planner.delegation.ParserFactory
Flink SQL> CREATE TABLE t1 (
> a STRING,
> b INT
> )WITH(
> 'connector'='filesystem',
> 'path'='/tmp/qwe',
> 'format'='csv',
>
Hi,
请教下各位,Flink-1.15.0,消费Kafka发现下面个问题,offset提交失败的情况,有的任务应该是一直提交失败的,数据消费了,但是offset不变,这种情况如何处理?
现象如下:
1. 任务没有异常,
2. 数据能正常消费处理,不影响数据使用
3. 任务有配置checkpoint,几分钟一次,理论上执行checkpoint的时候会提交offset
4. 部分任务的从Kafka的offset提交失败,部分正常
WARN日志如下:
2022-06-27 01:07:42,725 INFO
e-1.14/docs/dev/python/dependency_management/#python-interpreter-of-client
>
>可以把这个参数-pyclientexec 也加上试试。
>
>On Tue, Jun 7, 2022 at 11:24 AM RS wrote:
>
>> Hi,
>>
>>
>> 环境:
>> - flink-1.14.3, 单机集群
>> - 服务器上默认python2,也存在python3.6.8
>> - /xxx/bin/python3
Hi,
环境:
- flink-1.14.3, 单机集群
- 服务器上默认python2,也存在python3.6.8
- /xxx/bin/python3是python3生成的虚拟环境
使用sql-client测试pyflink的udf,自定义了一个函数f1,/xxx/p.py
启动命令:
./bin/sql-client.sh -pyfs file:///xxx/p.py -pyexec /xxx/bin/python3
配置pyexec指定了使用的python为python3
执行命令报错,报错信息如下:
Flink SQL> create temporary
Hi,
每次checkpoint都会生成文件的,在一个checkpoint期间内,可以配置文件大小合并等,
不同checkpoint的生成的文件无法合并,所以文件数量最少是和checkpoint时间间隔相关的,
如果checkpoint时间间隔比较短,就需要自己去合并小文件了
在 2022-06-06 16:44:51,"谭家良" 写道:
>大家好,关于kafka数据消费到hive/filesystem(orc/parquet)我有个疑问。orc/parquet如何调整落地的文件大小?是根据checkpoint时间来的吗?在落地到hive/filesystem
>
es/pyflink/lib) 的那些 jar 包的版本。
>
>
>
>On Mon, May 23, 2022 at 4:22 PM RS wrote:
>
>> Hi,
>> 在Pycharm中,测试Pyflink示例代码,启动运行报错,代码为官方文档中的代码
>> 参考官方文档:
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table_api_tutorial/
&g
Hi,
在Pycharm中,测试Pyflink示例代码,启动运行报错,代码为官方文档中的代码
参考官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table_api_tutorial/
报错如下:
Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.NoSuchMethodError:
Hi,
cancel的时候要加savepoint,然后启动的时候指定savepoint应该就不会丢数据了,直接cancel的话是可能丢数据的,
checkpoint的作用和你想到可能不一样,你再看看
Thx
在 2022-05-12 10:38:33,"徐战辉" 写道:
hello, 请教下,如何设置flink配置及作业参数,在取消作业重新部署、flink作业失败重跑情况下,保证不丢失数据。
目前有一份作业,开启checkpoint, cancel 后重新启动,发现数据会丢失1小部分。
1. flink.conf
Hi,
partition.discovery.interval.ms 这个是Flink connector
kafka里面加上的,KafkaSourceOptions里面定义的,
看下你的kafka-client的版本,官方的是 2.4.1,如果版本一样,那只能先忽略了。
在 2022-03-22 17:10:52,"Michael Ran" 写道:
>dear all :
> 目前用flink1.4 table api +kafka 的情况下,有各种警告,比如:
> The configuration
Hi,
你这个例子中,捕获到B的变更CDC,若最终结果表支持部分字段更新,就直接更新结果表就行,都不需要关联,
只要你的B的CDC处理 晚于 A流的join处理就行
如果一定要全部关联的话,ttl又不可行,那你这个数据量会无限增大,后面就无法关联了的,设计肯定得改
在 2022-03-22 09:01:30,"JianWen Huang" 写道:
>是的。其实我想到的也是将维度表和事实表都通过Cdc方式做成流,然后regular
Hi,
参考官方文档的话,目前应该是不支持的,kafka的connector, source只支持Unbounded Scan, 不支持Bounded Scan
NameVersionSourceSink
Apache Kafka0.10+Unbounded ScanStreaming Sink, Batch Sink
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/#supported-connectors
感谢,确定了下是压缩格式的问题,
原hive文件的压缩是SNAPPY压缩,使用Flink SQL合并小文件之后,默认不压缩,导致文件变大了。
Flink默认没有继承原文件的压缩算法。。。
在 2022-02-22 12:08:39,"junjie.m...@goupwith.com"
写道:
检查下数据格式和压缩格式是否和之前的不一致
原始邮件 ----
发件人: RS
日期: 2022年2月22日周二 09:35
收件人: user-zh@flink.apache.org
主 题: hive 进行 overwrite
Hi,
flink写hive任务,checkpoint周期配置的比较短,生成了很多小文件,一天一个目录,
然后我调用flink sql合并之前的数据,跑完之后,发现存储变大了,请教下这个是什么原因导致的?
合并之前是很多小part文件,overwrite之后文件减少了,但是存储变大了,从274MB变大成2.9GB了?
hive表table1的分区字段是`date`
insert overwrite aw_topic_compact select * from `table1` where
`date`='2022-02-21';
合并前:
514.0 M 1.5 G
1. 图片挂了,看不到,尽量用文字,或者用图床等工具
2. 启动任务有配置checkpoint吗?
在 2022-02-17 11:40:04,"董少杰" 写道:
flink读取csv文件建表,同时消费kafka数据建表,两张表join之后写入hdfs(hudi),读取csv数据的任务已经是finished状态,就会触发不了checkpoint,看有什么办法能让它正常触发checkpoint?
flink版本1.12.2。
谢谢!
| |
董少杰
|
|
eric21...@163.com
|
Hi,
请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10?
如果source是10的话,那还有7个线程就空闲了?
在 2022-01-11 11:10:41,"Caizhi Weng" 写道:
>Hi!
>
>可以设置 parallelism.default 为需要的并发数。
>
>Jeff 于2022年1月9日周日 19:44写道:
>
>> 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?
Hi,
感谢回复,我也测试过这类方法,
我给json format加了个参数,在序列化的时候,row里面去除null,但是这个要修改代码,单独更新flink-json的jar包了,后期维护可能会有问题
这种很适合写ES和写文件,不会有冗余的字段
如果社区能新增这个功能或者合并进去就方便了
在 2022-01-06 21:18:37,"Benchao Li" 写道:
>我们内部是给json format加了一个功能,允许不把null字段进行序列化。主要解决的也是es这个写入的场景。
>你们也可以试一下。
>
>RS 于2021年12月
>我不太熟悉 es,如果某一个字段不写的话,是会写入一个默认值吗?如果是的话,可以使用 coalesce 函数。coalesce(a, b, c,
>...) 会返回第一个非 null 的值,因此只要把默认值放在最后一个,如果前面都是 null 就会写默认值。
>
>RS 于2021年12月30日周四 17:06写道:
>
>> 有10~20个字段,这样一个个写,手都敲断了,还有其他的方式吗?或者如何开发代码适配到SQL?
>>
>>
>>
>>
>>
>> 在 2021-12
有10~20个字段,这样一个个写,手都敲断了,还有其他的方式吗?或者如何开发代码适配到SQL?
在 2021-12-30 11:36:21,"Xuyang" 写道:
>可以使用case when试一下
>在 2021-12-29 16:40:39,"RS" 写道:
>>Hi,
>>使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢?
>>
>>
>>比如:源数据有3个字段,a,b,c
>>inser
Hi,
使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢?
比如:源数据有3个字段,a,b,c
insert into table2
select
a,b,c
from table1
当b=null的时候,只希望写入a和c
当c=null的时候,只希望写入a和b
我是直接监控kafka的lag,如果lag数值较大或持续上升,肯定就有延迟了。收到告警后,再查看下plan,有个busy指标,红色的节点就是有问题的
在 2021-12-23 08:36:33,"casel.chen" 写道:
>想问一下flink sql作业链路延迟监控如何实现?
>我们的flink
>sql作业基本上都是上游接kafka,下游sink到es/hbase/kafka/mongodb/redis/clickhouse/doris这些存储
只有一条SQL,只是数据量比较大,使用的BATCH模式。
SELECT price
FROM hive.data.data1
ORDER BY price DESC
在 2021-12-22 18:35:13,"刘建刚" 写道:
>你的SQL是怎么写的?两个独立的SQL吗?Flink中有个参数table.dml-sync
>,决定是否多条SQL语句顺序执行,默认是false,也就是多条语句是同时执行的。
>
>RS 于2021年12月22日周三 09:25写道:
>
>> 跑了10几个小时终于跑完了,测试
跑了10几个小时终于跑完了,测试发现BATCH模式下,只有Source把所有数据消费完,后面的SortLimit plan才会创建,和流模式不太一样
在 2021-12-21 20:06:08,"RS" 写道:
>slot资源是绝对充足的,你提到的资源还涉及到其他资源吗?
>
>
>
>
>
>在 2021-12-21 17:57:21,"刘建刚" 写道:
>>固定资源的情况下,batch的调度会按照拓扑顺序执行算子。如果你的资源只够运行一个source,那么等source运行完毕
set table.dynamic-table-options.enabled=true;
sql-client的话这样配置,不要引号
在 2021-12-21 20:20:11,"Fei Han" 写道:
>@all:
>大家好!
> 我在实时读取hive的时候动态参数不生效,另外flink是否可以通过流读方式读取hive的普通表呢?
>版本如下:
>Flink版本1.13.3
> Hive版本hive2.1.1-CDH6.2.0
>设置的参数是set 'table.dynamic-table-options.enabled'='true'
>报错如下:
slot资源是绝对充足的,你提到的资源还涉及到其他资源吗?
在 2021-12-21 17:57:21,"刘建刚" 写道:
>固定资源的情况下,batch的调度会按照拓扑顺序执行算子。如果你的资源只够运行一个source,那么等source运行完毕后才能运行SortLimit。
>
>RS 于2021年12月21日周二 16:53写道:
>
>> hi,
>>
>> 版本:flink1.14
>>
>> 模式:batch
>>
>> 测试场景:消费h
hi,
版本:flink1.14
模式:batch
测试场景:消费hive大量数据,计算某个字段的 top 10
使用sql-client测试,创建任务之后,生成2个plan,一个Source,一个SortLimit。Source状态为RUNNING,SortLimit状态一直为CREATED。
请问下,SortLimit状态一直为CREATED是正常现象吗?
数据量比较大,全部消费完的话,估计得好几天时间,BATCH模式下,SortLimit的状态需要等所有数据全部消费完才改变吗?
测试SQL:
SELECT price
FROM hive.data.data1
SELECT class_no, collect(info)
FROM (
SELECT class_no, ROW(student_no, name, age) AS info
FROM source_table
)
GROUP BY class_no;
从SQL层面想到比较接近的方法,但multiset无法转array
从你的需求描述看,mongodb目标表的这种班级设计平时可能不太需要,如果是为了查某个班所有的学生的话,在查询的时候加个where条件即可,没有必要把明细数据再放到一个数组里面
感觉可能是你定义表结构和实际使用方面的问题,可以换个角度思考下
在
试试 flink-conf.yam里面配置 env.hadoop.conf.dir: /xxx/hadoop
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/
在 2021-11-26 11:14:17,"王健" <13166339...@163.com> 写道:
>
>
>您好,ssh远程调用,/etc/profile配置是不起作用的呢
>
>
>| |
>王健
>|
>|
>13166339...@163.com
>|
>签名由网易邮箱大师定制
Hi,
环境:flink-1.14.0,单节点standalone
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/
参考官方文档,执行下面命令:
mkdir plugins/s3-fs-hadoop
cp opt/flink-s3-fs-hadoop-1.14.0.jar plugins/s3-fs-hadoop/
在flink-conf中配置了hadoop的路径(s3使用了hadoop的配置文件)
hi,
环境:
1. flink-1.12,版本可以升级
2. flink-conf中配置了env.hadoop.conf.dir,路径下有hdfs集群的core-site.xml和hdfs-site.xml,
state.backend保存在该HDFS上
3. flink的部署模式是K8S+session
需求:
需要从一个s3协议的分布式文件系统中读取文件,处理完写到mysql中
问题:
s3配置采用hadoop的配置方式,保存为一个新的core-site.xml文件,参考的
图片看不到的,尽量不要发图片,你可以复制文字出来并说明下,
在 2021-11-22 13:14:13,"zhiyuan su" 写道:
我使用的是上面的jar 包。从1.13的文档处获取的,但维标注flink 版本,我理解应该是flink1.13版本编译的。
这个是yaml文件,我直接在sql 客户端,通过DDL 的方式去编写的话,也是如下报错:
Caused by: java.util.ServiceConfigurationError:
org.apache.flink.table.factories.Factory:
1. 文件名是不带.zlib后缀的
2. ORC格式默认是配置了ZIP压缩的,并且开启的,你可以配置'orc.compress'='NONE'测试下,看下不压缩的大小,没有压缩的文件应该是更大的
在 2021-11-16 17:29:17,"yidan zhao" 写道:
>我看了下,默认不带.zlib之类的后缀,我加了也看不出来到底有没有压缩。
>其次,orc.compression官方介绍默认是zlib,貌似默认就有开启压缩?
>
>RS 于2021年11月15日周一 上午9:55写道:
>
>&g
官网里面有介绍这个,你是要这个吧
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/orc/
Orc format also supports table properties from Table properties. For example,
you can configure orc.compress=SNAPPY to enable snappy compression.
在 2021-11-11 12:37:31,"yidan zhao" 写道:
查看下client的日志,一般在flink的logs目录下
在 2021-11-12 20:59:59,"sky" 写道:
>我使用的事flink on yarn。在执行命令时: flink run -m yarn-cluster
>./examples/batch/WordCount.jar 结果却报错了:
>
>The program finished with the following exception:
>
看看任务并行度是多少,可能是并发太大导致的内存占用??
在 2021-11-04 15:52:14,"Asahi Lee" <978466...@qq.com.INVALID> 写道:
>hi!
>我通过flink sql,将mysql的一亿条数据传输到hive库中,通过yarn-application方式运行,结果配置16G的内存,执行失败!
1. metric指标每次都会清0的2. 数据对账的话, 可以将每次的统计数据按时间点保存起来, 然后查询时间范围的时候, 做sum求和来对账
在 2021-08-09 09:51:43,"Jimmy Zhang" 写道:
>您好,看到你们在用kafka相关metrics,我想咨询一个问题。你们是否遇见过在重启一个kafka sink
>job后,相关指标清零的情况?这样是不是就无法持续的进行数据想加?我们想做一个数据对账,查询不同时间段的输出量统计,这样可能中间归零就有问题,所以想咨询下,任何的回复都非常感谢!
>
>
>
>
>|
>Best,
>Jimmy
>|
>
T只是时间格式显示问题, 数据格式都是timestamp(3), 这个和T应该无关的
在 2021-08-16 13:45:12,"Geoff nie" 写道:
>谢谢你!第二个问题确实是我版本太低问题,我flink版本是1.12.1。
>第一个问题,是因为我通过flink写入iceberg
>表中,然后通过presto查询iceberg表,其他字段的表都可以查询,但是当写入的是含有TIMESTAMP 类型的表时,presto查询如下报错:
>
>Query failed (#20210816_020321_00011_wa8bs) in your-presto: Cannot
taskmanager-job-deployment.yaml 和 jobmanager-job.yaml 部署的时候只用启动一次服务,
后续启动实际job的时候,就占用slot,available slot就少了
当job执行完之后,slot资源就释放了,available的slot又恢复了,可以给下一次的job提供资源
如果你的slot用完了的话,那就是资源不够了,需要重新配置taskmanager-job-deployment.yaml
在 2020-11-20 15:20:58,"WeiXubin" <18925434...@163.com> 写道:
可以配置任务重启告警, flink任务挂掉之后会自动尝试重启
如果是固定任务数量的话, 还可以配置slot数量告警
在 2020-11-05 10:15:01,"bradyMk" 写道:
>请问各位大佬,我基于grafana+prometheus构建的Flink监控,现在想实现flink任务挂掉后,grafana就发出报警的功能,但是目前不知道该用什么指标去监控,我之前想监控flink_jobmanager_job_uptime这个指标,设置的监控规则是:max_over_time(flink_jobmanager_job_uptime[1m])
>-
Hi, 请教下
我尝试使用sql-client连接hive, hive正常, 使用beeline -u jdbc:hive2://x.x.x.x:1 可以正常连接
sql-client-defaults.yaml配置内容:
tables: []
functions: []
catalogs:
- name: myhive
type: hive
hive-conf-dir: /home/hive/flink-1.11.1/conf
default-database: default
execution:
planner: blink
type:
测试出来了, rowtime参数需要是最后一个参数, $(timeField).rowtime()
但是这个报错也太隐晦了吧 .
在 2020-08-30 14:54:15,"RS" 写道:
>Hi, 请教下
>
>
>启动任务的时候抛异常了, 但是没看懂报错原因, 麻烦各位大佬帮看下
>这里我是先创建了一个DataStreamSource, 然后配置转为view, 配置EventTime, 后面再用SQL DDL进行数据处理
>DataStreamSource sou
Hi, 请教下
启动任务的时候抛异常了, 但是没看懂报错原因, 麻烦各位大佬帮看下
这里我是先创建了一个DataStreamSource, 然后配置转为view, 配置EventTime, 后面再用SQL DDL进行数据处理
DataStreamSource source = env.addSource(consumer);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
tableEnv.createTemporaryView(table_name,
Hi,
你这个完全就是CEP的使用场景啊, 大于多少次, 大于一定数值组合起来判定事件,
1. 规则变更了, 重启任务就行, 规则都变了, 任务重启也没影响
2. CEP支持规则组合, 时间窗口
3. 最佳实践官网的介绍就很合适
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/cep.html
在 2020-08-06 10:26:19,"samuel@ubtrobot.com" 写道:
>由于需要实时告警功能,经调研,用flink
Hi
恩, 重新试了下, 这种是可以的, 前面是我操作错了, 谢谢~
Thx
在 2020-08-10 13:36:36,"Yang Wang" 写道:
>你是自己打了一个新的镜像,把flink-shaded-hadoop-2-uber-2.8.3-10.0.jar放到lib下面了吗
>如果是的话不应该有这样的问题
>
>Best,
>Yang
>
>RS 于2020年8月10日周一 下午12:04写道:
>
>> Hi,
>> 我下载了flink-shaded-hadoop-2-uber-
Hi,
我下载了flink-shaded-hadoop-2-uber-2.8.3-10.0.jar, 然后放到了lib下, 重启了集群, 但是启动任务还是会报错:
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could
not find a file system implementation for scheme 'hdfs'. The scheme is not
directly supported by Flink and no Hadoop file system to
Hi,
找下这种的 flink-connector-jdbc_2.12-1.11.1.jar
在 2020-08-07 14:47:29,"lydata" 写道:
>flink-jdbc_2.11:1.11.1依赖 在 https://mvnrepository.com/ 找不到 ,是不是没有上传?
Hi,
Flink 1.11.1 想运行到K8S上面, 使用的镜像是flink:1.11.1-scala_2.12, 按照官网上面介绍的, 部署session
cluster, jobmanager和taskmanager都启动成功了
然后提交任务的时候会报错:
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Hadoop is not in the classpath/dependencies.
at
Hi,
我尝试消费SASL机制的Kafka集群
jaas.conf 文件内容:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin001"
password="123456";
};
执行命令如下:
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/jaas.conf"
./bin/sql-client.sh embedded
CREATE TABLE t1
Hi,
版本:Flink-1.11.1
任务启动模式:standalone
Flink任务编译的jar的maven中包含了flink-avro,jar-with-dependencies编译的
org.apache.flink
flink-avro
1.11.1
编译出来的jar也包含了这个class
我看官网上说明 Flink has extensive built-in support for Apache Avro。感觉默认是支持avro的
1. 直接启动的话,会报错
s schema, ROW(totall_count, username,
>update_time) as payload
>FROM ...
>
>
>Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
>mysql 不是很方便么?
>
>Best,
>Jark
>
>
>On Mon, 27 Jul 2020 at 17:33, RS wrote:
>
>> hi,
>&g
Hi,
附近应该是收不到的,包括图片啥的
只能回复纯文本,贴代码,如果真的需要图片的话,可以上传到其他的网站上,然后给个连接跳转过去
在 2020-07-27 19:21:51,"air23" 写道:
我再上传一次
在2020年07月27日 18:55,Jark Wu 写道:
Hi,
你的附件好像没有上传。
On Mon, 27 Jul 2020 at 18:17, air23 wrote:
> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private
, 用 ROW 函数封装 payload
>INSERT INTO output
>SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
>update_time) as payload
>FROM ...
>
>
>Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
>mysql 不是很方便么?
>
>Best,
>
hi,
kafka->Flink->kafka->mysql
Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
使用kafka-connect是方便数据同时导出到其他存储
Flink定义输出表结构:
CREATE TABLE print_table \
(total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
WITH (\
'connector' = 'kafka', \
'topic' = 'test_out',
link 的依赖也打在大包里呢?因为 Flink 本身的 classpath 里就已经有这些依赖了,这个大包作为 Flink
>的用户 jar 的话,并不需要把 Flink 的依赖也放进去。
>
>RS 于2020年7月24日周五 下午8:30写道:
>
>> hi,
>> 感谢回复,尝试了多次之后,发现应该不是依赖包的问题
>>
>>
>> 我项目中新增目录:resources/META-INF/services
>> 然后从Flink源码中复制了2个文件
>> org.apache.
伪代码发下看看?看下jdbc sink的配置,是不是支持删除记录,更新的时候旧记录被删除了
在 2020-07-27 11:33:31,"郑斌斌" 写道:
>hi all :
>
> 请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt
> from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。
>但问题是,每次JOB重启后,之前mysql
你看下INERT SQL的执行时长,看下是不是MySQL那边的瓶颈?比如写入的数据较大,索引创建比较慢等其他问题?
或者你手动模拟执行下SQL写数据对比下速度?
在 2020-07-25 10:20:35,"小学生" <201782...@qq.com> 写道:
>您好,谢谢您的解答,但是我测试了按您这个方式添加以后,每秒入mysql数据量变成了8条左右,提升还达不到需要。
t;只需要-sql和-json两个包就可以了
>
>
>| |
>JasonLee
>|
>|
>邮箱:17610775...@163.com
>|
>
>Signature is customized by Netease Mail Master
>
>On 07/24/2020 17:02, RS wrote:
>hi,
>Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>编译的jar包是jar-with-dependencies的
>
邮件格式不对,我重新回复下
我这边是直接打成jar包扔到服务器上运行的,没有在IDEA运行过。
> flink run xxx
没有使用shade-plugin
maven build参数:
1.8
1.11.1
maven-compiler-plugin
${jdk.version}
${jdk.version}
序如果直接在idea里面运行是可以运行的么?
>
>如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。
>如果你用的是shade plugin,需要看下这个transformer[1]
>
>[1]
>https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer
>
>RS 于2020年7月24日周五 下午5:02写道:
>
>>
hi,
Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
编译的jar包是jar-with-dependencies的
代码片段:
public String ddlSql = String.format("CREATE TABLE %s (\n" +
" number BIGINT,\n" +
" msg STRING,\n" +
" username STRING,\n" +
" update_time
93 matches
Mail list logo