flink中是仍然存在这个问题。
>
>
>
>
> -- 原始邮件 --
> 发件人:
> "user-zh"
> <
> libenc...@apache.org>;
> 发送时间: 2024年5月20日(星期一) 中午12:51
> 收件人
> 发送时间: 2024年5月20日(星期一) 上午10:32
> 收件人: "user-zh"
> 主题: Re: flinksql 经过优化后,group by字段少了
>
>
>
> 看起来像是因为 "dt = cast(CURRENT_DATE as string)" 推导 dt 这个字段是个常量,进而被优化掉了。
>
> 将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛
看起来像是因为 "dt = cast(CURRENT_DATE as string)" 推导 dt 这个字段是个常量,进而被优化掉了。
将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛?
℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月19日周日 01:01写道:
>
> create view tmp_view as
> SELECT
> dt, -- 2
> uid, -- 0
> uname, -- 1
> uage -
hi, 东树
隐藏sql中的敏感信息,这个需要外部的大数据平台来做。
比如:StreamPark 的变量管理,可以提前维护好配置信息,编写sql时引用配置,由平台提交至flink时解析sql并替换变量。
Best,
Zhongqiang Gong
杨东树 于2024年3月10日周日 21:50写道:
> 各位好,
>考虑到数据库用户、密码安全性问题,使用FlinkSQL connector
> jdbc时,请问如何对数据库的用户密码进行加密/隐藏呢。例如如下常用sql中的password:
> CREATE TABLE wo
1. 目前 JDBC connector 本身不支持加密, 我理解可以在提交 SQL 给 SQL 文本来做加解密的操作,或者做一些变量替换来隐藏密码。
2. 可以考虑提前创建好 jdbc catalog,从而避免编写 DDL 暴露密码。
Best,
Feng
On Sun, Mar 10, 2024 at 9:50 PM 杨东树 wrote:
> 各位好,
>考虑到数据库用户、密码安全性问题,使用FlinkSQL connector
> jdbc时,请问如何对数据库的用户密码进行加密/隐藏呢。例如如下常用sql中的password:
>
各位好,
考虑到数据库用户、密码安全性问题,使用FlinkSQL connector
jdbc时,请问如何对数据库的用户密码进行加密/隐藏呢。例如如下常用sql中的password:
CREATE TABLE wordcount_sink (
word String,
cnt BIGINT,
primary key (word) not enforced
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localho
flink版本1.18
场景如下:
A表字段:
id,update_time(date格式)
一条数据:
1,2023-01-12
现在我需要保留update_time+1年,大于当前日。
简单地写一个sql:
select
id,update_time
from A
where TIMESTAMPADD(YEAR,1,update_time) > CURRENT_DATE;
结果:
在2024年1月11日这一天,where条件达成,这条数据不会被过滤掉;
在2024年1月12日,sql并不会触发计算来过滤掉此条数据。
在真实的场景中,update_time跨度很多年
式写入一个OLAP系统(譬如doris/ck),读时再聚合(需要一个稳定可靠的外部存储)
你这边用flink做滑动窗口的计算会遇到这样的问题吗?是否还有其他更好解决办法?
十分期待你的反馈
best,
tanjialiang.
回复的原邮件
| 发件人 | Shammon FY |
| 发送日期 | 2023年5月29日 09:08 |
| 收件人 | |
| 主题 | Re: FlinkSQL大窗口小步长的滑动窗口解决方案 |
Hi,
这是窗口触发后发送的数据量过大吗?调大资源,加大窗口计算的并发度是否可以缓解这个问题?
Best,
Shammon FY
Hi,
这是窗口触发后发送的数据量过大吗?调大资源,加大窗口计算的并发度是否可以缓解这个问题?
Best,
Shammon FY
On Fri, May 26, 2023 at 2:03 PM tanjialiang wrote:
> Hi, all.
> 我在使用FlinkSQL的window tvf滑动窗口时遇到一些问题。
> 滑动步长为5分钟,窗口为24小时,group by
> user_id的滑动窗口,当任务挂掉了或者从kafka的earliest-offset消费,checkpoint很难成功。
> 因为从earliest开始消费,数据很
Hi, all.
我在使用FlinkSQL的window tvf滑动窗口时遇到一些问题。
滑动步长为5分钟,窗口为24小时,group by
user_id的滑动窗口,当任务挂掉了或者从kafka的earliest-offset消费,checkpoint很难成功。
因为从earliest开始消费,数据很快就会堆满缓冲区产生背压,这时这一批数据可能会触发N次窗口计算往下游发,每次触发的操作成本是(用户基数 * 24 *
60 / 5),checkpoint barrier可能会一直卡住。
这时候有什么办法可以破局吗?
best,
tanjialiang.
从 plan 上看起来在 sink 节点这里因为推导不出 upsert key 加上了 SinkUpsertMaterializer[1],
这里会按照 sink 表定义的主键进行 keyby shuffle[2], 只能保证最终一致性.
另外你的操作描述中 schema 为三列, 但 DDL 是四列, 且格式乱了.
一些可能的建议如下
1. 如果上游数据有主键并且也是 rowid 的话, 建议在 Flink source 表上声明 PK, 避免额外生成 materializer
节点; 同时注意在声明 Flink source 表时不要带上 metadata 列 (比如 op), 这会
Hi
如果没有现成的系统函数,你可以写个自定义udf来实现
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/
Best,
Shammon
On Mon, Mar 6, 2023 at 7:46 PM 唐世伟 wrote:
>
> 我们需要对两个时间相减,精度为毫秒。但是无论是TIMESTAMPDIFF函数,还是先转成UNIX_TIMESTAMP,都只支持秒的精度。请问还有其他方法吗?
我们需要对两个时间相减,精度为毫秒。但是无论是TIMESTAMPDIFF函数,还是先转成UNIX_TIMESTAMP,都只支持秒的精度。请问还有其他方法吗?
刚做了一下测试
目前假定有3行数据需要同步(全量):
| 编号 |
电话
|
座机
|
| 1 |
1311313
|
123
|
| 2 |
1311313
|
456
|
| 3 |
1311313
|
789
|
这个时候我修改第四行数据的两个字段(增量):
| 1
|
电话
|
座机
|
| 1 |
1311313
|
123
|
| 2 |
1311313
|
456
|
| 3 |
13113133110
|
888
|
修改完后我删除字段2这个时候去mysql看结果2是正确被删除,且无新增的(操作正确).
然后我继续删除数据3
hi 早上好
我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称,
手机, 座机])
+- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"],
名称=[$1], 手机=[CAST($2):VA
Hi,
抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在 1.16.1
上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan
打印出来看看.
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
祝好!
Jane
On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪
hi 你好
目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的
在 2023-03-02 11:52:41,"Jane Chan" 写道:
>Hi,
>
>可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
>query 在 1.16.2 上验证没有问题
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/ex
Hi,
可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
query 在 1.16.2 上验证没有问题
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
Best,
Jane
On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 wrote:
> flink ,kafka连接 jdbc连接版本都是1.15.2的
>
>
>
>
>
>
>
>
flink ,kafka连接 jdbc连接版本都是1.15.2的
在 2023-03-01 18:14:35,"陈佳豪" 写道:
>问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
>String kafka = "CREATE TABLE `电话` (`rowid`
>VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
>VARCHAR(2147483647),`63fd660536521f81a2cfabad`
>VARCHAR(65535),`63fd66
问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
String kafka = "CREATE TABLE `电话` (`rowid`
VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
VARCHAR(2147483647),`63fd660536521f81a2cfabad`
VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535) ) WITH ( 'connector'
= 'kafka', 'topic' =
'sz_work
Hi
目前SQL还不支持watermark对齐,目前有FLIP正在讨论中
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240884405
Best,
Shammon
On Wed, Feb 22, 2023 at 3:15 PM haishui wrote:
> Hi, all
> 以并行度4读取kafka的topic1和topic2形成两个流,然后IntervalJoin。在kafka堆积大量数据的情况下,我分别用SQL和DataStream
> API实现了上述功能。
>
>
> 使用SQL
Hi, all
以并行度4读取kafka的topic1和topic2形成两个流,然后IntervalJoin。在kafka堆积大量数据的情况下,我分别用SQL和DataStream
API实现了上述功能。
使用SQL实现的作业中IntervalJoin算子的状态会逐渐增大,直到checkpoint失败。原因是在8个Source分区中输出水位线差距很大。
使用API实现的作业,在使用Flink15版本的水位线对齐后可以保证正常读取topic内的所有数据。
想请教一下大家如何在SQL上解决Source处水位线差距过大,数据堆积导致checkpoint失败问题。还有如果只有一个topic
dob_dim_account 维表如果使用 jdbc 的 connector, flink 会在初始化的时候一次性读取所有的数据,
后续数据库中更新并不会触发 flink 计算。
要解决这个问题, dob_dim_account 需要变成流表。
Zhiwen Sun
On Thu, Nov 17, 2022 at 1:56 PM Jason_H wrote:
> hi,你好
> 这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题
>
>
> | |
> Jason_H
>
hi,你好
这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题
| |
Jason_H
|
|
hyb_he...@163.com
|
Replied Message
| From | 任召金 |
| Date | 11/15/2022 09:52 |
| To | user-zh |
| Subject | Re: flinksql join |
hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL
hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。
| |
Jason_H
|
|
hyb_he...@163.com
|
Replied Message
| From | RS |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据
hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。
| |
Jason
|
|
hyb_he...@163.com
|
Replied Message
| From | RS |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据
\n" +
"SUPER_ORG_ID string, \n" +
"IS_OUTSIDE BIGINT \n" +
") \n" +
"WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '***',\n" +
" 'driver' = 'com.mysql.c
\n" +
"SUPER_ORG_ID string, \n" +
"IS_OUTSIDE BIGINT \n" +
") \n" +
"WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '***',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver'
7;***',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
//" 'lookup.cache
用普通的 join, 不要用 lookup join
Zhiwen Sun
On Fri, Nov 11, 2022 at 11:10 AM Jason_H wrote:
>
>
> hi,大家好
>
> 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
> kakfa输入:
> 账号 金额 笔数
> 100 1
hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
100 1 -> 未匹配
100 1 -> 未匹配
100 1 -> 匹配上
维表
账号 企业
-> 后插入的账号信息
实际输出结果
企业 金额
Hi.
可以看看这个回答[1]。
Best,
Alibaba
[1]
https://stackoverflow.com/questions/71779752/why-is-redis-source-connector-not-available-for-flink
Jason_H 于2022年10月26日周三 14:23写道:
> hi,
> 请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。
>
>
> | |
> Jason_H
>
Hi,
Flink 的 lookup join 目前不支持对维表进行预处理, 并且需要有对维表原始字段的等值连接条件(因为需要通过确定的字段值去查找)
示例中 t4 字段不做计算应该是 work 的, 比如 udf(t1.telephone_no) = t4.de_mobile
Best,
Lincoln Lee
Fei Han 于2022年10月27日周四 12:12写道:
> 大家好!请教几个问题
> 1. FlinkSQL维表join的时候,能不能把维表先用flinksql 当做一个临时表,然后在用临时表做维表join
> 例如:临
hi,
请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。
| |
Jason_H
|
|
hyb_he...@163.com
|
大家好!请教几个问题
1. FlinkSQL维表join的时候,能不能把维表先用flinksql 当做一个临时表,然后在用临时表做维表join
例如:临时表
WITH employee_tmp AS(
select
userid as userid,
name as name,
mobile as de_mobile
from ygp_dwd_catalog.flink_dwd.employee
)
select
*
from ( select
*
from ygp_dwd_catalog.flink_dwd.xxx ) t1
left join
hi,
请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。
| |
Jason_H
|
|
hyb_he...@163.com
|
Dear Flink:
一需求, FlinkSQL可以用udf实现,捕获标签值的变化。
例如: 若当前用户由低端用户变为中端用户或由中端用户变为高端用户,输出只要用户state状态发生变化,结果用户状态打标为1,反之为0;
有什么好的实现方式没?
/StreamExecGroupWindowAggregateBase.scala#L171
在 2022-06-01 15:41:05,"hdxg1101300...@163.com" 写道:
>您好:
> 最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
> 比如这样一条sql语句:
> select
>dim,
>count(*) as pv,
>sum(price) as sum_price,
>max(price) as max_price,
>min(price) as min_price
Dear:
您可以首先建一个这样的对象
class Acc{
long sum;
long max;
long min;
...
}
在 AggregateFunction 里面维护这样的 ACC ,
就可以在 add 方法里面对维护的 acc 和新传入的值之间实现多需求下的结果更新。
不知道你想了解的是不是这个意思
-- Original --
From: "Lincoln Lee"
flink sql 的实现可以参考下 flink-table planner&runtime 部分的代码
从 datastream 层面来说, 可以基于 KeyedProcessFunction, 比如
datastream.keyby(...).process(keyedProcessFunction)... 来实现自己定义的逻辑
Best,
Lincoln Lee
hdxg1101300...@163.com 于2022年6月1日周三 15:49写道:
> 您好:
>最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4
您好:
最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
比如这样一条sql语句:
select
dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval
dxg1101300...@163.com
>
>
> *发件人:* Jingsong Li
> *发送时间:* 2022-05-26 14:47
> *收件人:* hdxg1101300123
> *抄送:* dev
> *主题:* Re: Re: flinksql关联hive维表java.lang.UnsupportedOperationException错误
> Please don't use Chinese on the dev mailing list to discuss issues, I've
>
>> 你好:
>> 我在使用flink1.12.4版本和hive1.1.0版本时遇到下面的错误:
>>
>>
>> 场景时使用hive的catalog管理元数据,用flinksql把Kafka的数据注册成输入表,关联hive维表做数据拉宽;提交任务到yarn时遇到如下错误;
>>sql如下:
>>create view if not exists dwm_ai_robot_contact_view as select
>> CALLER,
Hello:
Now we cann't add a shuffle-operation in a sql-job.
Sometimes , for example, I have a kafka-source(three partitions) with
parallelism three. And then I have a lookup-join function, I want process the
data distribute by id so that the data can split into thre parallelism evenly
(The so
?? dlink ?? FlinkSQL??
https://github.com/DataLinkDC/dlink
-- --
??:
"us
get();
LOG.info("Job {} is submitted successfully", jobID);
}
}
}
Best,
Yang
吕宴全 <1365976...@qq.com.invalid> 于2022年4月24日周日 14:45写道:
> 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application
> mode)和即席查询(Session mode)。在Application模式下,从jar中构建job
SQL Client的Application模式现在还不支持,方案在设计中。
https://issues.apache.org/jira/browse/FLINK-26541
吕宴全 <1365976...@qq.com.invalid> 于2022年4月24日周日 14:45写道:
> 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application
> mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:
>
>
??KyuubiFlinkSQL??k8s(Application
mode)??(Session
mode)Application??jar??jobgraph??SQL??
CREATE TABLE T (
id INT
) WITH (
'connector.type' = 'filesystem',
'connector.path' = '
??KyuubiFlinkSQL??k8s(Application
mode)??(Session
mode)Application??jar??jobgraph??SQL??
CREATE TABLE T (
id INT
) WITH (
'connector.type' = 'filesystem',
'connector.path' = '
你好,可以贴下客户端的具体提交命令吗?
Best,
Zhanghao Chen
From: 799590...@qq.com.INVALID <799590...@qq.com.INVALID>
Sent: Tuesday, April 12, 2022 10:46
To: user-zh
Subject: flinksql执行时提示自定义UDF无法加载的
环境信息
flink-1.13.6_scala_2.11
java 1.8
使用的是standalonesession集群模式,n
环境信息
flink-1.13.6_scala_2.11
java 1.8
使用的是standalonesession集群模式,node01为jobmanager node02和node03为taskmanager
UDF代码
package com.example.udf;
import org.apache.flink.table.functions.ScalarFunction;
public class SubStr extends ScalarFunction {
public String eval(String s, Integer start,Inte
看不到图
赵旭晨 于2022年3月15日周二 12:25写道:
> flink版本:1.14.3 场景如下:
> sql:
> set table.exec.state.ttl=1 day;
> describe t_k_chargeorder;
> describe t_k_appointment;
> SELECT
> ReportTime,
> sum( InsertAppointmentCount ) + sum( InsertChargeOrderCount )
> kpitotalcount,
> sum( InsertActualPriceCount ) Insert
flink版本:1.14.3 场景如下:
sql:
set table.exec.state.ttl=1 day;
describe t_k_chargeorder;
describe t_k_appointment;
SELECT
ReportTime,
sum( InsertAppointmentCount ) + sum( InsertChargeOrderCount ) kpitotalcount,
sum( InsertActualPriceCount ) InsertActualPriceCount,
sum( InsertAppointmentCount ) Insert
Hello , Flink SQL 会将SQL解析、优化并最终翻译成DataStream作业,所以本质上跟直接用DataStream
API直接写Flink作业没有根本的区别,反而会因为一些通用的优化和代码生成,在性能上可能会有一些提升。
Pinjie Huang 于2022年2月18日周五 11:28写道:
> Hi all,
>
> 同样一个task 用flinksql 写和DataStreamAPI写比较,是否会占用更多的CPU和memory?是否有performance
> 比较的benchmark?
>
> Thanks,
> Pinjie Huang
>
我理解只是不同api而已
> 2022年2月18日 上午11:18,Pinjie Huang 写道:
>
> Hi all,
>
> 同样一个task 用flinksql 写和DataStreamAPI写比较,是否会占用更多的CPU和memory?是否有performance
> 比较的benchmark?
>
> Thanks,
> Pinjie Huang
Hi all,
同样一个task 用flinksql 写和DataStreamAPI写比较,是否会占用更多的CPU和memory?是否有performance
比较的benchmark?
Thanks,
Pinjie Huang
hello , listagg,
-- --
??:
"Mr.S"
/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
陈卓宇 <2572805...@qq.com.invalid> 于2021年12月25日周六 23:30写道:
> 您好社区:
> 场景是这样的:
>
> 我司要求将标签数据每日同步一份到mongodb供业务开发同学进行使用,面临这样一个问题,我是不能先删表在建表的,这样会导致接口查询mongodb出现数据查询不到的风险。而是使用切表,将同步表设为:表名_时间戳,将历史表删除,在将同步表改为正确表
??
??
mongodbmongodb??:_??flinksql
陈卓宇 你好,
在默认情况下,所有提交后的DML都是异步执行的,详见TableEnvironment.executeSql(String
statement)的注释。使用.await()和不使用.await()的区别是使用await()后会等待异步查询返回第一行结果(题外话:请注意INSERT和SELECT的区别),详见TableResult.await()注解,具体代码见TableResultImpl.awaitInternal(long
timeout, TimeUnit unit), 由于此时入参timeout为-1,导致future.get()被调用,
强制等待resultProv
??
String initialValues =
"INSERT INTO kafka\n"
+ "SELECT CAST(price AS DECIMAL(10, 2)), currency, "
+ " CAST(d AS DATE), CAST(t AS TIME(0)), CAST(ts AS
TIMESTAMP(3))\n"
+ "FROM (VALUES (2.02,'Euro','2019-12-12', '00:00:01',
'2019-1
在自定义udaf函数实现中使用了一些flinksql不支持的数据类型
想请问如何进行自定义数据类型的实现
Exception in thread "main" org.apache.flink.table.api.ValidationException:
SQL validation failed. An error occurred in the type inference logic of
function 'Average'.
at
org.apache.flink.table.planner.calcite.FlinkPla
> 于2021年12月20日周一 17:00写道:
> 在自定义udaf函数实现中使用了一些flinksql不支持的数据类型
> 想请问如何进行自定义数据类型的实现
>
>
>
>
>
>
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. An error occurred in the type
udafflinksql
??
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL
validation failed. An error occurred in the type inference logic of function
'A
-- --
??:
"user-zh"
您好,目前flinkSQL KafkaConnector目前不支持Source算子并行度配置以及数据分发策略选择:
导致的问题:
1、当并行度 > source中间件消息分区时, 超出的部分会空跑占用资源
2、假设: 并行度>消息分区, source vertex: Source: TableSourceScan->Calc(select…);
当Calc(select..)为cpu密集(json解析)算子,其将会成为任务的性能瓶颈,Calc算子无法享受到扩容带来的资源(超出kafka分区的并发没有数据流入)。
请问下目前社区有没有现有的 / 计划中的解决方案?
您好,目前flinkSQL KafkaConnector目前不支持Source算子并行度配置以及数据分发策略选择:
导致的问题:
1、当并行度 > source中间件消息分区时, 超出的部分会空跑占用资源
2、假设: 并行度>消息分区, source vertex: Source: TableSourceScan->Calc(select…);
当Calc(select..)为cpu密集(json解析)算子,其将会成为任务的性能瓶颈,Calc算子无法享受到扩容带来的资源(超出kafka分区的并发没有数据流入)。
请问下目前社区有没有现有的 / 计划中的解决方案?
您好,目前flinkSQL KafkaConnector目前不支持Source算子并行度配置以及数据分发策略选择:
导致的问题:
1、当并行度 > source中间件消息分区时, 超出的部分会空跑占用资源
2、假设: 并行度>消息分区, source vertex: Source: TableSourceScan->Calc(select…);
当Calc(select..)为cpu密集(json解析)算子,其将会成为任务的性能瓶颈,Calc算子无法享受到扩容带来的资源(超出kafka分区的并发没有数据流入)。
请问下目前社区有没有现有的 / 计划中的解决方案?
Hi!
stmtSet.execute() 默认是异步的,只是提交作业而不会等待作业完成。如果需要等待作业完成再进行后续步骤,需要用
stmtSet.execute().await()。
陈卓宇 <2572805...@qq.com.invalid> 于2021年12月10日周五 20:25写道:
> 您好社区:
>
> 我在使用flinksql将数据表A_now写入到数据库中后还有一步操作:将表A删除,完成将A_now更名为A,的切表操作。
> 发现当执行:
> //sql 插入数据到数据库操作
>
??
flinksqlA_now:AA_now??A??
//sql
StatementSet stmtSet = tenv.createStatementSet () ;
stmtSet.addInsertSql ( insertSqlMongoDB ) ;
stmtSet.addInsertSql ( insertSql
hi,有人清楚如上问题吗,确认下是不是bug,我感觉是某种情况下会导致的问题,这个情况大概率是flink应该兼容考虑的。
yidan zhao 于2021年11月26日周五 下午2:19写道:
> 我认为这个应该是bug。
>
> yidan zhao 于2021年11月26日周五 上午11:18写道:
>
>> 如题,注意,非检查点本身失败,而是检查点完成后导致任务失败。
>>
>> 目前跟进报错是PartitionTimeCommitTrigger.committablePartitions部分如下代码报的异常:
>>
>> if (!watermarks.containsKey
我认为这个应该是bug。
yidan zhao 于2021年11月26日周五 上午11:18写道:
> 如题,注意,非检查点本身失败,而是检查点完成后导致任务失败。
>
> 目前跟进报错是PartitionTimeCommitTrigger.committablePartitions部分如下代码报的异常:
>
> if (!watermarks.containsKey(checkpointId)) {
> throw new IllegalArgumentException(
> String.format(
>
如题,注意,非检查点本身失败,而是检查点完成后导致任务失败。
目前跟进报错是PartitionTimeCommitTrigger.committablePartitions部分如下代码报的异常:
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(
String.format(
"Checkpoint(%d) has not been snapshot. The
watermark information is
这个可以,非常感谢。
select user, sum(num * IF(flag=1, 1, 0)) as num
> from (
> select user, ord, LAST_VALUE(num) as num, LAST_VALUE(flag) as flag
> from tmpTable,
> group by user, ord
> ) t1
> group by user
---
> 2021年11月25日 11:19,Tony Wei 写道:
>
> 上一封的 sql 稍微有誤,不需要 group by user, ord 才對:
>
>
上一封的 sql 稍微有誤,不需要 group by user, ord 才對:
select user, sum(num) as num
> from (
> select user, ord, num * IF(flag=1, 1, -1) as num
> from tmpTable
> ) t1
> group by user
或者也可以考慮這種寫法:
select user, sum(num * IF(flag=1, 1, 0)) as num
> from (
> select user, ord, LAST_VALUE(num) as num, LAST
Hi,
對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為:
+--+---+
| user | num |
+--+---+
| b | 20|
+--+---+
因為 user a 被 `where flag = 1` 過濾掉了,所以實際下游應該是撤回了 insert 的操作才對,而不是更新成 num = 0。
或許可以考慮把 sql 寫法改為這樣試試?
select user, sum(num) as num
> from (
> select use
Hi!
这是因为我们有配置关闭 subplan reuse 和 source reuse,因此需要先把 plan 拆开,然后再判断是否允许
reuse,如果允许才能合并。
岳晗 于2021年11月24日周三 下午3:55写道:
> Hi,
>
>
> 请问下FlinkSQL物理计划转换为ExecNodeGraph的时候,拿到optimizedRelNodes后,
>
>
> 首先执行:SameRelObjectShuttle Rewrite same rel object to different rel objects
Hi,
这三条数据的话:
new Tuple4<>("a", "a1",30,1),new Tuple4<>("b", "b1",20,1),new
Tuple4<>("a","a1",30,0)
计算结果是:
| +I | a | 30 |
| +I | b | 20 |
| -D | a | 30 |
实际想要的是
a 30
b
Hi!
无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的
CloseableIterator,然后通过 Row#getKind 获得该 row 对应的 op。
顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。
wushijjian5 于2021年11月24日周三 下午9:05写道:
>
> DataStream> dataStream =
> env.fromElements(
> new Tuple4<>("a",
Hi??
??FlinkSQL??ExecNodeGraphoptimizedRelNodes
??SameRelObjectShuttle Rewrite same rel object to different rel objects.
e.g.
Join
Join
/ \
/ \
Filter1 Filter2 => Filter1
Filt
这是个依赖问题,你检查下你环境中是否只使用sql connector 的jar,即 flink-sql-connector-elasticsearch7,
如果不是 datastream 作业是不需要 flink-connector-elasticsearch7 这个
jar包的。如果不是这个问题,你可以分析下你作业里使用的 es 相关依赖,可以参考异常栈确定类再去确定jar包,看下是不是多加了一些无用的jar。
祝好,
Leonard
> 在 2021年11月22日,12:30,mispower 写道:
>
> 你好,咨询一下后续你这个问题是如何解决的?
>
>
>
hi,还是这个问题,请问有什么确定的方法能确认某个文件属于无用,还是有用吗。
比如一种复杂的方式是:判定任务当前运行到什么时间点,比如14点,认为12点的数据已经完整了,则12点对应的分区中.开头文件都可以删除。但这种判定需要结合任务的watermark看任务跑到什么时间等,复杂性较高。
话说success文件可行吗,compact结束才有success?还是先有success后再慢慢compact呢。如果是前者,我可以写个ct脚本,遍历目录下存在success的情况下,则可以删除该目录下.开头的全部文件。
yidan zhao 于2021年11月17日周三 上午10:22写道:
>
好的
RS 于2021年11月18日周四 上午9:32写道:
> 1. 文件名是不带.zlib后缀的
> 2.
> ORC格式默认是配置了ZIP压缩的,并且开启的,你可以配置'orc.compress'='NONE'测试下,看下不压缩的大小,没有压缩的文件应该是更大的
>
>
> 在 2021-11-16 17:29:17,"yidan zhao" 写道:
> >我看了下,默认不带.zlib之类的后缀,我加了也看不出来到底有没有压缩。
> >其次,orc.compression官方介绍默认是zlib,貌似默认就有开启压缩?
> >
> >RS 于2021年11月15日周一 上午9:
1. 文件名是不带.zlib后缀的
2. ORC格式默认是配置了ZIP压缩的,并且开启的,你可以配置'orc.compress'='NONE'测试下,看下不压缩的大小,没有压缩的文件应该是更大的
在 2021-11-16 17:29:17,"yidan zhao" 写道:
>我看了下,默认不带.zlib之类的后缀,我加了也看不出来到底有没有压缩。
>其次,orc.compression官方介绍默认是zlib,貌似默认就有开启压缩?
>
>RS 于2021年11月15日周一 上午9:55写道:
>
>> 官网里面有介绍这个,你是要这个吧
>>
>> https://nightlies.
还有基于检查点启动,首先数据完整性最终实际没问题对吧。
yidan zhao 于2021年11月17日周三 上午10:22写道:
> 出错原因是因为机器不稳定,tm超时等。
> 话说这种有什么判别方法用于定期清理吗。
>
> Caizhi Weng 于2021年11月17日周三 上午9:50写道:
>
>> Hi!
>>
>> 因为 compact 是在每次 checkpoint 的时候进行的,在做 checkpoint 之前产生的文件都是以 .
>> 开头的,表示当前不可见。只有
>> checkpoint
>> 之后才会重命名为可见文件。因此如果任务频繁出现错误,这些不可见文件就
出错原因是因为机器不稳定,tm超时等。
话说这种有什么判别方法用于定期清理吗。
Caizhi Weng 于2021年11月17日周三 上午9:50写道:
> Hi!
>
> 因为 compact 是在每次 checkpoint 的时候进行的,在做 checkpoint 之前产生的文件都是以 . 开头的,表示当前不可见。只有
> checkpoint
> 之后才会重命名为可见文件。因此如果任务频繁出现错误,这些不可见文件就会留在目录里,导致文件数增加。建议首先把任务为什么频繁出错查出来。
>
> yidan zhao 于2021年11月16日周二 下午5:36写道:
>
> >
> >
Hi!
因为 compact 是在每次 checkpoint 的时候进行的,在做 checkpoint 之前产生的文件都是以 . 开头的,表示当前不可见。只有
checkpoint
之后才会重命名为可见文件。因此如果任务频繁出现错误,这些不可见文件就会留在目录里,导致文件数增加。建议首先把任务为什么频繁出错查出来。
yidan zhao 于2021年11月16日周二 下午5:36写道:
>
> 如题,目前没有具体看是没生效,还是来不及compact。任务正常情况没问题,但是如果任务出现频繁错误,导致过一会重启一次,这种情况导致文件数暴增,compact功能不生效。
>
如题,目前没有具体看是没生效,还是来不及compact。任务正常情况没问题,但是如果任务出现频繁错误,导致过一会重启一次,这种情况导致文件数暴增,compact功能不生效。
我看了下,默认不带.zlib之类的后缀,我加了也看不出来到底有没有压缩。
其次,orc.compression官方介绍默认是zlib,貌似默认就有开启压缩?
RS 于2021年11月15日周一 上午9:55写道:
> 官网里面有介绍这个,你是要这个吧
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/orc/
>
>
> Orc format also supports table properties from Table properties. For
> e
; 引擎 查询性能是可以接受的
>
>
>
> ---
> Best,
> WuKong
>
>
> 发件人: Caizhi Weng
> 发送时间: 2021-11-12 11:32
> 收件人: flink中文邮件组
> 主题: Re: FlinkSQL 1.12 Temporal Joins 多表关联问题
>
>
> Hi!
>
> 这是说每次主流来一条数据,都要去维表里查询一次吗?然后你想每次攒一批数据,一次性查询以提高性能?
>
> 如果是的话,一部分维
:
mongosql ??sql:
CREATE TABLE label (
distinct_id BIGINT,
xwho String,
sync5 decimal,
sync4 decimal,
sync6 string,
syncea1 string,
aa string,
ceshi string,
ttt string,
tongji decimal,
qweqwe array
or
> 我看是因为array source到hdfs的一张orc的表
>
> 陈卓宇
>
>
>
>
>
>
>
> -- 原始邮件 --
> 发件人:
> "user-zh"
>
??debug
??flink1.12.5??flink-orc_2.11org/apache/flink/orc/vector/AbstractOrcColumnVector.java
??createFlinkVector??ListColumnVectorflink??master??2021/5/12??wangwei1025??pr??1.12.5
??:
string_tag string
number_tag number
boolean_tag boolean
datetime_tag datetime
arr_tag array
??:
string_tag string
number_tag number
boolean_tag boolean
datetime_tag datetime
arr_tag array
??:
string_tag string
number_tag number
boolean_tag boolean
datetime_tag datetime
arr_tag array
---
Best,
WuKong
Caizhi Weng
?? 2021-11-12 11:32
flink??
?? Re: FlinkSQL 1.12 Temporal Joins
Hi??
?? jdbc ?? hbase
Hi :
第一个 我了解了Cache 不太适合我的场景,因为我的表都是几十亿量级,同时 我要根据一些关键键 去数据库里查询,所以 我先在Job 中
聚合一些主键,通过In 条件 去查询。
第二个 好像是我理解的问题,最初想通过Flink Sql 把整体逻辑 下发到数据库去查询,因为有些OLAP 引擎 查询性能是可以接受的
---
Best,
WuKong
发件人: Caizhi Weng
发送时间: 2021-11-12 11:32
收件人: flink中文邮件组
主题: Re: FlinkSQL 1.12 Temporal Joins 多表关联问题
Hi
Hi!
这是说每次主流来一条数据,都要去维表里查询一次吗?然后你想每次攒一批数据,一次性查询以提高性能?
如果是的话,一部分维表(如 jdbc 和 hbase)支持 cache 功能 [1]。cache 功能可以在每次 cache 刷新的时候把数据加载到
task manager 内存中,这样主流来数据时只需要从 task manager 内存中查询对应数据即可,不必去外部系统查询。
另外查询逻辑下沉到数据库具体指的是什么?能否详细说明一下。
[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connec
Hi!
filesystem sink 的文件数量与 sink 并发数有关。如果数据量不大可以考虑在 sink DDL 的 with 参数里加入
'sink.parallelism' = '1' 设置 sink 并发度为 1。
陈卓宇 <2572805...@qq.com.invalid> 于2021年11月11日周四 下午4:50写道:
> 问题描述:写到本地产生了8个part-817677cc-2f4b-464a-bf9e-11957bcf9c76-0-0 ...的文件
> 请问,我只想写成一个csv文件,如果关闭这种文件分区
>
>
>
> Flink SQL:
> String
共有 613 项搜索結果,以下是第 1 - 100 matches
Mail list logo