) {...}
在 2024-05-27 23:47:40,"casel.chen" 写道:
>Flink SQL消费kafka topic有办法限速么?场景是消费kafka
>topic数据写入下游mongodb,在业务高峰期时下游mongodb写入压力大,希望能够限速消费kafka,请问要如何实现?
Flink SQL消费kafka topic有办法限速么?场景是消费kafka
topic数据写入下游mongodb,在业务高峰期时下游mongodb写入压力大,希望能够限速消费kafka,请问要如何实现?
我们有使用阿里云商业版Hologres数据库,同时我们有自研的Flink实时计算平台,为了实现在Hologres上实时建仓,我们基于开源Apache
Flink 1.17.1结合阿里云maven仓库的ververica-connector-hologres包[1]和开源的holo
client[2]开发了hologres
connector,修复了一些jar依赖问题。目前我们已经在生产环境使用了一段时间,暂时没有发现问题,现在想将它贡献给社区。
请问:
1. 贡献Flink Hologres连接器是否合规?
2. 如果合规的话,PR应该提到哪个项目代码仓库?
3. 还是说要像
有人尝试这么实践过么?可以给一些建议么?谢谢!
在 2024-04-15 11:15:34,"casel.chen" 写道:
>我最近在调研Flink实时数仓数据质量保障,需要定期(每10/20/30分钟)跑批核对实时数仓产生的数据,传统方式是通过spark作业跑批,如Apache
>DolphinScheduler的数据质量模块。
>但这种方式的最大缺点是需要使用spark sql重写flink
>sql业务逻辑,难以确保二者一致性。所以我在考虑能否使用Flink流批一体特性,复用flink
&g
我最近在调研Flink实时数仓数据质量保障,需要定期(每10/20/30分钟)跑批核对实时数仓产生的数据,传统方式是通过spark作业跑批,如Apache
DolphinScheduler的数据质量模块。
但这种方式的最大缺点是需要使用spark sql重写flink sql业务逻辑,难以确保二者一致性。所以我在考虑能否使用Flink流批一体特性,复用flink
sql,只需要将数据源从cdc或kafka换成hologres或starrocks表,再新建跑批结果表,最后只需要比较相同时间段内实时结果表和跑批结果表的数据即可。不过有几点疑问:
1. 原实时flink
请问flink cdc对外有暴露一些监控metrics么?
我希望能够监控到使用flink cdc的实时作业当前未消费的binlog数据条数,类似于kafka topic消费积压监控。
想通过这个监控防止flink cdc实时作业消费慢而被套圈(最大binlog条数如何获取?)
我们最近在使用Flink 1.17.1开发flink sql作业维表关联使用复合主键时遇到FLINK-34170描述一样的问题,请问这个major
issue什么时候在哪个版本后能够修复呢?谢谢!
select xxx from kafka_table as kt
left join phoenix_table FORSYSTEM_TIMEASOFphoenix_table.proctime as pt
on kt.trans_id=pt.trans_id and pt.trans_date =
DATE_FORMAT(CURRENT_TIMESTAMP,'MMdd');
我使用注册kafka topic对应的schema到confluent schema registry时报错,想知道问题的原因是什么?如何fix?
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Schema being registered is incompatible with an earlier schema for subject
"rtdp_test-test_schema-value", details:
正在运行的flink作业能够通过其对外暴露的web url获取到JobGraph信息么?
搜索了debezium源码但没有发现哪里有调用
SchemaRegistryClient.register方法的地方,请问它是如何注册schema到confluent schema registry的?
一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink connector信息,包括连接字符串、数据库名、表名等?
场景:使用Flink实时生成指标写入Prometheus进行监控告警
网上搜索到 https://github.com/apache/flink-connector-prometheus 项目,但内容是空的
另外找到FLIP-312 是关于flink prometheus
connector的,https://cwiki.apache.org/confluence/display/FLINK/FLIP-312%3A+Prometheus+Sink+Connector
请问Flink官方有没有出flink prometheus connector?
flink sql作业从kafka消费mysql过来的canal
json消息,经过复杂处理后写入doris,请问如何统计doris表记录的端到端时延?mysql表有update_time字段代表业务更新记录时间。
doris系统可以在表schema新增一个更新时间列ingest_time,所以在doris表上可以通过ingest_time -
update_time算出端到端时延,但这种方法只能离线统计,有没有实时统计以方便实时监控的方法?
st!
>Xuyang
>
>
>
>
>
>在 2024-02-20 19:38:44,"Feng Jin" 写道:
>>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>>
>>Best,
>>Feng
>>
>>On Tue, Feb 20, 2024 at 4:35 PM casel.chen wrote:
>>
>>&g
请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
public class XxxSinkFunction extends RichSinkFunction implements
CheckpointedFunction, CheckpointListener {
@Override
public synchronized void invoke(RowData rowData, Context context) throws
IOException {
我们在使用flink搭建实时数仓,想知道flink作业是如何做数据质量监控告警的?包括数据及时性、完整性、一致性、准确性等
调研了spark streaming有amazon deequ和apache
griffin框架来实现,想知道flink作业有没有类似的DQC框架?最好是对原有作业无侵入或者少侵入。
如果没有的话,实时数据质量这块一般是如何实现的呢?
如果每个生产作业都要单独配置一个DQC作业是不是代价太高了?有没有通过metrics暴露数据质量信息的呢?
下面是deequ使用的示例,检查每个微批数据是否满足规则要求。我们也有类似的数据质量检查需求
使用flink cdc 3.0
yaml作业进行mysql到doris整库同步时发现有数据倾斜发生,大的TM要处理180G数据,小的TM只有30G数据,上游有的大表流量很大,而小表几乎没有流量,有什么办法可以避免发生数据倾斜问题么?
t;
>[1] https://github.com/mongodb/mongo-kafka
>
>
>On 2024/01/22 02:57:38 "casel.chen" wrote:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Flink CDC MongoDB connector 还是基于debezium实现的
>>
>>
>>
>>
Flink CDC MongoDB connector 还是基于debezium实现的
在 2024-01-22 10:14:32,"Jiabao Sun" 写道:
>Hi,
>
>可以参考 Flink CDC MongoDB connector 的实现。
>
>Best,
>Jiabao
>
>
>On 2024/01/22 02:06:37 "casel.chen" wrote:
>> 现有一种数据源不在debezium支持范围内,需要通
现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc
3.x自行开发,查了一下现有大部分flink cdc source
connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!
想知道oracle cdc connector不支持动态加表的原因是什么?可否自己扩展实现呢?
在 2024-01-19 11:53:49,"Jiabao Sun" 写道:
>Hi,
>
>Oracle CDC connector[1] 目前是不支持动态加表的。
>
>Best,
>Jiabao
>
>[1]
>https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/oracle-cdc.html
>
>
>On 2024/01/19
我想在Intellij Idea里面调试mysql-to-doris.yaml作业要如何操作呢?flink-cdc-cli模块需要依赖
flink-cdc-pipeline-connector-mysql 和 flink-cdc-pipeline-connector-doris 模块么?
业务表存量数据很大,如果不加限流直接使用flink cdc读取snapshot阶段数据的话会造成业务库压力,触发数据库告警,影响在线业务。
请问Flink CDC中如何在Snapshot阶段读取数据时进行限流?
我看到社区之前有人提议过,但issue一直是open状态
https://issues.apache.org/jira/browse/FLINK-18740
另外,我在flink最新master分支源码中有找到
GuavaFlinkConnectorRateLimiter,但没有找到调用它的例子,请问如何在flink作业中使用限流呢?
题比较复杂,可以参考一下源码和设计文档 [1]。
>
>Best,
>Jiabao
>
>[1]
>https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit
>
>> 2023年12月27日 22:14,casel.chen 写道:
>>
>> 看了infoq介绍flink cdc 3.0文章
>> https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schem
看了infoq介绍flink cdc 3.0文章
https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema.
evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, d6
其中d代表数据变更,s代表schema变更
这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2
apshotSplit的情况,此时才会走到这段逻辑。
>
>Best,
>Hang
>
>
>key lou 于2023年12月20日周三 16:24写道:
>
>> 意思是当 有 binlog 就意味着 已经读完了 snapshot
>>
>> casel.chen 于2023年12月19日周二 16:45写道:
>>
>> > 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢!
>> >
>>
我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢!
MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read
snapshot split”这一句话我不理解。
为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot split再读增量的binlog
split么?
private MySqlRecords pollSplitRecords() throws
补充一下,flink版本是 1.17.1
在 2023-12-01 15:49:48,"casel.chen" 写道:
>线上有一个flink sql作业,创建和更新时间列使用的是 TIMESTAMP(3)
>类型,没有配置'table.exec.sink.upsert-materialize'参数时是正常时间写入的`-MM-dd
>HH:mm:ss.SSS`格式,
>然后添加了'table.exec.sink.upsert-materialize'='NONE'参数后,
线上有一个flink sql作业,创建和更新时间列使用的是 TIMESTAMP(3)
类型,没有配置'table.exec.sink.upsert-materialize'参数时是正常时间写入的`-MM-dd
HH:mm:ss.SSS`格式,
然后添加了'table.exec.sink.upsert-materialize'='NONE'参数后,输出的时间格式变成了 `-MM-dd
HH:mm:ss.SS`。数据类型变成了TIMESTAMP(6),请问这是已知的issue么?
-U[2023-11-29T21:11:02.327, 2023-11-29,
社区Flink自带的那些json函数都没有解析一串json string返回一行或多行ROW的
在 2023-11-23 15:24:33,"junjie.m...@goupwith.com" 写道:
>可以看下JSON函数
>https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/functions/systemfunctions/#json-functions
>
>
>
>Junjie.M
>
ing>>
>)
>
>Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1
>as body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid,
>null,body_data[2]. field, body_data[2]. field2)] as result
>
>希望对你有帮助
>
>> 2023年11月22日 2
输入:
{
"uuid":"",
"body_data":
"[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
}
输出:
[
{
"uuid": "",
"body_data: null,
"body_data.fild1": "123”,
"body_data.fild2": "234"
},
{
"uuid": "",
"body_data": null,
"body_data.fild1":
描述详细一点么?看上去是符合一个lookup join里直接带上k1=v1 and k2=v2 and
>k3=v3的用法的。
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>在 2023-11-22 11:55:11,"casel.chen" 写道:
>>一行数据带了三个待lookup查询的key,分别是key1,key2和key3
>>
>>
>>id key1 key2 key3
>&
一行数据带了三个待lookup查询的key,分别是key1,key2和key3
id key1 key2 key3
想实现批量lookup查询返回一行数据 id value1 value2 value3
查了下目前包括jdbc connector在内的lookup都不支持批量查询,所以只能先将多列转成多行分别lookup再将多行转成多列,如下所示
id key1 key2 key3
先将多列转成多行
id key1
id key2
id key3
分别进行lookup join后得到
id value1
id value2
id value3
最后多行转多列返回一行数据
id
我有一个flink
sql作业过滤条件customer_id是需要根据用户配置来定的,类似于Apollo配置中心,是否可以通过定义一张配置维表来实现呢?设置TTL定期去获取最新配置。
create table customer_conf_tbl (
customer_id STRING
) with (
'connector' = 'apollo',
'其他属性'
);
select * from biz_table where customer_id in (select string_split(customer_id,
',') from
这个critical issue有人fix吗?我们线上使用flink 1.17.1版本有使用jdbc维表查询on带and过滤条件,发现and过滤条件不起作用
例如
select xxx from a left join b on a.id = b.id and b.type = 'xxx'
发现b.type='xxx'这个过滤条件不起作用
onfigurations
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2023-10-29 20:34:52,"casel.chen" 写道:
>>请问flink 1.18引入的CatalogStore是为了解决什么问题呢?为什么引入CatalogStore后才可以提供show create
>>catalog语法支持?
>>
>>
>>
>>
>>
>
想问一下目前flink 1.18.0的sql gateway只支持提交作业到本地运行吗?能否支持提交作业到yarn或k8s上运行呢?
flink 的 gateway 接口,所以你在启动 gateway
>的时候不用指定 endpoint 为 hiveserver2 类型,用 Flink 默认的 gateway endpoint 类型即可。
>
>casel.chen 于2023年10月29日周日 17:24写道:
>>
>> 1. 启动flink集群
>> bin/start-cluster.sh
>>
>>
>> 2. 启动sql gateway
>> bin/sql-gateway.sh start -Dsql-ga
alog,管理起来很麻烦,有这个特性会好很多。
>| |
> 回复的原邮件
>| 发件人 | Feng Jin |
>| 发送日期 | 2023年10月20日 13:18 |
>| 收件人 | |
>| 主题 | Re: flink sql不支持show create catalog 吗? |
>hi casel
>
>
>从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。
>
>
>Best,
>Feng
>
&
1. 启动flink集群
bin/start-cluster.sh
2. 启动sql gateway
bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2
3. 将flink-sql-jdbc-driver-bundle-1.18.0.jar放到apache-hive-3.1.2-bin/lib目录下
4. 到apache-hive-3.1.2-bin目录下启动beeline连接sql gateway,提示输入用户名和密码时直接按的回车
$ bin/beeline
SLF4J: Class path
场景:使用flink
sql将数据写入下游OLAP系统,如doris,遇到一些异常情况,比如字段值超长或者分区字段值为当前doris表不存在的分区(需要先人为创建)等等,当前写入这些脏数据会使得作业写入报错,进而导致作业失败。我们是希望能够将这些“脏”数据单独发到一个kafka
topic或者写入一个文件便于事后审查。这个目前有办法做到吗?
之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink
sql不支持show create catalog 。
而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?
目前想要通过Flink全量+增量消费Apache Paimon表需要分别起离线和增量消费两个作业,比较麻烦,而且无法无缝衔接,能否通过类似Flink
CDC消费mysql表的方式消费Apache Paimon表?
oint为HDFS上Savepoint的地址,同时配置savepoint/checkpoint目录为OSS。这样Flink启动的时候会从HDFS中的状态恢复,并将新的checkpoint保存在oss中。
>
>On Sun, Aug 6, 2023 at 10:03 PM casel.chen wrote:
>
>> flink on yarn作业checkpoint/savepoint保存在hdfs上面,现在想将其迁移到on
>> k8s上运行,使用的是对象存储oss,请问如何无感地进行作业状态迁移呢?使用的flink版本是1.15.2,谢谢!
flink on yarn作业checkpoint/savepoint保存在hdfs上面,现在想将其迁移到on
k8s上运行,使用的是对象存储oss,请问如何无感地进行作业状态迁移呢?使用的flink版本是1.15.2,谢谢!
我们要将当前在Hadoop Yarn上运行的flink
sql作业迁移到K8S上,状态存储介质要从HDFS更换到对象存储,以便作业能够从之前保存点恢复,升级对用户无感。
又因为flink作业状态文件内容中包含有绝对路径,所以不能通过物理直接复制文件的办法实现。
查了一下官网flink state processor api目前读取状态需要传参uid和flink状态类型,但问题是flink
sql作业的uid是自动生成的,状态类型我们也无法得知,请问有没有遍历目录下保存的所有状态并将其另存到另一个文件系统目录下的API ? 感觉state
processor
社区无人响应吗?
在 2023-07-15 12:19:46,"casel.chen" 写道:
>Flink社区能否考虑将ROWKIND做成元数据metadata,类似于kafka
>connector中的offset和partition等,用户可以使用这些ROWKIND
>metadata进行处理,例如对特定ROWKIND过滤或者答输出数据中添加ROWKIND字段
Flink社区能否考虑将ROWKIND做成元数据metadata,类似于kafka
connector中的offset和partition等,用户可以使用这些ROWKIND
metadata进行处理,例如对特定ROWKIND过滤或者答输出数据中添加ROWKIND字段
可以查看history server
在 2023-07-14 18:36:42,"阿华田" 写道:
>
>
>hello 各位大佬, flink on K8S ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了
>无法判断flink任务是正常Finished 还是异常失败了,各位大佬有什么建议吗
>| |
>阿华田
>|
>|
>a15733178...@163.com
>|
>签名由网易邮箱大师定制
>
Flink社区能否考虑将ROWKIND做成元数据metadata,类似于kafka connector中的offset,
partition,用户可以引用这些metadata进行过滤操作?
在 2023-07-10 23:39:00,"yh z" 写道:
>Hi, shi franke. 你可以尝试自己实现一个 DynamicTableSink,在里面添加参数 “sink.ignore-delete”。
>你可以参考 github 上的一些实现,例如 clickhouse:
mysql库中设置的是utf8mb4编码,单独sql查询mysql表没有出现中文乱码
使用flink datastream作业通过cdc消费mysql
binlog并写到下游doris表时遇到字符串长度超长问题,我们是按mysql表schema创建的doris
schema,就很奇怪为什么总是报字符串超长错误。于是将异常时的原始数据打印出来,才发现数据中只要包含了中文字符都会显示成乱码,要么都是???,要么都是其他莫名字符。
没有人遇到过这个问题吗?
在 2023-06-19 10:41:30,"casel.chen" 写道:
>Flink作业消费数据源来自mysql业务表,后者使用了`-00-00 00:00:00`这个dummy date来存时间,直接用Flink
>CDC消费的话会被解析成`1970-01-01 00:00:00` (mysql中是datetime类型)或者`1970-01-01 08:00:00`
>(mysql中是timestamp类型)。
>问题1:可否给个Flink CDC选项,遇到这种dummy时间转成NULL?
Flink作业消费数据源来自mysql业务表,后者使用了`-00-00 00:00:00`这个dummy date来存时间,直接用Flink
CDC消费的话会被解析成`1970-01-01 00:00:00` (mysql中是datetime类型)或者`1970-01-01 08:00:00`
(mysql中是timestamp类型)。
问题1:可否给个Flink CDC选项,遇到这种dummy时间转成NULL?存量query和增量消费binlog处理这种dummy时间结果一致么?
问题2:如果是mysql -> mysql同步场景,使用Flink
hi casel
>
>1. 可以考虑使用 Flink1.15, 使用精简的 operator name
>
>https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/#table-exec-simplify-operator-name-enabled
>
>2. Flink 也提供了 restful 接口直接获取瞬时的 metric,如果不需要历史的 metric
>
>https://nightlies.apache.org/flink
线上跑了200多个flink
sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。
flink
sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称,
请问这个问题有什么好的办法解决吗?
ightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#io
>
>On Mon, Jun 12, 2023 at 5:05 PM casel.chen wrote:
>
>> 想统计数据经过flink作业各个算子的延迟指标,目前社区开源版能实现吗?
>
>
>
>--
>Best,
>Hangxiang.
想统计数据经过flink作业各个算子的延迟指标,目前社区开源版能实现吗?
arker
>也会被阻塞,所以大体上还是可以反应出任务的延迟情况,如果想要准确的计算出端到端的延迟,可以在 消费 kafka 的时候获取一个 start time 时间戳
>在 sink 的时候获取一个 end time 时间戳,然后自定义一个 metric 把这个结果上报 基于这个 metric 做端到端的延迟监控。
>
>
>Best
>JasonLee
>
>
> 回复的原邮件
>| 发件人 | casel.chen |
>| 发送日期 | 2023年06月8日 16:39 |
>| 收件人 |
我想知道当前flink作业延迟了多久现在能通过什么指标可以获取到吗?想通过设置作业延迟告警来反馈作业健康状况,是否产生背压,是否需要增加资源等。
以mysql表实时同步到doris表为例:mysql binlog -> kafka -> flink -> doris
延迟指标包括:
1. 业务延迟:业务延迟=当前系统时间 - 当前系统处理的最后一条数据的事件时间(Event time)
例如:kafka消息写入doris的时间 - kafka消息数据本身产生时间(例如更新mysql记录的时间)
2. 数据滞留延迟:数据滞留时间=数据进入实时计算的时间 - 数据事件时间(Event
有没有Flink RocketMQ官方连接器? 需要自己开发吗?Flink生态组件网址(用户上传自己开发的连接器格式什么的)是什么?
用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?
mysql binlog 操作记录发到 kafka topic 中,消息格式是canal json,现通过flink
sql实时同步写入另一个mysql库。今天发现实时作业抛错说写入mysql时遇到duplicate key error,查了一下发现是kafka
topic中存在两条相同的消息,即相同主键且都是INSERT操作的消息。请问这种情况有什么办法可以避免作业出错吗?
查了官方文档说要在作业中添加参数 table.exec.source.cdc-events-duplicate
使用flink sql写mysql表数据到doris表,发现case
when语句判断交易类型使用了中文,写入后在doris查出是乱码,而mysql其他中文字段写入是正确的,想问一下这个sql中出现的乱码问题要解决?
t;可以考虑基于flink-kubernetes依赖下的KubernetesClusterDescriptor来启动任务,可以参考https://github.com/collabH/flink-deployer/blob/main/infrastructure/src/main/java/com/flink/plugins/inf/deployer/KubernetesClusterDeployer.java
>
>
>> 2023年5月15日 19:21,casel.chen 写道:
>>
>> 我们开发了一个实时计算平台提交flink
&g
我们开发了一个实时计算平台提交flink
sql作业到k8s上运行,发现每次提交作业都需要上传平台sql作业jar包flinksql.jar,因为这个jar包包含了平台用到的所有connector和format,所以flinksql.jar这个fat
请问Flink SQL CEP只能处理单流输入吗?网上看到的例子都是在同一个输入流中进行CEP处理,有没有双(多)输入流下使用CEP处理的例子?谢谢!
需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。
请问这个双流实时对帐场景使用Flink CEP SQL要如何实现?
网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。
线上使用flink sql消费kafka topic canal json格式数据,发现有一些数据中有的时间字段值为-00-00
00:00:00无法被解析,于是加了'canal-json.ignore-parse-errors = true'
参数,作业是能够正常运行了,但同时我们也希望知道哪些数据解析失败以便发给上游业务系统去自查。想问一下除了ignore外,有办法将这些parse
error数据输出到另外一个kafka topic吗?谢谢!
实际工作中会遇到kafka版本升级或者kafka扩容(横向或纵向),数据重平衡等情况,想问一下发生这些情况下对线上运行的flink作业会有什么影响?flink作业能感知topic分区发生变化吗?要如何应对以减少对flink作业消费端的影响?
我在尝试提交pyflink作业到k8s,按照这篇文章[1]介绍操作的,pyflink镜像文件[2],flink版本是1.15.2,执行wordcount
jar作业没遇到问题,而在提交pyflink作业时发现作业失败了,日志显示如下。我本地安装的python 3.7.9和pyflink镜像中的版本是一致的,
请问是不是pickle包版本有问题?
怎样查看当前pickle包版本号是多少?
期望用的pickle包版本号是多少?
如何将当前pickle包安装成期望的版本?
./bin/flink run \
-m localhost:8081 \
-py
多张oracle表变更同步到同一个kafka topic,现在实时flinlk作业需要消费其中一张oracle表,查了一下没看到类似canal json格式中
canal-json.database.include 和 canal-json.table.include 参数,只在available
metadata中看到 table 字段,这意味着我需要在select语句中按table字段进行过滤吗?
[1]
我的实时作业项目想解析sql获取到TableIdentifier做sql血缘,使用的版本是flink 1.15.2,同时引入了
flink-table-planner_2.12 和 flink-table-planner-loader 依赖,debug时发现
TableEnvironmentImpl create(EnvironmentSettings settings) 方法会调用
FactoryUtil.discoverFactory(classLoader, ExecutorFactory.class,
项目中引入 flink-sql-connector-oracle-cdc-2.3.0.jar
后启动过程中报如下异常,查了一下该jar下有oracle.xml.jaxp.JXDocumentBuilderFactory类,有什么办法解决么?
ERROR StatusLogger Caught javax.xml.parsers.ParserConfigurationException
setting feature http://xml.org/sax/features/external-general-entities to false
on
使用prometheus监控flink
sql作业,发现没一会儿工夫就将prometheus内存(30GB)占满了,查了一下是因为作业指标名称过长导致的,像flink sql作业这种operator
name和task name默认是根据sql内容拼装的,一旦sql出现的列名很多就会导致指标名称过长。
请问这种情况Flink社区有什么建议?prometheus抓取的时候能够过滤掉吗?只保留operator_id和task_id。
要是自己想将现有拼装名称修改成哈希值的话应该改哪个类呢?谢谢!
更正一下,监控flink的方式从pushgateway方式改成了直接prometheus定期来抓取,周期设置的是1分钟,之前用pushgateway方式也总是把pushgateway打挂,现在改成pull方式还是照样把prometheus打挂。flink作业侧有什么参数可以配置吗?prometheus侧能否配置只抓取那些grafana
dashboard展示需要的metrics?
在 2023-03-22 12:08:29,"casel.chen" 写道:
>我们通过pushgateway上报metrics到prometheu
我们通过pushgateway上报metrics到prometheus,设置的上报周期是30秒,整个实时平台有200多个作业,启了一台50GB的prometheus还是撑不住,指标保留1天,设置了指标在内存中驻留2小时后写入磁盘。最大的一个metric已经有37万条。请问有什么解决办法么?能选择哪些指标进行上报不?
Flink实时计算平台在k8s上以Application模式启动作业如何实时同步作业状态到平台?作业一旦crash失败就会被k8s回收到相关的pod,没法通过web
url去获取作业状态,有什么别的办法吗?通过metrics? 如果是的话具体是哪一个metric值呢?
检查过了,当前`state.checkpoints.num-retained`参数值是3
在 2023-03-21 20:05:35,"Shammon FY" 写道:
>Hi
>
>你可以检查一下checkpoint配置`state.checkpoints.num-retained`,是否保存的checkpoint数量太多了?
>
>Best,
>Shammon FY
>
>
>On Tue, Mar 21, 2023 at 11:55 AM casel.chen wrote:
>
>> 有
有一个flink cdc实现多表关联打宽的flink作业,作业状态达到20GB左右,远端状态存储用的是aliyun
oss。今天作业运行失败打算手动从checkpoint恢复时发现保存作业状态的checkpoint目录(share目录)无法通过浏览器打开,后来使用命令行list了一下该目录下的文件有多达上万个文件。该flink作业用的是rocksdb
state
backend并开启了增量checkpoint。请问有什么办法可以解决这个问题吗?share目录下这么多文件是因为增量checkpoint遗留下来的吗?
线上用prometheus监控几百个flink作业,使用的是pushgateway方式,设置采样作业metrics周期是30秒,prometheus服务本身给了将近50GB内存,还是会经常发生OOM,请问有什么调优办法吗?
站在业务角度,监控指标包括数据的一致性(不多不少)和 数据的时效性(同步延迟时长在合理区间)。这2块有什么工具和方案吗?
在 2023-03-17 15:23:30,"Shammon FY" 写道:
>Hi
>
>具体是要监控哪些信息?不同的信息会有不同的工具和方案,比如资源使用率、failover情况、同步数据延时等
>
>Best,
>Shammon FY
>
>
>On Fri, Mar 17, 2023 at 10:52 AM casel.chen wrote:
>
使用flink cdc消费mysql binlog遇到业务库刷数据瞬间cdc流量上涨打爆作业的问题有什么好的解决办法吗?
业务上利用flink作业做实时数据同步,请问实时数据同步对比监控有什么好的工具和方案吗?
实时同步链路:mysql -> kafka canal -> flink -> doris
欢迎大家提供思路
当前flink实时作业接的kafka canal json格式的cdc数据,mysql表会有新增和更新数据,但不会有物理删除。
如果直接多条cdc数据流实时关联会导致作业状态很大,请教:
1. 有没有什么办法可以减少作业状态?
2. cdc格式的retract流可以加去重变成append流吗?
3. 使用append流多流关联是不是能减少作业状态?
flink cdc connector计划支持hudi change data capture吗?
flink sql jdbc connector是否支持多流拼接?
业务上存在基于主键打宽场景,即多个流有相同的主键字段,除此之前其他字段没有重叠,现在想打成一张大宽表写入mysql/oracle这些关系数据库。
每条流更新大宽表的一部分字段。
flink sql上游接kafka canal json topic消费mysql同步过来的变更,接着关联几张维表,最后写入下游数据库发现漏数据。
随后在写目标数据库中加了一些日志后发现同一主键的变更记录(前后发生间隔时间很短)被发送到了不同的TaskManager处理,导致新数据被旧数据覆盖,造成漏数据现象。
请问:
1. cdc数据源关联维表后会被分散到不同TaskManager吗?什么情况下会发生?
2. 如何解决这个问题?是需要在写目标表之前加一层窗口去重[1]吗?
[1]
Hi,
>
>Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?", *checking out
>ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details about
>solution of disordering problems in KeyBy shuffling.
>
>Best,
>Shuo
>
>On Wed, Feb 22, 2023 at 10:23 AM casel.chen wro
On Sun, Feb 19, 2023 at 1:43 PM RS wrote:
>>
>> > Hi,
>> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>> >
>> >
>> > Thanks
>> >
>>
group by 主键,然后再执行insert into
>>
>>
>> Thanks
>>
>>
>>
>> 在 2023-02-17 15:56:51,"casel.chen" 写道:
>> >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
>> join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>> >测试下来发现
日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了
在 2023-02-20 09:58:56,"Shengkai Fang" 写道:
>Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?
>
>Best,
>Shengkai
>
>casel.chen 于2023年2月9日周四 12:03写道:
>
>> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal
键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>
>
>Thanks
>
>
>
>在 2023-02-17 15:56:51,"casel.chen" 写道:
>>作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
>>join,最后将打宽表写入mongodb。使用的是fli
作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
请问:
flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
我理解flink
目前应该是不支持,一个替代方案是利用concat函数将数组转成string作为输入,再在你的UDF中拆成数组进行处理。
在 2023-02-15 16:29:19,"723849736" <723849...@qq.com.INVALID> 写道:
>大家好,
>
>我在用flink sql的时候有一个场景,就是需要对数组中的某一列做变换,类似于spark sql中的tranform函数
>
>
>https://spark.apache.org/docs/latest/api/sql/index.html#transform
>
>
>目前flink
不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal json格式解析时直接忽略不识别的type,例如
例1:
我们flink sql作业跑在k8s上,但发现k8s集群整体资源使用率并不高,例如请求内存占总内存89.28%,但实际使用内存占总内存只有66.38%。
现在想排查出哪些作业过度申请资源,有什么办法或直接的metrics可以监控flink sql作业实现k8s资源使用率么?谢谢!
flink自带的web ui为什么不能提供一个触发生成保存点的按钮?
>
>所以要看用户想要什么,你们想给用户开放到哪个程度?
>至于是不是可以像flink sql kafka connector定义 `properties.*`
>,这个是具体实现的方式,现在都不清楚你要做什么,先确定目标,再考虑实现。
>
>
>Thanks
>
>在 2022-12-27 13:24:38,"casel.chen" 写道:
>>
>>
>>遇到用户添加自定义请求头Headers问题
>>
>>如果自定义Headers是和内容相关
nnector options中支持Map数据类型呢?
>options里面都是k=v的结构,v都是字符串,所以你要考虑的是如何用 字符串 描述 Map
>
>
>
>
>Thanks
>
>在 2022-12-17 10:20:29,"casel.chen" 写道:
>>我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector
>>options中支持Map数据类型呢?
共有 350 项搜索結果,以下是第 1 - 100 matches
Mail list logo