A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-08 文章 Appleyuchi
代码是: https://paste.ubuntu.com/p/gVGrj2V7ZF/ 报错: A group window expects a time attribute for grouping in a stream environment. 但是代码的数据源中已经有时间属性了. 请问应该怎么修改代码? 谢谢

Re:Re:Re:Re:Flink SQL读取复杂JSON格式

2020-12-08 文章 破极
刚才搜到了,谢谢 在 2020-12-09 15:20:07,"hailongwang" <18868816...@163.com> 写道: >http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259 >这个邮件列表有相似的问题,你看下有没有帮助。 >PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source >消费不会因为 format 解析慢导致任务的瓶颈在拉数据

Re:Re:Re:Flink SQL读取复杂JSON格式

2020-12-08 文章 hailongwang
http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259 这个邮件列表有相似的问题,你看下有没有帮助。 PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source 消费不会因为 format 解析慢导致任务的瓶颈在拉数据慢,因为往往 Source 的并发度最大也只能是中间件的分区数,比如 Kakfa。 [1] https://ci.apache.org/projects/flink/flink-do

Re: Flink HA目录下数据不完整,导致JobManager启动失败。

2020-12-08 文章 赵一旦
第二个问题原因找到了,为啥ha目录下没有知道了,这个是我傻了,我本身这次更新容器就是更换了ha目录的。 所以问题变为,1个是为什么这种情况会导致JobManager失败,其次2是当前这种case是不是需要删除zk中信息,我看删除还挺复杂,因为zk不支持删除非空目录,我需要一个一个子目录删除嘛是? 赵一旦 于2020年12月9日周三 下午3:07写道: > 基于公司自研的pass平台部署,3个机器,pass自带recover。 > 正常运作中,直接重启pass容器,导致任务失败,等容器重启后,3个机器就都处于类似的无限循环状态。 > 目前初步分析是因为JobManager启动失败,进而

Re: Flink HA目录下数据不完整,导致JobManager启动失败。

2020-12-08 文章 赵一旦
基于公司自研的pass平台部署,3个机器,pass自带recover。 正常运作中,直接重启pass容器,导致任务失败,等容器重启后,3个机器就都处于类似的无限循环状态。 目前初步分析是因为JobManager启动失败,进而由pass平台自动重启容器,然后无限循环了。 这里(1)为什么恢复任务失败会导致JobManager进程失败。(2)任务恢复失败从日志来看是因为flink的ha目录下确实部分文件,这个是什么原因呢?不排除是文件系统原因,目前用的bos://是百度的对象服务,想知道如果这个没写成功会显示检查点成功嘛,至少我操作重启前任务的检查点是成功的。之前倒是没注意去看是否这个目录一直

Flink HA目录下数据不完整,导致JobManager启动失败。

2020-12-08 文章 赵一旦
看日志,JobManager启动后有恢复任务,然后进程失败。 日志如下: 14:55:55.304 [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 14:55:55.305 [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Preconfiguration

Re:flink sql 实时计算百分比CodeGenException: Unsupported cast from 'ROW' to 'ROW'.

2020-12-08 文章 hailongwang
Hi, 我理解这边两个问题。 1. `Row` 2 `Row` 的转换在 1.12 支持了:https://issues.apache.org/jira/browse/FLINK-17049 2. 这个 Select 语句貌似不会产生这个错误,方便发个完整的不 Best, hailong 在 2020-12-09 12:51:21,"bigdata" <1194803...@qq.com> 写道: >flink1.10.1,应该如何计算error_1006_cnt_permillage >sql如下: >SELECT >|DATE_FORMAT(TIMESTAMPAD

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 文章 xiao cai
Hi Jark sorry,是1.12.0, 我打错了 Original Message Sender: Jark Wu Recipient: user-zh Date: Wednesday, Dec 9, 2020 14:40 Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的? 1.12.0 这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao ca

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 文章 xiao cai
Hi Jark Original Message Sender: Jark Wu Recipient: user-zh Date: Wednesday, Dec 9, 2020 14:40 Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的? 1.12.0 这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao cai wrote: > 好的,计划

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 文章 Jark Wu
Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的? 1.12.0 这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao cai wrote: > 好的,计划下周升级测试下,另:1.12.1计划何时发布呢 > > > Original Message > Sender: Jark Wu > Recipient: user-zh > Date: Tuesday, Dec 8, 2020 13:41 > Subject: Re: Fli

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 文章 xiao cai
好的,计划下周升级测试下,另:1.12.1计划何时发布呢 Original Message Sender: Jark Wu Recipient: user-zh Date: Tuesday, Dec 8, 2020 13:41 Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 hailong 说的定义成 STRING 是在1.12 版本上支持的, https://issues.apache.org/jira/browse/FLINK-18002 1.12 这两天就会发布,如果能升级的话,可以尝试一下。 Best, Jark On Tue,

flink sql ??????????????CodeGenException: Unsupported cast from 'ROW' to 'ROW'.

2020-12-08 文章 bigdata
flink1.10.1??error_1006_cnt_permillage sql?? SELECT |DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10' SECOND)), '-MM-dd') `day`, |UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10' SECOND)), '-MM-dd HH:mm:ss'

flink sql ??????????????CodeGenException: Unsupported cast from 'ROW' to 'ROW'.

2020-12-08 文章 bigdata
flink1.10.1??error_1006_cnt_permillage sql?? SELECT |DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10' SECOND)), '-MM-dd') `day`, |UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10' SECOND)), '-MM-dd HH:mm:ss'

flink??????planner??blink??????????????????cep sql??a->b

2020-12-08 文章 ??????
flink cep sql blink PATTERN (e1{3 } -> e2{1 }?)??

Re:Re:Flink SQL读取复杂JSON格式

2020-12-08 文章 破极
比如下面这种消息: 第一条消息: {"source":"transaction_2020202020200","data":[{"ip":"127.0.0.1"}]} 第二条消息: {"source":"transaction_2020202020200","data":[{"name":"d"},{"age":18}]} 第三条消息: {"source":"transaction_2020202020200","data":[]} 我想直接在创建表时用一个字段来表示data这个属性的所有值。 在 2020-12-09 13:21:41,"Apple

Re:Flink SQL读取复杂JSON格式

2020-12-08 文章 Appleyuchi
怎么个动态法? 在 2020-12-09 13:18:56,"破极" 写道: >Hello,各位大佬: >请教下大佬们,在Flink >SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create >table时候schema呢?我定义了array,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。 >请问各位大佬有啥高招呢?谢谢。 > > >kafka消息样例(data的value是动态的): >{"source":"transaction_20202020

Flink SQL读取复杂JSON格式

2020-12-08 文章 破极
Hello,各位大佬: 请教下大佬们,在Flink SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create table时候schema呢?我定义了array,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。 请问各位大佬有啥高招呢?谢谢。 kafka消息样例(data的value是动态的): {"source":"transaction_2020202020200","data":[{"name":"d"},{"age":18}]} 我定义的schem

关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-08 文章 jindy_liu
场景上: 目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。 目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。 目前测试了一版本flink sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。 所以产生以下想法,不

flink sql 1.11 kafka cdc??holo sink

2020-12-08 文章 ????
flink sql 1.11 kafka source ?? ??kafka??canal??mysql ??'format' = 'canal-json'?? ?? 1sourcemsyql??schemadata[{}]??table??ts?? 2??topicmysql binlog??kafka source??s

Re: flink11 SQL 如何支持双引号字符串

2020-12-08 文章 zhisheng
是跟这个 Issue 有关吗?https://issues.apache.org/jira/browse/FLINK-20537 赵一旦 于2020年12月9日周三 上午10:17写道: > MARK,学习下。等回复。 > > 莫失莫忘 于2020年12月8日周二 下午6:49写道: > > > 我在迁移hive  sql 到 flink引擎。原来的很多 hive  sql  中 > > 字符串都是用双引号表示,例如  select * from table1 where column1 = > > "word"。我如何在不修改SQL的前提下,使flink SQL 支持 双引号字符串

Re: Flink1.10执行sql超出内存限制被yarn杀掉

2020-12-08 文章 kuailuohe
关闭了RocksDB的内存控制后,是不是应该把taskmanager.memory.managed.size设置成0? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Flink 1.11.2 on yarn报错

2020-12-08 文章 Jacob
*【环境】* Flink 版本:1.11.2 Hadoop 版本 :2.6.0-cdh5.8.3 Java 版本: 1.8.0_144 - *【命令】* [jacob@localhost flink-1.11.2]$ ./bin/yarn-session.sh -jm 1024m -tm 2048m *【现象】* 2020-12-08 18:06:00,134 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli [] -

Re: flink sql实时计算UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-12-08 文章 赵一旦
旧版 'connector.type' = 'jdbc',新版 'connector' = 'jdbc'。 新旧区别,旧版根据查询决定key,新版你只需要定义了key就是upsert了,不需要查询符合一定要求。 Leonard Xu 于2020年12月7日周一 下午5:11写道: > Hi, > 你是不是没有订阅flink的用户邮件列表,所以有些邮件你看不到。 > 你可以发送任意内容的邮件到user-zh-subscr...@flink.apache.org user-zh-subscr...@flink.apache.org> 即可订阅用户邮件列表,订阅后邮件列表里大家的提问和回答

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 文章 赵一旦
@JarkWu,你的意思是定义成string,但是输入数据是一个json object也可以支持?这岂不是需要对json部分的反序列化做特殊定制。 比如a字段是string,但数据中a属性是这样的: "a":{ .. }。在java中相当于需要先将a这部分反序列化为一个map,然后再序列化为json(字符串)后作为a这个string的值。 ?是吗。 Jark Wu 于2020年12月8日周二 下午1:42写道: > hailong 说的定义成 STRING 是在1.12 版本上支持的, > https://issues.apache.org/jira/browse/FLINK-18

Re: 关于 stream-stream Interval Join 的问题

2020-12-08 文章 Benchao Li
Hi macia, 一旦回答的基本比较完整了。 watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。 如果是两侧都有数据,watermark不前进,也都可以正常输出。 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay 10个小时,这个已经会导致 你的没有join到的数据下发会延迟很多了。 你也可以尝试下用

Re: flink11 SQL 如何支持双引号字符串

2020-12-08 文章 赵一旦
MARK,学习下。等回复。 莫失莫忘 于2020年12月8日周二 下午6:49写道: > 我在迁移hive  sql 到 flink引擎。原来的很多 hive  sql  中 > 字符串都是用双引号表示,例如  select * from table1 where column1 = > "word"。我如何在不修改SQL的前提下,使flink SQL 支持 双引号字符串。 > ps:我看到flink SQL中字符串都必须用 单引号,例如 select * from table1 where column1 = > 'word' 。如何使 字符串 既可以是单引号 也可以是  双引号呢

Re: 在map算子中对redis进行sadd写入之后再sget读取

2020-12-08 文章 赵一旦
没搞懂你怎么个不得解,是不去除redis的sdk使用还是咋的,问题描述太简单了。 这个貌似就是map内你通过redis client操作redis就好啦呀。 追梦的废柴 于2020年12月8日周二 下午8:44写道: > 各位: > 晚上好! > 对于redis作为数据源或者sink网上有很多参照的案例,那么请问如何在一个map算子里面,先写入set结构中,然后再读取该set的长度呢? > 业务需要,百思不得解,还望各位指点迷津! > 祝好! > > > | | > 追梦的废柴 > | > | > 邮箱:zhuimeng...@163.com > | > > 签名由 网易邮箱大师 定制

Re: 关于 stream-stream Interval Join 的问题

2020-12-08 文章 赵一旦
重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。 (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left join。 (2)此外,还有一个点,这个我也不确认。如果是datastream api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 input_database中定义了wat

回复:怎样定时更新广播变量的值

2020-12-08 文章 熊云昆
最简单的方式就是自定义Source,里面定时检测配置文件是否有更新,如果有更新的话就发送配置文件的内容,下游算子把这个source发送的内容当做广播变量 | | 熊云昆 | | 邮箱:xiongyun...@163.com | 签名由 网易邮箱大师 定制 在2020年12月08日 17:36,Lei Wang 写道: flink 程序读配置文件,配置文件的内容作为广播变量广播出去。 如果配置文件更新了,怎样能把广播变量的内容也更新呢? 谢谢, 王磊

Re: 关于 stream-stream Interval Join 的问题

2020-12-08 文章 macia kk
@Benchao Li 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 FLink,可能我的Case 太特殊了. 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 Binlog,我需要 filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. 还要注意的是 even time 是 create_time, 这里问题非常大: 1. 很多表都有 create tim

在map算子中对redis进行sadd写入之后再sget读取

2020-12-08 文章 追梦的废柴
各位: 晚上好! 对于redis作为数据源或者sink网上有很多参照的案例,那么请问如何在一个map算子里面,先写入set结构中,然后再读取该set的长度呢? 业务需要,百思不得解,还望各位指点迷津! 祝好! | | 追梦的废柴 | | 邮箱:zhuimeng...@163.com | 签名由 网易邮箱大师 定制

flink sql ddl????????????java.lang.IncompatibleClassChangeError: Implementing class

2020-12-08 文章 bigdata
??flink1.10.1,pom??

Re:关于flink sql往postgres写数据遇到的timestamp问题

2020-12-08 文章 李轲
报错信息: Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) Caused by: org.apache.flink.table.api.TableException: Unsupported conversion

关于flink sql往postgres写数据遇到的timestamp问题

2020-12-08 文章 李轲
项目需求要向 postgres 中插入数据,用 catalog 之后,插入数据貌似需要和数据库表定义完全一致,而且没找到只插入部分字段的写法 在时间转 TIMESTAMP(6) WITH LOCAL TIME ZONE 时报了错,这个格式是 postgres 中的时间戳定义 select cast(localtimestamp as TIMESTAMP(6) WITH LOCAL TIME ZONE); 有没有什么转换方法?或者只插入部分数据的方法?

flink11 SQL ????????????????????

2020-12-08 文章 ????????
hive  sql ?? flink hive  sql  ??   select * from table1 where column1 = "word"SQLflink SQL ?? psflink SQL  select * from table1 where column1 = 'word' ???

Re: Flink 1.11版本LeaseRenewer线程不释放

2020-12-08 文章 zilong xiao
作业数据流是 kafka -> flink -> http/prometheus,目前这类型的作业很多,但是就只有那几个有问题,而且是必现,每次都只能重启,然后看着线程数上涨。。 我再debug看看~ Paul Lam 于2020年12月8日周二 下午6:00写道: > Hi, > > 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。 > > Best, > Paul Lam > > > 2020年12月8日 11:03,zilong xiao 写道: > > > > Hi Paul, > >

Re: flink sql 任务滑动窗口失效

2020-12-08 文章 xushanshan
-- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: FlinkSQL中创建表,视图等一些元数据信息都是存放在什么地方(没看到像Hive那样使用mysql存储元数据信息)

2020-12-08 文章 邮件帮助中心
感谢您的答复!! 在 2020-12-08 15:57:32,"Leonard Xu" 写道: >Hi, >Flink 的元数据存放在catalog中的,也支持多种catalog(embedded, >HIve,JDBC,自定义catalog),默认Flink使用内置的GenericInMemoryCatalog,这个是in >memory的catalog,元数据都在这里,生产环境上可以使用HiveCatalog > > >祝好 >Leonard >[1] >https://ci.apache.org/projects/flink/flink-doc

Re: Flink 1.11版本LeaseRenewer线程不释放

2020-12-08 文章 Paul Lam
Hi, 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。 Best, Paul Lam > 2020年12月8日 11:03,zilong xiao 写道: > > Hi Paul, >线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink > 1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root > cau

Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-08 文章 bradyMk
好的,谢谢大佬解答~ - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql ddl????????????java.lang.IncompatibleClassChangeError: Implementing class

2020-12-08 文章 bigdata
??flink1.10.1,pom??

求助如何用flink1.11.2 on yarn集成CDH的hbase2.0版本

2020-12-08 文章 site
根据官方提供的方法,用HADOOP_CLASSPATH=`hadoop classpath`集成hadoop成功。 因为flink on yarn是用的cdh6集群,所以我想利用现有的classpath中的包含的hbase库,使用 export HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/lib/*:$HADOOP_CLASSPATH 然后用yarn-session创建一个flink运行环境,再用sql-client连接这个容器,创建hbase映射表,这种用法失败:分析提示是找不到hbase包。 ./bin/yarn

怎样定时更新广播变量的值

2020-12-08 文章 Lei Wang
flink 程序读配置文件,配置文件的内容作为广播变量广播出去。 如果配置文件更新了,怎样能把广播变量的内容也更新呢? 谢谢, 王磊