[ANNOUNCE] Apache Flink 1.11.1 released

2020-07-21 文章 Dian Fu
The Apache Flink community is very happy to announce the release of Apache Flink 1.11.1, which is the first bugfix release for the Apache Flink 1.11 series. Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streamin

?????? flinksql1.11????????????????

2020-07-21 文章 ??????????????
?? ?? --  -- ??:

Re: flinksql1.11中主键声明的问题

2020-07-21 文章 Leonard Xu
Hello 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。 祝好 Leonard Xu > 在 2020年7月22日,14:13,1129656...@qq.com 写道: > > 输出结果仍然没有被更新

答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-21 文章 刘首维
Hi JingSong, 感谢回复,真心期待一个理想的解决方案~ 发件人: Jingsong Li 发送时间: 2020年7月22日 13:58:51 收件人: user-zh; Jark Wu 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题 Hi 首维, 非常感谢你的信息,我们试图 去掉太灵活的“DataStream”,但是也逐渐发现一些额外的需求,再考虑和评估下你的需求。 CC: @Jark Wu Best, Jingsong On Wed, Jul 22, 2020

回复: Re: flinksql1.11中主键声明的问题

2020-07-21 文章 1129656...@qq.com
您好: 非常感谢您的建议,我已经成功解决了这个问题,但是我又发现了一个新的问题,我这里设置的超时时间是一分钟或者超时行数是5000行, 我在这期间更新了维表数据,但是我发现已经超过了超时时间,输出结果仍然没有被更新,是我理解的有问题么? 我尝试了停止输入流数据直到达到超时时间后仍然没有更新维表,除非停止整个程序,否则我的维表数据都不会被更新。 请问这个问题有解决的办法么? def register_mysql_source(st_env): source_ddl = \ """ CREATE TABLE dim_mysql ( id int,

回复:flink1.11启动问题

2020-07-21 文章 酷酷的浑蛋
这是我的启动命令:./bin/flink run -m yarn-cluster -p 2 -ys 2 -yqu rt_constant -c com.xx.Main -yjm 1024 -ynm RTC_TEST xx.jar 任务到yarn上后就一直在占用core,core数量和内存数量一直在增加 在2020年07月22日 12:48,JasonLee<17610775...@163.com> 写道: HI 你使用的什么模式?启动任务的命令发出来看一下吧 | | JasonLee | | 邮箱:17610775...@163.com | Signature is cust

Re: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-21 文章 Jingsong Li
Hi 首维, 非常感谢你的信息,我们试图 去掉太灵活的“DataStream”,但是也逐渐发现一些额外的需求,再考虑和评估下你的需求。 CC: @Jark Wu Best, Jingsong On Wed, Jul 22, 2020 at 1:49 PM 刘首维 wrote: > Hi JingSong, > > > 简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL > SDK > 下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子 > > > 1. 我

Re: flink1.11 tablefunction

2020-07-21 文章 Benchao Li
Hi, 如果只是想打平array,Flink有个内置的方法,可以参考[1]的”Expanding arrays into a relation“部分 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins Jark Wu 于2020年7月22日周三 上午11:17写道: > Hi, > > Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能: > https://issues.apache.org/jira/bro

答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-21 文章 刘首维
Hi JingSong, 简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL SDK 下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子 1. 我们现在有某种特定的Kafka数据格式,1条Kafka数据 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。 2. 我们目前封装了一些自己的Sink,我们会在Sink之前增加一个proc

Re: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-21 文章 Jingsong Li
可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗? Best Jingsong On Wed, Jul 22, 2020 at 12:36 PM 刘首维 wrote: > Hi all, > > > > 很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~ > > 我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL > SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transform

Re: 想知道state写到checkpoint文件没有

2020-07-21 文章 Congxian Qiu
Hi Checkpoint 包括两部分:1)meta 文件;2)具体的数据。如果是 Meta 部分可以参考 CheckpointMetadataLoadingTest[1] 自己写一个测试,如果你知道具体的内容,或许也可以看一下 StatePorcessAPI[2] [1] https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java [2] https://c

Re: Re: Flink catalog的几个疑问

2020-07-21 文章 dixingxin...@163.com
@Godfrey @Jingsong 感谢回复,很好的解答了我的疑惑! 背景是这样的,目前我们正打算实现一套支持持久化的catalog,同时基于这个catalog实现一个metaserver,对外暴露REST接口,用来支持日常管理操作,比如: 1.基于原生DDL管理source,sink,支持多种connector,并将这些元数据持久化到mysql中。 2.做统一的权限控制 我们面临两种选择: 1.基于hive catalog建设自己的catalog(或者说直接使用hive catalog): 优势:鉴于hive catalog已经相对比较完善,直接使用可以减少开发量。 劣势:不太明确社区

Re: flink1.11 web ui没有DAG

2020-07-21 文章 Congxian Qiu
Hi 这边说的 UI 上不显示数据接受和发送的条数,能否截图发一下,这样大家能更好的理解这个问题。另外 flink 作业有数据输入和处理吗? Best, Congxian 小学生 <201782...@qq.com> 于2020年7月22日周三 上午10:47写道: > 本地linux下单机版安装的,提交flink代码运行后,正常运行,有日志,但是为啥UI上面却不显示数据接收和发送的条数,求大佬解答

Re: flink1.11任务启动

2020-07-21 文章 Congxian Qiu
Hi 你可以把的启动命令贴一下,然后说一下你期望的行为是什么,现在看到的行为是什么。 Best, Congxian 酷酷的浑蛋 于2020年7月22日周三 下午12:43写道: > 现在启动任务(yarn)怎么指定-ys 和-p 都不管用了? 自动就分配好多core? > 默认启动个任务给我启动了100个core?100个container?我擦,啥情况啊,现在指定什么参数才生效啊?我默认配置也没有配置过多少个core啊,默认不是1个吗

回复:flink1.11启动问题

2020-07-21 文章 JasonLee
Hi 报错显示的是资源不足了 你确定yarn上的资源是够的吗 看下是不是节点挂了 1.11我这边提交任务都是正常的 | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月21日 16:36,酷酷的浑蛋 写道: 服了啊,这个flink1.11启动怎么净是问题啊 我1.7,1.8,1.9 都没有问题,到11就不行 ./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm

Re: flink1.11启动问题

2020-07-21 文章 Yang Wang
可以的话,发一下client端和JM端的log 1.11是对提交方式有一些变化,但应该都是和之前兼容的,你的提交命令看着也是没有问题的 我自己试了一下也是可以正常运行的 Best, Yang 酷酷的浑蛋 于2020年7月22日周三 上午11:06写道: > jm里面没有日志啊,关键是配置都是一样的,我在1.9里运行就没问题,在flink1.11就一直卡在那里,不分配资源,到底启动方式改变了啥呢? > 集群资源是有的,可是任务一直卡在那说没资源,这怎么办 > > > > > 在2020年07月21日 17:22,Shuiqiang Chen 写道: > Hi, > > 可以尝试在j

回复:flink1.11启动问题

2020-07-21 文章 JasonLee
HI 你使用的什么模式?启动任务的命令发出来看一下吧 | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月22日 12:44,酷酷的浑蛋 写道: 现在是为什么啊?启动任务自动占用所有core?core数量一直在增加,直到达到队列最大值,然后就卡住了,这flink1.11啥情况啊?

flink1.11启动问题

2020-07-21 文章 酷酷的浑蛋
现在是为什么啊?启动任务自动占用所有core?core数量一直在增加,直到达到队列最大值,然后就卡住了,这flink1.11啥情况啊?

flink1.11任务启动

2020-07-21 文章 酷酷的浑蛋
现在启动任务(yarn)怎么指定-ys 和-p 都不管用了? 自动就分配好多core? 默认启动个任务给我启动了100个core?100个container?我擦,啥情况啊,现在指定什么参数才生效啊?我默认配置也没有配置过多少个core啊,默认不是1个吗

关于1.11Flink SQL 全新API设计的一些问题

2020-07-21 文章 刘首维
Hi all, 很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~ 我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内

答复: Flink catalog的几个疑问

2020-07-21 文章 刘首维
hi all, 我在想如果社区提供一个unified metastore server是不是会解决这个问题,然后写一个(一系列)catalog和这个metastore对应 发件人: Jark Wu 发送时间: 2020年7月22日 11:22:56 收件人: user-zh 主题: Re: Flink catalog的几个疑问 非常欢迎贡献开源一个轻量的 catalog 实现 :) On Wed, 22 Jul 2020 at 10:53, Jingsong Li wrote: > Hi, > > HiveCat

Re: flink1.11 sql

2020-07-21 文章 Leonard Xu
Hi 必须可以呢,参考[1] Best, Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/hive/hive_functions.html > 在 2020年7月22日,12:14,Dream-底限 写道: > > hi > flink支持配置h

flink1.11 sql

2020-07-21 文章 Dream-底限
hi flink支持配置hive方言,那么flink可以直接使用hive内自定义的udf、udtf函数吗

flink1.11 实现tablefunction报错

2020-07-21 文章 Dream-底限
hi、 我这面实现了一个tablefunction想打撒数据,但是现在我运行官方demo样式的demo都无法成功,请问下面是什么原因: @FunctionHint(output = @DataTypeHint("ROW")) public static class FlatRowFunction extends TableFunction { private static final long serialVersionUID = 1L; public void eval(String rows) { for (String row : rows.spli

Re: Flink catalog的几个疑问

2020-07-21 文章 Jark Wu
非常欢迎贡献开源一个轻量的 catalog 实现 :) On Wed, 22 Jul 2020 at 10:53, Jingsong Li wrote: > Hi, > > HiveCatalog就是官方唯一的可以保存所有表的持久化Catalog,包括kafka,jdbc,hbase等等connectors。 > > > 后续有可能转正为flink 默认的catalog实现吗? > > 目前不太可能,你看,Flink连Hadoop的依赖都没有打进来。Hive的依赖更不会默认打进来。 依赖都没有,也不会成为默认的。 > > > hive catalog是不支持大小写敏感的 > > 是的,

Re: flink1.11 tablefunction

2020-07-21 文章 Jark Wu
Hi, Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能: https://issues.apache.org/jira/browse/FLINK-17855 Best, Jark On Wed, 22 Jul 2020 at 10:45, Dream-底限 wrote: > hi, > 我想将一个array打散成多行,但是并没有成功 > > @FunctionHint(input =@DataTypeHint("ARRAY STRING,rule_type_name STRING,`result` INT,in_path BOOL

Re: Flink catalog的几个疑问

2020-07-21 文章 godfrey he
hi Xingxing, 1. Flink 提供了一套catalog的接口,提提供了几个内置的实现:in-memory catalog, hive catalog, postgres catalog, 可以根据自己的需求选择。也可以实现自定义的catalog。参考 [1] 2. hive catalog 主要是对接 hive,方便读取现有的hive catalog的meta信息。当然也可以往hive catalog写新的meta。 是否会转为默认catalog,据我所知,目前没有。 3. 一般没什么问题。在和其他区分大小写的db对接的时候,可能有问题。 Best, Godfrey di

回复: flink1.11启动问题

2020-07-21 文章 酷酷的浑蛋
jm里面没有日志啊,关键是配置都是一样的,我在1.9里运行就没问题,在flink1.11就一直卡在那里,不分配资源,到底启动方式改变了啥呢? 集群资源是有的,可是任务一直卡在那说没资源,这怎么办 在2020年07月21日 17:22,Shuiqiang Chen 写道: Hi, 可以尝试在jm的log里看看是在申请哪个资源的时候超时了, 对比下所申请的资源规格和集群可用资源 Best, Shuiqiang 酷酷的浑蛋 于2020年7月21日周二 下午4:37写道: 服了啊,这个flink1.11启动怎么净是问题啊 我1.7,1.8,1.9 都没有问题,到11就不行

Re: flinksql1.11中主键声明的问题

2020-07-21 文章 Leonard Xu
Hi, 你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。 在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。 我理解你把connector的with参数更新成新的就解决问题了。 Best Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options

Re: Flink catalog的几个疑问

2020-07-21 文章 Jingsong Li
Hi, HiveCatalog就是官方唯一的可以保存所有表的持久化Catalog,包括kafka,jdbc,hbase等等connectors。 > 后续有可能转正为flink 默认的catalog实现吗? 目前不太可能,你看,Flink连Hadoop的依赖都没有打进来。Hive的依赖更不会默认打进来。 依赖都没有,也不会成为默认的。 > hive catalog是不支持大小写敏感的 是的,就像Godfrey说的,特别是JDBC对接的某些大小写敏感的db,这可能导致字段名对应不了。 Best, Jingsong On Wed, Jul 22, 2020 at 10:39 AM

flink1.11 web ui????DAG

2020-07-21 文章 ??????
linuxflink??UI

Re: flink1.11 tablefunction

2020-07-21 文章 Dream-底限
hi, 我想将一个array打散成多行,但是并没有成功 @FunctionHint(input =@DataTypeHint("ARRAY>") ,output = @DataTypeHint("ROW")) public static class FlatRowFunction extends TableFunction { private static final long serialVersionUID = 1L; public void eval(Row[] rows) { for (Row row : rows) {

flinksql1.11中主键声明的问题

2020-07-21 文章 1129656...@qq.com
hi: 我在使用pyflink1.11过程中,使用flinksql维表时声明了主键primary key 但是还是会报错说我没有用声明主键,另外,当我使用inner join代替left join就不会有这个问题,请问这是什么问题 下面我附录了报错信息和代码。谢谢! 报错附录 Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco return f(*a, **

flink1.11 web ui????DAG

2020-07-21 文章 ??????
linuxflink??UI

Re: flink-1.11 ddl kafka-to-hive问题

2020-07-21 文章 Jingsong Li
你的Source表是怎么定义的?确定有watermark前进吗?(可以看Flink UI) 'sink.partition-commit.trigger'='partition-time' 去掉试试? Best, Jingsong On Wed, Jul 22, 2020 at 12:02 AM Leonard Xu wrote: > HI, > > Hive 表时在flink里建的吗? 如果是建表时使用了hive dialect吗?可以参考[1]设置下 > > Best > Leonard Xu > [1] > https://ci.apache.org/projects/fli

Re: flink-1.11 ddl kafka-to-hive问题

2020-07-21 文章 Leonard Xu
HI, Hive 表时在flink里建的吗? 如果是建表时使用了hive dialect吗?可以参考[1]设置下 Best Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#use-hive-dialect > 在

Flink catalog的几个疑问

2020-07-21 文章 dixingxin...@163.com
Hi Flink社区: 有几个疑问希望社区小伙伴们帮忙解答一下: 1.个人感觉Flink很有必要提供一个官方的catalog,用来支持各种connector,比如:kafka,jdbc,hbase等等connector。不知道社区有没有这个打算,目前没有看到对应的flip 2.社区对hive catalog的定位是什么,后续有可能转正为flink 默认的catalog实现吗? 3.hive catalog是不支持大小写敏感的(字段名都是小写),这个后续会带来哪些问题?想征集下大家的意见避免我们以后踩大坑。 Best, Xingxing Di

回复:flink-1.11 ddl kafka-to-hive问题

2020-07-21 文章 kcz
一直都木有数据 我也不知道哪里不太对 hive有这个表了已经。我测试写ddl hdfs 是OK的 -- 原始邮件 -- 发件人: JasonLee <17610775...@163.com> 发送时间: 2020年7月21日 20:39 收件人: user-zh

Re: FlinkKafkaConsumer API 维表关联

2020-07-21 文章 Jark Wu
你需要用 DDL 去声明这张 kafka 表[1], 目前不建议使用 `Kafka` 和 `StreamTableDescriptor` API。 Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html On Thu, 16 Jul 2020 at 11:43, 郑斌斌 wrote: > 各位好: > > 请教一下,用FlinkKafkaConsumer API的话,如何支持SQL的方式,和维表关联。(之前用Kafka >

Re: flink-1.11 ddl kafka-to-hive问题

2020-07-21 文章 Jark Wu
rolling 策略配一下? https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#sink-rolling-policy-rollover-interval Best, Jark On Tue, 21 Jul 2020 at 20:38, JasonLee <17610775...@163.com> wrote: > hi > hive表是一直没有数据还是过一段时间就有数据了? > > > | | > JasonLee > | > | > 邮箱

Re: Flink sql中可以使用自定义窗口触发器吗

2020-07-21 文章 Jark Wu
Hi, 目前是不支持的。不过有个实验性功能可以指定提前输出的策略和迟到处理的策略 [1],可能可以满足你的需求。 Best, Jark [1]: https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala#L175 On Tue, 21 Jul 2020 at 22:28, 462329521 <462329...@qq

Re: flink解析kafka json数据

2020-07-21 文章 Jark Wu
目前是不支持的。这个需求有点太业务特定了。flink 不可能为了一个错误日志去抽象、对接各种存储系统。 一种方案是社区可以考虑支持下打印到日志里,然后用户可以通过自定义插件 log appender 写入外部存储。 Best, Jark On Tue, 21 Jul 2020 at 18:53, Dream-底限 wrote: > hi > json.ignore-parse-errors那只配置这个就好了, 其实我想把解析失败的数据存储到外部系统,而不是直接丢弃 > > Leonard Xu 于2020年7月21日周二 下午4:18写道: > > > Hi, > > 我理解应该

Flink sql中可以使用自定义窗口触发器吗

2020-07-21 文章 462329521
Hi,想问下现在的Flink sql支持使用自定义窗口触发器吗?

Re: flink 1.11 cdc: kafka中存了canal-json格式的多张表信息,需要按表解析做处理,sink至不同的下游,要怎么支持?

2020-07-21 文章 Jark Wu
Hi, 目前 Flink SQL CDC 是不支持自动感知新表的,得要提前定义要表的 schema 然后提交同步作业。比如你上面的例子,就需要定义两个 source 表: CREATE TABLE `test` ( `id` int, `name` string, `time` timestamp(3), `status` int ) with ( 'connector' = 'kafka', 'format' = 'canal-json', ... ); insert into downstream1 select * from `test`; CRE

Re: SQL 报错只有 flink runtime 的 NPE

2020-07-21 文章 Jark Wu
这个异常一般是由于 UDF 的实现用了主类型(int),但是实际的字段值有 null 值。 你可以试试先做个 where 条件过滤,将 null 值过滤掉? Best, Jark On Mon, 20 Jul 2020 at 15:28, godfrey he wrote: > 看不到图片信息,换一个图床工具上传图片吧 > > Luan Cooper 于2020年7月17日周五 下午4:11写道: > > > 附一个 Job Graph 信息,在 Cal 处挂了 > > [image: image.png] > > > > On Fri, Jul 17, 2020 at 4:01

Re:Re: [sql-client] 如何绕过交互式模式去做ddl

2020-07-21 文章 zhanglianzhg
看下CliClient.java源码 open接口, final Optional cmdCall = parseCommand(line); cmdCall.ifPresent(this::callCommand); 可以看出解析字符串后执行响应命令。 目前我们这边一个项目也在做相似的,可以界面写好slq,以分号作为分隔符表示ddl或则DMl作为分隔符。 然后以文件方式保存(可以作为日志等用作)。 然后自己实现一个excutor类包装了tableEnvironment,主要功能用作string命令解析以及命令执行,可以简单的把flink的解析以及 callCommand拿过来,然后加以改造

??????flink-1.11 ddl kafka-to-hive????

2020-07-21 文章 JasonLee
hi hive?? | | JasonLee | | ??17610775...@163.com | Signature is customized by Netease Mail Master ??2020??07??21?? 19:09??kcz ?? hive-1.2.1 chk ??chkchk??kafkahive??

Re: [sql-client] 如何绕过交互式模式去做ddl

2020-07-21 文章 godfrey he
sql-client.sh的-u是指update语句,目前只支持insert。 Jark Wu 于2020年7月21日周二 下午6:47写道: > Hi, > > 你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。 > 社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。 > 不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828. > > Best, > Jark > > On Thu, 16 Jul 20

Re: [sql-client] 如何绕过交互式模式去做ddl

2020-07-21 文章 Harold.Miao
谢谢 我暂时这样改了一下 public boolean submitUpdate(String statement) { terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi()); terminal.writer().println(new AttributedString(statement).toString()); terminal.flush(); final Optional parsedStatement = parseCo

Re: flink1.11 tablefunction

2020-07-21 文章 godfrey he
可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide Dream-底限 于2020年7月21日周二 下午7:25写道: > hi > > 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中

Re: flink table??????????????????????????????

2020-07-21 文章 ??????
1.??insert??checkg_sink_unit 2.kafkag_unit

Re: flink table同时作为写出及输入时下游无数据

2020-07-21 文章 咿咿呀呀
就是没有数据,我这个是简化版本的,都切换为kafka的初始源是没问题的 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.11 tablefunction

2020-07-21 文章 Dream-底限
hi 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)

Re: Flink 1.11 submit job timed out

2020-07-21 文章 Congxian Qiu
Hi 不确定 k8s 环境中能否看到 pod 的完整日志?类似 Yarn 的 NM 日志一样,如果有的话,可以尝试看一下这个 pod 的完整日志有没有什么发现 Best, Congxian SmileSmile 于2020年7月21日周二 下午3:19写道: > Hi,Congxian > > 因为是测试环境,没有配置HA,目前看到的信息,就是JM刷出来大量的no hostname could be > resolved,jm失联,作业提交失败。 > 将jm内存配置为10g也是一样的情况(jobmanager.memory.pprocesa.size:10240m)。 > >

flink-1.11 ddl kafka-to-hive????

2020-07-21 文章 kcz
hive-1.2.1 chk ??chkchk??kafkahive?? String hiveSql = "CREATE TABLE stream_tmp.fs_table (\n" + " host STRING,\n" + " url STRING," + " public_date STRING" + ") partitioned by (public

Re: flink解析kafka json数据

2020-07-21 文章 Dream-底限
hi json.ignore-parse-errors那只配置这个就好了, 其实我想把解析失败的数据存储到外部系统,而不是直接丢弃 Leonard Xu 于2020年7月21日周二 下午4:18写道: > Hi, > 我理解应该做不到,因为这两个format参数在format里就做的。 > json.ignore-parse-errors 是在 > format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field > 是标记如果字段少时是否失败还是继续(缺少的字段用null补上) > 这两个不能同时为ture,语义上就是互斥的。 > > Bes

Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-21 文章 Yang Wang
configmap "flink-config-k8s-session-1" not found的报错是正常的 因为目前的实现是先创建JobManager Deployment,然后再创建ConfigMap并设置owner reference到deployment 所以你才会看到创建Pod的时候报ConfigMap还没有创建出来,这个是正常的信息,K8s会自动重试创建Pod 你现在是任务起不来吗,还是有什么其他的问题? Best, Yang Yvette zhai 于2020年7月14日周二 上午10:20写道: > 补充一下,kubernetes版本是1.18 > Yvet

Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-21 文章 Jark Wu
这个对应关系是通过 Factory#factoryIdentifier 来决定的。 比如 DebeziumJsonFormatFactory#factoryIdentifier() 就是返回了 'debezium-json' Best, Jark On Thu, 16 Jul 2020 at 22:29, wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > > 谢谢,我理解了。 > > > > wangl...@geekplus.com.cn > > Sender: Harold.Miao > Send Time

Re: [sql-client] 如何绕过交互式模式去做ddl

2020-07-21 文章 Jark Wu
Hi, 你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。 社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。 不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828. Best, Jark On Thu, 16 Jul 2020 at 19:43, Harold.Miao wrote: > hi flink users > > 众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,

Re: 回复: Flink SQL处理Array型的JSON

2020-07-21 文章 wxpcc
补充: 最终查询为 SELECT t.* FROM kafka_source, LATERAL TABLE( fromJson(data) ) as t -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: Flink SQL处理Array型的JSON

2020-07-21 文章 wxpcc
如果不等待最新版本的话也可以这样 将 纯数组的数据作为字符串 从source消费,增加自定义的json解析函数,判断 isArray 之后 遍历进行 collect if (Objects.nonNull(str)) { if (isArray) { JsonNode node = objectMapper.readTree(str); if (node.isArray()) { Iterator nodeIterator = n

Re: flink table同时作为写出及输入时下游无数据

2020-07-21 文章 godfrey he
你可以先只跑第一个insert,然后check一下g_sink_unit是否有数据。 另外,你可以把query 改为都读取 kafka_source_tab 再分别写到两个不同的sink: sql1='''Insert into g_sink_unit select alarm_id,trck_id from kafka_source_tab''' sql2='''Insert into g_summary_base select alarm_id,trck_id from kafka_source_tab;''' 小学生 <201782...@qq.com> 于2020年7月21日周

Re: Re: flink 1.11 sql类型问题

2020-07-21 文章 Jark Wu
你是说输出的时候想带 'Z' 后缀? 如果这样的话,我觉得 json.timestamp-format.standard = 'ISO-8601' 这个参数应该能解决你的问题。 Best, Jark On Thu, 16 Jul 2020 at 10:02, sunfulin wrote: > > > > hi, leonard > 感谢回复。我在es的ddl with参数里加了这个,貌似还是报错。我再简单描述下我的场景: > 我的es sink的ddl如下: > create table es_sink ( > a varchar, > b varchar, > c T

flink table??????????????????????????????

2020-07-21 文章 ??????
flink??g_unit()??kafka??g_unit??g_summary??g_line from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode fro

Re: flink1.11启动问题

2020-07-21 文章 Shuiqiang Chen
Hi, 可以尝试在jm的log里看看是在申请哪个资源的时候超时了, 对比下所申请的资源规格和集群可用资源 Best, Shuiqiang 酷酷的浑蛋 于2020年7月21日周二 下午4:37写道: > > > 服了啊,这个flink1.11启动怎么净是问题啊 > > > 我1.7,1.8,1.9 都没有问题,到11就不行 > ./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm 1024 -yjm > 1024 -ynm sql_test ./examples/batch/WordCount.jar --i

Re: connector hive依赖冲突

2020-07-21 文章 Dream-底限
hi, 不排除依赖的话环境都起不来的哈, java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:756) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defi

????: flink table??????????????????????????????

2020-07-21 文章 chengyanan1...@foxmail.com
?? sql1='''Insert into g_unit_sink_ddl select alarm_id,trck_id from kafka_source_tab''' sql2='''Insert into g_summary_ddl select alarm_id,trck_id from g_unit_sink_ddl''' g_sink_unit ?? g_summary_base chengyanan1...@foxmail.com ?? ?? 2020-07-

flink table??????????????????????????????

2020-07-21 文章 ??????
flink??g_unit()??kafka??g_unit??g_summary??g_line from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode fro

flink table??????????????????????????????

2020-07-21 文章 ??????
flink??g_unit()??kafka??g_unit??g_summary??g_line from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode fro

flink1.11启动问题

2020-07-21 文章 酷酷的浑蛋
服了啊,这个flink1.11启动怎么净是问题啊 我1.7,1.8,1.9 都没有问题,到11就不行 ./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm 1024 -yjm 1024 -ynm sql_test ./examples/batch/WordCount.jar --input hdfs://xxx/data/wangty/LICENSE-2.0.txt --output hdfs://xxx/data/wangty/a 报错: Caused by: org.apache.flink.r

????: ?????? pyflink1.11.0window

2020-07-21 文章 chengyanan1...@foxmail.com
Hi?? kafkaTableSink??AppendStreamTableSinkfinal_result2source1??group by??final_result??mysqljoingroup by??StreamTable RetractStreamTableSink?? ??AppendStreamTableSink??RetractStreamTab

Re: flink解析kafka json数据

2020-07-21 文章 Leonard Xu
Hi, 我理解应该做不到,因为这两个format参数在format里就做的。 json.ignore-parse-errors 是在 format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field 是标记如果字段少时是否失败还是继续(缺少的字段用null补上) 这两个不能同时为ture,语义上就是互斥的。 Best Leonard Xu > 在 2020年7月21日,16:08,Dream-底限 写道: > > json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解

flink解析kafka json数据

2020-07-21 文章 Dream-底限
hi 我这面在使用sql api解析kafka json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储 json.ignore-parse-errors son.fail-on-missing-field

Re: flink1.11 pyflink stream job 退出

2020-07-21 文章 Xingbo Huang
是的,execute是1.10及以前使用的,execute_sql是1.11之后推荐使用的 Best, Xingbo lgs <9925...@qq.com> 于2020年7月21日周二 下午3:57写道: > 谢谢。加上后就可以了。 > > 改成原来的sql_update然后st_env.execute("job")好像也可以。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >

Re: flink1.11 pyflink stream job 退出

2020-07-21 文章 lgs
谢谢。加上后就可以了。 改成原来的sql_update然后st_env.execute("job")好像也可以。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.11 pyflink stream job 退出

2020-07-21 文章 Xingbo Huang
Hi, execute_sql是一个异步非阻塞的方法,所以你需要在你的代码末尾加上 sql_result.get_job_client().get_job_execution_result().result() 对此我已经创建了JIRA[1] [1] https://issues.apache.org/jira/browse/FLINK-18598 Best, Xingbo lgs <9925...@qq.com> 于2020年7月21日周二 下午3:35写道: > python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。 > 代码如下,使用了

flink1.11 pyflink stream job 退出

2020-07-21 文章 lgs
python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。 代码如下,使用了MATCH_RECOGNIZE: s_env = StreamExecutionEnvironment.get_execution_environment() b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() st_env = StreamTableEnvironment.create(s_env, envir

Re: flink 1.11 cdc: kafka中存了canal-json格式的多张表信息,需要按表解析做处理,sink至不同的下游,要怎么支持?

2020-07-21 文章 godfrey he
http://apache-flink.147419.n8.nabble.com/flink-1-10-sql-kafka-format-json-schema-json-object-tt4665.html 这个邮件里提到了类似的问题。 https://issues.apache.org/jira/browse/FLINK-18002 这个issue完成后(1.12),你可以将 “data”,“mysqlType”等格式不确定的字段定义为String类型, 下游通过udf自己再解析对应的json Best, Godfrey jindy_liu <286729...@qq.com>

Re: Flink 1.11 submit job timed out

2020-07-21 文章 SmileSmile
Hi,Congxian 因为是测试环境,没有配置HA,目前看到的信息,就是JM刷出来大量的no hostname could be resolved,jm失联,作业提交失败。 将jm内存配置为10g也是一样的情况(jobmanager.memory.pprocesa.size:10240m)。 在同一个环境将版本回退到1.10没有出现该问题,也不会刷如上报错。 是否有其他排查思路? Best! | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制 On 07/16/2020 13:17, Congxian Q

回复: (无主题)

2020-07-21 文章 罗显宴
hi,我想到解决办法了,可以用全局window,我一直以为是要分区在做窗口运算其实可以直接用timewindowAll来算,然后用状态保存就够了 val result = num.timeWindowAll(Time.seconds(20)) //.trigger(ContinuousEventTimeTrigger.of(Time.seconds(20))) .process(new ProcessAllWindowFunction[IncreaseNumPerHour,IncreasePerHour,TimeWindow] { private var itemStat