应该是可以的。另外在老版本的 Kafka connector 上,曾经也实现过限速逻辑 [1],可以参考下。这个需求我觉得还比较通用,可以提一个 JIRA。
[1] https://issues.apache.org/jira/browse/FLINK-11501
Best,
Zhanghao Chen
From: casel.chen
Sent: Tuesday, May 28, 2024 22:00
To: user-zh@flink.apache.org
Subject: Re:Flink SQL消费kafka
查了下Flink源码,当前DataGeneratorSource有添加RateLimiterStrategy参数,但KafkaSource没有该参数,可以像DataGeneratorSource那样来实现限速么?
public DataGeneratorSource(
GeneratorFunction generatorFunction,
long count,
RateLimiterStrategy rateLimiterStrategy,
TypeInformation typeInfo) {...}
Hi, 如果在中间添加了op,或者修改了处理逻辑,那么代表拓扑图会变,那么基于拓扑序所确定的uid也会变,从状态恢复就可能失败。具体可以参考[1]
目前table api应该是没有开放自定义uid的能力,可以在jira[2]上新建一个feature的jira,然后在dev邮件里发起讨论下。
[1]
有人尝试这么实践过么?可以给一些建议么?谢谢!
在 2024-04-15 11:15:34,"casel.chen" 写道:
>我最近在调研Flink实时数仓数据质量保障,需要定期(每10/20/30分钟)跑批核对实时数仓产生的数据,传统方式是通过spark作业跑批,如Apache
>DolphinScheduler的数据质量模块。
>但这种方式的最大缺点是需要使用spark sql重写flink
>sql业务逻辑,难以确保二者一致性。所以我在考虑能否使用Flink流批一体特性,复用flink
Hi iasiuide,
感谢提问. 先来回答最后一个问题
关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗?
>
Lookup join 的 on condition 会在优化过程中经过一系列改写, 这里只简要对影响 lookup 和 where 的几处进行说明.
1. logical 阶段, FlinkFilterJoinRule 会将 on 条件 split 为针对单边的 (左表/右表) 和针对双边的.
**针对单边的 filter 会被尽量 pushdown 到 join 节点之前** (这意味着有可能会额外生成一个 Filter 节点);
退订
在 2024-03-13 15:25:27,"chenyu_opensource" 写道:
>您好:
> flink将数据写入kafka【kafka为sink】,当kafka
> topic分区数【设置的60】小于设置的并行度【设置的300】时,task是轮询写入这些分区吗,是否会影响写入效率?【是否存在遍历时的耗时情况】。
> 此时,如果扩大topic的分区数【添加至200,或者直接到300】,写入的效率是否会有明显的提升?
>
> 是否有相关的源码可以查看。
>期待回复,祝好,谢谢!
>
>
>
Hi, 你的图挂了,可以用图床或者直接贴SQL
--
Best!
Xuyang
在 2024-03-08 10:54:19,"iasiuide" 写道:
下面的sql片段中
ods_ymfz_prod_sys_divide_order 为kafka source表
dim_ymfz_prod_sys_trans_log 为mysql为表
dim_ptfz_ymfz_merchant_info 为mysql为表
flink web ui界面的执行计划片段如下:
图片可能加载不出来,下面是图片中的sql片段
..
END AS trans_type,
a.div_fee_amt,
a.ts
FROM
ods_ymfz_prod_sys_divide_order a
LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time
AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
AND
Hi,
Flink SQL中可以用Group Window[1]的方式来读完cdc数据后加窗口。
可以具体描述一下“一直不生效”的现象和SQL么?
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-agg/#selecting-group-window-start-and-end-timestamps-1
--
Best!
Xuyang
在 2024-01-17
我记得flink低版本有这个bug,会错误的删除某一个checkpoint的,你这个版本太老了,可以升级到新版本。
The following is the content of the forwarded email
From:"吴先生" <15951914...@163.com>
To:user-zh
Date:2024-01-10 17:54:42
Subject:flink-checkpoint 问题
Hi,
需要精准控制异常数据的话,就不太推荐flink sql了。
考虑使用DataStream将异常数据用侧流输出[1],再做补偿。
Best,
Jiabao
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/side_output/
On 2023/12/06 08:45:20 Xuyang wrote:
> Hi,
> 目前flink sql主动收集脏数据的行为。有下面两种可行的办法:
> 1.
Hi,
目前flink sql主动收集脏数据的行为。有下面两种可行的办法:
1. 如果知道脏数据是什么格式,那么将脏数据打个标,不走正常的处理逻辑,只收集,然后由一个UDAF来负责在达到一定的量的时候cancen。
2. 如果不知道脏数据是什么格式,可以在处理数据的那一个节点上使用UDX来处理正常的数据和脏数据,同时统计脏数据的数量,在达到一定上限的时候抛异常。
但是这里在udx里抛异常应该只会导致作业fo,无法让作业达到失败的状态。
要想让作业达到失败的状态,如果在source端就可以识别到脏数据的话,需要魔改下source
补充一下,flink版本是 1.17.1
在 2023-12-01 15:49:48,"casel.chen" 写道:
>线上有一个flink sql作业,创建和更新时间列使用的是 TIMESTAMP(3)
>类型,没有配置'table.exec.sink.upsert-materialize'参数时是正常时间写入的`-MM-dd
>HH:mm:ss.SSS`格式,
>然后添加了'table.exec.sink.upsert-materialize'='NONE'参数后,输出的时间格式变成了 `-MM-dd
Hi, casel.
可以对“批量lookup join”再描述详细一点么?看上去是符合一个lookup join里直接带上k1=v1 and k2=v2 and
k3=v3的用法的。
--
Best!
Xuyang
在 2023-11-22 11:55:11,"casel.chen" 写道:
>一行数据带了三个待lookup查询的key,分别是key1,key2和key3
>
>
>id key1 key2 key3
>想实现批量lookup查询返回一行数据 id value1 value2 value3
>
>
>查了下目前包括jdbc
Hi,
是否可以将这个”配置维表“换成流表,利用flink cdc,改动这个配置表的时候,监听字段cdc变化,同时下游上流join呢?
--
Best!
Xuyang
在 2023-11-20 19:24:47,"casel.chen" 写道:
>我有一个flink
>sql作业过滤条件customer_id是需要根据用户配置来定的,类似于Apollo配置中心,是否可以通过定义一张配置维表来实现呢?设置TTL定期去获取最新配置。
>
>
>create table customer_conf_tbl (
> customer_id
Hi, 你的图挂了,可以贴一下图床链接或者直接贴一下代码。
--
Best!
Xuyang
在 2023-11-15 09:39:22,"刘聪聪" 写道:
Flink 1.17.1 遇到 DECIMAL(10, 0)类型字段,直接无法运行,我用强转都不行,还是报数组越界,去除 DECIMAL(10,
0)类型字段,sql运行都正常。
Hi,
看了下发现这个jira下面已经有人在尝试复现但是没有成功。
如果可以的话,可以在jira下面留言回复一起多提供一些可以复现的case,帮助assigner复现这个问题,从而更快的定位+修复。
--
Best!
Xuyang
在 2023-11-07 15:59:53,"casel.chen" 写道:
>这个critical issue有人fix吗?我们线上使用flink 1.17.1版本有使用jdbc维表查询on带and过滤条件,发现and过滤条件不起作用
>
>
>例如
>select xxx from a left join b on
Hi,
你指的是sql gateway上 ADD JAR这种方式来上传自定义UDF的jar包[1]么?
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/jar/
--
Best!
Xuyang
在 2023-11-01 14:21:04,"RS" 写道:
>Hi
>flink的sql gateway支持自定义的UDF吗?包括java和python的,有示例可以参考下吗?
Hi,
提交到本地是flink配置文件里面配置的jobmanager的地址,所以肯定也是提交到K8S的吧
yarn的不太清楚。
在 2023-10-30 14:36:23,"casel.chen" 写道:
>想问一下目前flink 1.18.0的sql gateway只支持提交作业到本地运行吗?能否支持提交作业到yarn或k8s上运行呢?
Flink SQL目前对于脏数据没有类似side output的机制来输出,这个需求用自定义connector应该可以实现。
--
Best!
Xuyang
在 2023-10-29 10:23:38,"casel.chen" 写道:
>场景:使用flink
>sql将数据写入下游OLAP系统,如doris,遇到一些异常情况,比如字段值超长或者分区字段值为当前doris表不存在的分区(需要先人为创建)等等,当前写入这些脏数据会使得作业写入报错,进而导致作业失败。我们是希望能够将这些“脏”数据单独发到一个kafka
>
可以查看history server
在 2023-07-14 18:36:42,"阿华田" 写道:
>
>
>hello 各位大佬, flink on K8S ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了
>无法判断flink任务是正常Finished 还是异常失败了,各位大佬有什么建议吗
>| |
>阿华田
>|
>|
>a15733178...@163.com
>|
>签名由网易邮箱大师定制
>
你好,整个程序有反压吗
在 2023-07-10 15:32:44,"jiaot...@mail.jj.cn" 写道:
>Hello,
> 我定义了一个pattern (a->b->c->d->e->f->g)在10分钟内匹配,通过在WebUI上查看任务很快在cep节点
> busy(max)100%,我发现通过增加cep节点的并发度并不能解决问题,且checkpoint随着时间的推移状态大小越来越大,数据应该存在大量堆积。数据源同时消费4个kafka
> topic
>
没有人遇到过这个问题吗?
在 2023-06-19 10:41:30,"casel.chen" 写道:
>Flink作业消费数据源来自mysql业务表,后者使用了`-00-00 00:00:00`这个dummy date来存时间,直接用Flink
>CDC消费的话会被解析成`1970-01-01 00:00:00` (mysql中是datetime类型)或者`1970-01-01 08:00:00`
>(mysql中是timestamp类型)。
>问题1:可否给个Flink
hi,
jdbc创建链接是在SimpleJdbcConnectionProvider这个类中实现的,至于真正创建链接,则是由DriverManager来处理。
关于连接数,则是跟你的并行度有关。
在 2023-05-30 13:55:57,"小昌同学" 写道:
>各位老师,请教一下关于flink jdbcsink 连接数的问题;
>我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀;
>谢谢各位老师的指导
>
>|
可以在算子后面调用.name()方法指定名称,方法参数就是算子名称。
比如需sink的流为stream
stream.sinkTo(Sink算子).name("sink-name")
--
Best,
Hjw
在 2023-04-14 16:26:35,"小昌同学" 写道:
>我将流式数据输出到mysql,查看flink 自带的web ui界面,有一个sink节点显示为Sink: Unnamed ,这个针对sink节点可以命名嘛
>
>
>| |
>小昌同学
>|
>|
>ccc0606fight...@163.com
>|
Hi
可以通过设置 pipeline.operator-chaining = false 来实现。
Best
JasonLee
Replied Message
| From | 小昌同学 |
| Date | 03/3/2023 15:50 |
| To | user-zh |
| Subject | flink sql |
各位大佬,请教一下如何使用flink sql实现DataStreaming的disableOperatorChaining功能
| |
小昌同学
|
|
ccc0606fight...@163.com
|
这种情况下有两种方式可以处理
1> 注册表-使用join方式直接拼接成大宽表写入
2> 每个任务-直接写入下游数据 ,每个任务只更新自己的字段即可(因为主键相同)
在 2023-03-02 20:59:59,"casel.chen" 写道:
>flink sql jdbc connector是否支持多流拼接?
>业务上存在基于主键打宽场景,即多个流有相同的主键字段,除此之前其他字段没有重叠,现在想打成一张大宽表写入mysql/oracle这些关系数据库。
>每条流更新大宽表的一部分字段。
hi,
这个问题是因为经过窗口算子后StreamRecord中指定的时间时间戳被改成了window.maxTimestamp(),可以查看[1]中WindowOperator或EvictingWindowOperator中的emitWindowContents方法。
如果想要更改时间戳,可以实现一个ProcessFuncton
TimestampedCollector collector = (TimestampedCollector) out;
collector.setAbsoluteTimestamp( );
collector.collect(value);
目前应该是不支持,一个替代方案是利用concat函数将数组转成string作为输入,再在你的UDF中拆成数组进行处理。
在 2023-02-15 16:29:19,"723849736" <723849...@qq.com.INVALID> 写道:
>大家好,
>
>我在用flink sql的时候有一个场景,就是需要对数组中的某一列做变换,类似于spark sql中的tranform函数
>
>
>https://spark.apache.org/docs/latest/api/sql/index.html#transform
>
>
>目前flink
你好,可以dump下内存分析
在 2023-02-16 10:05:19,"Fei Han" 写道:
>@all
>大家好!我的Flink 版本是1.14.5。CDC版本是2.2.1。在on yarn 运行一段时间后会出现如下报错:
>org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id
>container_e506_1673750933366_49579_01_02(hdp-server-010.yigongpin.com:8041)
> is
Hi,
> 遇到用户添加自定义请求头Headers问题
如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表的字段来描述
如果自定义Headers是定义table的时候确定的,那就是定义connector options的问题了
> 如何在connector options中支持Map数据类型呢?
options里面都是k=v的结构,v都是字符串,所以你要考虑的是如何用 字符串 描述 Map
Thanks
在 2022-12-17 10:20:29,"casel.chen" 写道:
>我想开发一个Flink SQL
Hi,
我考虑过的几种方式:
1. 做成http服务,和flink分离,py模型本来跑起来也不快,做成http可以动态扩容,无状态的话
2. 用pyflink来跑任务,可以嵌python代码,就是任务启动非常慢,要复制虚拟环境,模型可以写成pandas的输入输出,这样模型也是可以独立开发的
3. Java调python的udf,py必须要能封装成函数,写udf毕竟麻烦
Thanks
在 2022-12-19 16:51:33,"kcz" <573693...@qq.com.INVALID> 写道:
>flink调用py模型时候,大是采取什么方案,直接跟flink集成嘛?
flink on native kubernetes如何使用 affinity 配置软互斥?即同一个作业的不同pod分布在不同的节点node上
在 2022-12-05 19:51:02,"casel.chen" 写道:
>我司flink作业运行在k8s集群上,日前发现有一些k8s集群节点的网络io在某些时间段超过了告警阈值180MB/s,最多达到430MB/s,最少的只有4MB/s,导致新作业无法部署到网络负载高的节点上,哪怕cpu和内存还有很多剩余。
是查询的时候报错,建表是成功的。
select * from postgres_cdc_test
错误:
[ERROR] Could not execute SQL statement. Reason:
org.postgresql.util.PSQLException: ERROR: syntax error
在 2022-12-02 10:09:37,"bmw" 写道:
HI flink postgresql CDC flink1.12 ,postgresql:9.6.21 报错:
CREATE TABLE
Hi,
Flink的Metric了解下,里面应该有作业的状态
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#reporter
配置不同的Metric方式,有的是拉取,有的是推送的机制,
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/
Thanks
在 2022-11-23 08:32:11,"casel.chen" 写道:
Hi, 目前已经有相关的Flip来尝试在Flink SQL中单独设置source和sink的并行度[1],但是目前source
并没有实现。如果实在需要的话,可能需要现在自己的本地poc一下自行build
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces
在 2022-11-04 14:10:24,"杨扬" 写道:
各位好!
你好. 但是下一次cp发起之时, kafka transaction 已经超时失败了, sink端precommit之前,写入到kafka的数据,
是不是就丢失了?
发件人: Xuyang
发送时间: 2022年11月1日 23:08
收件人: user-zh@flink.apache.org
主题: Re:flink exactly once
写kafka,如果checkpoint超时了导致kafka的transaction在commit之前也超时了, flink会怎么处理呢?
Hi, 应该会等到下一次做
Hi, 应该会等到下一次做cp的时候再提交
在 2022-11-01 17:13:22,"郑 致远" 写道:
>大佬们好.
>flink exactly once 写kafka,如果flink
>checkpoint超时了导致kafka的transaction在commit之前也超时了, flink会怎么处理呢?
>kafka的transaction因为超时,abort后, 会导致abort之前写kafka的数据,丢失吗?
Hi,你可以参考cdc社区中支持flink
1.15的issue[1]和pr[2],着急的话,可以尝试先cp这个pr到本地分支[1]https://github.com/ververica/flink-cdc-connectors/issues/1363[2]https://github.com/ververica/flink-cdc-connectors/pull/1504
在 2022-10-11 11:01:25,"casel.chen" 写道:
>当前flinlk cdc master分支的snapshot版本最高支持到flink 1.14.4,尝试使用flink
Hi, 目前应该是不行的
在 2022-09-26 23:27:05,"casel.chen" 写道:
>flink cdc能否同步DDL语句以实现schema同步? 例如create table , create index, truncate table等
Hi,你可以看下HiveDynamicTableFactory的实现,作为connector需要实现factoryIdentifier来表明在with参数中使用的‘connector’=‘xxx’,但这个类由于仅支持在hive
catalog中使用,所以没有实现(还有一些其他的方法,如options可以透传with中的其他参数)。
如果你需要HiveDynamicTableFactory的功能,我感觉可以通过copy出一个新的connector类,参照其他正常connector的方式裁剪和实现一些必要的方法,然后mvn打包一下
--
Best!
Xuyang
Hi, 看上去这种情况只能使用inner join来实现,state很大的话有考虑过用FsStateBackend或者RocksDB
StateBackend来存储state么?
--
Best!
Xuyang
在 2022-09-17 10:59:16,"casel.chen" 写道:
>请教一个flink实现实时双流驱动join问题:
>
>
>order cdc流字段:order_id, order_status, order_time, user_id (order_id是主键)
>user cdc流字段:user_id,
Hi,可以向社区jira[1]报告一下这个bug,详细记录下flink版本、错误信息等。
可以通过手动修改flink源码下hbase connector pom文件,并重新编译打包的方式来快速fix这个bug。
[1] https://issues.apache.org/jira/projects/FLINK/issues
--
Best!
Xuyang
在 2022-09-16 09:34:02,"junjie.m...@goupwith.com" 写道:
Hi,
看起来像是这几个项目中的版本并不适配,导致com.google.common.base.Preconditions这个类版本冲突导致的,可以尝试下将这个包在flink和hudi中shade一下试试
--
Best!
Xuyang
At 2022-09-14 09:27:45, "Summer" wrote:
>
>版本:Flink1.13.3、Hudi0.10.1、Hive3.1.2、Hadoop3.2.1
>
>
>编译:Hudi:mvn clean package -DskipITs
Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。
--
Best!
Xuyang
在 2022-09-09 19:04:27,"郑 致远" 写道:
>各位大佬好
>请教下,
>flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 这么设计的考虑是啥呢?
Hi, 可以类似这样写 “.filter($("a").isGreater(10)) "。 更多的使用方法可以参考[1]
[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java
--
Best!
Xuyang
在 2022-09-05 20:53:03,"小昌同学"
Hi,看起来这个报错是用于输出信息的文件找不到了,可以尝试加一下这个配置再试一下“taskmanager.log.path”,找一下导致tasks超时的根本原因。
还可以试一下用火焰图或jstack查看一下那几个tasks超时的时候是卡在哪个方法上。
--
Best!
Xuyang
Hi,看起来这个报错是用于输出信息的文件找不到了,可以尝试加一下这个配置再试一下“taskmanager.log.path”,找一下导致tasks超时的根本原因。还可以试一下用火焰图或jstack查看一下那几个tasks超时的时候是卡在哪个方法上。
在
看了下关于资源比例的参数是1.15版本才有的,1.13是没有的
内存的配置参数是 kubernetes.taskmanager.memory.limit-factor
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#kubernetes-taskmanager-memory-limit-factor
On 08/28/2022 11:55,casel.chen wrote:
Hi, 请问你的需求是 “debezium数据”- flink -“canal ”么?
如果是这样的话,可以用UDF[1]来尝试下。[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/
在 2022-08-21 10:49:29,"casel.chen" 写道:
>flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?
>flink cdc获取的是debezium格式记录(用的是
Hi,
现在好像没有count相关的下沉逻辑,目前应该只实现了关于filter、limit、partition、projection等的source下沉。具体可以参考下[1]等等
[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
--
Best!
您好,我这边使用Gzip压缩后在hdfs是可读的,只不过是用了1.15 FileSink的文件合并功能之后,几个压缩文件被合并后生成的新文件读不了了。
在 2022-08-04 10:48:02,"Wenhao Xiao" 写道:
大佬们好,有用过1.15 FileSink DataStream api的合并小文件功能吗,我这里写文件用gz格式压缩,发现合并后的文件读不了。
Format
您好,您这种情况我试了一下,确实不可读,具体原因可能还要看hdfs的支持(flink的Gzip压缩出去的文件hdfs是否认可,这个待确认),不过我这边使用lzo压缩,hdfs是可读的,可参考以下内容
// create the stream with kafka source, test_topic must return Student!
val kafkaStream: DataStream[Student] = env
.addSource(kafkaConsumer)
//
可以尝试使用个参数:
kubernetes.taskmanager.cpu.limit-factor
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#kubernetes-taskmanager-cpu-limit-factor
On 08/5/2022 09:24,casel.chen wrote:
我通过flink native kubernetes部署flink
Hi,
你可以在定义表ccc_test_20220630_2字段的时候,结构如果固定,可以指定字段类型为ARRAY+ROW吧,例如 abc
ARRAY>,如果里面是动态结构,可以定义为MAP
结构如果比较复杂,或者字段不明确,就自定义UDF解决。
Thanks
在 2022-06-30 15:02:55,"小昌同学" 写道:
各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了
Hi, 目前我在flink
master上没找到这个参数'json.infer-schema.flatten-nested-columns.enable'='true'。
你可以试一下在source读完整数据,然后通过UDF手动展开潜逃类型。
在 2022-06-30 15:02:55,"小昌同学" 写道:
各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了
Hi??
??JMTM??logdatastream??closedelay
?? 2022-06-16 15:10:07??"??" <757434...@qq.com.INVALID> ??
>flink 1.10.1 flinkui
可以把堆栈的日志打印出来看看
在 2022-06-10 18:15:53,"Summer" 写道:
使用 FinkUI 上传 Flink 任务 Jar 时,任务启动失败。 这时候JVM Metaspace就会异常增加。 这是什么原因?
Hi,
请问下你找的.out文件是tm的还是jm的?tm的out文件应该是有内容的才对。
--
Best!
Xuyang
在 2022-06-08 16:49:20,"陈卓宇" <2572805...@qq.com.INVALID> 写道:
>您好:
>向flink集群提交的sql:
>CREATE TABLE datagen (
>f_sequence INT,
>f_random INT,
>f_random_str STRING,
>ts AS localtimestamp,
>WATERMARK
configmap 如下
sql-test--jobmanager-leader
sql-test-resourcemanager-leader
sql-test-restserver-leader
sql-test-dispatcher-leader
在 2022-06-08 15:42:52,"json" <18042304...@163.com> 写道:
flink1.13.6 on k8s application 模式,设置HA
high-availability:
补充一下图片
https://s2.loli.net/2022/06/02/C5it4rPFgmJlopZ.png
https://s2.loli.net/2022/06/02/3ri2HIv1RsAawBW.png
https://s2.loli.net/2022/06/02/efVWPvXCYFwhgTp.png
https://s2.loli.net/2022/06/02/9UptbNaWvs7xwXC.png
在 2022-06-02 11:38:24,"lxk" 写道:
各位,请教个问题
Hi,请问在UI界面这些数据都是空的吗?可以贴一下具体的代码和UI界面截图吗?会不会是由于算子chain在一起了导致输入/输出数据是0呢?
--
Best!
Xuyang
在 2022-05-19 17:52:20,"yidan zhao" 写道:
>如题,主要表现是web
>ui部分监控,比如watermark,每个节点的数据之类不展示。看chrome的network视图可以发现请求返回状态码都是200,但是数据是空的。
>
>以watermarks请求为例:
Hi??FlinkjaninoJanino[1]??org.codehaus.janino.source_debugging.enable=true??org.codehaus.janino.source_debugging.dir=mypathdebug[1]
http://janino-compiler.github.io/janino/#debugging
?? 2022-04-25 17:04:30??"zhiyezou" <1530130...@qq.com.INVALID>
退订
在 2022-04-14 22:44:48,"顺其自然" <712677...@qq.com.INVALID> 写道:
>我的flink 1.12.1 sql clinet
>使用flink-sql-connector-elasticsearch7,代码里使用的flink-connector-elasticsearch7,然后在同一个flink上运行,报如下错误:
>
>Caused by: java.lang.reflect.InvocationTargetException
>at
The deployment 'cert-manager-webhook' shows
Failed to pull image "quay.io/jetstack/cert-manager-webhook:v1.7.1": rpc error:
code = Unknown desc = Error response from daemon: Get "https://quay.io/v2/":
net/http: TLS handshake timeout
在 2022-04-14 15:40:51,"casel.chen" 写道:
图片挂了,如果要发图片,可以将图片上传到共享图床,然后把链接贴在邮件里;
或者是把异常信息直接贴在邮件内容里
--
Best regards,
Mang Zhang
在 2022-04-07 16:25:12,"su wenwen" 写道:
hi,all.想问大家下,是否有遇到过这个问题,flink 1.12 的版本
在线上运行的flink sql 作业,总是在凌晨报错如下:
blobserver 我理解是传输二进制jar 包,从hdfs 到 本地工作目录。但没发现其他环节出现问题,对任务数据未产生影响。。
我们是直接使用云存储,像阿里云的oss,没有再搭建hadoop集群。如果flink on
k8s的确需要访问hadoop的话,是需要打包hadoop发行包在镜像里面的,配置好core-site.xml, hdfs-site.xml等
在 2022-03-30 12:01:54,"yidan zhao" 写道:
>如题,是需要打包hadoop client到镜像中吗。
Hi,
partition.discovery.interval.ms 这个是Flink connector
kafka里面加上的,KafkaSourceOptions里面定义的,
看下你的kafka-client的版本,官方的是 2.4.1,如果版本一样,那只能先忽略了。
在 2022-03-22 17:10:52,"Michael Ran" 写道:
>dear all :
> 目前用flink1.4 table api +kafka 的情况下,有各种警告,比如:
> The configuration
在Flink里面,你如果 use 了 HiveCatalog,那么暂时不能很好的使用非hive connector以外的表;
我理解你现在想要做的是,将flink 表的数据写入到一个hive table里
HiveCatalog hiveCatalog = new HiveCatalog("devHive", null, (String)
null, "2.1.1");
tableEnv.registerCatalog("devHive", hiveCatalog);
// 去掉这部分,还使用flink默认的catalog
Hi
一般是卡在最后一步从JM写checkpoint meta上面了,建议使用jstack等工具检查一下JM的cpu栈,看问题出在哪里。
祝好
唐云
From: Sun.Zhu <17626017...@163.com>
Sent: Tuesday, March 8, 2022 14:12
To: user-zh@flink.apache.org
Subject: Re:flink s3 checkpoint 一直IN_PROGRESS(100%)直到失败
图挂了
https://post
图挂了
https://postimg.cc/Z9XdxwSk
在 2022-03-08 14:05:39,"Sun.Zhu" <17626017...@163.com> 写道:
hi all,
flink 1.13.2,将checkpoint 写到S3但是一直成功不了,一直显示IN_PROGRESS,直到超时失败,有大佬遇到过吗?
目前sql任务的提交类似与jar的提交,在代码中写flinksql,打包成jar后使用flink pre-job-cluster模式提交
在 2022-02-22 18:40:45,"shanwu...@rockontrol.com" 写道:
>Hi:
>flink Per-Job-Cluster 模式支持提交sql任务吗?没有看到相关的资料。如果支持,可以发下文档地址吗?
>版本:flink-1.13.5
>
>
>
>shanwu...@rockontrol.com
1. 图片挂了,看不到,尽量用文字,或者用图床等工具
2. 启动任务有配置checkpoint吗?
在 2022-02-17 11:40:04,"董少杰" 写道:
flink读取csv文件建表,同时消费kafka数据建表,两张表join之后写入hdfs(hudi),读取csv数据的任务已经是finished状态,就会触发不了checkpoint,看有什么办法能让它正常触发checkpoint?
flink版本1.12.2。
谢谢!
| |
董少杰
|
|
eric21...@163.com
|
jdbc 连接 mysql 的driver 记得默认就是AutoCommit。phoenix不太清楚
在 2022-02-15 13:25:07,"casel.chen" 写道:
>最近在扩展flink sql jdbc
>connector以支持phoenix数据库,测试debug的时候发现数据能够通过PhoenixStatement.executeBatch()写入,但因为没有提交事务,所以其他人看不到。
贴失败原因吧,这个看不出来
在 2022-01-13 09:37:59,"Fei Han" 写道:
>
>@all:
>Flink mysql cdc凌晨同步报错,流任务都失败了。报错如下:
>
>org.apache.flink.runtime.JobException: Recovery is suppressed by
>FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3,
>backoffTimeMS=3)
>
>at
没测试过,如果kafka 确定自身会丢掉超时事务消息的前提下,比如10分钟超时丢消息。1.flink
发送消息A,进入第一阶段。2.flink 等待kafka 消息一阶段 ack信息 3.flink
收到ack消息,发送二阶段确认消息,并进行chk异常:
这个时候flink第二阶段消息确认,发送失败(同时flink应用因为各种原因挂了,超过10分钟)3.1 10分钟后,kakfa
丢弃事务超时的消息3.2 flink 重启,重新提交二阶段的事务id (但是由于kakfa
你好,
图片无法显示,建议使用外部图床上传,或是贴文字在邮件里面。
Best regards,
Xianxun
On 12/30/2021 11:53,Liu Join wrote:
使用flinkCDC2.1.1读取MySQL数据,一段时间后报错
从 Windows 版邮件发送
eventInfo_eventTime 我猜测是 BIGINT 类型的吧?
order by | range 需要用到 timestamp 类型,需要用计算列转换一下
At 2021-12-24 16:38:00, "Pinjie Huang"
wrote:
>我的原SQL:
>CREATE TABLE consumer_session_created
>(
>consumer ROW (consumerUuid STRING),
>clientIp STRING,
>deviceId STRING,
>eventInfo ROW <
CPU不会触发驱逐的,只有内存的request/limit不一样可能会发生这样的事情
Best,
Yang
casel.chen 于2021年12月23日周四 17:18写道:
> cpu request和limit不同会有什么影响吗?会不会pod竞争不过被kill掉?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-12-20 11:36:02,"Jeff" 写道:
> >升级版本没有用的,我用的是flink 1.13.2也遇到这个问题,原因是它request与limit相同,所以后来我改了它的源代码,你可以参考一下:
>
clickhouse 还提供 Decimal64 Decimal128 ,我也想知道38 这个是什么数据库的标准吗?
在 2021-12-23 19:58:24,"Ada Wong" 写道:
>最大精度为38,这个是有什么说法吗,为什么不是1000。如果我需要更高精度的DECIMAL我改怎么做?例如我需要DECIMAL(50, 18)
升级版本没有用的,我用的是flink
1.13.2也遇到这个问题,原因是它request与limit相同,所以后来我改了它的源代码,你可以参考一下:https://github.com/jeff-zou/flink.git
,我主要是改了KubernetesUtils.java这个类,利用external resource传入参数来替换request
在 2021-12-18 09:15:06,"casel.chen" 写道:
>所用flink版本是1.12.5,部署作业到native k8s设置的不管是
图片挂掉了。
可以外链到图床或者直接贴一下关键的原始提示信息。
在 2021-12-11 16:34:41,"Jeff" 写道:
根据官方建议的maven打包命令: mvm install -Dfast -DskipTests -Dscalla-2.12 -T 1C
,但我在本地编译打包总是卡在flink-table-runtim-blink这里,也没有错误提示,如下图:
请问有什么处理方法么?
图片挂掉了,可以放到图床或者附带一些原提示信息。
在 2021-12-11 11:19:51,"Jeff" 写道:
根据官方建议的maven打包命令: mvm install -Dfast -DskipTests -Dscalla-2.12 -T 1C
,但我在本地编译打包总是卡在flink-table-runtim-blink这里,也没有错误提示,如下图:
请问有什么处理方法么?
jdbc scan
?? 2021-12-02 14:40:06??"" ??
>
Hi, 俊超.
如果你指的是数据流必须在接受到一个或者多个ddl数据流才能够继续解析的话,那么你可以在ddl流到达算子之前,将数据流存入liststate,当接收到ddl类型的数据流元素后,先解析或处理
liststate中的数据,而后继续处理当前与后续的来自数据流的元素。
也可以使用上述方式达到 ‘使用广播流的方式来提前加载mysql表结构的变化’ 的逻辑效果。
看看任务并行度是多少,可能是并发太大导致的内存占用??
在 2021-11-04 15:52:14,"Asahi Lee" <978466...@qq.com.INVALID> 写道:
>hi!
>我通过flink sql,将mysql的一亿条数据传输到hive库中,通过yarn-application方式运行,结果配置16G的内存,执行失败!
Hi,
lei-tian.
基于你的描述,我推测(flink-1.10+)会存在这几种可能。
1. 使用了
flink的yarn-session模式,然后进行了相应的作业提交。这种情况表现为,作业最终完成后,yarn中对应flink集群的taskmanager
container能够被释放掉,但是只是保留了Jobmanager组件的容器没有释放。在 flink的yarn-session模式 的部署方式中,这是正常的。
2. 不论是在flink yarn 的per-job部署模式或者yarn-session部署模式中,如果负责hbase source/sink的
应该和OSS没关系吧,毕竟只是个存储。
我们CPU 你先看看消耗在哪个线程或者方法类呗
在 2021-10-08 16:34:47,"Lei Wang" 写道:
flink 程序以 RocksDB 作为 stateBackend, aliyun OSS 作为 checkpoint 数据最终的物理位置。
我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。
这个可能的原因是什么?会跟 OSS 有关吗?
谢谢,
王磊
插入 tidb 接收binlog,再次写入
在 2021-10-14 15:59:54,"WuKong" 写道:
>Hi:
> 目前遇到一个问题,我想在一个Job下 ,有两个SQL 分步都是 读取同一个Source Kafka 数据, 一个是插入Tidb 落数据,
> 另一个是写入下游Kafka, 目前想控制 先插入DB 然后再写入下游Kafka, 请问有什么方案可以实现这种方式?
>
>
>
>---
>Best,
>WuKong
升级到1.13用pod template吧,这之前的版本没有官方支持的方式
在 2021-09-17 16:43:53,"casel.chen" 写道:
>为了监控TM OOM情况发生,我们在启动作业的时候添加了如下参数
>-Denv.java.opts.taskmanager="-XX:+HeapDumpOnOutOfMemoryError
>-XX:HeapDumpPath=/var/log/oom.bin"
>想在OOM发生的时候能生成HeapDumpFile,以便事后分析。
>但是因为OOM时TM所在的pod会被销毁,因此想挂载一个网络盘持久化HeapDumpFile。
你这个,重点报错原因是这个吧:
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
/flink/flink-cdc-demo/13714486ceb74d650ba104df7b202920/chk-1/_metadata could
only be written to 0 of the 1 minReplication nodes. There are 3 datanode(s)
running and 3 node(s) are excluded in this operation.
可以阅读
StreamingJobGraphGenerator.isChainable()方法来详细了解chain的规则,具体到AsyncWaitOperator,应该与FLINK-13063
有关
在 2021-09-02 14:05:52,"lpengdr...@163.com" 写道:
>Hi:
>
>请教下Flink的operator-chain的机制,
把容器的日志采集下来不就行了么,K8s下ELK采集容器日志的方案很成熟啊,基本上官方Helm Charts默认安装就能全采集下来。
在 2021-08-23 11:37:54,"casel.chen" 写道:
>flink 1.12.1版本,作业通过flink run命令提交,运行在native
>k8s上,有个问题:作业日志要如何持久化下来?因为发现作业重启后,pod销毁,落在pod本地的日志也看不到了,不知道出错的root
>cause。有没有办法将作业日志持久化下来?比如存到ELK或阿里云oss上面。另外,我们使用的是阿里云
这不很清楚么,连 192.168.0.32:9866 超时啊
在 2021-08-20 18:13:10,"杨帅统" 写道:
>// 开启checkpoint
>env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
>
>env.getCheckpointConfig().setCheckpointStorage("hdfs://test.gl.cdh.node1:8020/flink/flink-cdc-demo");
>System.setProperty("HADOOP_USER_NAME",
batch 和数量小点呗 ~。~
在 2021-08-12 10:09:21,"周瑞" 写道:
您好,Flink Hive 当上游的Kafka数据量特别大的时候,发现checkpoint一直无法完成,5分钟后报错了。请问这个问题要怎么解决
你好,请问下 这个问题解决了么?我现在也同样遇到这样的问题
在 2020-08-25 15:29:09,"amen...@163.com" 写道:
>hi, everyone
>
>当我把jar包都上传至hdfs时,使用如下命令进行application mode提交,
>
>./bin/flink run-application -t yarn-application
>-Dyarn.provided.lib.dirs="hdfs:///user/flink/lib" -c
>com.yui.flink.demo.Kafka2Mysql
java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError:
org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils缺jar
在 2021-07-14 09:39:53,"xie_guo...@163.com" 写道:
>您好,有关flinkSQL时态表左关联时遇到了问题。
>具体场景:
>
>
??
?? 2021-07-13 17:31:19??"" <1510603...@qq.com.INVALID> ??
>Hi All??
>
>
> ??Flink
>checkpoint??2min??
通过任务进行隔离引用呗。你们美团已经是k8s了吧?
在 2021-07-05 14:06:53,"silence" 写道:
>请教大家目前flink sql有没有办法做到依赖隔离
>比如connector,format,udf(这个最重要)等,
>很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
要么内存增大,或者并行增大,要么窗口改小,同时保留数据时间减少
在 2021-07-15 10:23:25,"Hui Wang" <463329...@qq.com.INVALID> 写道:
>flink大窗口缓存数据量过大导致jvm频烦full gc,并且处理速度极低,最终OOM,该如何调优
这是k8s运维的问题,k8s应该有自己的日志收集机制,问一下你的运维,让他们把你的workload日志往oss也写一份,并带上各种必要的meta信息(比如pod信息、宿主信息等等)。
在 2021-07-12 10:08:31,"casel.chen" 写道:
>我们使用k8s运行flink作业,作业日志存储在容器中,一旦作业挂了容器销毁了就没法获取出问题的日志,有什么办法可以将日志保存到oss上么?通过配置启动history
> server吗?
共有 257 项搜索結果,以下是第 1 - 100 matches
Mail list logo