?????? BLinkPlanner sql join????????

2020-06-10 文章 op
??Blinkplanner??oldplanner??1.10 package test.table.sql import java.util.Properties import com.souhu.msns.huyou.PublicParams import com.souhu.msns.huyou.utils.KafkaPbSchema import org.apache.flink.api.common.time.Time import org.apache.fli

Re: BLinkPlanner sql join状态清理

2020-06-10 文章 Leonard Xu
Hi, 可以稳定复现吗?麻烦贴下flink版本和你的case, 我可以帮忙跟进确认下 Best, Leonard Xu > 在 2020年6月11日,14:30,op <520075...@qq.com> 写道: > > 大家好,最近发现一个问题 > 发现同一套代码,用oldPlanner设置IdleStateRetentionTime后join两边表的状态可以被定时清理,但是用blinkplanner同样的代码,运行却发现状态不会被清理,这是个bug吗?

Re: FLINK SQL DDL写入hbase问题

2020-06-10 文章 Leonard Xu
Hi 你服务器上是否配置了hadoop_classpath? 建议hbase在试用时 用 hadoop_classpath + flink-hbase jar,不然依赖问题会比较麻烦。 祝好 Leonard Xu > 在 2020年6月11日,14:24,酷酷的浑蛋 写道: > > > > 在使用flink sql ddl语句向hbase中写的时候报如下错误: > java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration >at > org.apache.flink.addon

BLinkPlanner sql join????????

2020-06-10 文章 op
??oldPlannerIdleStateRetentionTime??join??blinkplannerbug

flink on yarn模式的代码运行位置问题

2020-06-10 文章 zjfpla...@hotmail.com
Hi, 我在使用flink的过程中,有些疑问请教下各位: 1.flink分为jobmanger和taskmanager,我怎么区分哪些代码是运行在jobmanager,哪些在taskmanager? 2.假设我jarA中使用AbstractYarnClusterDescriptor.deployJobCluster()替代flink run命令(想直接通过jar包启动方式直接提交flink任务上yarn),部署jarB到yarn上,jarB中mainClass中使用StreamExecutionEnvironment.execute去执行流任务,通过java -c

Re:Re: flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 Zhou Zach
3ku 在 2020-06-11 14:10:53,"Leonard Xu" 写道: >Hi, > >JDBC connector 之前不支持 unsigned 类型,unsigned 会比signed 类型更长。 >bigint(20) unsigned(range is 0 to 18446744073709551615) 超过了 bigint (range is >-9223372036854775808 to 9223372036854775807)的长度。 > > >最新的代码已经修复这个问题了[1],你可以等1.11发布后试用,或者编译下

FLINK SQL DDL写入hbase问题

2020-06-10 文章 酷酷的浑蛋
在使用flink sql ddl语句向hbase中写的时候报如下错误: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration at org.apache.flink.addons.hbase.HBaseUpsertTableSink.consumeDataStream(HBaseUpsertTableSink.java:87) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSin

Re: flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 Leonard Xu
Hi, JDBC connector 之前不支持 unsigned 类型,unsigned 会比signed 类型更长。 bigint(20) unsigned(range is 0 to 18446744073709551615) 超过了 bigint (range is -9223372036854775808 to 9223372036854775807)的长度。 最新的代码已经修复这个问题了[1],你可以等1.11发布后试用,或者编译下最新的代码,flink 中对应表 声明decimal(20, 0)处理。 祝好, Leonard Xu [1] https://i

Re:Re:flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 Zhou Zach
项目里引用的mysql: mysql mysql-connector-java 5.1.46 使用的Mysql版本是5.7.18-log 如果mysql里面的字段是bigint,建表转换成int吗,会有截断风险吧 At 2020-06-11 13:39:18, "chaojianok" wrote: >检查一下你项目里引入的 MySQL 包的版本是否和你使用的 MySQL 版本一致,或者也可以直接转换一下数据类型。 > > > > > > > > > > > > > > > > > >At 2020-06-11 13:22:07, "

Re:Re: flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 Zhou Zach
flink版本是1.10.0, mysql表: CREATE TABLE `analysis_gift_consume` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `times` int(8) NOT NULL COMMENT '时间[MMdd]', `gid` int(4) NOT NULL DEFAULT '0' COMMENT '礼物ID', `gname` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '礼物

Re: flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 Leonard Xu
Hi, 用的 flink 版本是多少? 数据库的字段确定是 bigint 类型吗? > Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast > to java.lang.Long java.math.BigInteger 的范围比 java.lang.Long的范围大很多,是不能cast的,应该是你数据类型对应错误了,可以把mysql 表的schema贴下吗? 祝好, Leonard Xu > 在 2020年6月11日,13:22,Zhou Zach 写道: > > SLF4

Re:flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 chaojianok
检查一下你项目里引入的 MySQL 包的版本是否和你使用的 MySQL 版本一致,或者也可以直接转换一下数据类型。 At 2020-06-11 13:22:07, "Zhou Zach" wrote: >SLF4J: Class path contains multiple SLF4J bindings. > >SLF4J: Found binding in >[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-i

flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 Zhou Zach
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/

Re: keyed state在不同算子中使用

2020-06-10 文章 Congxian Qiu
你好 现在 KeyedState 是不能跨算子使用的,也就是不同的算子使用的是不同的 state。 Best, Congxian Z-Z 于2020年6月11日周四 上午10:11写道: > 请问,假设两个算子以相同的字段keyby,它们可以使用相同的StateDescriptor从而使用相同的的keyed state吗

keyed state????????????????

2020-06-10 文章 Z-Z
??keybyStateDescriptorkeyed state??

Re: kafka????????

2020-06-10 文章 ??????
Mikey

Re: flink 1.10SQL 报错问题求教

2020-06-10 文章 godfrey he
hi chenkaibit 欢迎将fix贡献回社区 chenkaibit 于2020年6月9日周二 上午10:34写道: > 我也遇到过这个问题,这个可能是 checkpoint 的 bug,我修改了下 flink 源码后不报错了,可以参考下这个patch > https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19 > > > 在 2020-06-05 15:06:48,"hb" <343122...@163.com> 写道: > >Flink SQL 作业, 开始运行

Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-10 文章 Congxian Qiu
Hi 从错误栈看是因为 task 端 snapshot 出问题了,原因是 “Caused by: java.lang.IllegalStateException: Pending record count must be zero at this point: 5”,需要看一下为什么会走到这里 Best, Congxian 李奇 <359502...@qq.com> 于2020年6月10日周三 下午5:57写道: > > 哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内

Re: 关于DataStreamUtils.reinterpretasKeyedStream的使用

2020-06-10 文章 Jark Wu
Hi, 你可以再用 DataStreamUtils.reinterpretAsKeyedStream(proStream, new MyKeySelector2(groupKeys) ) 把它解释成 KeyedStream。 因为你的 flatmap 和 上游节点之间如果并发一样的话,运行时是会 chain 在一起的,所以key的分布没有变。 Best, Jark On Wed, 10 Jun 2020 at 21:15, Yichao Yang <1048262...@qq.com> wrote: > Hi > > > flatmap之后返回的本身就不是Keyedstream哈,k

Re: kafka相关问题

2020-06-10 文章 Jark Wu
Hi, 我的理解是你想要获取 kafka 里面的最新一条数据,然后就结束? 类似于 kafka 的命令? ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic xxx --max-messages 1 在 Flink 里面表达出来就是 select * from kafka limit 1 的批处理结果,只不过现在这个 query 会一直运行(流模式),不会结束。 Best, Jarrk On Wed, 10 Jun 2020 at 21:44, Mikey <359502...@qq.com> wrot

Re: kafka相关问题

2020-06-10 文章 Mikey
hi,可以取最新的一条数据: select id, last_value(value) over (partition by id order by id range between 1 prodding and current row ) as cur_value from table_ddl 通过分组分组获取最新的一条数据。 具体可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html

Re: kafka????????

2020-06-10 文章 ??????
kafka

??????????DataStreamUtils.reinterpretasKeyedStream??????

2020-06-10 文章 Yichao Yang
Hi flatmapKeyedstreamkeyby??keyedstream?? Best, Yichao Yang --  -- ??: ""<318666...@qq.com>; : 2020??6??10??(??) 7:18 ??: "user-zh"

Re: 关于flinksql between问题

2020-06-10 文章 Leonard Xu
> > 非常感谢,使用的flink1.10.0 中流转成表是是否有字段的数目的限制 我把流转成表 我的流是一个封装的实体类 转换的时候没有这个字段数目的限制的,另外看你的字段也不是很多,一般业务上几百个字段都正常的,你检查下你字段的对应关系 祝好, Leonard Xu > tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id) >     > tnv.registerDataStream("OMstream",value,'original_netw

回复:kafka相关问题

2020-06-10 文章 kcz
你这个表达,实时kafka的一条记录,你要最新的那个是吧,你最新的判断标准是什么?根据什么特性来,表达清楚一点哇。 -- 原始邮件 -- 发件人: 小学生 <201782...@qq.com> 发送时间: 2020年6月10日 18:15 收件人: user-zh

Re:flink??????????????????

2020-06-10 文章 Matt Wang
kafka ?? 0.11.0 ?? flink EXACTLY-ONCE?? send ?? kafka commit ?? kafka ?? isolation.level commit --- Best,

关于DataStreamUtils.reinterpretasKeyedStream的使用

2020-06-10 文章 绘梦飘雪
hi   我有这样一个场景,以多个相同的key.做keyby, DataStream resStream =  demoStream.keyBy(groupKeys)             .flatMap(new MyFlatmapFunction())             .keyBy(groupKeys)             .process(new MyProcessFunction())             .keyBy(groupKeys)             .timeWindow(Time.seconds(1))             .aggregate

Re: FLINK SQL文档示例是否正确

2020-06-10 文章 Jark Wu
Good catch! 在 Flink 中需要用 mod(a, 4) 做取余运算。 % 不是一个 SQL 标准操作符。 我开了个 issue 去跟进这个问题:要么改文档,要么允许 % 操作符。 https://issues.apache.org/jira/browse/FLINK-18240 Best, Jark On Wed, 10 Jun 2020 at 18:34, 张韩 wrote: > 问题: > 文档( > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/querie

FLINK SQL文档示例是否正确

2020-06-10 文章 张韩
问题: 文档(https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#scan-projection-and-filter)使用'%'求余运算,在SQL解析报错: org.apache.calcite.sql.parser.SqlParseException: Percent remainder '%' is not allowed under the current SQL conformance level MYSQL conformance 支持'%'运算,使用

Re: kafka????????

2020-06-10 文章 ??????
??kafkamysql??where??kafka

Re: kafka相关问题

2020-06-10 文章 Jingsong Li
Hi, 小学生 你可以仔细描述下你的业务场景吗?然后再描述下问题,没懂到底是想要什么。 Best, Jingsong Lee On Wed, Jun 10, 2020 at 3:46 PM 方盛凯 wrote: > 那你可以调整sql语句,例如使用limit关键字, topN 或使用自定义的方法,具体的你可以根据你的sql语句不断调整. > 如有错误欢迎指正 > > 小学生 <201782...@qq.com> 于2020年6月10日周三 下午3:26写道: > > > 您好,我是通过select * from > > table_ddl这个去触发的,但是就是因为table_ddl

Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-10 文章 李奇
哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内容,好像是你kafka数据有问题样,你可以往这个方向看看数据是否正常。解析是否正确。 > 在 2020年6月10日,下午1:24,Zhefu PENG 写道: > > 补充一下,在TaskManager发现了如下错误日志: > > 2020-06-10 12:44:40,688 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - E

Re: kafka????????

2020-06-10 文章 ??????
??table_ddlselect * from table_ddl

Re: kafka相关问题

2020-06-10 文章 方盛凯
那你有没有尝试过修改connector中property中connector.startup-mode 设置为latest-offset,这样子每次从kafka读取都是读取最新的消息。 另外,我想问一下 你的sql是一直运行的吗? 我给的limit方案是一个upersert流。 小学生 <201782...@qq.com> 于2020年6月10日周三 下午5:31写道: > limit 没有用呀。有没有切实可行的方案呢,pyflink下。

Re: sink mysql 失败

2020-06-10 文章 李奇
用户名密码没有设置。 > 在 2020年6月10日,下午5:42,Zhou Zach 写道: > > 感谢回复!忘记设置用户名和密码了。。 > > > > > > > > > > > > > > > > > > At 2020-06-10 16:54:43, "wangweigu...@stevegame.cn" > wrote: >> >> Caused by: java.sql.SQLException: Access denied for user ''@'localhost' >> (using password: NO) >> 得指

Re:回复: sink mysql 失败

2020-06-10 文章 Zhou Zach
感谢回复!忘记设置用户名和密码了。。 At 2020-06-10 16:54:43, "wangweigu...@stevegame.cn" wrote: > >Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using >password: NO) >得指定下有操作mysql这个表的权限账号了! > > > >发件人: Zhou Zach >发送时间: 2020-06-10 16:32 >收件人: Flink user-zh mailing list

Re: kafka????????

2020-06-10 文章 ??????
limit ??pyflink

Re: Flink sql 状态清理问题

2020-06-10 文章 Benchao Li
Hi, Join算子的state是支持清理的。 可以提供下以下信息: - Flink 版本 - planner (blink planner / old planner) op <520075...@qq.com> 于2020年6月10日周三 下午4:08写道: > hi, > 写了个测试程序: > > .. > > val tConfig = bstEnv.getConfig > > confg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25)) > > .. > > val q1=bstEnv.

Re: TTL 支不支持自然日

2020-06-10 文章 Jingsong Li
Hi, 我觉得可以有基于watermark的状态清理这种机制。 但是 SQL的语义不太好描述这种机制,所以业务上能不能算出一个day来加上?比如group by的字段加上这个day,这样可以隔天后数据独立? Best, Jingsong Lee On Wed, Jun 10, 2020 at 3:12 PM star <3149768...@qq.com> wrote: > 感谢您的建议,实时和离线的sink的目标表是一样的。 > 举个场景: > 比如计算用户购买商品的种类数量(select count(distinct product) from > order),这种计算需要基

回复: sink mysql 失败

2020-06-10 文章 wangweigu...@stevegame.cn
Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using password: NO) 得指定下有操作mysql这个表的权限账号了! 发件人: Zhou Zach 发送时间: 2020-06-10 16:32 收件人: Flink user-zh mailing list 主题: sink mysql 失败 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:

??????flink??????????????????

2020-06-10 文章 1193216154
read uncommit ??read commint. read uncommit ??flink??commit?? read commit?? --  -- ??: "??"

sink mysql 失败

2020-06-10 文章 Zhou Zach
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/

????: ?????? ????flinksql between????

2020-06-10 文章 wangweigu...@stevegame.cn
??valuemysqlinst ?? ?? 2020-06-10 15:25 user-zh ?? ?? flinksql between flink1.10.0 ?? tnv.registerDataStrea

Flink sql ????????????

2020-06-10 文章 op
hi?? .. val tConfig = bstEnv.getConfigconfg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25))..val q1=bstEnv.sqlQuery( """select createTime,feedid from source |where circleName is not null |and circleName not in('','_') |and action = 'C_FEED_ED

Re: kafka相关问题

2020-06-10 文章 方盛凯
那你可以调整sql语句,例如使用limit关键字, topN 或使用自定义的方法,具体的你可以根据你的sql语句不断调整. 如有错误欢迎指正 小学生 <201782...@qq.com> 于2020年6月10日周三 下午3:26写道: > 您好,我是通过select * from > table_ddl这个去触发的,但是就是因为table_ddl的不断增大,脱离我的业务需要,我业务需要的就是触发select * from > table_ddl这个时候,只取最新的一条数据(就是保证table_ddl表总只有一条数据)

?????? ????flinksql between????

2020-06-10 文章 ??????
flink1.10.0 ?? tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id)     tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'

Re: kafka????????

2020-06-10 文章 ??????
??select * from table_ddl??table_ddl??select * from table_ddl??table_ddl??

Re: kafka相关问题

2020-06-10 文章 方盛凯
我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。 至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗? 我个人猜可能有两种方案: 1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始; 2.定期向文件系统写入数据。 小学生 <201782...@qq.com> 于2020年6月10日周三 下午2:48写道: > 各位大佬好,请教一个问题: > 利用

??????TTL ??????????????

2020-06-10 文章 star
??sink?? ?? select count(distinct product) from order),?? 2100w???

Re: Re: flink如何传递全局变量

2020-06-10 文章 zjfpla...@hotmail.com
我看到这篇文章介绍的比较详细:https://www.cnblogs.com/029zz010buct/p/10362451.html zjfpla...@hotmail.com 发件人: Px New 发送时间: 2020-06-10 09:54 收件人: user-zh 主题: Re: flink如何传递全局变量 对 正如 -> 1048262223 所说的一样 , 目前我就是通过BroadCast 动态更细一些规则带到下游并在Process method 中 进行操作 |  zjfpla...@hotmail.com 于2020年6月9日周二 下午8:14写道: