;
>
> 1. 请问这个select语句要怎么写?
> select name, collect(color) as colors from source_table group by
> tumble(ts, interval '5' seconds)
> 这里collect(color)返回的是multiset类型,怎样转成Array类型呢?
>
>
> 2. 如果array元素很多,我只想取其中N个,该怎么写flink sql?
>
> 3, 若取出现次数最多的前N个,又该怎么写flink sql?
> select na
source_table group by tumble(ts,
interval '5' seconds)
这里collect(color)返回的是multiset类型,怎样转成Array类型呢?
2. 如果array元素很多,我只想取其中N个,该怎么写flink sql?
3, 若取出现次数最多的前N个,又该怎么写flink sql?
select name, collect(color) as colors from (
select name, color from (
select *, ROW_NUMBER() OVER (PARTITION BY name ORDER
如果只是数据同步作业,例如从kafka消费将数据存入下游db,这种弱“状态”作业能跨版本兼容么?
在 2021-08-11 16:54:56,"Leonard Xu" 写道:
>这里的SQL是指DDL还是DML,通常 DML都是兼容的,且一般不会有不兼容的升级,
>DDL 语法 各家 SQL 方言都有自己的语法,这个比较灵活,FLINK SQL 的DDL 各个版本稍有不同,但 Flink SQL 新版本都是兼容老的
>DDL的,
>只是新版本上的DDL语法如果提供了更丰富的功能,那么老版本的DDL则不能提供
source_table group by tumble(ts,
interval '5' seconds)
这里collect(color)返回的是multiset类型,怎样转成Array类型呢?
2. 如果array元素很多,我只想取其中N个,该怎么写flink sql?
3, 若取出现次数最多的前N个,又该怎么写flink sql?
select name, collect(color) as colors from (
select name, color from (
select *, ROW_NUMBER() OVER (PARTITION BY name ORDER
这里的SQL是指DDL还是DML,通常 DML都是兼容的,且一般不会有不兼容的升级,
DDL 语法 各家 SQL 方言都有自己的语法,这个比较灵活,FLINK SQL 的DDL 各个版本稍有不同,但 Flink SQL 新版本都是兼容老的
DDL的,
只是新版本上的DDL语法如果提供了更丰富的功能,那么老版本的DDL则不能提供 。
所以我理解你关心的兼容性问题是不存在的,但请注意如果你的SQL作业是有状态的,需要带状态升级,这些状态都是跨版本不兼容的。
祝好,
Leonard
> 在 2021年8月10日,11:44,Jason Lee 写道:
>
&
各位大佬好,
请教一个问题,我们最近打算升级Flink 版本,想问一下升级之后的已有任务的SQL会兼容到新版本吗?
比如我升级到1.13,那我1.10的SQL语法能被兼容吗?
感恩
| |
Chuang Li
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制
2021年8月10日周二 上午9:35写道:
> 同行们,大家好,
>
> 请教一个问题,现在有3个kafka的topic:device consumer order
> 想用Flink-sql计算出多个聚合指标,比如:过去12小时,每个deviceid下的订单量。
> 我是这么设计的:
>
> 1. 先通过 create table with(...='kafka') ... 注册出 table1 table2 table3
> ,指定事件时间、water mark
> 2. 进行3张表的关联:
>
>
同行们,大家好,
请教一个问题,现在有3个kafka的topic:device consumer order
想用Flink-sql计算出多个聚合指标,比如:过去12小时,每个deviceid下的订单量。
我是这么设计的:
1. 先通过 create table with(...='kafka') ... 注册出 table1 table2 table3
,指定事件时间、water mark
2. 进行3张表的关联:
create temporary view wide_table as (
select ***
from
请教各位大佬一个问题,
使用flink sql sink数据到hbase (flink版本 1.13.1 hbase版本2.2.6)
提交任务后,一直报错误是 java.lang.RuntimeException: hbase-default.xml file seems to be for
an older version of HBase (2.2.3), this version is 2.2.6
已经在连接器参数里面配置了 'properties.hbase.defaults.for.version.skip'='true',
hbase-default.xml也配置了跳
始邮件--
> 发件人:
> "user-zh"
>
> 发送时间:2021年8月4日(星期三) 下午4:44
> 收件人:"user-zh"
&g
??lookup??on??key,??
??batch
----
??:
Hi!
我查了一下,processing time temporal join 确实还没有实现... 这里可能需要变成 event time temporal
join[1] 或者双流 join 了。但更好的方法可能是维表本身就已经计算好所需的数据。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join
carlc 于2021年8月4日周三 下午3:57写道:
>
感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。
create view v_bl_user_count as (
select user_id, count(1)
from mysql_user_blacklist
group by user_id
);
select t1.`user_id`
, t1.`event_type`
, t1.`current_ts`
from kafka_user_event t1
left join v_bl_user_count FOR SYSTEM_TIME AS OF
Hi!
这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。
为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal
table join 了。
carlc 于2021年8月4日周三 上午10:41写道:
> 请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
>
> -- 模拟需求(有点牵强...):
> -- 过滤 kafka_user_event 中
场景:实时统计用户访问日志数据,求一分钟内访问事件发生次数超过5次的用户,其不同source_ip出现次数最多前3个的事件
源表数据
user_name, source_ip, ts
张三, 100, 00:08
张三, 104, 00:12
张三, 100, 00:15
张三, 101, 00:35
张三, 100, 00:38
张三, 102, 00:40
张三, 102, 00:45
张三, 101, 00:47
张三, 100, 00:55
张三, 100, 01:15
李四, 200, 01:17
李四, 200, 01:19
李四, 200, 01:27
王五,
请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
-- 模拟需求(有点牵强...):
-- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist 统计对应
user_id 在维表中的次数 -> 即: 在维表上做聚合操作
-- 1. 创建user_blacklist表
CREATE TABLE `user_blacklist` (
`user_id` bigint(20) NOT NULL,
`create_time` datetime
t;> 需求:table t 有三个字段(a,b,c)
>> 我们的场景只想根据主键a更新部分字段b,其余的字段c原有的值保持不变,
>>
>> 例如mysql 支持:
>> insert into t(a,b) select 1,2 on duplicate key update b=2;
>> 主键重复的时候只更新字段b,字段c的值不变。
>> 但是flink sql 目前只支持全字段更新:insert into t(a,b,c) select 1,2,3 。
>> 我在sql-client测试了一下:inser
ink/flink-docs-master/docs/dev/table/sql/insert/#syntax
Ye Chen 於 2021年8月2日 週一 下午4:08寫道:
> 你好,我们用的1.11版本。
>
> 需求:table t 有三个字段(a,b,c)
> 我们的场景只想根据主键a更新部分字段b,其余的字段c原有的值保持不变,
>
> 例如mysql 支持:
> insert into t(a,b) select 1,2 on duplicate key update b=2;
> 主键重复的时候只更新字段b,字段c的值不变。
>
你好,我们用的1.11版本。
需求:table t 有三个字段(a,b,c)
我们的场景只想根据主键a更新部分字段b,其余的字段c原有的值保持不变,
例如mysql 支持:
insert into t(a,b) select 1,2 on duplicate key update b=2;
主键重复的时候只更新字段b,字段c的值不变。
但是flink sql 目前只支持全字段更新:insert into t(a,b,c) select 1,2,3 。
我在sql-client测试了一下:insert into t(a,b) select 1,2 on duplicate key
用的什么版本这个应该已经在1.13中修复了https://issues.apache.org/jira/browse/FLINK-18726
不行的话可以在ddl中限制列的数量
--
发件人:Ye Chen
发送时间:2021年8月2日(星期一) 11:37
收件人:user-zh ; silence
主 题:Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
你好,我试了一下,如
需求:现有table t 三个字段
CREATE TABLE t (
abigint,
bbigint,
cbigint,
PRIMARY KEY (a) NOT ENFORCED
) WITH (
...
);
我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变,
例如mysql 支持:insert into t(a,b) select 1,2 on duplicate key update b=2;
主键重复的时候只更新字段b,字段c的值不变。但是flink sql 目前只支持全字段更新:insert into t
bigint,
> PRIMARY KEY (a) NOT ENFORCED
> ) WITH (
> ...
> );
> 我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变,
> 例如mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key
> update b='4';主键重复的时候只更新字段b,字段c的值不变。
> 我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
&
时候只更新字段b,字段c的值不变。
我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理?
在 2021-08-02 10:47:55,"silence" 写道:
>如果只想更新部分字段的话可以试下
>insert into t(a,b) select a,b from x
>
>
>--
(a,b,c) select '1','2','3' on duplicate key update
b='4';主键重复的时候只更新字段b,字段c的值不变。
我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
请问这种部分字段更新的场景 使用flink sql应该怎么处理?
在 2021-08-02 10:08:28,"silence" 写道:
>你在你的sink ddl定义了主键会自动的按主键进行upsert的
>参考https://ci.apache.org/proj
如果只想更新部分字段的话可以试下
insert into t(a,b) select a,b from x
--
发件人:Ye Chen
发送时间:2021年7月30日(星期五) 17:57
收件人:user-zh
主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
现有table
CREATE TABLE t (
abigint,
b
你在你的sink ddl定义了主键会自动的按主键进行upsert的
参考https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#idempotent-writes
--
发件人:Ye Chen
发送时间:2021年7月30日(星期五) 17:57
收件人:user-zh
主 题:场景题:Flink SQL 不支持 INSERT
一下也不支持 on duplicate key update,会报错。
请问这种部分字段更新的场景 使用flink sql应该怎么处理?
突,
>>目前初步想法是每个udf绑定一个jarpath,使用独立的classloader进行加载,避免和主jar以及其他udf之间的依赖冲突
>>--
>>发件人:Michael Ran
>>发送时间:2021年7月22日(星期四) 20:07
>>收件人:user-zh ; silence
>>主 题:Re:flink sql 依赖隔离
>>
>>通过任务进行隔离引用呗。你们美团已经是k8s了吧?
>>在 2021-07-05 14:06:53,"
就是单独引用的啊,但任务逻辑比较复杂时会同时混用多个udf这个是没法避免的啊
--
发件人:Michael Ran
发送时间:2021年7月23日(星期五) 17:42
收件人:user-zh ; silence
主 题:Re:回复:flink sql 依赖隔离
建议上传的时候单独放,提交任务的时候 拉下来单独引用
在 2021-07-23 11:01:59,"silence" 写道:
>
>这边目前主要还是yarn,
,避免和主jar以及其他udf之间的依赖冲突
>--
>发件人:Michael Ran
>发送时间:2021年7月22日(星期四) 20:07
>收件人:user-zh ; silence
>主 题:Re:flink sql 依赖隔离
>
>通过任务进行隔离引用呗。你们美团已经是k8s了吧?
>在 2021-07-05 14:06:53,"silence" 写道:
>>请教大家目前flink sql有没有办法做到依赖隔离
>>比如connector,format,udf(这个最重要)等,
>
主 题:Re:flink sql 依赖隔离
通过任务进行隔离引用呗。你们美团已经是k8s了吧?
在 2021-07-05 14:06:53,"silence" 写道:
>请教大家目前flink sql有没有办法做到依赖隔离
>比如connector,format,udf(这个最重要)等,
>很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
写道:
>
>> 通过任务进行隔离引用呗。你们美团已经是k8s了吧?
>> 在 2021-07-05 14:06:53,"silence" 写道:
>> >请教大家目前flink sql有没有办法做到依赖隔离
>> >比如connector,format,udf(这个最重要)等,
>> >很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>> >目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
>>
>
>
>--
>Best Regards
>
>Jeff Zhang
Zeppelin 支持依赖的动态加载
https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2
Michael Ran 于2021年7月22日周四 下午8:07写道:
> 通过任务进行隔离引用呗。你们美团已经是k8s了吧?
> 在 2021-07-05 14:06:53,"silence" 写道:
> >请教大家目前flink sql有没有办法做到依赖隔离
> >比如
> at
> org.apache.hadoop.hive.conf.HiveConf.clinit(HiveConf.java:146)
> ... 24 more
> 在 2021-07-16 16:10:10,"Caizhi Weng" 写道:
> >Hi!
> >
> >理论上可行,可以尝试一下。但要注意 -C 指定的路径必须是所有节点都能访问到,如果指定的是一个本地路径,那么所有节点的本地路径下都要有相应的
> >connector jar。
> >
> >niko 于2021年7月16日周五 下午3:21写道:
> >
> >> 能否使用命令行的 -C 命令 加载 flink sql 的 connector?
>
16日周五 下午3:21写道:
>
>> 能否使用命令行的 -C 命令 加载 flink sql 的 connector?
Hi!
理论上可行,可以尝试一下。但要注意 -C 指定的路径必须是所有节点都能访问到,如果指定的是一个本地路径,那么所有节点的本地路径下都要有相应的
connector jar。
niko 于2021年7月16日周五 下午3:21写道:
> 能否使用命令行的 -C 命令 加载 flink sql 的 connector?
能否使用命令行的 -C 命令 加载 flink sql 的 connector?
flink sql的不确定性优化相比于批处理看起来是比较少的,另外我们使用的版本(flink-1.8)的实现并没有统计信息,因此我们在尝试使用
Hep Planner来提高编译速度。不知道是否会导致其它问题,比如语义变化等
Caizhi Weng 于2021年7月14日周三 上午10:08写道:
> Hi!
>
> Hep planner 是一个 rule based 的 planner,较多用于确定性的优化上。Volcano planner 是一个 cost
> based 的 planner,多用于不确定性的优化(例如 join 方式的选择,build 端
Hi!
Hep planner 是一个 rule based 的 planner,较多用于确定性的优化上。Volcano planner 是一个 cost
based 的 planner,多用于不确定性的优化(例如 join 方式的选择,build 端的选择等),需要靠统计信息等进行决策。目前 Flink
两者均有应用。
terry Huang 于2021年7月13日周二 下午7:31写道:
> 大佬们好,目前Flink sql使用calcite 的Volcano
>
> Planner进行逻辑计划优化,但是我们的实践下来觉得编译时间有点长,我们准备使用HepPl
大佬们好,目前Flink sql使用calcite 的Volcano
Planner进行逻辑计划优化,但是我们的实践下来觉得编译时间有点长,我们准备使用HepPlanner来做优化。请问,这么做是否会带来致命问题或者flink
sql 使用Volcano planner的原因是什么呢
Hi, Jingsong.
最新的类型推导相对于之前版本的类型推导更加严格,对schema的非空限制校验也更加细致。
在之前提到的例子中使用基本类型做UDF参数,
表示跟UDF中参数相关的列必须非空,而在创建视图时,每个类型默认的非空限制为false,因此出现了之前描述的问题。
祝好。
Best Roc.
在 2021-06-29 11:02:55,"Jingsong Li" 写道:
>Hi,
>
>你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题
>
>Best,
>Jingsong
>
>On
场景:mysql数据实时同步到mongodb. 上游mysql binlog日志发到一个kafka topic,
不保证同一个主键的记录发到相同的partition,为了保证下游sink
mongodb同一主键的所有记录按序保存,所以需要按主键keyby。然后下游再批量写入mongodb。
问题:flink sql有办法解决上述问题?如果可以的话,要怎么写?
create table person_source (
id BIGINT PRIMARY KEY NOT FORCED,
name STRING,
age BIGINT
flink-sql有没有类似hive里distribute by的功能,数据行根据某个字段hash到不同的
task中
/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
flink-sql 查询kafka
kafka版本2.4 connector版本flink-sql-connector-kafka_2.11-1.11.2.jar
请求 这是什么原因 是 connector的版本问题有?
Hi~
你需要的应该是flink sql里提供的catalog功能
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#postgres-database-as-a-catalog
目前PostgresCatalog实现了jdbc catalog,MysqlCatalog没有支持,有一些资料可以参考实现:
https://vendanner.github.io/2020/11/25/Flink-SQL-%E4%B9%8B
Hi,
请问目前的 Flink SQL 在创建source表的时候支持自动拉取所有的表列信息并解析吗?
谢谢。
Best, Roc.
没用放在lib下,是启动时通过-C动态添加udf jar,一个sql作业可能会用到很多udf,可能是不同的用户写的,所以经常会出现依赖冲突
--
发件人:yzhhui
发送时间:2021年7月5日(星期一) 14:09
收件人:user-zh@flink.apache.org ; silence
抄 送:user-zh
主 题:回复:flink sql 依赖隔离
提交任务的时候提交自己的jar就好了,这个不要放公共lib下 就OK
在2021年
请教大家目前flink sql有没有办法做到依赖隔离
比如connector,format,udf(这个最重要)等,
很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
15:36,silence 写道:
可参考
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout
--
发件人:杨光跃
发送时间:2021年6月30日(星期三) 10:54
收件人:user-zh@flink.apache.org
主 题:flink sql 空闲数据源场景如何配置
--
发件人:杨光跃
发送时间:2021年6月30日(星期三) 10:54
收件人:user-zh@flink.apache.org
主 题:flink sql 空闲数据源场景如何配置
在代码中可以通过 .withIdleness(Duration.ofMinutes(1)); 指定空闲数据源也触发水印,那么sql中怎么表示呢
| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制
可参考
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout
--
发件人:杨光跃
发送时间:2021年6月30日(星期三) 10:54
收件人:user-zh@flink.apache.org
主 题:flink sql 空闲数据源场景如何配置
在代码中可以通过
在代码中可以通过 .withIdleness(Duration.ofMinutes(1)); 指定空闲数据源也触发水印,那么sql中怎么表示呢
| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制
Hi,
你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题
Best,
Jingsong
On Tue, Jun 29, 2021 at 7:10 AM Roc Marshal wrote:
>
>
> Hi, All.
>
>
> 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:
>
>
> 版本: 1.13.1
> 运行模式: IDE-application
> ---
> about udf define...
Hi, All.
请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:
版本: 1.13.1
运行模式: IDE-application
---
about udf define...
public static class UDFAggregateFunction extends AggregateFunction {
//返回最终结果
@Override
public
Hi, All.
请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:
版本: 1.13.1
运行模式: IDE-application
---
about udf define...
public static class UDFAggregateFunction extends AggregateFunction {
//返回最终结果
@Override
public
Hi,
貌似是jar包冲突了,我再确认确认。
在 2021/6/28 下午2:33,“王刚” 写入:
注意:此封邮件来自于公司外部,请注意信息安全!
Attention: This email comes from outside of the company, please pay
attention to the information security!
把flink parquet包放在flink客户端lib包下试试呢
把flink parquet包放在flink客户端lib包下试试呢
原始邮件
发件人: Wei JI10 季伟
收件人: user-zh@flink.apache.org
发送时间: 2021年6月28日(周一) 14:14
主题: Re: flink sql报错: Could not find any format factory for identifier 'parquet'
in the classpath
您好,
我没有设置scope,我看jar包中是有org/apache/flink/formats/parquet/这个目录的...
在 2021
您好,
我没有设置scope,我看jar包中是有org/apache/flink/formats/parquet/这个目录的...
在 2021/6/28 下午12:47,“zhisheng” 写入:
注意:此封邮件来自于公司外部,请注意信息安全!
Attention: This email comes from outside of the company, please pay
attention to the information security!
看下你引入的 jar 包是咋引入的,scope 设置的是 provided 吧?
看下你引入的 jar 包是咋引入的,scope 设置的是 provided 吧?
Wei JI10 季伟 于2021年6月28日周一 下午12:19写道:
> 您好,
> 版本都是1.12.3的,有其他方式能够定位到这个问题出在哪么?
>
> 在 2021/6/28 上午11:59,“Jingsong Li” 写入:
>
> 注意:此封邮件来自于公司外部,请注意信息安全!
> Attention: This email comes from outside of the company, please pay
> attention to the
您好,
版本都是1.12.3的,有其他方式能够定位到这个问题出在哪么?
在 2021/6/28 上午11:59,“Jingsong Li” 写入:
注意:此封邮件来自于公司外部,请注意信息安全!
Attention: This email comes from outside of the company, please pay
attention to the information security!
Hi, 你的版本check下?集群和flink-parquet是同一个版本吗?
BEST,
Jingsong
Hi, 你的版本check下?集群和flink-parquet是同一个版本吗?
BEST,
Jingsong
On Mon, Jun 28, 2021 at 11:22 AM Wei JI10 季伟
wrote:
> 您好,
> 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。
>
>
--
Best, Jingsong Lee
您好,
不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。
使用的是 sql client 测试的 sql 吗?如果是的话,记得在 flink lib 目录下添加 flink-sql-parquet jar
包,然后重启集群和 sql client
Wei JI10 季伟 于2021年6月28日周一 上午9:35写道:
> 您好,
> 添加的parquet 依赖如下,不知道全不全
>
> org.apache.flink
> flink-parquet_${scala.binary.version}
> ${flink.version}
>
>
您好,
添加的parquet 依赖如下,不知道全不全
org.apache.flink
flink-parquet_${scala.binary.version}
${flink.version}
org.apache.parquet
parquet-avro
1.10.1
parquet 相关依赖增加了吗?
Zhiwen Sun
On Sun, Jun 27, 2021 at 3:57 PM Wei JI10 季伟
wrote:
> Hi:
> 在使用flink sql connector的filesytem时,指定format为parquet。抛出异常信息
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any format factory for identifier 'parquet' in t
Hi:
在使用flink sql connector的filesytem时,指定format为parquet。抛出异常信息
Caused by: org.apache.flink.table.api.ValidationException: Could not find any
format factory for identifier 'parquet' in the classpath.
at
org.apache.flink.table.filesystem.FileSystemTableSource
Hello,
Flink sql cdc 还不支持获取元数据, 获取元数据的业务场景通常是怎么样的呢?
祝好,
Leonard
> 在 2021年6月23日,08:21,casel.chen 写道:
>
> flink sql cdc如何获取元数据?像数据库名,表名,操作类型,binlog产生时间等。
>
>
> create table xxx_tbl (
> k_op varchar, -- 操作类型
> k_database varchar, -- 数据库名
> k_table varc
flink sql cdc如何获取元数据?像数据库名,表名,操作类型,binlog产生时间等。
create table xxx_tbl (
k_op varchar, -- 操作类型
k_database varchar, -- 数据库名
k_table varchar, -- 表名
k_ts. BIGINT, -- binlog产生时间
idBIGINT,
name. varchar
) with (
'connector' = 'mysql-cdc',
.
'meta.fields-prefix' = 'k_'
)
thub.com/DataLinkDC/dlink
>
>
>
>
>----
>??: "todd": 2021??6??16??(??) 5:48
>??: "user-zh": Re: flink sql??
>
>
>
>?
hi
先执行一下 export HADOOP_CLASSPATH=`hadoop classpath` 就可以了
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
社区最近重新设计了 mysql-cdc 的实现,可以支持全量阶段并发读取、checkpoint,移除全局锁依赖。
可以关注 GitHub 仓库的动态 https://github.com/ververica/flink-cdc-connectors。
7月的 meetup 上也会分享相关设计和实现,敬请期待。
Best,
Jark
On Thu, 17 Jun 2021 at 09:34, casel.chen wrote:
> Flink CDC什么时候能够支持修改并行度,进行细粒度的资源控制?目前我也遇到flink sql
> cdc写mysql遇到数据同步跟不上数
请问该问题有解决吗?我使用FLINK yarn-per-job方式提交到yarn集群也出现了这个错误
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Flink CDC什么时候能够支持修改并行度,进行细粒度的资源控制?目前我也遇到flink sql
cdc写mysql遇到数据同步跟不上数据写入速度问题,何时能支持像mysql并行复制这种机制呢?
在 2021-06-16 17:27:14,"Leonard Xu" 写道:
>看起来和 Flink-CDC 关系不大,看异常栈是 ES 侧抛出的异常 version_conflict_engine_exception,
>可以查下这个异常,看下是不是有写(其他作业/业务 也在写同步表)冲突。
>
>祝好,
&
FlinkSql WebIDE??
FlinkSQLSQL??SqlCli??
https://github.com/DataLinkDC/dlink
----
??:
补充一种使用Flink api提交方式,参考:https://github.com/todd5167/flink-spark-submiter。
任务提交、状态获取继承统一的接口,上层服务在引用时,通过spi的方式进行加载即可。
缺点:
- 需要对Flink client源码、类加载机制有了解。
优点:
- 良好的外部集成
- 不需要额外部署服务
--
Sent from: http://apache-flink.147419.n8.nabble.com/
看起来和 Flink-CDC 关系不大,看异常栈是 ES 侧抛出的异常 version_conflict_engine_exception,
可以查下这个异常,看下是不是有写(其他作业/业务 也在写同步表)冲突。
祝好,
Leonard
> 在 2021年6月16日,17:05,mokaful <649713...@qq.com> 写道:
>
> 相同问题,请问有处理方式吗
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
相同问题,请问有处理方式吗
--
Sent from: http://apache-flink.147419.n8.nabble.com/
https://flink-packages.org/packages/streaming-flink-dynamodb-connector
----
??:
"user-zh"
flink sql cdc发到kafka,显示下游写kafka并行度只有1,有没有办法提高并行度呢?
显示job-parallelism, table.exec.resource.default-parallelism, parallelism.default
都是 24,但execution graph显示parallelism还是1,我设置了pipeline.operator-chaining=false
Hey 社区的同行们好,
请问 Flink 1.10以及以后的版本会支持通过Flink SQL 读写 DynamoDB么?有对应的connector么?
谢谢。
t;
>> 在 2021-06-13 07:21:46,"Jeff Zhang" 写道:
>> >另外一个选择是flink on zeppelin,可以调用flink on zeppelin的rest api,把zeppelin当做是flink
>> >job server, zeppelin天然支持flink 1.10之后的所有版本。钉钉群:32803524
>> >
>> >casel.chen 于2021年6月12日周六 下午5:56写道:
>> >
>&g
所有版本。钉钉群:32803524
> >
> >casel.chen 于2021年6月12日周六 下午5:56写道:
> >
> >> 需求背景:
> >> 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink
> >> SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink
> >> SQL作业采用的是1.13开发的。
>
请问 flink sql cdc 场景下如何增大下游sink端并行度?
我试了修改default.parallism=2参数,并且将operator chain参数设置成false,并没有效果。
而后,我将作业分成两步:首先 源mysql cdc sink到 upsert kafka,再从 upsert kafka sink到
目标mysql。是想通过kafka partition增大sink并行度
初步测试效果是可以的,kafka建了3个partitions,每个partitions都按主键hash分配到数据,下游并行度跟partitions个数对齐。
以下是作业内容
有版本。钉钉群:32803524
>
>casel.chen 于2021年6月12日周六 下午5:56写道:
>
>> 需求背景:
>> 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink
>> SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink
>> SQL作业采用的是1.13开发的。
>>
>>
>> 而让平台支持不同Flink版本,我能想
另外一个选择是flink on zeppelin,可以调用flink on zeppelin的rest api,把zeppelin当做是flink
job server, zeppelin天然支持flink 1.10之后的所有版本。钉钉群:32803524
casel.chen 于2021年6月12日周六 下午5:56写道:
> 需求背景:
> 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink
> SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业
需求背景:
因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink
SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink SQL作业采用的是1.13开发的。
而让平台支持不同Flink版本,我能想到有三种实现方案:
1. 平台直接调用 flink run 或 flink run-application 提交作业
优点:实现简单,每个flink版本都会带这个shell脚本
缺点:受限于脚本提供的功能,像语法检查、格式化sql、获取sql执行
temporary join
--
Sent from: http://apache-flink.147419.n8.nabble.com/
有例子吗?或者相关资料连接也行
在 2021-06-11 12:40:10,"chenchencc" <1353637...@qq.com> 写道:
>使用事件时间就可以延时
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
使用事件时间就可以延时
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我们之前试过用 session window 来实现,只要流数据有唯一键就可以按唯一键开 session window
来把整个流(而不是关联不上的部分数据)延迟,能凑合用
--
Sent from: http://apache-flink.147419.n8.nabble.com/
针对现在flink sql cdc下游并行度无法修改问题,是否可以分两步实现?谢谢!
1. flink sql cdc发到下游kafka,通过 upsert kafka connector,以debezium或canal格式,kafka
topic开多个分区
2. 再从kafka消费,通过flink sql同步到最终mysql库
在 2021-06-08 19:49:40,"Leonard Xu" 写道:
>试着回答下这两个问题。
>
>> flink 1.12的jdbc connector不支持 sink.
flink sql
cdc写入kafka,期望kafka消息带上数据库database,表table,变更时间和变更标记+I/-U/+U/-D这几个特殊字段,目前来看是做不到的,对吗?
延迟join主要是为了解决维表数据后于事实表数据到达问题。java代码可以实现,那flink sql这块能否通过sql hint解决呢?有没有示例?
c:mysql://host:3306/datav_test?useUnicode=true=utf8
本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢!
在 2021-05-19 17:52:01,"Michael Ran" 写道:
数据库的字段字符编码
在 2021-05-18 18:19:31,"casel.chen" 写道:
我的URL连接串已经使用了 useUnicode=truecharacterEncoding
试着回答下这两个问题。
> flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc
> connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
是的,关键问题是cdc connector为了保证数据一致性只能单并发,所以作业也只能单并发。这个需要cdc
connector支持多并发读取,下游sink自然就能解决。
> flink 1.13的jdbc connector新增
flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc
connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink
task呢?SQL正确的写法是什么?
你好,我也遇到这个问题,flink 1.12.2 sql,想问下
1.有什么方式能本地物理上删除那些ttl过期的数据吗
2.有什么方式能checkpoint时候删除ttl过期的数据吗?让checkpoint数据不再继续增长?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
一种方法就是借助 Flink SQL Parser,解析你的 SQL,然后获取到不同的 SQL node,
然后每个 SQL Node 都有对应的类型,以及 connector 后面的 with 参数,你需要自己在
写代码判定一下即可。本质是通过解析 SQL,来获取血缘关系。
Best,
LakeShen
casel.chen 于2021年6月8日周二 上午12:05写道:
> 如何获取flink sql的血缘关系?如:表A -> 表B。有代码示例吗?谢谢!
各位好,我在flink 1.13中使用flink sql 在一次修改代码后的重启任务中,报以下错误:
For heap backends, the new state serializer
(org.apache.flink.api.common.typeutils.base.MapSerializer@a5b17bdb) must not
be incompatible with the old state serializer
(org.apache.flink.api.common.typeutils.base.MapSerializer@e5a9c6d8).
我更改了sql中
如何获取flink sql的血缘关系?如:表A -> 表B。有代码示例吗?谢谢!
共有 1922 项搜索結果,以下是第 401 - 500 matches
Mail list logo