Re: flinksql 经过优化后,group by字段少了

2024-05-20 文章 Lincoln Lee
flink中是仍然存在这个问题。 > > > > > -- 原始邮件 -- > 发件人: > "user-zh" > < > libenc...@apache.org>; > 发送时间: 2024年5月20日(星期一) 中午12:51 > 收件人

Re: flinksql 经过优化后,group by字段少了

2024-05-19 文章 Benchao Li
> 发送时间: 2024年5月20日(星期一) 上午10:32 > 收件人: "user-zh" > 主题: Re: flinksql 经过优化后,group by字段少了 > > > > 看起来像是因为 "dt = cast(CURRENT_DATE  as string)" 推导 dt 这个字段是个常量,进而被优化掉了。 > > 将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛

Re: flinksql 经过优化后,group by字段少了

2024-05-19 文章 Benchao Li
看起来像是因为 "dt = cast(CURRENT_DATE as string)" 推导 dt 这个字段是个常量,进而被优化掉了。 将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛? ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月19日周日 01:01写道: > > create view tmp_view as > SELECT > dt, -- 2 > uid, -- 0 > uname, -- 1 > uage -

Re: FlinkSQL connector jdbc 用户密码是否可以加密/隐藏

2024-03-10 文章 gongzhongqiang
hi, 东树 隐藏sql中的敏感信息,这个需要外部的大数据平台来做。 比如:StreamPark 的变量管理,可以提前维护好配置信息,编写sql时引用配置,由平台提交至flink时解析sql并替换变量。 Best, Zhongqiang Gong 杨东树 于2024年3月10日周日 21:50写道: > 各位好, >考虑到数据库用户、密码安全性问题,使用FlinkSQL connector > jdbc时,请问如何对数据库的用户密码进行加密/隐藏呢。例如如下常用sql中的password: > CREATE TABLE wo

Re: FlinkSQL connector jdbc 用户密码是否可以加密/隐藏

2024-03-10 文章 Feng Jin
1. 目前 JDBC connector 本身不支持加密, 我理解可以在提交 SQL 给 SQL 文本来做加解密的操作,或者做一些变量替换来隐藏密码。 2. 可以考虑提前创建好 jdbc catalog,从而避免编写 DDL 暴露密码。 Best, Feng On Sun, Mar 10, 2024 at 9:50 PM 杨东树 wrote: > 各位好, >考虑到数据库用户、密码安全性问题,使用FlinkSQL connector > jdbc时,请问如何对数据库的用户密码进行加密/隐藏呢。例如如下常用sql中的password: >

FlinkSQL connector jdbc 用户密码是否可以加密/隐藏

2024-03-10 文章 杨东树
各位好, 考虑到数据库用户、密码安全性问题,使用FlinkSQL connector jdbc时,请问如何对数据库的用户密码进行加密/隐藏呢。例如如下常用sql中的password: CREATE TABLE wordcount_sink ( word String, cnt BIGINT, primary key (word) not enforced ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localho

flinksql以时间函数过滤数据场景求助

2024-01-11 文章 张河川
flink版本1.18 场景如下: A表字段: id,update_time(date格式) 一条数据: 1,2023-01-12 现在我需要保留update_time+1年,大于当前日。 简单地写一个sql: select id,update_time from A where TIMESTAMPADD(YEAR,1,update_time) > CURRENT_DATE; 结果: 在2024年1月11日这一天,where条件达成,这条数据不会被过滤掉; 在2024年1月12日,sql并不会触发计算来过滤掉此条数据。 在真实的场景中,update_time跨度很多年

回复: FlinkSQL大窗口小步长的滑动窗口解决方案

2023-05-28 文章 tanjialiang
式写入一个OLAP系统(譬如doris/ck),读时再聚合(需要一个稳定可靠的外部存储) 你这边用flink做滑动窗口的计算会遇到这样的问题吗?是否还有其他更好解决办法? 十分期待你的反馈 best, tanjialiang. 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年5月29日 09:08 | | 收件人 | | | 主题 | Re: FlinkSQL大窗口小步长的滑动窗口解决方案 | Hi, 这是窗口触发后发送的数据量过大吗?调大资源,加大窗口计算的并发度是否可以缓解这个问题? Best, Shammon FY

Re: FlinkSQL大窗口小步长的滑动窗口解决方案

2023-05-28 文章 Shammon FY
Hi, 这是窗口触发后发送的数据量过大吗?调大资源,加大窗口计算的并发度是否可以缓解这个问题? Best, Shammon FY On Fri, May 26, 2023 at 2:03 PM tanjialiang wrote: > Hi, all. > 我在使用FlinkSQL的window tvf滑动窗口时遇到一些问题。 > 滑动步长为5分钟,窗口为24小时,group by > user_id的滑动窗口,当任务挂掉了或者从kafka的earliest-offset消费,checkpoint很难成功。 > 因为从earliest开始消费,数据很

FlinkSQL大窗口小步长的滑动窗口解决方案

2023-05-25 文章 tanjialiang
Hi, all. 我在使用FlinkSQL的window tvf滑动窗口时遇到一些问题。 滑动步长为5分钟,窗口为24小时,group by user_id的滑动窗口,当任务挂掉了或者从kafka的earliest-offset消费,checkpoint很难成功。 因为从earliest开始消费,数据很快就会堆满缓冲区产生背压,这时这一批数据可能会触发N次窗口计算往下游发,每次触发的操作成本是(用户基数 * 24 * 60 / 5),checkpoint barrier可能会一直卡住。 这时候有什么办法可以破局吗? best, tanjialiang.

Re: Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-08 文章 Jane Chan
从 plan 上看起来在 sink 节点这里因为推导不出 upsert key 加上了 SinkUpsertMaterializer[1], 这里会按照 sink 表定义的主键进行 keyby shuffle[2], 只能保证最终一致性. 另外你的操作描述中 schema 为三列, 但 DDL 是四列, 且格式乱了. 一些可能的建议如下 1. 如果上游数据有主键并且也是 rowid 的话, 建议在 Flink source 表上声明 PK, 避免额外生成 materializer 节点; 同时注意在声明 Flink source 表时不要带上 metadata 列 (比如 op), 这会

Re: flinkSQL无法实现两个timestamp(3) 相减

2023-03-06 文章 Shammon FY
Hi 如果没有现成的系统函数,你可以写个自定义udf来实现 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/ Best, Shammon On Mon, Mar 6, 2023 at 7:46 PM 唐世伟 wrote: > > 我们需要对两个时间相减,精度为毫秒。但是无论是TIMESTAMPDIFF函数,还是先转成UNIX_TIMESTAMP,都只支持秒的精度。请问还有其他方法吗?

flinkSQL无法实现两个timestamp(3) 相减

2023-03-06 文章 唐世伟
我们需要对两个时间相减,精度为毫秒。但是无论是TIMESTAMPDIFF函数,还是先转成UNIX_TIMESTAMP,都只支持秒的精度。请问还有其他方法吗?

Re:Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-05 文章 陈佳豪
刚做了一下测试 目前假定有3行数据需要同步(全量): | 编号 | 电话 | 座机 | | 1 | 1311313 | 123 | | 2 | 1311313 | 456 | | 3 | 1311313 | 789 | 这个时候我修改第四行数据的两个字段(增量): | 1 | 电话 | 座机 | | 1 | 1311313 | 123 | | 2 | 1311313 | 456 | | 3 | 13113133110 | 888 | 修改完后我删除字段2这个时候去mysql看结果2是正确被删除,且无新增的(操作正确). 然后我继续删除数据3

Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-05 文章 陈佳豪
hi 早上好 我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下 == Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 座机]) +- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"], 名称=[$1], 手机=[CAST($2):VA

Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-04 文章 Jane Chan
Hi, 抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在 1.16.1 上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan 打印出来看看. [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/ 祝好! Jane On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪

Re:Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-04 文章 陈佳豪
hi 你好 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的 在 2023-03-02 11:52:41,"Jane Chan" 写道: >Hi, > >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个 >query 在 1.16.2 上验证没有问题 > >[1] >https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/ex

Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-01 文章 Jane Chan
Hi, 可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个 query 在 1.16.2 上验证没有问题 [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/ Best, Jane On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 wrote: > flink ,kafka连接 jdbc连接版本都是1.15.2的 > > > > > > > >

Re:使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-01 文章 陈佳豪
flink ,kafka连接 jdbc连接版本都是1.15.2的 在 2023-03-01 18:14:35,"陈佳豪" 写道: >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 >String kafka = "CREATE TABLE `电话` (`rowid` >VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` >VARCHAR(2147483647),`63fd660536521f81a2cfabad` >VARCHAR(65535),`63fd66

使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-01 文章 陈佳豪
问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 String kafka = "CREATE TABLE `电话` (`rowid` VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` VARCHAR(2147483647),`63fd660536521f81a2cfabad` VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535) ) WITH ( 'connector' = 'kafka', 'topic' = 'sz_work

Re: FlinkSql如何实现水位线对齐

2023-02-22 文章 Shammon FY
Hi 目前SQL还不支持watermark对齐,目前有FLIP正在讨论中 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240884405 Best, Shammon On Wed, Feb 22, 2023 at 3:15 PM haishui wrote: > Hi, all > 以并行度4读取kafka的topic1和topic2形成两个流,然后IntervalJoin。在kafka堆积大量数据的情况下,我分别用SQL和DataStream > API实现了上述功能。 > > > 使用SQL

FlinkSql如何实现水位线对齐

2023-02-21 文章 haishui
Hi, all 以并行度4读取kafka的topic1和topic2形成两个流,然后IntervalJoin。在kafka堆积大量数据的情况下,我分别用SQL和DataStream API实现了上述功能。 使用SQL实现的作业中IntervalJoin算子的状态会逐渐增大,直到checkpoint失败。原因是在8个Source分区中输出水位线差距很大。 使用API实现的作业,在使用Flink15版本的水位线对齐后可以保证正常读取topic内的所有数据。 想请教一下大家如何在SQL上解决Source处水位线差距过大,数据堆积导致checkpoint失败问题。还有如果只有一个topic

Re: flinksql join

2022-11-16 文章 Zhiwen Sun
dob_dim_account 维表如果使用 jdbc 的 connector, flink 会在初始化的时候一次性读取所有的数据, 后续数据库中更新并不会触发 flink 计算。 要解决这个问题, dob_dim_account 需要变成流表。 Zhiwen Sun On Thu, Nov 17, 2022 at 1:56 PM Jason_H wrote: > hi,你好 > 这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题 > > > | | > Jason_H >

Re: flinksql join

2022-11-16 文章 Jason_H
hi,你好 这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题 | | Jason_H | | hyb_he...@163.com | Replied Message | From | 任召金 | | Date | 11/15/2022 09:52 | | To | user-zh | | Subject | Re: flinksql join | hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL

Re: flinksql join

2022-11-14 文章 Jason_H
hi,你好 我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。 | | Jason_H | | hyb_he...@163.com | Replied Message | From | RS | | Date | 11/15/2022 09:07 | | To | user-zh@flink.apache.org | | Subject | Re:flinksql join | Hi, 我的理解是后插入的维表数据,关联不到是正常现象, 如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据

Re: flinksql join

2022-11-14 文章 Jason_H
hi,你好 我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。 | | Jason | | hyb_he...@163.com | Replied Message | From | RS | | Date | 11/15/2022 09:07 | | To | user-zh@flink.apache.org | | Subject | Re:flinksql join | Hi, 我的理解是后插入的维表数据,关联不到是正常现象, 如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据

Re: flinksql join

2022-11-10 文章 Jason_H
\n" + "SUPER_ORG_ID string, \n" + "IS_OUTSIDE BIGINT \n" + ") \n" + "WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = '***',\n" + " 'driver' = 'com.mysql.c

Re: flinksql join

2022-11-10 文章 Jason_H
\n" + "SUPER_ORG_ID string, \n" + "IS_OUTSIDE BIGINT \n" + ") \n" + "WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = '***',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver'

Re: flinksql join

2022-11-10 文章 Jason_H
7;***',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + //" 'lookup.cache

Re: flinksql join

2022-11-10 文章 Zhiwen Sun
用普通的 join, 不要用 lookup join Zhiwen Sun On Fri, Nov 11, 2022 at 11:10 AM Jason_H wrote: > > > hi,大家好 > > 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: > kakfa输入: > 账号 金额 笔数 > 100 1

flinksql join

2022-11-10 文章 Jason_H
hi,大家好 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: kakfa输入: 账号 金额 笔数 100 1 -> 未匹配 100 1 -> 未匹配 100 1 -> 匹配上 维表 账号 企业 -> 后插入的账号信息 实际输出结果 企业 金额

Re: flinksql-redis插件

2022-10-27 文章 Shengkai Fang
Hi. 可以看看这个回答[1]。 Best, Alibaba [1] https://stackoverflow.com/questions/71779752/why-is-redis-source-connector-not-available-for-flink Jason_H 于2022年10月26日周三 14:23写道: > hi, > 请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。 > > > | | > Jason_H >

Re: flinksql 维表join

2022-10-27 文章 Lincoln Lee
Hi, Flink 的 lookup join 目前不支持对维表进行预处理, 并且需要有对维表原始字段的等值连接条件(因为需要通过确定的字段值去查找) 示例中 t4 字段不做计算应该是 work 的, 比如 udf(t1.telephone_no) = t4.de_mobile Best, Lincoln Lee Fei Han 于2022年10月27日周四 12:12写道: > 大家好!请教几个问题 > 1. FlinkSQL维表join的时候,能不能把维表先用flinksql 当做一个临时表,然后在用临时表做维表join > 例如:临

flinksql-redis-connector

2022-10-26 文章 Jason_H
hi, 请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。 | | Jason_H | | hyb_he...@163.com |

flinksql 维表join

2022-10-26 文章 Fei Han
大家好!请教几个问题 1. FlinkSQL维表join的时候,能不能把维表先用flinksql 当做一个临时表,然后在用临时表做维表join 例如:临时表 WITH employee_tmp AS( select userid as userid, name as name, mobile as de_mobile from ygp_dwd_catalog.flink_dwd.employee ) select * from ( select * from ygp_dwd_catalog.flink_dwd.xxx ) t1 left join

flinksql-redis插件

2022-10-25 文章 Jason_H
hi, 请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。 | | Jason_H | | hyb_he...@163.com |

Flinksql 计算特征标签自身状态变化

2022-07-25 文章 andrew
Dear Flink: 一需求, FlinkSQL可以用udf实现,捕获标签值的变化。 例如: 若当前用户由低端用户变为中端用户或由中端用户变为高端用户,输出只要用户state状态发生变化,结果用户状态打标为1,反之为0; 有什么好的实现方式没?

Re:关于flinksql聚合函数实现的学习疑问

2022-06-01 文章 Xuyang
/StreamExecGroupWindowAggregateBase.scala#L171 在 2022-06-01 15:41:05,"hdxg1101300...@163.com" 写道: >您好: > 最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4) > 比如这样一条sql语句: > select >dim, >count(*) as pv, >sum(price) as sum_price, >max(price) as max_price, >min(price) as min_price

Re: 关于flinksql聚合函数实现的学习疑问

2022-06-01 文章 陈铜玖
Dear: 您可以首先建一个这样的对象 class Acc{    long sum;    long max;    long min;    ... } 在 AggregateFunction 里面维护这样的 ACC , 就可以在 add 方法里面对维护的 acc 和新传入的值之间实现多需求下的结果更新。 不知道你想了解的是不是这个意思       -- Original -- From:  "Lincoln Lee"

Re: 关于flinksql聚合函数实现的学习疑问

2022-06-01 文章 Lincoln Lee
flink sql 的实现可以参考下 flink-table planner&runtime 部分的代码 从 datastream 层面来说, 可以基于 KeyedProcessFunction, 比如 datastream.keyby(...).process(keyedProcessFunction)... 来实现自己定义的逻辑 Best, Lincoln Lee hdxg1101300...@163.com 于2022年6月1日周三 15:49写道: > 您好: >最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4

关于flinksql聚合函数实现的学习疑问

2022-06-01 文章 hdxg1101300...@163.com
您好: 最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4) 比如这样一条sql语句: select dim, count(*) as pv, sum(price) as sum_price, max(price) as max_price, min(price) as min_price, -- 计算 uv 数 count(distinct user_id) as uv, UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval

Re: Re: flinksql关联hive维表java.lang.UnsupportedOperationException错误

2022-05-25 文章 Jingsong Li
dxg1101300...@163.com > > > *发件人:* Jingsong Li > *发送时间:* 2022-05-26 14:47 > *收件人:* hdxg1101300123 > *抄送:* dev > *主题:* Re: Re: flinksql关联hive维表java.lang.UnsupportedOperationException错误 > Please don't use Chinese on the dev mailing list to discuss issues, I've >

Re: flinksql关联hive维表java.lang.UnsupportedOperationException错误

2022-05-25 文章 Jingsong Li
>> 你好: >> 我在使用flink1.12.4版本和hive1.1.0版本时遇到下面的错误: >> >> >> 场景时使用hive的catalog管理元数据,用flinksql把Kafka的数据注册成输入表,关联hive维表做数据拉宽;提交任务到yarn时遇到如下错误; >>sql如下: >>create view if not exists dwm_ai_robot_contact_view as select >> CALLER,

【Could we support distribute by For FlinkSql】

2022-05-08 文章 lpengdr...@163.com
Hello: Now we cann't add a shuffle-operation in a sql-job. Sometimes , for example, I have a kafka-source(three partitions) with parallelism three. And then I have a lookup-join function, I want process the data distribute by id so that the data can split into thre parallelism evenly (The so

?????? FlinkSQL ????k8s??????????

2022-04-25 文章 ??????
?? dlink ?? FlinkSQL?? https://github.com/DataLinkDC/dlink --  -- ??: "us

Re: FlinkSQL 对接k8s的提交问题

2022-04-25 文章 Yang Wang
get(); LOG.info("Job {} is submitted successfully", jobID); } } } Best, Yang 吕宴全 <1365976...@qq.com.invalid> 于2022年4月24日周日 14:45写道: > 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application > mode)和即席查询(Session mode)。在Application模式下,从jar中构建job

Re: FlinkSQL 对接k8s的提交问题

2022-04-25 文章 LuNing Wang
SQL Client的Application模式现在还不支持,方案在设计中。 https://issues.apache.org/jira/browse/FLINK-26541 吕宴全 <1365976...@qq.com.invalid> 于2022年4月24日周日 14:45写道: > 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application > mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下: > >

FlinkSQL ????k8s??????????

2022-04-23 文章 ??????
??KyuubiFlinkSQL??k8s(Application mode)??(Session mode)Application??jar??jobgraph??SQL?? CREATE TABLE T ( id INT  ) WITH ( 'connector.type' = 'filesystem', 'connector.path' = '

FlinkSQL

2022-04-23 文章 ??????
??KyuubiFlinkSQL??k8s(Application mode)??(Session mode)Application??jar??jobgraph??SQL?? CREATE TABLE T ( id INT  ) WITH ( 'connector.type' = 'filesystem', 'connector.path' = '

Re: flinksql执行时提示自定义UDF无法加载的

2022-04-11 文章 Zhanghao Chen
你好,可以贴下客户端的具体提交命令吗? Best, Zhanghao Chen From: 799590...@qq.com.INVALID <799590...@qq.com.INVALID> Sent: Tuesday, April 12, 2022 10:46 To: user-zh Subject: flinksql执行时提示自定义UDF无法加载的 环境信息 flink-1.13.6_scala_2.11 java 1.8 使用的是standalonesession集群模式,n

flinksql执行时提示自定义UDF无法加载的

2022-04-11 文章 799590...@qq.com.INVALID
环境信息 flink-1.13.6_scala_2.11 java 1.8 使用的是standalonesession集群模式,node01为jobmanager node02和node03为taskmanager UDF代码 package com.example.udf; import org.apache.flink.table.functions.ScalarFunction; public class SubStr extends ScalarFunction { public String eval(String s, Integer start,Inte

Re: flinksql 使用kafka connector (自定义的cdc格式) 消费删除语义的数据没有纳入计算

2022-03-31 文章 Guo Thompson
看不到图 赵旭晨 于2022年3月15日周二 12:25写道: > flink版本:1.14.3 场景如下: > sql: > set table.exec.state.ttl=1 day; > describe t_k_chargeorder; > describe t_k_appointment; > SELECT > ReportTime, > sum( InsertAppointmentCount ) + sum( InsertChargeOrderCount ) > kpitotalcount, > sum( InsertActualPriceCount ) Insert

flinksql 使用kafka connector (自定义的cdc格式) 消费删除语义的数据没有纳入计算

2022-03-14 文章 赵旭晨
flink版本:1.14.3 场景如下: sql: set table.exec.state.ttl=1 day; describe t_k_chargeorder; describe t_k_appointment; SELECT ReportTime, sum( InsertAppointmentCount ) + sum( InsertChargeOrderCount ) kpitotalcount, sum( InsertActualPriceCount ) InsertActualPriceCount, sum( InsertAppointmentCount ) Insert

Re: FlinkSQL vs DataStream API 对硬件的需求

2022-02-17 文章 yue ma
Hello , Flink SQL 会将SQL解析、优化并最终翻译成DataStream作业,所以本质上跟直接用DataStream API直接写Flink作业没有根本的区别,反而会因为一些通用的优化和代码生成,在性能上可能会有一些提升。 Pinjie Huang 于2022年2月18日周五 11:28写道: > Hi all, > > 同样一个task 用flinksql 写和DataStreamAPI写比较,是否会占用更多的CPU和memory?是否有performance > 比较的benchmark? > > Thanks, > Pinjie Huang >

Re: FlinkSQL vs DataStream API 对硬件的需求

2022-02-17 文章 Jie Han
我理解只是不同api而已 > 2022年2月18日 上午11:18,Pinjie Huang 写道: > > Hi all, > > 同样一个task 用flinksql 写和DataStreamAPI写比较,是否会占用更多的CPU和memory?是否有performance > 比较的benchmark? > > Thanks, > Pinjie Huang

FlinkSQL vs DataStream API 对硬件的需求

2022-02-17 文章 Pinjie Huang
Hi all, 同样一个task 用flinksql 写和DataStreamAPI写比较,是否会占用更多的CPU和memory?是否有performance 比较的benchmark? Thanks, Pinjie Huang

??????flinksql ????collect??????????????????Multiset????????java udf????????????????????????????????

2021-12-27 文章 Mr.S
hello , listagg, --  -- ??: "Mr.S"

Re: flinksql钩子函数

2021-12-26 文章 Caizhi Weng
/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html 陈卓宇 <2572805...@qq.com.invalid> 于2021年12月25日周六 23:30写道: > 您好社区: >     场景是这样的: > > 我司要求将标签数据每日同步一份到mongodb供业务开发同学进行使用,面临这样一个问题,我是不能先删表在建表的,这样会导致接口查询mongodb出现数据查询不到的风险。而是使用切表,将同步表设为:表名_时间戳,将历史表删除,在将同步表改为正确表

flinksql????????

2021-12-25 文章 ??????
??     ?? mongodbmongodb??:_??flinksql

Re: flinksql的await

2021-12-21 文章 Jing Ge
陈卓宇 你好, 在默认情况下,所有提交后的DML都是异步执行的,详见TableEnvironment.executeSql(String statement)的注释。使用.await()和不使用.await()的区别是使用await()后会等待异步查询返回第一行结果(题外话:请注意INSERT和SELECT的区别),详见TableResult.await()注解,具体代码见TableResultImpl.awaitInternal(long timeout, TimeUnit unit), 由于此时入参timeout为-1,导致future.get()被调用, 强制等待resultProv

flinksql??await

2021-12-21 文章 ??????
?? String initialValues = "INSERT INTO kafka\n" + "SELECT CAST(price AS DECIMAL(10, 2)), currency, " + " CAST(d AS DATE), CAST(t AS TIME(0)), CAST(ts AS TIMESTAMP(3))\n" + "FROM (VALUES (2.02,'Euro','2019-12-12', '00:00:01', '2019-1

回复: flinksql自定义udaf函数

2021-12-20 文章 Chuang Li
在自定义udaf函数实现中使用了一些flinksql不支持的数据类型 想请问如何进行自定义数据类型的实现 Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. An error occurred in the type inference logic of function 'Average'.     at org.apache.flink.table.planner.calcite.FlinkPla

Re: flinksql自定义udaf函数

2021-12-20 文章 Caizhi Weng
> 于2021年12月20日周一 17:00写道: > 在自定义udaf函数实现中使用了一些flinksql不支持的数据类型 > 想请问如何进行自定义数据类型的实现 > > > > > > > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. An error occurred in the type

flinksql??????udaf????

2021-12-20 文章 ??????
udafflinksql ?? Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. An error occurred in the type inference logic of function 'A

??????flinksql source ????????????????????????

2021-12-14 文章 ????
--  -- ??: "user-zh"

flinksql source 算子并行度与数据分发策略

2021-12-13 文章 janke
您好,目前flinkSQL KafkaConnector目前不支持Source算子并行度配置以及数据分发策略选择: 导致的问题: 1、当并行度 > source中间件消息分区时, 超出的部分会空跑占用资源 2、假设: 并行度>消息分区, source vertex: Source: TableSourceScan->Calc(select…); 当Calc(select..)为cpu密集(json解析)算子,其将会成为任务的性能瓶颈,Calc算子无法享受到扩容带来的资源(超出kafka分区的并发没有数据流入)。 请问下目前社区有没有现有的 / 计划中的解决方案?

flinksql source 算子并行度与数据分发策略

2021-12-13 文章 吴Janick
您好,目前flinkSQL KafkaConnector目前不支持Source算子并行度配置以及数据分发策略选择: 导致的问题: 1、当并行度 > source中间件消息分区时, 超出的部分会空跑占用资源 2、假设: 并行度>消息分区, source vertex: Source: TableSourceScan->Calc(select…); 当Calc(select..)为cpu密集(json解析)算子,其将会成为任务的性能瓶颈,Calc算子无法享受到扩容带来的资源(超出kafka分区的并发没有数据流入)。 请问下目前社区有没有现有的 / 计划中的解决方案?

flinksql source 算子并行度与数据分发策略

2021-12-13 文章 吴Janick
您好,目前flinkSQL KafkaConnector目前不支持Source算子并行度配置以及数据分发策略选择: 导致的问题: 1、当并行度 > source中间件消息分区时, 超出的部分会空跑占用资源 2、假设: 并行度>消息分区, source vertex: Source: TableSourceScan->Calc(select…); 当Calc(select..)为cpu密集(json解析)算子,其将会成为任务的性能瓶颈,Calc算子无法享受到扩容带来的资源(超出kafka分区的并发没有数据流入)。 请问下目前社区有没有现有的 / 计划中的解决方案?

Re: flinksql相关问题

2021-12-11 文章 Caizhi Weng
Hi! stmtSet.execute() 默认是异步的,只是提交作业而不会等待作业完成。如果需要等待作业完成再进行后续步骤,需要用 stmtSet.execute().await()。 陈卓宇 <2572805...@qq.com.invalid> 于2021年12月10日周五 20:25写道: > 您好社区: >      > 我在使用flinksql将数据表A_now写入到数据库中后还有一步操作:将表A删除,完成将A_now更名为A,的切表操作。 > 发现当执行: > //sql 插入数据到数据库操作 >

flinksql????????

2021-12-10 文章 ??????
??      flinksqlA_now:AA_now??A?? //sql StatementSet stmtSet = tenv.createStatementSet () ; stmtSet.addInsertSql ( insertSqlMongoDB ) ; stmtSet.addInsertSql ( insertSql

Re: FlinkSQL kafka2hive每次检查点导致任务失败

2021-11-28 文章 yidan zhao
hi,有人清楚如上问题吗,确认下是不是bug,我感觉是某种情况下会导致的问题,这个情况大概率是flink应该兼容考虑的。 yidan zhao 于2021年11月26日周五 下午2:19写道: > 我认为这个应该是bug。 > > yidan zhao 于2021年11月26日周五 上午11:18写道: > >> 如题,注意,非检查点本身失败,而是检查点完成后导致任务失败。 >> >> 目前跟进报错是PartitionTimeCommitTrigger.committablePartitions部分如下代码报的异常: >> >> if (!watermarks.containsKey

Re: FlinkSQL kafka2hive每次检查点导致任务失败

2021-11-25 文章 yidan zhao
我认为这个应该是bug。 yidan zhao 于2021年11月26日周五 上午11:18写道: > 如题,注意,非检查点本身失败,而是检查点完成后导致任务失败。 > > 目前跟进报错是PartitionTimeCommitTrigger.committablePartitions部分如下代码报的异常: > > if (!watermarks.containsKey(checkpointId)) { > throw new IllegalArgumentException( > String.format( >

FlinkSQL kafka2hive每次检查点导致任务失败

2021-11-25 文章 yidan zhao
如题,注意,非检查点本身失败,而是检查点完成后导致任务失败。 目前跟进报错是PartitionTimeCommitTrigger.committablePartitions部分如下代码报的异常: if (!watermarks.containsKey(checkpointId)) { throw new IllegalArgumentException( String.format( "Checkpoint(%d) has not been snapshot. The watermark information is

Re: FlinkSql回撤流

2021-11-24 文章 wushijjian5
这个可以,非常感谢。 select user, sum(num * IF(flag=1, 1, 0)) as num > from ( > select user, ord, LAST_VALUE(num) as num, LAST_VALUE(flag) as flag > from tmpTable, > group by user, ord > ) t1 > group by user --- > 2021年11月25日 11:19,Tony Wei 写道: > > 上一封的 sql 稍微有誤,不需要 group by user, ord 才對: > >

Re: FlinkSql回撤流

2021-11-24 文章 Tony Wei
上一封的 sql 稍微有誤,不需要 group by user, ord 才對: select user, sum(num) as num > from ( > select user, ord, num * IF(flag=1, 1, -1) as num > from tmpTable > ) t1 > group by user 或者也可以考慮這種寫法: select user, sum(num * IF(flag=1, 1, 0)) as num > from ( > select user, ord, LAST_VALUE(num) as num, LAST

Re: FlinkSql回撤流

2021-11-24 文章 Tony Wei
Hi, 對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為: +--+---+ | user | num | +--+---+ | b | 20| +--+---+ 因為 user a 被 `where flag = 1` 過濾掉了,所以實際下游應該是撤回了 insert 的操作才對,而不是更新成 num = 0。 或許可以考慮把 sql 寫法改為這樣試試? select user, sum(num) as num > from ( > select use

Re: FlinkSQL源码分段优化中,物理计划转换为ExecNodeGraph的时候,SameRelObjectShuttle、SubplanReuseShuttle一拆一合目的是啥

2021-11-24 文章 Caizhi Weng
Hi! 这是因为我们有配置关闭 subplan reuse 和 source reuse,因此需要先把 plan 拆开,然后再判断是否允许 reuse,如果允许才能合并。 岳晗 于2021年11月24日周三 下午3:55写道: > Hi, > > > 请问下FlinkSQL物理计划转换为ExecNodeGraph的时候,拿到optimizedRelNodes后, > > > 首先执行:SameRelObjectShuttle Rewrite same rel object to different rel objects

Re: FlinkSql回撤流

2021-11-24 文章 wushijjian5
Hi, 这三条数据的话: new Tuple4<>("a", "a1",30,1),new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0) 计算结果是: | +I | a | 30 | | +I | b | 20 | | -D | a | 30 | 实际想要的是 a 30 b

Re: FlinkSql回撤流

2021-11-24 文章 Caizhi Weng
Hi! 无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的 CloseableIterator,然后通过 Row#getKind 获得该 row 对应的 op。 顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。 wushijjian5 于2021年11月24日周三 下午9:05写道: > > DataStream> dataStream = > env.fromElements( > new Tuple4<>("a",

FlinkSQL??????????????????????????????ExecNodeGraph????????SameRelObjectShuttle??SubplanReuseShuttle????????????????

2021-11-23 文章 ????
Hi?? ??FlinkSQL??ExecNodeGraphoptimizedRelNodes ??SameRelObjectShuttle Rewrite same rel object to different rel objects. e.g.       Join                       Join      /    \                     /    \  Filter1 Filter2     =>     Filter1 Filt

Re: FlinkSQL ES7连接器无法使用

2021-11-22 文章 Leonard Xu
这是个依赖问题,你检查下你环境中是否只使用sql connector 的jar,即 flink-sql-connector-elasticsearch7, 如果不是 datastream 作业是不需要 flink-connector-elasticsearch7 这个 jar包的。如果不是这个问题,你可以分析下你作业里使用的 es 相关依赖,可以参考异常栈确定类再去确定jar包,看下是不是多加了一些无用的jar。 祝好, Leonard > 在 2021年11月22日,12:30,mispower 写道: > > 你好,咨询一下后续你这个问题是如何解决的? > > >

Re: FlinkSQL写hive orc表开启compact,任务异常重启,compact不生效?

2021-11-19 文章 yidan zhao
hi,还是这个问题,请问有什么确定的方法能确认某个文件属于无用,还是有用吗。 比如一种复杂的方式是:判定任务当前运行到什么时间点,比如14点,认为12点的数据已经完整了,则12点对应的分区中.开头文件都可以删除。但这种判定需要结合任务的watermark看任务跑到什么时间等,复杂性较高。 话说success文件可行吗,compact结束才有success?还是先有success后再慢慢compact呢。如果是前者,我可以写个ct脚本,遍历目录下存在success的情况下,则可以删除该目录下.开头的全部文件。 yidan zhao 于2021年11月17日周三 上午10:22写道: >

Re: Re: flinksql 写 hive ,orc格式,应该支持下压缩。

2021-11-17 文章 yidan zhao
好的 RS 于2021年11月18日周四 上午9:32写道: > 1. 文件名是不带.zlib后缀的 > 2. > ORC格式默认是配置了ZIP压缩的,并且开启的,你可以配置'orc.compress'='NONE'测试下,看下不压缩的大小,没有压缩的文件应该是更大的 > > > 在 2021-11-16 17:29:17,"yidan zhao" 写道: > >我看了下,默认不带.zlib之类的后缀,我加了也看不出来到底有没有压缩。 > >其次,orc.compression官方介绍默认是zlib,貌似默认就有开启压缩? > > > >RS 于2021年11月15日周一 上午9:

Re:Re: flinksql 写 hive ,orc格式,应该支持下压缩。

2021-11-17 文章 RS
1. 文件名是不带.zlib后缀的 2. ORC格式默认是配置了ZIP压缩的,并且开启的,你可以配置'orc.compress'='NONE'测试下,看下不压缩的大小,没有压缩的文件应该是更大的 在 2021-11-16 17:29:17,"yidan zhao" 写道: >我看了下,默认不带.zlib之类的后缀,我加了也看不出来到底有没有压缩。 >其次,orc.compression官方介绍默认是zlib,貌似默认就有开启压缩? > >RS 于2021年11月15日周一 上午9:55写道: > >> 官网里面有介绍这个,你是要这个吧 >> >> https://nightlies.

Re: FlinkSQL写hive orc表开启compact,任务异常重启,compact不生效?

2021-11-16 文章 yidan zhao
还有基于检查点启动,首先数据完整性最终实际没问题对吧。 yidan zhao 于2021年11月17日周三 上午10:22写道: > 出错原因是因为机器不稳定,tm超时等。 > 话说这种有什么判别方法用于定期清理吗。 > > Caizhi Weng 于2021年11月17日周三 上午9:50写道: > >> Hi! >> >> 因为 compact 是在每次 checkpoint 的时候进行的,在做 checkpoint 之前产生的文件都是以 . >> 开头的,表示当前不可见。只有 >> checkpoint >> 之后才会重命名为可见文件。因此如果任务频繁出现错误,这些不可见文件就

Re: FlinkSQL写hive orc表开启compact,任务异常重启,compact不生效?

2021-11-16 文章 yidan zhao
出错原因是因为机器不稳定,tm超时等。 话说这种有什么判别方法用于定期清理吗。 Caizhi Weng 于2021年11月17日周三 上午9:50写道: > Hi! > > 因为 compact 是在每次 checkpoint 的时候进行的,在做 checkpoint 之前产生的文件都是以 . 开头的,表示当前不可见。只有 > checkpoint > 之后才会重命名为可见文件。因此如果任务频繁出现错误,这些不可见文件就会留在目录里,导致文件数增加。建议首先把任务为什么频繁出错查出来。 > > yidan zhao 于2021年11月16日周二 下午5:36写道: > > > > >

Re: FlinkSQL写hive orc表开启compact,任务异常重启,compact不生效?

2021-11-16 文章 Caizhi Weng
Hi! 因为 compact 是在每次 checkpoint 的时候进行的,在做 checkpoint 之前产生的文件都是以 . 开头的,表示当前不可见。只有 checkpoint 之后才会重命名为可见文件。因此如果任务频繁出现错误,这些不可见文件就会留在目录里,导致文件数增加。建议首先把任务为什么频繁出错查出来。 yidan zhao 于2021年11月16日周二 下午5:36写道: > > 如题,目前没有具体看是没生效,还是来不及compact。任务正常情况没问题,但是如果任务出现频繁错误,导致过一会重启一次,这种情况导致文件数暴增,compact功能不生效。 >

FlinkSQL写hive orc表开启compact,任务异常重启,compact不生效?

2021-11-16 文章 yidan zhao
如题,目前没有具体看是没生效,还是来不及compact。任务正常情况没问题,但是如果任务出现频繁错误,导致过一会重启一次,这种情况导致文件数暴增,compact功能不生效。

Re: flinksql 写 hive ,orc格式,应该支持下压缩。

2021-11-16 文章 yidan zhao
我看了下,默认不带.zlib之类的后缀,我加了也看不出来到底有没有压缩。 其次,orc.compression官方介绍默认是zlib,貌似默认就有开启压缩? RS 于2021年11月15日周一 上午9:55写道: > 官网里面有介绍这个,你是要这个吧 > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/orc/ > > > Orc format also supports table properties from Table properties. For > e

Re: Re: FlinkSQL 1.12 Temporal Joins 多表关联问题

2021-11-15 文章 Shengkai Fang
; 引擎 查询性能是可以接受的 > > > > --- > Best, > WuKong > >   > 发件人: Caizhi Weng > 发送时间: 2021-11-12 11:32 > 收件人: flink中文邮件组 > 主题: Re: FlinkSQL 1.12 Temporal Joins 多表关联问题 > > > Hi! >   > 这是说每次主流来一条数据,都要去维表里查询一次吗?然后你想每次攒一批数据,一次性查询以提高性能? >   > 如果是的话,一部分维

?????? Flinksql ????????full join ????????

2021-11-12 文章 ??????
: mongosql ??sql: CREATE TABLE label (   distinct_id BIGINT,   xwho String, sync5 decimal, sync4 decimal, sync6 string, syncea1 string, aa string, ceshi string, ttt string, tongji decimal, qweqwe array

Re: Flinksql 多表进行full join 出现异常

2021-11-11 文章 Jingsong Li
or > 我看是因为array source到hdfs的一张orc的表  > > 陈卓宇 > > >   > > > > > -- 原始邮件 -- > 发件人: > "user-zh" >

?????? Flinksql ????????full join ????????

2021-11-11 文章 ??????
??debug ??flink1.12.5??flink-orc_2.11org/apache/flink/orc/vector/AbstractOrcColumnVector.java ??createFlinkVector??ListColumnVectorflink??master??2021/5/12??wangwei1025??pr??1.12.5

?????? Flinksql ????????full join ????????

2021-11-11 文章 ??????
??: string_tag string number_tag number boolean_tag boolean datetime_tag datetime arr_tag array

?????? Flinksql ????????full join ????????

2021-11-11 文章 ??????
??: string_tag string number_tag number boolean_tag boolean datetime_tag datetime arr_tag array

?????? Flinksql ????????full join ????????

2021-11-11 文章 ??????
??: string_tag string number_tag number boolean_tag boolean datetime_tag datetime arr_tag array

Re: Re: FlinkSQL 1.12 Temporal Joins ????????????

2021-11-11 文章 ????
--- Best, WuKong    Caizhi Weng ?? 2021-11-12 11:32  flink?? ?? Re: FlinkSQL 1.12 Temporal Joins Hi??     ?? jdbc ?? hbase

Re: Re: FlinkSQL 1.12 Temporal Joins 多表关联问题

2021-11-11 文章 WuKong
Hi : 第一个 我了解了Cache 不太适合我的场景,因为我的表都是几十亿量级,同时 我要根据一些关键键 去数据库里查询,所以 我先在Job 中 聚合一些主键,通过In 条件 去查询。 第二个 好像是我理解的问题,最初想通过Flink Sql 把整体逻辑 下发到数据库去查询,因为有些OLAP 引擎 查询性能是可以接受的 --- Best, WuKong 发件人: Caizhi Weng 发送时间: 2021-11-12 11:32 收件人: flink中文邮件组 主题: Re: FlinkSQL 1.12 Temporal Joins 多表关联问题 Hi

Re: FlinkSQL 1.12 Temporal Joins 多表关联问题

2021-11-11 文章 Caizhi Weng
Hi! 这是说每次主流来一条数据,都要去维表里查询一次吗?然后你想每次攒一批数据,一次性查询以提高性能? 如果是的话,一部分维表(如 jdbc 和 hbase)支持 cache 功能 [1]。cache 功能可以在每次 cache 刷新的时候把数据加载到 task manager 内存中,这样主流来数据时只需要从 task manager 内存中查询对应数据即可,不必去外部系统查询。 另外查询逻辑下沉到数据库具体指的是什么?能否详细说明一下。 [1] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connec

Re: flinksql sink写csv小文件问题

2021-11-11 文章 Caizhi Weng
Hi! filesystem sink 的文件数量与 sink 并发数有关。如果数据量不大可以考虑在 sink DDL 的 with 参数里加入 'sink.parallelism' = '1' 设置 sink 并发度为 1。 陈卓宇 <2572805...@qq.com.invalid> 于2021年11月11日周四 下午4:50写道: > 问题描述:写到本地产生了8个part-817677cc-2f4b-464a-bf9e-11957bcf9c76-0-0 ...的文件 > 请问,我只想写成一个csv文件,如果关闭这种文件分区 > > > > Flink SQL: > String

  1   2   3   4   5   6   7   >