Re:回复: Flink 维表延迟join

2021-06-07 文章 casel.chen
双流interval join是否可行呢? 在 2021-06-07 16:35:10,"Jason Lee" 写道: > > >我么生产环境同样遇到这种问题,因为上有流数据到了,但是维表数据未更新导致丢失部分数据,请问大家现在有好的解决方案去解决Flink SQL >维表延迟Join的问题了吗? > > >有解决方案的小伙伴能分享下嘛? >| | >JasonLee >| >| >jasonlee1...@163.com >| >签名由网易邮箱大师定制 > > >在2021年02月25日 14:40,Suhan 写道:

Re:Re:使用mysql-cdc 的scan.startup.mode = specific-offset的读取模式,运行一段时间后,报错

2021-06-07 文章 casel.chen
这个问题很严重啊,生产线上可不敢这么用,丢失部分数据是不能接受的。社区什么时候能支持 GTID 呢?官方网档上有写么? 在 2021-06-07 18:40:50,"董建" <62...@163.com> 写道: > > > >我也遇到了这种情况,可能是你们的db做了主从切换。 >因为binlog每台服务器的pos都不一样。 >mysql5.6以后支持了GTID的同步方式,这个是全局唯一的。但是目前mysql-cdc貌似还不支持。 >我目前的解决方案是出错后从最后的位置开始消费,可能会丢失一部分数据。 > > > > > > > > > > > > > >

flink sql cdc作数据同步作业数太多

2021-06-06 文章 casel.chen
flink sql cdc作数据同步,因为是基于库+表级别的,表数量太多导致作业数太多。请问能否用flink sql cdc基于库级别同步?这样作业数量会少很多。

回撤流的窗口统计

2021-06-05 文章 casel.chen
上游是binlog cdc消费获取的回撤流,现要使用flink sql统计分析该回撤流上每5分钟的sum值,不能使用常规tumble window是吗?只能使用group by ts配合state TTL进行? 另外,问一下flink sql的state TTL只能是全局设置吗?能够通过在sql hint上添加从而可以细粒度控制吗?

flink sql调整算子并行度的方法有哪些?

2021-06-05 文章 casel.chen
flink sql调整算子并行度的方法有哪些?通过 sql hint 可以调整吗?

flink postgres jdbc catalog是只读的吗?

2021-06-02 文章 casel.chen
flink postgres jdbc catalog是只读的吗?能写的catalog 除了Hive Catalog还有哪些?社区什么时候会有Mysql JDBC Catalog呢?

flink sql作业表定义部分字段问题

2021-06-02 文章 casel.chen
有一个flink sql mysql-cdc作业,从kafka消费canal-json格式的binlog数据,打到下游存储,遇到一个问题:上游源表字段是会动态添加的,而我的sql table定义是静态的,如果上游schema发生变化,我的作业就会失败。在flink sql中是否可以针对目标数据源只定义用到的字段?如果可以的话应该怎么实现?现在是会抛类似下面的error。意思是原本包含43列的数据表,在我的DDL中只定义了其中的4列。有哪些格式是支持定义部分字段的呢? 21/06/02 18:54:22 [Source:

怎么避免flink sql cdc作业重启后重新从头开始消费binlog?

2021-06-02 文章 casel.chen
我有一个如下flink sql cdc作业,设置了'scan.startup.mode' = 'latest-offset'。但在作业重启后发现它又从头开始消费binlog,导致sink下游数据库频繁报duplicate key error,有什么办法可以避免吗? CREATE TABLE `mysql_source` ( `id` STRING, `acct_id` STRING, `acct_name` STRING, `acct_type` STRING, `acct_bal` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH (

rocksdb状态后端最多保留checkpoints问题

2021-05-27 文章 casel.chen
作业中使用了增量式的rocksdb状态后端,请问如果再设置了最多保留checkpoints个数的话,会不会造成rocksdb state恢复失败?例如,假设完整的state恢复需要最近10个chk,但因为设置了最多保留checkpoints个数为5的话,状态是不是恢复不了了?

如何根据flink sql解析出作业的血缘关系?

2021-05-27 文章 casel.chen
如何根据flink sql解析出作业的血缘关系?找到类似这样的血缘关系:source table A --> lookup table B --> sink table C

Re:Re: flink sql cdc并行度问题

2021-05-27 文章 casel.chen
我的作业是用flink sql消费mysql cdc binlog并实时同步到mongodb。如果只开一个并行度的话,mongodb的写入速度可能追不上mysql的写入。所以我需要在sink端开大并行度。 我不清楚用sql怎么写keyBy,是不是要group by pk呢?我原来的想法是在MongoDBSinkFunction中开一个线程池,每个线程对应下游sink的一个并行度,每个线程带一个queue,MongoDBSinkFunction根据数据PK往对应的queue发数据,每个消费者线程从自己的queue pull数据再进行批量插入。不知道这样可不可行?

flink状态查看工具

2021-05-25 文章 casel.chen
我有一个flink sql写的数据实时同步作业,从mysql binlog cdc消费发到mongodb,仅此而已,没有lookup,也没有join。 查看checkpoint页显示状态有17MB,checkpoint耗时要2s。 想知道为什么状态会如此之大,有没有状态查看工具看看里面到底存了什么信息?

Re:flink sql写mysql中文乱码问题

2021-05-25 文章 casel.chen
LE’F%')]) DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2], fields=[count, word]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat 在 2021-05-25 10:40:46,"casel.chen" 写道: >数据库字符编码设置如下 > >

flink sql cdc并行度问题

2021-05-24 文章 casel.chen
flink sql作业:消费mysql binlog将数据同步到 mongodb 问题: 1. mysql-cdc connector只能设置成一个并行度吗? 2. 可以增大mongodb的sink并行度吗?可以的话,要如何设置?它保证主键相同的记录会发到同一个分区sink吗?

Re:Re:Re:Re:flink sql写mysql中文乱码问题

2021-05-24 文章 casel.chen
jdbc:mysql://host:3306/datav_test?useUnicode=true=utf8 本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢! 在 2021-05-19 17:52:01,"Michael Ran" 写道: > > > >数据库的字段字符编码 > > > > > > > > > > > > > > >在 2021-05-18 18:19:31,

flink sql支持Common Table Expression (CTE)吗?

2021-05-21 文章 casel.chen
flink sql支持Common Table Expression (CTE)吗?是不是可以通过 create temporary view xxx 来实现?CTE和temporary view的区别是什么? 例如 with toronto_ppl as ( SELECT DISTINCT name FROM population WHERE country = "Canada" AND city = "Toronto" ) , avg_female_salary as ( SELECT AVG(salary) as

flink sql支持创建临时函数吗?

2021-05-21 文章 casel.chen
如下 CREATE TEMPORARY FUNCTION get_seniority(tenure INT64) AS ( CASE WHEN tenure < 1 THEN "analyst" WHEN tenure BETWEEN 1 and 3 THEN "associate" WHEN tenure BETWEEN 3 and 5 THEN "senior" WHEN tenure > 5 THEN "vp" ELSE "n/a" END ); SELECT name ,

flink sql运行在阿里云k8s用oss作为checkpoint存储介质出错

2021-05-21 文章 casel.chen
flink sql运行在阿里云k8s用oss作为checkpoint存储介质,在作业启动过程中出错,请问这个NoSuchKey是指什么?flink在获取checkpoint作restore吗? 2021-05-21 10:56:10,278 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog, default_database, kafka_source]], fields=[id,

Re:Re:flink sql写mysql中文乱码问题

2021-05-18 文章 casel.chen
'username' = 'mysqluser', 'password' = 'mysqluser', >'table-name' = 'jdbc_sink') >在 2021-05-18 11:55:46,"casel.chen" 写道: >>我的flink sql作业如下 >> >> >>SELECT >>product_name, >>window_start, >>window_end, >>CAST(SUM(trans_amt)ASDECIMAL(24,2)

flink sql写mysql中文乱码问题

2021-05-17 文章 casel.chen
我的flink sql作业如下 SELECT product_name, window_start, window_end, CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, CAST(COUNT(order_no)ASBIGINT) trans_cnt, -- LOCALTIMESTAMP AS insert_time, '微支付事业部'AS bus_name FROM( mysql sink表的定义如下 CREATE TABLE XXX ( ) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT

flink sql源表定义字段列表问题

2021-05-17 文章 casel.chen
采用flink sql定义源表时,哪些connector支持提供部分字段,哪些connector必须要提供全量字段呢? 这边常用的connector主要有kafka, jdbc, clickhouse和mysql-cdc。 cdc是不是必须要提供对应表的全量字段呢?如果上游mysql业务表新增了字段,flink sql作业会不会出错? kafka表定义是否支持部分字段?

Re:flink sql怎样将change log stream转换成append log stream?

2021-05-17 文章 casel.chen
没有人知道吗? 在 2021-05-13 17:20:15,"casel.chen" 写道: flink sql怎样将change log stream转换成append log stream? 通过cdc接入了change log stream,后面就不能使用窗口聚合了,像Tumble和Hop, Session窗口。只能通过state ttl + group by timestamp这种方式聚合。 问一下有没有办法将change log stream转换成append log stream,从而可以使用上述窗口聚合了呢?谢谢!

Re:flink sql如何修改执行计划?

2021-05-17 文章 casel.chen
没有人知道吗? 在 2021-05-13 08:19:24,"casel.chen" 写道: >flink sql如何修改执行计划?例如,修改上下游算子不同的并行度,或者将算子链人为打断等等。 >我知道如何获取flink sql执行计划,但要怎么人为干预这个执行计划呢?还请大佬解答一下,谢谢!

flink sql怎样将change log stream转换成append log stream?

2021-05-13 文章 casel.chen
flink sql怎样将change log stream转换成append log stream? 通过cdc接入了change log stream,后面就不能使用窗口聚合了,像Tumble和Hop, Session窗口。只能通过state ttl + group by timestamp这种方式聚合。 问一下有没有办法将change log stream转换成append log stream,从而可以使用上述窗口聚合了呢?谢谢!

flink sql如何修改执行计划?

2021-05-12 文章 casel.chen
flink sql如何修改执行计划?例如,修改上下游算子不同的并行度,或者将算子链人为打断等等。 我知道如何获取flink sql执行计划,但要怎么人为干预这个执行计划呢?还请大佬解答一下,谢谢!

flink on native kubernetes要如何修改Logging配置?

2021-05-07 文章 casel.chen
我用native kubernetes方式部署flink session cluster,想修改某个包下的日志级别,如果直接修改configmap下的log4j-console.properties再重新部署是能生效的,但是通过命令行 (./bin/kubernetes-session.sh -Dkubernetes.cluster-id=xxx) 起flink session cluster会将之前的修改冲掉,有什么办法可以保留下之前的修改吗?是否有命令行启动参数可以指定自定义的logging配置?谢谢!

请问在native kubernetes上如何运行Flink History Server?

2021-05-07 文章 casel.chen
请问在native kubernetes上如何运行Flink History Server? 有没有相应的文档?

flink run命令是否支持读取远程文件系统中的jar文件?

2021-04-22 文章 casel.chen
flink run是否支持读取远程文件系统,例如oss://或hdfs://路径下的jar文件?看源码是需要构建PakcagedProgram,而它的构造函数中有一个File jarFile参数。不知是否能够从oss路径或hdfs路径构建出File对象。

Re:回复:flink sql cdc发到kafka消息表名信息缺失问题

2021-04-22 文章 casel.chen
我的疑问正是flink cdc集成debezium后为何会把原始信息弄丢失了?直接采用原生的debezium或者canal同步数据固然可以。但如果flink cdc直接能发出来的话不就可以节省这些组件和运维么?flink cdc设计的初衷也是如此。 在 2021-04-22 11:01:22,"飞翔" 写道: 既然这样,为何要用flink去同步信息,把信息的原始信息都丢失了。你可以直接采用原生的debezium或者canal同步数据,发送kafka, 比如canal的样例,虽然after

flink sql cdc发到kafka消息表名信息缺失问题

2021-04-21 文章 casel.chen
最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table 这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢? CREATE TABLE

Re:回复: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

2021-04-21 文章 casel.chen
>发送时间:2021年4月21日(星期三) 下午2:16 >收件人:"user-zh" >主题:Re: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗? > > > >Hi casel. >flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。 > >https://github.com/ververica/flink-cdc-connectors/blob/master/README.md > >ca

flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

2021-04-20 文章 casel.chen
目标是用flink作业实现类似canal server的功能 CREATE TABLE `binlog_table` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter`

Re:如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming update and delete changes

2021-04-19 文章 casel.chen
我也遇到同样的问题 GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, mcsp_pay_log, ... 按时间窗口聚合不支持上游是canal-json格式的cdc表的情况么?我的业务表其实是一张日志表,怎样用flink sql将retract table转成append table?

Re:Re: 如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming update and delete changes

2021-04-19 文章 casel.chen
flink window doesn't support retract stream 的话有什么workaround办法吗?常见的场景有 业务表cdc -> kakfa -> flink按时间窗口聚合 如果业务表是只会insert的日志表,该如何将retract table转换成普通table? GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog,

GroupWindowAggregate doesn't support consuming update and delete changes

2021-04-19 文章 casel.chen
使用 flink sql 1.12.1时遇到三个问题: 1. GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, mcsp_pay_log, ... 按时间窗口聚合不支持上游是canal-json格式的cdc表的情况么?我的业务表其实是一张日志表,怎样用flink sql将retract table转成append

flink run是否支持读取oss://或hdfs://路径下的jar文件?

2021-04-13 文章 casel.chen
flink run是否支持读取oss://或hdfs://路径下的jar文件?看源码是需要构建PakcagedProgram,而它的构造函数中有一个File jarFile参数。不知是否能够从oss路径或hdfs路径构建出File对象。 private PackagedProgram( @Nullable File jarFile, List classpaths, @Nullable String entryPointClassName, Configuration configuration, SavepointRestoreSettings

flink on native kubernetes application Rest服务暴露问题

2021-04-07 文章 casel.chen
我在阿里云k8s上部署flink on native kubernetes application,默认用的服务暴露类型是 LoadBalancer,启动后会在公网暴露rest url。运维管理人员不允许这样,说是只能使用固定预先申请的几个SLB,但我在flink官网没有找到有参数设置LoadBalancerIP,这样情况要怎么实现?

flink kubernetes application频繁重启TaskManager问题

2021-04-04 文章 casel.chen
最近试用flink kubernetes application时发现TM不断申请再终止,而且设置的LoadBalancer类型的Rest服务一直没有ready,查看不到flink web ui,k8s日志如下,这是什么原因?是因为我申请的资源太小么? = 启动参数 "kubernetes.jobmanager.cpu": "0.1", "kubernetes.taskmanager.cpu": "0.1", "taskmanager.numberOfTaskSlots": "1", "jobmanager.memory.process.size":

Flink on Native K8S模式下如何配置StandBy做高可用?

2021-03-25 文章 casel.chen
Flink on K8S Standalone模式下可以通过yaml启多个JM,但是在Native K8S模式下要如果做呢?有文档资料介绍吗?谢谢!

请问有flink + hudi或iceberg + aliyun oss的示例吗?

2021-03-15 文章 casel.chen
请问有flink + hudi或iceberg + aliyun oss的示例吗?谢谢!

flink cdc遇到数据源大事务怎么处理?

2021-03-10 文章 casel.chen
flink cdc对接上游的mysql或pg业务库时遇到业务库大批量修数或schema变更是怎么处理的? 会不会瞬间产生很多changelog records打爆flink应用?如果会的话应该要如何避免呢?谢谢!

flink sql sink多数据源问题

2021-03-10 文章 casel.chen
请教一下flink sql多条数据sink用 statement set 语句时, 1. 如果其中一条sink条发生背压或故障,会影响其他sink流吗? 2. 在flink sql cdc 消费同一张mysql表sink进多种数据源场景下,例如 mysql -> fink cdc -> mongodb & polardb 建议是启多个作业分别etl,还是分两段 mysql -> flink cdc -> kafka -> flink -> mongodb & polardb ... 呢?关系数据库端接入同时多个cdc会不会影响性能?

flink sql中如何使用异步io关联维表?

2021-03-03 文章 casel.chen
flink sql中如何使用异步io关联维表?官网文档有介绍么?

flink on k8s日志时间戳时区问题

2021-02-18 文章 casel.chen
目前是UTC时区的,怎样才能设置成当地的东8区呢?谢谢! 2021-02-19 01:34:21,259 INFO akka.event.slf4j.Slf4jLogger [] - Slf4jLogger started 2021-02-19 01:34:22,155 INFO akka.remote.Remoting [] - Starting remoting 2021-02-19 01:34:21,259 INFO

flink k8s高可用如何使用oss作为high-availability.storageDir?

2021-02-17 文章 casel.chen
如题,在k8s环境下不想使用hdfs作为high-availability.storageDir,有没有办法直接使用oss呢?checkpoint和savepoint已经能够使用oss了。

Flink standalone on k8s HA异常

2021-02-08 文章 casel.chen
我试着答k8s上部署flink standalone集群,做HA之前集群是能够正常work的,在做HA的时候发现在configmap中添加了如下两个HA配置后JM就会抛异常,这是为什么? high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: oss:///odps-prd/rtdp/flink/recovery 2021-02-09 00:03:04,421 ERROR

flink 1.12.0 k8s session部署异常

2021-02-07 文章 casel.chen
在k8s上部署sesson模式的flink集群遇到jobmanager报如下错误,请问这是什么原因造成的?要如何fix? 2021-02-07 08:21:41,873 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 . 2021-02-07

Re:flink yarn application提交作业问题

2021-01-19 文章 casel.chen
./bin/flink run-application -t yarn-application \ -Dyarn.provided.lib.dirs="hdfs://localhost:9000/flinkLib" \ hdfs://localhost:9000/flinkJobs/TopSpeedWindowing.jar 这种命令执行方式是可以执行的。 在 2021-01-20 10:21:32,"casel.chen" 写道: >使用了如下命令来提交flink作业到yarn上运行,结果出

flink yarn application提交作业问题

2021-01-19 文章 casel.chen
使用了如下命令来提交flink作业到yarn上运行,结果出错。如果job jar路径改成本地的就没有问题。我已经将 flink-oss-fs-hadoop-1.12.0.jar 放到flink lib目录下面,并且在flink.conf配置文件中设置好了oss参数。试问,这种作业jar在远端的分布式文件系统flink难道不支持吗? ./bin/flink run-application -t yarn-application \ -Dyarn.provided.lib.dirs="oss://odps-prd/rtdp/flinkLib" \

flink作业版本管理实现方案探讨

2021-01-19 文章 casel.chen
为支持作业上线评审和遇到问题进行版本回滚,实时计算平台中的flink作业版本管理采用哪种方案比较好? 一种声音是用数据库来存,另一种声音是使用git来管理:一个作业对应一个repo,里面有作业sql或jar文件,作业参数和配置文件等。 用数据库存的话作业文件比较割裂,像文本文件可以直接存表,但像jar包的话得存分布式文件系统,同时在数据表中记录文件id。我个人更倾向于用git,不知道这里会不会有什么坑?还请做过的朋友给个建议,谢谢!

flink作业版本管理实现方案

2021-01-19 文章 casel.chen
为支持作业上线评审和遇到问题进行版本回滚,实时计算平台中的flink作业版本管理采用哪种方案比较好? 一种声音是用数据库来存,另一种声音是使用git来管理:一个作业对应一个repo,里面有作业sql或jar文件,作业参数和配置文件等。

flink yarn application 提交任务出错

2021-01-19 文章 casel.chen
今天尝试使用yarn application模式(带yarn.provided.lib.dirs参数),将$FLINK_LIB目录下的jar包上传到了hdfs,结果报了如下的错,是少了哪个jar包或配置文件吗? org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn Application Cluster at

<    1   2   3   4