双流interval join是否可行呢?
在 2021-06-07 16:35:10,"Jason Lee" 写道:
>
>
>我么生产环境同样遇到这种问题,因为上有流数据到了,但是维表数据未更新导致丢失部分数据,请问大家现在有好的解决方案去解决Flink SQL
>维表延迟Join的问题了吗?
>
>
>有解决方案的小伙伴能分享下嘛?
>| |
>JasonLee
>|
>|
>jasonlee1...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2021年02月25日 14:40,Suhan 写道:
这个问题很严重啊,生产线上可不敢这么用,丢失部分数据是不能接受的。社区什么时候能支持 GTID 呢?官方网档上有写么?
在 2021-06-07 18:40:50,"董建" <62...@163.com> 写道:
>
>
>
>我也遇到了这种情况,可能是你们的db做了主从切换。
>因为binlog每台服务器的pos都不一样。
>mysql5.6以后支持了GTID的同步方式,这个是全局唯一的。但是目前mysql-cdc貌似还不支持。
>我目前的解决方案是出错后从最后的位置开始消费,可能会丢失一部分数据。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
flink sql cdc作数据同步,因为是基于库+表级别的,表数量太多导致作业数太多。请问能否用flink sql
cdc基于库级别同步?这样作业数量会少很多。
上游是binlog cdc消费获取的回撤流,现要使用flink sql统计分析该回撤流上每5分钟的sum值,不能使用常规tumble
window是吗?只能使用group by ts配合state TTL进行?
另外,问一下flink sql的state TTL只能是全局设置吗?能够通过在sql hint上添加从而可以细粒度控制吗?
flink sql调整算子并行度的方法有哪些?通过 sql hint 可以调整吗?
flink postgres jdbc catalog是只读的吗?能写的catalog 除了Hive Catalog还有哪些?社区什么时候会有Mysql
JDBC Catalog呢?
有一个flink sql
mysql-cdc作业,从kafka消费canal-json格式的binlog数据,打到下游存储,遇到一个问题:上游源表字段是会动态添加的,而我的sql
table定义是静态的,如果上游schema发生变化,我的作业就会失败。在flink
sql中是否可以针对目标数据源只定义用到的字段?如果可以的话应该怎么实现?现在是会抛类似下面的error。意思是原本包含43列的数据表,在我的DDL中只定义了其中的4列。有哪些格式是支持定义部分字段的呢?
21/06/02 18:54:22 [Source:
我有一个如下flink sql cdc作业,设置了'scan.startup.mode' =
'latest-offset'。但在作业重启后发现它又从头开始消费binlog,导致sink下游数据库频繁报duplicate key
error,有什么办法可以避免吗?
CREATE TABLE `mysql_source` (
`id` STRING,
`acct_id` STRING,
`acct_name` STRING,
`acct_type` STRING,
`acct_bal` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
作业中使用了增量式的rocksdb状态后端,请问如果再设置了最多保留checkpoints个数的话,会不会造成rocksdb
state恢复失败?例如,假设完整的state恢复需要最近10个chk,但因为设置了最多保留checkpoints个数为5的话,状态是不是恢复不了了?
如何根据flink sql解析出作业的血缘关系?找到类似这样的血缘关系:source table A --> lookup table B --> sink
table C
我的作业是用flink sql消费mysql cdc
binlog并实时同步到mongodb。如果只开一个并行度的话,mongodb的写入速度可能追不上mysql的写入。所以我需要在sink端开大并行度。
我不清楚用sql怎么写keyBy,是不是要group by
pk呢?我原来的想法是在MongoDBSinkFunction中开一个线程池,每个线程对应下游sink的一个并行度,每个线程带一个queue,MongoDBSinkFunction根据数据PK往对应的queue发数据,每个消费者线程从自己的queue
pull数据再进行批量插入。不知道这样可不可行?
我有一个flink sql写的数据实时同步作业,从mysql binlog cdc消费发到mongodb,仅此而已,没有lookup,也没有join。
查看checkpoint页显示状态有17MB,checkpoint耗时要2s。
想知道为什么状态会如此之大,有没有状态查看工具看看里面到底存了什么信息?
LE’F%')])
DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2],
fields=[count, word])
== Physical Execution Plan == Stage 1 : Data Source content : collect elements
with CollectionInputFormat
在 2021-05-25 10:40:46,"casel.chen" 写道:
>数据库字符编码设置如下
>
>
flink sql作业:消费mysql binlog将数据同步到 mongodb
问题:
1. mysql-cdc connector只能设置成一个并行度吗?
2. 可以增大mongodb的sink并行度吗?可以的话,要如何设置?它保证主键相同的记录会发到同一个分区sink吗?
jdbc:mysql://host:3306/datav_test?useUnicode=true=utf8
本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢!
在 2021-05-19 17:52:01,"Michael Ran" 写道:
>
>
>
>数据库的字段字符编码
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-18 18:19:31,
flink sql支持Common Table Expression (CTE)吗?是不是可以通过 create temporary view xxx
来实现?CTE和temporary view的区别是什么?
例如
with toronto_ppl as (
SELECT DISTINCT name
FROM population
WHERE country = "Canada"
AND city = "Toronto"
)
, avg_female_salary as (
SELECT AVG(salary) as
如下
CREATE TEMPORARY FUNCTION get_seniority(tenure INT64) AS (
CASE WHEN tenure < 1 THEN "analyst"
WHEN tenure BETWEEN 1 and 3 THEN "associate"
WHEN tenure BETWEEN 3 and 5 THEN "senior"
WHEN tenure > 5 THEN "vp"
ELSE "n/a"
END
);
SELECT name
,
flink
sql运行在阿里云k8s用oss作为checkpoint存储介质,在作业启动过程中出错,请问这个NoSuchKey是指什么?flink在获取checkpoint作restore吗?
2021-05-21 10:56:10,278 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
TableSourceScan(table=[[default_catalog, default_database, kafka_source]],
fields=[id,
'username' = 'mysqluser', 'password' = 'mysqluser',
>'table-name' = 'jdbc_sink')
>在 2021-05-18 11:55:46,"casel.chen" 写道:
>>我的flink sql作业如下
>>
>>
>>SELECT
>>product_name,
>>window_start,
>>window_end,
>>CAST(SUM(trans_amt)ASDECIMAL(24,2)
我的flink sql作业如下
SELECT
product_name,
window_start,
window_end,
CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
CAST(COUNT(order_no)ASBIGINT) trans_cnt,
-- LOCALTIMESTAMP AS insert_time,
'微支付事业部'AS bus_name
FROM(
mysql sink表的定义如下
CREATE TABLE XXX (
) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT
采用flink sql定义源表时,哪些connector支持提供部分字段,哪些connector必须要提供全量字段呢?
这边常用的connector主要有kafka, jdbc, clickhouse和mysql-cdc。
cdc是不是必须要提供对应表的全量字段呢?如果上游mysql业务表新增了字段,flink sql作业会不会出错?
kafka表定义是否支持部分字段?
没有人知道吗?
在 2021-05-13 17:20:15,"casel.chen" 写道:
flink sql怎样将change log stream转换成append log stream?
通过cdc接入了change log stream,后面就不能使用窗口聚合了,像Tumble和Hop, Session窗口。只能通过state ttl +
group by timestamp这种方式聚合。
问一下有没有办法将change log stream转换成append log stream,从而可以使用上述窗口聚合了呢?谢谢!
没有人知道吗?
在 2021-05-13 08:19:24,"casel.chen" 写道:
>flink sql如何修改执行计划?例如,修改上下游算子不同的并行度,或者将算子链人为打断等等。
>我知道如何获取flink sql执行计划,但要怎么人为干预这个执行计划呢?还请大佬解答一下,谢谢!
flink sql怎样将change log stream转换成append log stream?
通过cdc接入了change log stream,后面就不能使用窗口聚合了,像Tumble和Hop, Session窗口。只能通过state ttl +
group by timestamp这种方式聚合。
问一下有没有办法将change log stream转换成append log stream,从而可以使用上述窗口聚合了呢?谢谢!
flink sql如何修改执行计划?例如,修改上下游算子不同的并行度,或者将算子链人为打断等等。
我知道如何获取flink sql执行计划,但要怎么人为干预这个执行计划呢?还请大佬解答一下,谢谢!
我用native kubernetes方式部署flink session
cluster,想修改某个包下的日志级别,如果直接修改configmap下的log4j-console.properties再重新部署是能生效的,但是通过命令行
(./bin/kubernetes-session.sh -Dkubernetes.cluster-id=xxx) 起flink session
cluster会将之前的修改冲掉,有什么办法可以保留下之前的修改吗?是否有命令行启动参数可以指定自定义的logging配置?谢谢!
请问在native kubernetes上如何运行Flink History Server? 有没有相应的文档?
flink
run是否支持读取远程文件系统,例如oss://或hdfs://路径下的jar文件?看源码是需要构建PakcagedProgram,而它的构造函数中有一个File
jarFile参数。不知是否能够从oss路径或hdfs路径构建出File对象。
我的疑问正是flink cdc集成debezium后为何会把原始信息弄丢失了?直接采用原生的debezium或者canal同步数据固然可以。但如果flink
cdc直接能发出来的话不就可以节省这些组件和运维么?flink cdc设计的初衷也是如此。
在 2021-04-22 11:01:22,"飞翔" 写道:
既然这样,为何要用flink去同步信息,把信息的原始信息都丢失了。你可以直接采用原生的debezium或者canal同步数据,发送kafka,
比如canal的样例,虽然after
最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal
server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table
这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink
cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢?
CREATE TABLE
>发送时间:2021年4月21日(星期三) 下午2:16
>收件人:"user-zh"
>主题:Re: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?
>
>
>
>Hi casel.
>flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。
>
>https://github.com/ververica/flink-cdc-connectors/blob/master/README.md
>
>ca
目标是用flink作业实现类似canal server的功能
CREATE TABLE `binlog_table` (
`id` INT,
`name` STRING,
`sys_id` STRING,
`sequence` INT,
`filter`
我也遇到同样的问题
GroupWindowAggregate doesn't support consuming update and delete changes which
is produced by node TableSourceScan(table=[[default_catalog, default_database,
mcsp_pay_log, ...
按时间窗口聚合不支持上游是canal-json格式的cdc表的情况么?我的业务表其实是一张日志表,怎样用flink sql将retract
table转成append table?
flink window doesn't support retract stream 的话有什么workaround办法吗?常见的场景有 业务表cdc ->
kakfa -> flink按时间窗口聚合
如果业务表是只会insert的日志表,该如何将retract table转换成普通table?
GroupWindowAggregate doesn't support consuming update and delete changes which
is produced by node TableSourceScan(table=[[default_catalog,
使用 flink sql 1.12.1时遇到三个问题:
1. GroupWindowAggregate doesn't support consuming update and delete changes
which is produced by node TableSourceScan(table=[[default_catalog,
default_database, mcsp_pay_log, ...
按时间窗口聚合不支持上游是canal-json格式的cdc表的情况么?我的业务表其实是一张日志表,怎样用flink sql将retract
table转成append
flink run是否支持读取oss://或hdfs://路径下的jar文件?看源码是需要构建PakcagedProgram,而它的构造函数中有一个File
jarFile参数。不知是否能够从oss路径或hdfs路径构建出File对象。
private PackagedProgram(
@Nullable File jarFile,
List classpaths,
@Nullable String entryPointClassName,
Configuration configuration,
SavepointRestoreSettings
我在阿里云k8s上部署flink on native kubernetes application,默认用的服务暴露类型是
LoadBalancer,启动后会在公网暴露rest
url。运维管理人员不允许这样,说是只能使用固定预先申请的几个SLB,但我在flink官网没有找到有参数设置LoadBalancerIP,这样情况要怎么实现?
最近试用flink kubernetes
application时发现TM不断申请再终止,而且设置的LoadBalancer类型的Rest服务一直没有ready,查看不到flink web
ui,k8s日志如下,这是什么原因?是因为我申请的资源太小么?
= 启动参数
"kubernetes.jobmanager.cpu": "0.1",
"kubernetes.taskmanager.cpu": "0.1",
"taskmanager.numberOfTaskSlots": "1",
"jobmanager.memory.process.size":
Flink on K8S Standalone模式下可以通过yaml启多个JM,但是在Native K8S模式下要如果做呢?有文档资料介绍吗?谢谢!
请问有flink + hudi或iceberg + aliyun oss的示例吗?谢谢!
flink cdc对接上游的mysql或pg业务库时遇到业务库大批量修数或schema变更是怎么处理的?
会不会瞬间产生很多changelog records打爆flink应用?如果会的话应该要如何避免呢?谢谢!
请教一下flink sql多条数据sink用 statement set 语句时,
1. 如果其中一条sink条发生背压或故障,会影响其他sink流吗?
2. 在flink sql cdc 消费同一张mysql表sink进多种数据源场景下,例如 mysql -> fink cdc -> mongodb &
polardb 建议是启多个作业分别etl,还是分两段 mysql -> flink cdc -> kafka -> flink -> mongodb &
polardb ... 呢?关系数据库端接入同时多个cdc会不会影响性能?
flink sql中如何使用异步io关联维表?官网文档有介绍么?
目前是UTC时区的,怎样才能设置成当地的东8区呢?谢谢!
2021-02-19 01:34:21,259 INFO akka.event.slf4j.Slf4jLogger
[] - Slf4jLogger started
2021-02-19 01:34:22,155 INFO akka.remote.Remoting
[] - Starting remoting
2021-02-19 01:34:21,259 INFO
如题,在k8s环境下不想使用hdfs作为high-availability.storageDir,有没有办法直接使用oss呢?checkpoint和savepoint已经能够使用oss了。
我试着答k8s上部署flink
standalone集群,做HA之前集群是能够正常work的,在做HA的时候发现在configmap中添加了如下两个HA配置后JM就会抛异常,这是为什么?
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: oss:///odps-prd/rtdp/flink/recovery
2021-02-09 00:03:04,421 ERROR
在k8s上部署sesson模式的flink集群遇到jobmanager报如下错误,请问这是什么原因造成的?要如何fix?
2021-02-07 08:21:41,873 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService
[] - Starting RPC endpoint for
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/rpc/dispatcher_1 .
2021-02-07
./bin/flink run-application -t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://localhost:9000/flinkLib" \
hdfs://localhost:9000/flinkJobs/TopSpeedWindowing.jar
这种命令执行方式是可以执行的。
在 2021-01-20 10:21:32,"casel.chen" 写道:
>使用了如下命令来提交flink作业到yarn上运行,结果出
使用了如下命令来提交flink作业到yarn上运行,结果出错。如果job jar路径改成本地的就没有问题。我已经将
flink-oss-fs-hadoop-1.12.0.jar 放到flink
lib目录下面,并且在flink.conf配置文件中设置好了oss参数。试问,这种作业jar在远端的分布式文件系统flink难道不支持吗?
./bin/flink run-application -t yarn-application \
-Dyarn.provided.lib.dirs="oss://odps-prd/rtdp/flinkLib" \
为支持作业上线评审和遇到问题进行版本回滚,实时计算平台中的flink作业版本管理采用哪种方案比较好?
一种声音是用数据库来存,另一种声音是使用git来管理:一个作业对应一个repo,里面有作业sql或jar文件,作业参数和配置文件等。
用数据库存的话作业文件比较割裂,像文本文件可以直接存表,但像jar包的话得存分布式文件系统,同时在数据表中记录文件id。我个人更倾向于用git,不知道这里会不会有什么坑?还请做过的朋友给个建议,谢谢!
为支持作业上线评审和遇到问题进行版本回滚,实时计算平台中的flink作业版本管理采用哪种方案比较好?
一种声音是用数据库来存,另一种声音是使用git来管理:一个作业对应一个repo,里面有作业sql或jar文件,作业参数和配置文件等。
今天尝试使用yarn
application模式(带yarn.provided.lib.dirs参数),将$FLINK_LIB目录下的jar包上传到了hdfs,结果报了如下的错,是少了哪个jar包或配置文件吗?
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy
Yarn Application Cluster
at
共有 352 项搜索結果,以下是第 301 - 352 matches
Mail list logo