Re: flink sql读kafka元数据问题

2021-01-12 Thread JasonLee
hi 你写入数据的时候设置 headers 了吗 没设置的话当然是空的了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql读kafka元数据问题

2021-01-13 Thread Jark Wu
kafka 读 key fields: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#key-fields On Wed, 13 Jan 2021 at 15:18, JasonLee <17610775...@163.com> wrote: > hi > > 你写入数据的时候设置 headers 了吗 没设置的话当然是空的了 > > > > - > Best Wishes > JasonLee > -- > Sent from: http://apac

转发:flink-sql字段类型问题

2021-01-14 Thread 郝文强
| | 郝文强 | | 18846086...@163.com | 签名由网易邮箱大师定制 - 转发邮件信息 - 发件人: 郝文强 <18846086...@163.com> 发送日期: 2021年01月14日 17:23 发送至: d...@flink.apache.org 主题: 转发:flink-sql字段类型问题 | | 郝文强 | | 18846086...@163.com | 签名由网易邮箱大师定制 - 转发邮件信息 - 发件人: 郝文强 <18846086..

Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-20 Thread Kurt Young
- NOW() - CURRENT_DATE - CURRENT_TIME - CURRENT_TIMESTAMP Before the changes, as I am writing this reply, the local time here is *2021-01-21 12:03:35 (Beijing time, UTC+8)*. And I tried these 5 functions in sql client, and got: *Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP,

Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-20 Thread Jark Wu
ng this tricky topic. At present, there > are many Flink jobs in our production environment that are used to count > day-level reports (eg: count PV/UV ).  > > > If use the default Flink SQL,  the window time range of the > statistics is incorrect, then the statistical resul

Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-21 Thread Leonard Xu
> Before the changes, as I am writing this reply, the local time here is > 2021-01-21 12:03:35 (Beijing time, UTC+8). > And I tried these 5 functions in sql client, and got: > > Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DAT

Flink sql 1.12写入hive报metastore失败

2021-01-26 Thread gimlee
使用flink sql 1.12写入hive,未提交到yarn上成功,错误信息如下: 2021-01-26 20:44:23.133 [main] INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient - Trying to connect to metastore with URI thrift://hdcom02.prd.com:9083 2021-01-26 20:44:23.133 [main] INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient

Re: Flink SQL temporal table join with Hive 报错

2021-02-07 Thread Rui Li
你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么? On Mon, Feb 8, 2021 at 10:33 AM macia kk wrote: > Currently the join key in Temporal Table Join can not be empty. > > 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错 > > [image: image.png] > -- Best regards! Rui Li

Re: Flink SQL temporal table join with Hive 报错

2021-02-07 Thread macia kk
图就是哪个报错 建表语句如下,表示公共表,我也没有改的权限. CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT 'country', `currency` string COMMENT 'currency', `exchange_rate` decimal(25,10) COMMENT 'exchange rate') PARTITIONED BY (`grass_date` date COMMENT 'partition key, -MM-dd') ROW FORMAT SERDE 'org.

Re: Flink SQL temporal table join with Hive 报错

2021-02-08 Thread Rui Li
Hi, 那join的语句是怎么写的呢? On Mon, Feb 8, 2021 at 2:45 PM macia kk wrote: > 图就是哪个报错 > > 建表语句如下,表示公共表,我也没有改的权限. > > CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT > 'country', `currency` string COMMENT 'currency', `exchange_rate` > decimal(25,10) COMMENT 'exchange rate') > PARTITIO

Re: Flink SQL temporal table join with Hive 报错

2021-02-08 Thread macia kk
SELECT *FROM ( SELECT tt.* FROM input_tabe_01 tt FULL OUTER JOIN input_tabe_02 mt ON (mt.transaction_sn = tt.reference_id) and tt.create_time >= mt.create_time + INTERVAL '5' MINUTES and tt.create_time <= mt.create_tim

Re: Flink SQL temporal table join with Hive 报错

2021-02-09 Thread Leonard Xu
Hi, macia > 在 2021年2月9日,10:40,macia kk 写道: > > SELECT *FROM >( >SELECT tt.* >FROM >input_tabe_01 tt >FULL OUTER JOIN input_tabe_02 mt >ON (mt.transaction_sn = tt.reference_id) >and tt.create_time >= mt.create_time + INTERVAL '

Re: Flink SQL temporal table join with Hive 报错

2021-02-10 Thread macia kk
Hi, Leonard 我们的业务变得越来越复杂,所以现在需要 Join Hive 维表的情况非常普遍。现在维表分三种情况 一,维表没有分区,没有 primary key 这时候 `'streaming-source.partition.include' = 'latest',因为没有 parition,所以 latest 应该加载的就是全部的数据。 二,维表有有分区,每个分区仅仅包含当天的数据,没有 primary key 这种情况因为要 Join 全部的数据,所以还是需要设置 'streaming-source.partition.include' = 'all'

Re: Flink SQL temporal table join with Hive 报错

2021-02-19 Thread Leonard Xu
> > 二,维表有有分区,每个分区仅仅包含当天的数据,没有 primary key > > 这种情况因为要 Join 全部的数据,所以还是需要设置 'streaming-source.partition.include' = > 'all',但是还是因为没有 primary Key,所以无法 run。 > > 现在就是针对第二种情况,因为Hive的维度表不是我维护的,很多人都在用,所以不能修改去加上 primary key,无法进行 join. 第二种情况,hive表不是streaming读的,相当于是一张静态表,每次都是加载最新的全量,所以配置如下参数即可 'stre

Re: Flink SQL 写入Hive问题请教

2021-02-22 Thread eriendeng
你这没有把dialect set成hive吧,走到了else分支。default dialect是需要指定connector的,参考文档的kafka到hive代码 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: Flink SQL 写入Hive问题请教

2021-02-22 Thread 邮件帮助中心
我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert into时创建Hive表时提示没有连接器的配置 Table options are: 'is_generic'='false' 'partition.time-extractor.timestamp-pattern'='$dt $hr' 'sink.partition-commit.delay'='0S' 'sink.partition-commit.policy.kind'='metastore,success-file' 'sink.par

Re: Re: Flink SQL 应用情况请教

2021-02-27 Thread xg...@126.com
1503,61,15811,1614405166858 1504,61,15813,1614405333871 1505,61,15814,1614405544862 1506,61,15814,1614405673863 就这几条数据,并行度设置为1 发件人: yinghua...@163.com 发送时间: 2021-02-27 14:23 收件人: user-zh 主题: Re: Flink SQL 应用情况请教 这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其

Re:Re: Re: Flink SQL 应用情况请教

2021-02-27 Thread 邮件帮助中心
gt; > >发件人: yinghua...@163.com >发送时间: 2021-02-27 14:23 >收件人: user-zh >主题: Re: Flink SQL 应用情况请教 >这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其中source的Watermarks显示No > Data,sink显示的是No Watermark >我的SQL语句如下: >CREATE TABLE t_stock_match_p_1( > id

flink sql没有jar包如何恢复?

2021-03-04 Thread huayuan
如题 官方的恢复是flink run -s path xxx.jar 那么flink sql没有jar包如何恢复呢 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); Configuration configuration = bsTableEnv.getConfig().getConfiguration(); configuration.setString("execution.savepoint.path","xxx")貌

flink sql没有jar包如何恢复

2021-03-04 Thread huayuan
如题 官方的恢复是flink run -s path xxx.jar 那么flink sql没有jar包如何恢复呢 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); Configuration configuration = bsTableEnv.getConfig().getConfiguration(); configuration.setString("execution.savepoint.path","xxx")貌

【flink sql-client 读写 Kerberos认证的hive】

2021-03-07 Thread 861277...@qq.com
环境: flink1.12.1  hive2.1.0 CDH6.2.0 【问题描述】  在没开启Kerberos认证时,可以正常读写hive表    开启Kerberos认证后,  启动时可以正常读取到hive metastore的元数据信息,读写不了表。 【sql-client.sh embedded】 Flink SQL> show tables; dimension_table dimension_table1 test Flink SQL> select * from test; [ERROR] Could not execute SQL sta

回复:【flink sql group by 时间窗口】

2021-03-17 Thread guoyb
这个问题换种写法解决了,从MySQL数据库表里取时间戳字段再转timestamp,可以实现滚动窗口,没报错。 从MySQL表里直接取datetime类型,jdbc表flink设置timestamp类型,会报错,直接取source的时间类型字段是不是转换有点问题。 ---原始邮件--- 发件人: "guoyb"<861277...@qq.com> 发送时间: 2021年3月17日(周三) 下午5:43 收件人: "user-zh"

flink sql如何修改执行计划?

2021-05-12 Thread casel.chen
flink sql如何修改执行计划?例如,修改上下游算子不同的并行度,或者将算子链人为打断等等。 我知道如何获取flink sql执行计划,但要怎么人为干预这个执行计划呢?还请大佬解答一下,谢谢!

flink sql写mysql中文乱码问题

2021-05-17 Thread casel.chen
我的flink sql作业如下 SELECT product_name, window_start, window_end, CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, CAST(COUNT(order_no)ASBIGINT) trans_cnt, -- LOCALTIMESTAMP AS insert_time, '微支付事业部'AS bus_name FROM( mysql sink表的定义如下 CREATE TABLE XXX ( ) Engine=InnoDB AUTO_INCREMENT=

Re: flink sql支持Common Table Expression (CTE)吗?

2021-05-23 Thread Jingsong Li
支持。 如果只是在单个sql中复用expression,和temporary view基本一样,区别不大。 在某些优化路径上不同,一般没有实质影响。 Best, Jingsong On Fri, May 21, 2021 at 11:32 PM casel.chen wrote: > flink sql支持Common Table Expression (CTE)吗?是不是可以通过 create temporary view > xxx 来实现?CTE和temporary view的区别是什么? > 例如 > > > w

Re: Re:Re: flink sql cdc并行度问题

2021-05-28 Thread Zorro
如果你是自己实现MongoDB sink的话,你描述的方法看起来是可行的,不过这种实现方式相对比较复杂。 sql keyby可以考虑使用flink提供的 Deduplication 功能。这样的话MongoDB sink就可以开多个并行度,而不用考虑不同key的顺序问题了 -- Sent from: http://apache-flink.147419.n8.nabble.

Re: 回复:Flink sql的state ttl设置

2021-05-28 Thread chenchencc
想问下state ttl能针对单表设置吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:Flink sql的state ttl设置

2021-05-31 Thread LakeShen
或许你可以参考这个: [image: image.png] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/ Best, LakeShen chenchencc <1353637...@qq.com> 于2021年5月28日周五 下午4:30写道: > 想问下state ttl能针对单表设置吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >

Re: flink sql cdc数据同步至mysql

2021-06-08 Thread Leonard Xu
试着回答下这两个问题。 > flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc > connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决? 是的,关键问题是cdc connector为了保证数据一致性只能单并发,所以作业也只能单并发。这个需要cdc connector支持多并发读取,下游sink自然就能解决。 > flink 1.13的jdbc connector新增 sin

Re:Re: flink sql cdc数据同步至mysql

2021-06-10 Thread casel.chen
针对现在flink sql cdc下游并行度无法修改问题,是否可以分两步实现?谢谢! 1. flink sql cdc发到下游kafka,通过 upsert kafka connector,以debezium或canal格式,kafka topic开多个分区 2. 再从kafka消费,通过flink sql同步到最终mysql库 在 2021-06-08 19:49:40,"Leonard Xu" 写道: >试着回答下这两个问题。 > >> flink 1.12的jdbc connector不支持 sink.

Re:Re: flink sql cdc数据同步至mysql

2021-06-12 Thread casel.chen
请问 flink sql cdc 场景下如何增大下游sink端并行度? 我试了修改default.parallism=2参数,并且将operator chain参数设置成false,并没有效果。 而后,我将作业分成两步:首先 源mysql cdc sink到 upsert kafka,再从 upsert kafka sink到 目标mysql。是想通过kafka partition增大sink并行度 初步测试效果是可以的,kafka建了3个partitions,每个partitions都按主键hash分配到数据,下游并行度跟partitions个数对齐。 以下是作业内容

flink sql cdc如何获取元数据

2021-06-22 Thread casel.chen
flink sql cdc如何获取元数据?像数据库名,表名,操作类型,binlog产生时间等。 create table xxx_tbl ( k_op varchar, -- 操作类型 k_database varchar, -- 数据库名 k_table varchar, -- 表名 k_ts. BIGINT, -- binlog产生时间 idBIGINT, name. varchar ) with ( 'connector' = 'mysql-cdc', . 'meta.fields-prefix' = 'k_' )

回复:回复:flink sql 依赖隔离

2021-07-25 Thread silence
就是单独引用的啊,但任务逻辑比较复杂时会同时混用多个udf这个是没法避免的啊 -- 发件人:Michael Ran 发送时间:2021年7月23日(星期五) 17:42 收件人:user-zh ; silence 主 题:Re:回复:flink sql 依赖隔离 建议上传的时候单独放,提交任务的时候 拉下来单独引用 在 2021-07-23 11:01:59,"silence" 写道: > >这边目前主要还是yarn,

Flink sql 维表聚合问题请教

2021-08-03 Thread carlc
请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~ -- 模拟需求(有点牵强...): -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作 -- 1. 创建user_blacklist表 CREATE TABLE `user_blacklist` ( `user_id` bigint(20) NOT NULL, `create_time` datetime N

Re: flink sql job 提交流程问题

2021-08-14 Thread Peihui He
补充: 这个问题在ha的情况下非常突出,因为和hdfs的交互式线性的,当文件达到几百的时候,特别慢 Peihui He 于2021年8月15日周日 上午11:18写道: > Hi all: > > 在使用zeppelin提交sql的过程中总是发现超时现象,通过定位发现有如下问题: > 1、blob client 和blob server 通信时采用单客户端通行,当有比较多的文件时,比如100个,这个耗时挺大的 > > 2、blob server 虽然有blob.fetch.num-concurrent 进行并发控制,但是blob > server在moveTempFileToStor

Flink SQL是否支持Count Window函数?

2021-09-17 Thread casel.chen
今天遇到一个业务场景用到count window,查了下Flink官网目前Flink SQL只支持time window,问一下官方是否打算sql支持count window呢? 如果没有计划的话,自己要如何实现?是否可以像flink 1.13引入的cumulate window写一个自定义窗口函数呢?谢谢!

回复: flink sql建表异常问题

2021-11-16 Thread aiden
provided 您好 我将flink sql建表程序提交到k8s执行时抛出如下异常,网上提示需要引入flink-table-planner-blink等依赖,但实际pom文件中已经引入,请问这个该如何解决,附部分pom文件,flink版本为1.13。谢谢。

flink sql太多insert into语句问题

2021-11-30 Thread casel.chen
我们有一个场景需要从一张kafka源表写很多不同告警规则到同一张告警目标表。规则数量大概有300多个,采用纯flink sql实现。 方案一是采用创建视图,将不同规则union all起来,再将视图插入目标表,发现算子都chain成了一个,因为flink sql算子的名称是flink sql内容,所以算子名称长度超过限制而失败。因而转向方案二 方案二是一条规则对应一条insert into语句,生成graph图会发现fan out特别多。这次没有算子名称超长问题,但作业起动会特别慢。考虑到后续规则还会进行修改,添加或删除。这样慢启动无法接受。 想问一下,这种场景最适合的做法是什

Re: Flink sql jdbc Partitioned Scan timestamp不生效

2021-11-30 Thread Caizhi Weng
Hi! scan.partition.lower-bound 和 scan.partition.upper-bound 都是一个 long 值(而不是一个 timestamp 字符串的形式)。它们将会转换成 where between and 的 SQL 语句通过 jdbc 获取数据。可以检查一下配置项的格式和值的范围是否符合期望。 天下五帝东 于2021年12月1日周三 上午9:23写道: > Hi: > 我在使用flink sql jdbc connector测试partitioned scan功能,发现指定 > scan.partition

Re: Flink sql jdbc Partitioned Scan timestamp不生效

2021-11-30 Thread 天下五帝东
年12月1日周三 上午9:23写道: > >> Hi: >> 我在使用flink sql jdbc connector测试partitioned scan功能,发现指定 >> scan.partition.column 为timestamp类型时,scan.partition.lower-bound >> >> 和scan.partition.upper-bound指定具体的值后,没有读取到相关数据,哪位大佬帮忙解答下 >> >> 谢谢 >> >>

Re: flink sql collect函数使用问题

2021-12-01 Thread cyril cui
af里acc为个list,merge的时候合并,输出的时候 list拼成string即可 casel.chen 于2021年12月2日周四 上午9:46写道: > 使用场景如下,将kafka源表通过flink sql处理成mongodb汇表存入。按照班级进行group > by,输出对应班级所有的学生数据集合。请问用flink sql自带的collect函数能实现吗?如果能的话要怎么写sql? > 如果不能的话要怎么写UDAF,有例子参考吗?谢谢! > > kafka源表: > 班级 学号 姓名 年龄 >

Re:Re: flink sql collect函数使用问题

2021-12-02 Thread casel.chen
可我要的最终结果不是string,最好是通用的Row类型,这样的话下次聚合其他维度就不用重复开发UDF了。 类似我这样的需求应该其他人也会遇到吧? 功能:collect出一个Multiset即map,key是数据本身,value是数据出现的次数,可以按出现次数排序等。 输出可以是去重或不去重的Array(按出现次数排序或不排序),也可以就是map本身 目前collect函数可以输出一个Multiset即map,但要怎么按value即出现次数排序并只输出排序后的keyset,用flink sql要怎么写呢? 在 2021-12-02 09

Re:Re:Re: flink sql collect函数使用问题

2021-12-03 Thread RS
怎么按value即出现次数排序并只输出排序后的keyset,用flink sql要怎么写呢? > > > > > > > > > > > > > > >在 2021-12-02 09:58:28,"cyril cui" 写道: >>af里acc为个list,merge的时候合并,输出的时候 list拼成string即可 >> >>casel.chen 于2021年12月2日周四 上午9:46写道: >> >>> 使用场景如下

Re:Re:Re:Re: flink sql collect函数使用问题

2021-12-05 Thread casel.chen
需求应该其他人也会遇到吧? >>功能:collect出一个Multiset即map,key是数据本身,value是数据出现的次数,可以按出现次数排序等。 >> 输出可以是去重或不去重的Array(按出现次数排序或不排序),也可以就是map本身 >> >> >>目前collect函数可以输出一个Multiset即map,但要怎么按value即出现次数排序并只输出排序后的keyset,用flink sql要怎么写呢? >> >> >> >> >> >&

flink sql回撤流sink优化问题

2021-12-22 Thread casel.chen
flink sql中aggregate without window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? 例如有下面binlog cdc购买数据(订单购买金额会更新): orderid. categorydt

flink sql jdbc sink事务提交问题

2022-02-14 Thread casel.chen
最近在扩展flink sql jdbc connector以支持phoenix数据库,测试debug的时候发现数据能够通过PhoenixStatement.executeBatch()写入,但因为没有提交事务,所以其他人看不到。 源码中PhoenixPreparedStatement.execute()方法会调用executeMutation(statement)方法,继而判断connection.getAutoCommit()与否来执行connection.commit()方法。完了回到PhoenixStatement.executeBatch()执行flushIfNecessary

Re: flink-connector和flink-sql-connector的区别

2022-04-21 Thread Shengkai Fang
hi sql jar 往往是 shade 了相关的依赖,而 普通的 jar 则不带有相关的依赖。正如名字所说,在 table api/sql 的情况下建议使用 sql jar,datastream 建议使用 普通的jar。 Best, Shengkai weishishuo...@163.com 于2022年4月21日周四 16:52写道: > > cdc项目中每种connector都分成flink-connector-xxx和flink-sql-connector-xxx,比如flink-connector-mysql-cdc和flink-sql-connector

Re: How can I set job parameter in flink sql

2022-05-11 Thread yuxia
Hi, AFAK, you can't get the parameter setted via Flink SQL client in udf. If you still want to get the parameters in your udf, you can use the following code to set the parameter: env = StreamExecutionEnvironment.getExecutionEnvironment parameter = new HashMap(); parameter

Re:Re: How can I set job parameter in flink sql

2022-05-11 Thread wang
Ok, got it. Thanks so much! Regards, Hunk -- 发自我的网易邮箱手机智能版 在 2022-05-11 16:46:14,yuxia 写道: Hi, AFAK, you can't get the parameter setted via Flink SQL client in udf. If you still want to get the parameters in your udf, you can use the following code to set the parameter:

flink sql cdc 2.2.1消费mysql binlog异常

2022-10-09 Thread casel.chen
flink sql cdc 2.2.1消费mysql binlog遇到如下异常,有谁遇到过?发现作业自己做了重试后过去了,想知道异常的root cause是什么?手动重起了作业重新消费后还是会出现。 Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at

flink sql作业无缝升级问题

2022-11-28 Thread casel.chen
线上有一个流量较大的flink sql作业需要升级添加业务字段,当前主要是kafka (canal) 多表关联写入 mongodb 数据同步场景,除了source offset外无其他状态,如何让用户对升级无感呢? 常规的停止作业再启动作业至少要几十秒,会造成消息积压告警,有没有可能先启新作业待运行平稳后再停止老作业?kafka group使用同一个,作业启动从group-offsets开始可以吗?另外,如果是有大状态作业又要如何无缝升级?

flink sql是否支持延迟lookup join?

2022-12-06 Thread casel.chen
维表流数据晚于主表流数据到达甚至可能到达不了,所以想设置个5分钟等待窗口,关联上正常处理,关联不上发到另一个kafka topic,这种场景使用flink sql要如何实现?

Whether Flink SQL window operations support "Allow Lateness and SideOutput"?

2023-02-20 Thread wang
Hi dear engineers, One question as title: Whether Flink SQL window operations support "Allow Lateness and SideOutput"? Just as supported in Datastream api (allowedLateness and sideOutputLateData) like: SingleOutputStreamOperator<>sumStream = dataStream.ke

flink sql的codegen导致metaspace OOM疑问

2023-03-29 Thread tanjialiang
Hi all, 我有一个通过flink kubernetes operator定时提交到同一个session作业(底层是将flink sql转JobGraph的逻辑下推到了JobManager执行),当他跑了一段时间后,JobManager报了metaspace OOM. 经过排查后发现是flink sql codegen生成的代码类有一个自增ID,这些类在使用完后不会释放。 疑问: 1. flink sql codegen做这样的一个自增ID有什么特殊意义吗? 2. java中通过类加载器加载的类有什么办法可以释放?

Re: flink sql不支持show create catalog 吗?

2023-10-19 Thread Feng Jin
hi casel 从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。 Best, Feng On Fri, Oct 20, 2023 at 11:55 AM casel.chen wrote: > 之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink > sql不支持show create catalog 。 > 而据我所知doris是支持show create c

Re:回复: 关于Flink SQL DISTINCT问题

2019-09-04 Thread lvwenyuan
对,肯定是按照窗口去重的。我就想问下,窗口去重时,所采用的方式 在 2019-09-04 14:38:29,"athlon...@gmail.com" 写道: >在窗口内去重吧,不可能无限保留去重数据的 > > > >athlon...@gmail.com > >发件人: lvwenyuan >发送时间: 2019-09-04 14:28 >收件人: user-zh >主题: 关于Flink SQL DISTINCT问题 >各位大佬好: > 我想问下,关于flink sql的实时去重

Re: 回复: 关于Flink SQL DISTINCT问题

2019-09-04 Thread JingsongLee
一般是按时间(比如天)来group by,state配置了超时过期的时间。 基本的去重方式就是靠state(比如RocksDbState)。 有mini-batch来减少对state的访问。 如果有倾斜,那是解倾斜问题的话题了。 Best, Jingsong Lee -- From:lvwenyuan Send Time:2019年9月4日(星期三) 15:11 To:user-zh Subject:Re:回复: 关于Flink SQL

回复: Re: Flink SQL :Unknown or invalid SQL statement.

2019-10-08 Thread pengchengl...@163.com
你好,可以参考https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sqlClient.html pengchengl...@163.com 发件人: Henry 发送时间: 2019-10-09 10:44 收件人: user-zh 主题: Re:Re: Flink SQL :Unknown or invalid SQL statement. 哈,非常非常感谢啦。 在 yaml 里定义使用表的哪些内容呢?有木有相关资料呢? 木有搜到呢。谢谢了。 在 2019

flink sql confluent schema avro topic注册成表

2019-12-17 Thread 陈帅
flink sql是否能够支持将confluent schema registry注册的一个avro数据格式 的topic注册成一张table?

Flink Sql Join, how to clear the sql join state?

2020-01-15 Thread LakeShen
Hi community,now I am use flink sql inner join in my code,I saw the flink document, the flink sql inner join will keep both sides of the join input in Flink’s state forever. As result , the hdfs files size are so big , is there any way to clear the sql join state? Thanks to your reply.

Re: 回复: 关于Flink SQL DISTINCT问题

2020-01-18 Thread LakeShen
Best, > Jingsong Lee > > > -- > From:lvwenyuan > Send Time:2019年9月4日(星期三) 15:11 > To:user-zh > Subject:Re:回复: 关于Flink SQL DISTINCT问题 > > 对,肯定是按照窗口去重的。我就想问下,窗口去重时,所采用的方式 > 在 2019-09-04 14:38:29,"athlon...@gmail.com" 写道: > >在窗口内去重吧,不可能无限保留去重数据的 > > > &g

回复: Flink SQL 1.10中ROW_NUMBER的使用

2020-04-06 Thread 111
Hi, 这里不太理解,我是想取每个用户在某个窗口内的topn分类,正常在batch模式下的语法是: row_number() over (PARTITION BY member_id ORDER BY c) AS rn 得到的就是每个用户下category按照对应数量的排序结果。 如果我这里使用HOP_PROCTIME得到了time属性, row_number() over (PARTITION BY member_id ORDER BY time) AS rn 这样的结果貌似是每个用户按照滑动时间窗口排序,并不是每个滑动窗口下按照c来排序。 Best, Xinghalo

回复: Flink SQL 1.10中ROW_NUMBER的使用

2020-04-06 Thread 111
Hi, 原来如此,是搭配使用的。 不过加上条件后,提示无法进行剪枝 Caused by: org.apache.flink.table.api.TableException: This calc has no useful projection and no filter. It should be removed by CalcRemoveRule. at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:176) at or

回复: Flink SQL 1.10中ROW_NUMBER的使用

2020-04-06 Thread 111
Hi, 好的,没问题。 Best, xinghalo | | xinghalo | | xingh...@163.com | 签名由网易邮箱大师定制 在2020年04月7日 14:28,Jark Wu 写道: Hi Xinghalo, 看起来是个 codegen bug, 能帮忙在 JIRA 中建一个 issue 么?最好能附上您的例子。 Best, Jark On Tue, 7 Apr 2020 at 14:22, 111 wrote: Hi, 原来如此,是搭配使用的。 不过加上条件后,提示无法进行剪枝 Caused by: org.apache.flink.ta

回复: Flink SQL 1.10中ROW_NUMBER的使用

2020-04-06 Thread 111
Hi, 已经提交到JIRA,https://issues.apache.org/jira/browse/FLINK-17022 Best, Xinghalo

回复: Re: flink sql string char 不兼容?

2020-04-21 Thread 王双利
hit声明的是varchar,现在是,'false' 编译的时候认为是char(4) ,导致类型不匹配 王双利 发件人: Leonard Xu 发送时间: 2020-04-21 18:29 收件人: user-zh 主题: Re: flink sql string char 不兼容? Hi Sink 对应的字段(taskid\hit)需要声明成varchar, 现在不支持varchar类型写入char(n) 祝好, Leonard Xu > 在 2020年4月21日,18:20,王双利 写道: > > 下面的sql 执行的时候

回复: Re: flink sql string char 不兼容?

2020-04-21 Thread 王双利
我这边用kafka的AppendStream没什么问题, 改的是支持Retract模式的,KafkaTableSinkBase继承的是RetractStreamTableSink 基本是按照下面的链接的地址改的 https://www.cnblogs.com/Springmoon-venn/p/12652845.html 王双利 发件人: Leonard Xu 发送时间: 2020-04-22 09:03 收件人: user-zh 主题: Re: flink sql string char 不兼容? Hi, 王双利 我试了下1.10.0的版本,没能复现你的异常

flink sql 处理时间 时区问题

2020-05-01 Thread hb
``` 代码 val env = StreamExecutionEnvironment.getExecutionEnvironment val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) val t2 = env.fromElemen

Re: flink sql 写 hive分区表失败

2020-05-27 Thread Leonard Xu
ive分区表: > CREATE TABLE `dwd.t1`( > `id` bigint, > `name` string) > PARTITIONED BY ( > `p_year` int, > `p_month` int) > > > CREATE TABLE `dwd.t1_copy`( > `id` bigint, > `name` string) > PARTITIONED BY ( > `p_year` int, > `p_month` int) > > >

Re:Re: flink sql 写 hive分区表失败

2020-05-27 Thread Zhou Zach
doAs(UserGroupInformation.java:1875) >> >> at >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >> >> >> >> &g

Re: flink sql 写 hive分区表失败

2020-05-27 Thread Leonard Xu
; at >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >>> >>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >>> >>> at java.security.AccessController.doPrivileged(Native

Re:Re: flink sql 写 hive分区表失败

2020-05-28 Thread Zhou Zach
) >>>> >>>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >>>> >>>> at >>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >>>> >>>> at org.apach

Re: flink sql 窗口场景的问题

2020-06-02 Thread Leonard Xu
我觉得既然你想要事实输出数据,那就没必要用窗口,用了时间窗口,那就是按照窗口大小来统计了,不能同时两个都实现吧,把这两种场景分开分别实现,或者你要想得到每分钟的聚合结果,就缩小窗口大小。 > > > ---原始邮件--- > 发件人: "steven chen" 发送时间: 2020年6月3日(周三) 凌晨2:49 > 收件人: "user-zh@flink.apache.org" 主题: flink sql 窗口场景的问题 > > > hi : >   我现在遇到有这样一个场景,我们

Re: Flink SQL 子查询优化问题

2020-06-04 Thread godfrey he
> Hi all > > > 版本说明: > Flink 版本:1.10 > Planner:old planner / blink planner > > > > 程序说明(Flink SQL): > > source:每隔一秒输出Tuple2.of(1, "{\"name\": \"a\"}"); > > > > > query:select a.id, a.name, a.n

Re: Flink SQL 子查询优化问题

2020-06-04 Thread godfrey he
chao Yang > > > > > > -- 原始邮件 -- > 发件人: godfrey he 发送时间: 2020年6月4日 21:17 > 收件人: user-zh 主题: 回复:Flink SQL 子查询优化问题 > > > > hi Yichao, > > 目前 planner 会 try best 的将两个相邻的 project 节点合并在一起,除非两个project被分开。 > 就像你上面的那种做法。但是加一个group by的执行代价比较高。 &g

FLINK SQL文档示例是否正确

2020-06-10 Thread 张韩
问题: 文档(https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#scan-projection-and-filter)使用'%'求余运算,在SQL解析报错: org.apache.calcite.sql.parser.SqlParseException: Percent remainder '%' is not allowed under the current SQL conformance level MYSQL conformance 支持'%'运算,使用

Re: flink sql bigint cannot be cast to mysql Long

2020-06-10 Thread Leonard Xu
Hi, 用的 flink 版本是多少? 数据库的字段确定是 bigint 类型吗? > Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast > to java.lang.Long java.math.BigInteger 的范围比 java.lang.Long的范围大很多,是不能cast的,应该是你数据类型对应错误了,可以把mysql 表的schema贴下吗? 祝好, Leonard Xu > 在 2020年6月11日,13:22,Zhou Zach 写道: > > SLF4

Re:Re: flink sql bigint cannot be cast to mysql Long

2020-06-10 Thread Zhou Zach
;, `counts` bigint(20) NOT NULL DEFAULT '0' COMMENT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT=''; 数据库的字段是 bigint 类型,总有场景在mysql的字段设置为bigint吧,如果mysql的字段为bigint,那在创建flink sql时,用什么类型合适呢

Re: flink sql bigint cannot be cast to mysql Long

2020-06-10 Thread Leonard Xu
Hi, JDBC connector 之前不支持 unsigned 类型,unsigned 会比signed 类型更长。 bigint(20) unsigned(range is 0 to 18446744073709551615) 超过了 bigint (range is -9223372036854775808 to 9223372036854775807)的长度。 最新的代码已经修复这个问题了[1],你可以等1.11发布后试用,或者编译下最新的代码,flink 中对应表 声明decimal(20, 0)处理。 祝好, Leonard Xu [1] https://i

Re:Re: flink sql bigint cannot be cast to mysql Long

2020-06-10 Thread Zhou Zach
3ku 在 2020-06-11 14:10:53,"Leonard Xu" 写道: >Hi, > >JDBC connector 之前不支持 unsigned 类型,unsigned 会比signed 类型更长。 >bigint(20) unsigned(range is 0 to 18446744073709551615) 超过了 bigint (range is >-9223372036854775808 to 9223372036854775807)的长度。 > > >最新的代码已经修复这个问题了[1],你可以等1.11发布后试用,或者编译下

回复: FLINK SQL DDL写入hbase问题

2020-06-11 Thread 酷酷的浑蛋
您是说将那几个jar都放到flink/lib下吗? 在2020年06月11日 14:39,Leonard Xu 写道: Hi 你服务器上是否配置了hadoop_classpath? 建议hbase在试用时 用 hadoop_classpath + flink-hbase jar,不然依赖问题会比较麻烦。 祝好 Leonard Xu 在 2020年6月11日,14:24,酷酷的浑蛋 写道: 在使用flink sql ddl语句向hbase中写的时候报如下错误: java.lang.NoClassDefFoundError: org/apache/hadoop

Re: flink sql字段重命名问题

2020-06-12 Thread Benchao Li
直接用SELECT XX AS YY就等价于Table API的renameColumns了吧。 naisili Yuan 于2020年6月12日周五 下午6:23写道: > Hi all > 想请教下,flink使用sql的时候,字段重命名如何解决呢?看到了table api中的renameColumes接口,flink > sql的没有看到明确的接口 > 我自己试了一下在创建表的sql语句中就加入name_alias AS > name,发现可行,但是这样引入的字段,会不会影响效率,希望能获得解答,谢谢! >

Re: flink sql字段重命名问题

2020-06-12 Thread godfrey he
k使用sql的时候,字段重命名如何解决呢?看到了table api中的renameColumes接口,flink > > sql的没有看到明确的接口 > > 我自己试了一下在创建表的sql语句中就加入name_alias AS > > name,发现可行,但是这样引入的字段,会不会影响效率,希望能获得解答,谢谢! > > >

Re: flink sql字段重命名问题

2020-06-12 Thread naisili Yuan
ameColumns了吧。 >> >> naisili Yuan 于2020年6月12日周五 下午6:23写道: >> >>> Hi all >>> 想请教下,flink使用sql的时候,字段重命名如何解决呢?看到了table api中的renameColumes接口,flink >>> sql的没有看到明确的接口 >>>我自己试了一下在创建表的sql语句中就加入name_alias AS >>> name,发现可行,但是这样引入的字段,会不会影响效率,希望能获得解答,谢谢! >>> >>

flink sql DDL支持 Temporal Table 定义吗

2020-06-14 Thread Zhou Zach
ll be specified if any of-- them is specified. Cache is not enabled as default.'connector.lookup.max-retries'='3',-- optional, max retry times if lookup database failed 是不是说在flink sql JDBC Connector DDL中,加上这三个配置项,那么创建的表就是Temporal Table,可以在temporary join 中使用?

flink sql job 提交到yarn上报错

2020-06-15 Thread Zhou Zach
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint. at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCl

flink sql read hbase sink mysql data type not match

2020-06-16 Thread Zhou Zach
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Field types of query result and registered TableSink default_catalog.default_database.user_age do not match. Query schema: [rowkey: STRING, cf: ROW<`age` INT>] Sink schema: [rowkey: STRING, age: INT] at

Re: flink sql 窗口场景的问题

2020-06-18 Thread john
3日 02:56,steven chen 写道: > hi : > 我现在遇到有这样一个场景,我们需要实时去统计5分和30分的粒度,flink sql 窗口使用了处理时间滑动窗口方式 > 但是都是只有5分结束的时候才能把聚合结果输出,这个不满足我们需求,有没有方式可以直接实时输出结果,比如18:02 的统计+1+1 > 都能直接落在18:00-18:05的窗口上,并每次+1都能实时输出,而不是等到窗口结束才sink 到mysql .30分钟我同样

Re: flink sql 窗口场景的问题

2020-06-18 Thread Leonard Xu
Hi, 窗口输出可以加emit策略,在watermark未触发时提前输出window的中间结果,不过社区目前标注的是experimental的功能,生产环境中应谨慎使用。 table.exec.emit.early-fire.enabled table.exec.emit.early-fire.delay 可以参考[1]。 Best Leonard Xu [1] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/fli

Re: flink sql cli 读取 hbase表报错

2020-06-28 Thread Leonard Xu
Hello, 这应该是一个已知bug[1],原因是Configuration是不可序列化的,HbaseRowInputFormat中没有正确处理,导致用户DDL中的zk配置无法传递。 在flink1.11和1.12上已经修复。如果是1.10.x版本中,可以将HBase 的配置文件(hbase-default.xml、 hbase-site.xml) 添加到 classpath下,也可以把 HBase 的配置文件添加到HADOOP_CLASSPATH(flnk启动脚本会检查HADOOP_CLASSPATH环境变量并加载),两种方式Flink集群和SQL Client都能加载到Hbase的

flink SQL如何将秒转换为timestamp

2020-06-30 Thread zilong xiao
有一个字段itime,类型为int,意为当前时间的秒值,如何将该字段转换成timestamp?以下是我的想法,不知是否正确,求遇到过类似问题的大佬指导 TO_TIMESTAMP(DATE_FORMAT(CAST(itime * 1000 as TIMESTAMP(3)), '-MM-dd hh:mm:ss'))

Re: flink sql if 函数使用问题

2020-06-30 Thread Benchao Li
看报错,应该是你的IF的后面两个参数的类型不同吧。这里应该让后面两个参数的类型也相同的,要不然IF函数的返回值类型就不好确定了。 kcz <573693...@qq.com> 于2020年7月1日周三 上午11:03写道: > flink-1.10.1 blink_planner > if使用时候限制了返回的数据类型吗? > Cannot apply 'IF' to arguments of type 'IF( 'IF( 我想创建DDL时候,因为字段可能有空,所以如果为空了我想设置一个默认值,但是报错提示是只支持返回数据类型。 -- Best, Benchao Li

flink sql ddl连接kafka,flink sql使用事件时间无法正常触发时间窗口

2020-11-13 Thread 李世钰
flink版本 flink1.11 flink sql连接kafka create table kafka_table ( log_id  string, event_time bigint, process_time as PROCTIME(), ts as TO_TIMESTAMP(FROM_UNIXTIME(event_time)), watermark for ts as ts - interval '1' second ) with (  'connector' = 'kafka&

Re: flink sql ddl连接kafka,flink sql使用事件时间无法正常触发时间窗口

2020-11-14 Thread Jark Wu
如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发: 1. 保证所有 partition 都有数据。 2. 且每个 partition 数据的 event time 都在前进 3. 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s = 11s 以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。 Best, Jark On Sat, 14 Nov 2020 at 15:11, 李世钰 wrote: > flink版本 flink1.11 > > >

flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 Thread Zhou Zach
代码在flink 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常: Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGrap

Re: 回复: Flink SQL处理Array型的JSON

2020-07-21 Thread wxpcc
如果不等待最新版本的话也可以这样 将 纯数组的数据作为字符串 从source消费,增加自定义的json解析函数,判断 isArray 之后 遍历进行 collect if (Objects.nonNull(str)) { if (isArray) { JsonNode node = objectMapper.readTree(str); if (node.isArray()) { Iterator nodeIterator = n

Re: 回复: Flink SQL处理Array型的JSON

2020-07-21 Thread wxpcc
补充: 最终查询为 SELECT t.* FROM kafka_source, LATERAL TABLE( fromJson(data) ) as t -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FLINK SQL view的数据复用问题

2020-08-04 Thread godfrey he
blink planner支持将多sink的query优化成尽量复用重复计算部分。 1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务 kandy.wang 于2020年8月4日周二 下午5:20写道: > FLINK SQL view相关问题: > create view order_source > > as > > select order_id, order_goods_id, user_id,... > > from ( > >

Re:Re: FLINK SQL view的数据复用问题

2020-08-04 Thread kandy.wang
尽量复用重复计算部分。 >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务 > >kandy.wang 于2020年8月4日周二 下午5:20写道: > >> FLINK SQL view相关问题: >> create view order_source >> >> as >> >> select order_id, order_goods_id, user_id,... >> >

flink sql 添加全局唯一id 字段

2020-08-06 Thread drewfranklin
Hello all . 想请教下。flink sql 想添加一个全局唯一的自增数据,有什么好的方法吗?

<    3   4   5   6   7   8   9   10   11   12   >