代码是:
https://paste.ubuntu.com/p/gVGrj2V7ZF/
报错:
A group window expects a time attribute for grouping in a stream environment.
但是代码的数据源中已经有时间属性了.
请问应该怎么修改代码?
谢谢
刚才搜到了,谢谢
在 2020-12-09 15:20:07,"hailongwang" <18868816...@163.com> 写道:
>http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259
>这个邮件列表有相似的问题,你看下有没有帮助。
>PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source
>消费不会因为 format 解析慢导致任务的瓶颈在拉数据
http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259
这个邮件列表有相似的问题,你看下有没有帮助。
PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source
消费不会因为 format 解析慢导致任务的瓶颈在拉数据慢,因为往往 Source 的并发度最大也只能是中间件的分区数,比如 Kakfa。
[1]
https://ci.apache.org/projects/flink/flink-do
第二个问题原因找到了,为啥ha目录下没有知道了,这个是我傻了,我本身这次更新容器就是更换了ha目录的。
所以问题变为,1个是为什么这种情况会导致JobManager失败,其次2是当前这种case是不是需要删除zk中信息,我看删除还挺复杂,因为zk不支持删除非空目录,我需要一个一个子目录删除嘛是?
赵一旦 于2020年12月9日周三 下午3:07写道:
> 基于公司自研的pass平台部署,3个机器,pass自带recover。
> 正常运作中,直接重启pass容器,导致任务失败,等容器重启后,3个机器就都处于类似的无限循环状态。
> 目前初步分析是因为JobManager启动失败,进而
基于公司自研的pass平台部署,3个机器,pass自带recover。
正常运作中,直接重启pass容器,导致任务失败,等容器重启后,3个机器就都处于类似的无限循环状态。
目前初步分析是因为JobManager启动失败,进而由pass平台自动重启容器,然后无限循环了。
这里(1)为什么恢复任务失败会导致JobManager进程失败。(2)任务恢复失败从日志来看是因为flink的ha目录下确实部分文件,这个是什么原因呢?不排除是文件系统原因,目前用的bos://是百度的对象服务,想知道如果这个没写成功会显示检查点成功嘛,至少我操作重启前任务的检查点是成功的。之前倒是没注意去看是否这个目录一直
看日志,JobManager启动后有恢复任务,然后进程失败。
日志如下:
14:55:55.304 [main] INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
14:55:55.305 [main] INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Preconfiguration
Hi,
我理解这边两个问题。
1. `Row` 2 `Row` 的转换在 1.12
支持了:https://issues.apache.org/jira/browse/FLINK-17049
2. 这个 Select 语句貌似不会产生这个错误,方便发个完整的不
Best,
hailong
在 2020-12-09 12:51:21,"bigdata" <1194803...@qq.com> 写道:
>flink1.10.1,应该如何计算error_1006_cnt_permillage
>sql如下:
>SELECT
>|DATE_FORMAT(TIMESTAMPAD
Hi Jark
sorry,是1.12.0, 我打错了
Original Message
Sender: Jark Wu
Recipient: user-zh
Date: Wednesday, Dec 9, 2020 14:40
Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型
Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的? 1.12.0
这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao ca
Hi Jark
Original Message
Sender: Jark Wu
Recipient: user-zh
Date: Wednesday, Dec 9, 2020 14:40
Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型
Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的? 1.12.0
这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao cai
wrote: > 好的,计划
Hi 赵一旦,
这部分 jackson 组件已经自动处理了这部分逻辑。
Hi xiaocai,
你有什么 issue 是需要1.12.1的? 1.12.0 这两天即将发布。
Best,
Jark
On Wed, 9 Dec 2020 at 14:34, xiao cai wrote:
> 好的,计划下周升级测试下,另:1.12.1计划何时发布呢
>
>
> Original Message
> Sender: Jark Wu
> Recipient: user-zh
> Date: Tuesday, Dec 8, 2020 13:41
> Subject: Re: Fli
好的,计划下周升级测试下,另:1.12.1计划何时发布呢
Original Message
Sender: Jark Wu
Recipient: user-zh
Date: Tuesday, Dec 8, 2020 13:41
Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型
hailong 说的定义成 STRING 是在1.12 版本上支持的,
https://issues.apache.org/jira/browse/FLINK-18002 1.12 这两天就会发布,如果能升级的话,可以尝试一下。
Best, Jark On Tue,
flink1.10.1??error_1006_cnt_permillage
sql??
SELECT
|DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10'
SECOND)), '-MM-dd') `day`,
|UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime,
INTERVAL '10' SECOND)), '-MM-dd HH:mm:ss'
flink1.10.1??error_1006_cnt_permillage
sql??
SELECT
|DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10'
SECOND)), '-MM-dd') `day`,
|UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime,
INTERVAL '10' SECOND)), '-MM-dd HH:mm:ss'
flink cep sql blink PATTERN (e1{3 } -> e2{1 }?)??
比如下面这种消息:
第一条消息:
{"source":"transaction_2020202020200","data":[{"ip":"127.0.0.1"}]}
第二条消息:
{"source":"transaction_2020202020200","data":[{"name":"d"},{"age":18}]}
第三条消息:
{"source":"transaction_2020202020200","data":[]}
我想直接在创建表时用一个字段来表示data这个属性的所有值。
在 2020-12-09 13:21:41,"Apple
怎么个动态法?
在 2020-12-09 13:18:56,"破极" 写道:
>Hello,各位大佬:
>请教下大佬们,在Flink
>SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create
>table时候schema呢?我定义了array,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。
>请问各位大佬有啥高招呢?谢谢。
>
>
>kafka消息样例(data的value是动态的):
>{"source":"transaction_20202020
Hello,各位大佬:
请教下大佬们,在Flink
SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create
table时候schema呢?我定义了array,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。
请问各位大佬有啥高招呢?谢谢。
kafka消息样例(data的value是动态的):
{"source":"transaction_2020202020200","data":[{"name":"d"},{"age":18}]}
我定义的schem
场景上:
目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。
目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。
目前测试了一版本flink
sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。
所以产生以下想法,不
flink sql 1.11 kafka source ?? ??kafka??canal??mysql ??'format'
= 'canal-json'?? ??
1sourcemsyql??schemadata[{}]??table??ts??
2??topicmysql binlog??kafka
source??s
是跟这个 Issue 有关吗?https://issues.apache.org/jira/browse/FLINK-20537
赵一旦 于2020年12月9日周三 上午10:17写道:
> MARK,学习下。等回复。
>
> 莫失莫忘 于2020年12月8日周二 下午6:49写道:
>
> > 我在迁移hive sql 到 flink引擎。原来的很多 hive sql 中
> > 字符串都是用双引号表示,例如 select * from table1 where column1 =
> > "word"。我如何在不修改SQL的前提下,使flink SQL 支持 双引号字符串
关闭了RocksDB的内存控制后,是不是应该把taskmanager.memory.managed.size设置成0?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
*【环境】*
Flink 版本:1.11.2
Hadoop 版本 :2.6.0-cdh5.8.3
Java 版本: 1.8.0_144
-
*【命令】*
[jacob@localhost flink-1.11.2]$ ./bin/yarn-session.sh -jm 1024m -tm 2048m
*【现象】*
2020-12-08 18:06:00,134 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli
[] -
旧版 'connector.type' = 'jdbc',新版 'connector' = 'jdbc'。
新旧区别,旧版根据查询决定key,新版你只需要定义了key就是upsert了,不需要查询符合一定要求。
Leonard Xu 于2020年12月7日周一 下午5:11写道:
> Hi,
> 你是不是没有订阅flink的用户邮件列表,所以有些邮件你看不到。
> 你可以发送任意内容的邮件到user-zh-subscr...@flink.apache.org user-zh-subscr...@flink.apache.org> 即可订阅用户邮件列表,订阅后邮件列表里大家的提问和回答
@JarkWu,你的意思是定义成string,但是输入数据是一个json object也可以支持?这岂不是需要对json部分的反序列化做特殊定制。
比如a字段是string,但数据中a属性是这样的:
"a":{ .. }。在java中相当于需要先将a这部分反序列化为一个map,然后再序列化为json(字符串)后作为a这个string的值。
?是吗。
Jark Wu 于2020年12月8日周二 下午1:42写道:
> hailong 说的定义成 STRING 是在1.12 版本上支持的,
> https://issues.apache.org/jira/browse/FLINK-18
Hi macia,
一旦回答的基本比较完整了。
watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
如果是两侧都有数据,watermark不前进,也都可以正常输出。
关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source subtask见到的最大的watermark
作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay 10个小时,这个已经会导致
你的没有join到的数据下发会延迟很多了。
你也可以尝试下用
MARK,学习下。等回复。
莫失莫忘 于2020年12月8日周二 下午6:49写道:
> 我在迁移hive sql 到 flink引擎。原来的很多 hive sql 中
> 字符串都是用双引号表示,例如 select * from table1 where column1 =
> "word"。我如何在不修改SQL的前提下,使flink SQL 支持 双引号字符串。
> ps:我看到flink SQL中字符串都必须用 单引号,例如 select * from table1 where column1 =
> 'word' 。如何使 字符串 既可以是单引号 也可以是 双引号呢
没搞懂你怎么个不得解,是不去除redis的sdk使用还是咋的,问题描述太简单了。
这个貌似就是map内你通过redis client操作redis就好啦呀。
追梦的废柴 于2020年12月8日周二 下午8:44写道:
> 各位:
> 晚上好!
> 对于redis作为数据源或者sink网上有很多参照的案例,那么请问如何在一个map算子里面,先写入set结构中,然后再读取该set的长度呢?
> 业务需要,百思不得解,还望各位指点迷津!
> 祝好!
>
>
> | |
> 追梦的废柴
> |
> |
> 邮箱:zhuimeng...@163.com
> |
>
> 签名由 网易邮箱大师 定制
重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
(1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
join。
(2)此外,还有一个点,这个我也不确认。如果是datastream
api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
input_database中定义了wat
最简单的方式就是自定义Source,里面定时检测配置文件是否有更新,如果有更新的话就发送配置文件的内容,下游算子把这个source发送的内容当做广播变量
| |
熊云昆
|
|
邮箱:xiongyun...@163.com
|
签名由 网易邮箱大师 定制
在2020年12月08日 17:36,Lei Wang 写道:
flink 程序读配置文件,配置文件的内容作为广播变量广播出去。
如果配置文件更新了,怎样能把广播变量的内容也更新呢?
谢谢,
王磊
@Benchao Li 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
FLink,可能我的Case 太特殊了.
我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 Binlog,我需要
filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB
中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
还要注意的是 even time 是 create_time, 这里问题非常大:
1. 很多表都有 create tim
各位:
晚上好!
对于redis作为数据源或者sink网上有很多参照的案例,那么请问如何在一个map算子里面,先写入set结构中,然后再读取该set的长度呢?
业务需要,百思不得解,还望各位指点迷津!
祝好!
| |
追梦的废柴
|
|
邮箱:zhuimeng...@163.com
|
签名由 网易邮箱大师 定制
??flink1.10.1,pom??
报错信息:
Exception in thread "main" org.apache.flink.table.client.SqlClientException:
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.api.TableException: Unsupported conversion
项目需求要向 postgres 中插入数据,用 catalog 之后,插入数据貌似需要和数据库表定义完全一致,而且没找到只插入部分字段的写法
在时间转 TIMESTAMP(6) WITH LOCAL TIME ZONE 时报了错,这个格式是 postgres 中的时间戳定义
select cast(localtimestamp as TIMESTAMP(6) WITH LOCAL TIME ZONE);
有没有什么转换方法?或者只插入部分数据的方法?
hive sql ?? flink hive sql ??
select * from table1 where column1 =
"word"SQLflink SQL ??
psflink SQL select * from table1
where column1 = 'word' ???
作业数据流是 kafka -> flink ->
http/prometheus,目前这类型的作业很多,但是就只有那几个有问题,而且是必现,每次都只能重启,然后看着线程数上涨。。 我再debug看看~
Paul Lam 于2020年12月8日周二 下午6:00写道:
> Hi,
>
> 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。
>
> Best,
> Paul Lam
>
> > 2020年12月8日 11:03,zilong xiao 写道:
> >
> > Hi Paul,
> >
--
Sent from: http://apache-flink.147419.n8.nabble.com/
感谢您的答复!!
在 2020-12-08 15:57:32,"Leonard Xu" 写道:
>Hi,
>Flink 的元数据存放在catalog中的,也支持多种catalog(embedded,
>HIve,JDBC,自定义catalog),默认Flink使用内置的GenericInMemoryCatalog,这个是in
>memory的catalog,元数据都在这里,生产环境上可以使用HiveCatalog
>
>
>祝好
>Leonard
>[1]
>https://ci.apache.org/projects/flink/flink-doc
Hi,
我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。
Best,
Paul Lam
> 2020年12月8日 11:03,zilong xiao 写道:
>
> Hi Paul,
>线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink
> 1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root
> cau
好的,谢谢大佬解答~
-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/
??flink1.10.1,pom??
根据官方提供的方法,用HADOOP_CLASSPATH=`hadoop classpath`集成hadoop成功。
因为flink on yarn是用的cdh6集群,所以我想利用现有的classpath中的包含的hbase库,使用
export
HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/lib/*:$HADOOP_CLASSPATH
然后用yarn-session创建一个flink运行环境,再用sql-client连接这个容器,创建hbase映射表,这种用法失败:分析提示是找不到hbase包。
./bin/yarn
flink 程序读配置文件,配置文件的内容作为广播变量广播出去。
如果配置文件更新了,怎样能把广播变量的内容也更新呢?
谢谢,
王磊
43 matches
Mail list logo