Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
1.10.1最近正在准备发布,还有几个blocker的issue,应该快了。 1.11的话,应该还比较久,现在都还没有feature freeze。 如果你可以在master上复现这个问题的话,可以建一个issue。 111 于2020年4月16日周四 上午11:32写道: > Hi, > 是的,我都有修改. > 那我去jira里面重新开个issue? > > > 另外,1.10.1或者1.11大概什么时间发布呢?我已经合并了很多PR,现在的版本有点乱了。 > Best, > Xinghalo -- Benchao Li School of Electronics

回复: flink-1.10-sql 维表问题

2020-04-15 文章 111
Hi, 是的,我都有修改. 那我去jira里面重新开个issue? 另外,1.10.1或者1.11大概什么时间发布呢?我已经合并了很多PR,现在的版本有点乱了。 Best, Xinghalo

回复: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 文章 xue...@outlook.com
双流join涉及的问题我罗列完整一下: 前提: 假设有两个流 其中一个流aStream非常庞大,基于时间单位是秒,源源不断的生产数据 另外一个流bStream也是很大,是不太可能基于内存做维度数据全局缓冲或者LRU淘汰,因为aStream使用bStream足够分散和随机,基于时间单位是天或者月,会持续不断的变化,部分数据或者长期不变 影响流的因子: 1、 系统集群资源,主要是内存 2、流速 3、不同流数据变化的时间单位不一致 4、同一流内数据变化的时间单位不一致 目标: 因两个流的数据都原样的保留下来,重算时,要保持每次

Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
https://issues.apache.org/jira/browse/FLINK-16068 https://issues.apache.org/jira/browse/FLINK-16345 上面这两个issue的修改都加到了1.10上了么?如果是的话,那这可能是还有其他的bug。 如果你可以在1.10和或者master分支的最新代码上复现这个问题的话,可以建一个issue来跟踪下这个问题。 111 于2020年4月16日周四 上午10:46写道: > Hi, > 基于1.10 源码按照jira里面的PR修改不行么? > 跟hbase的ddl关系应该不大,就发一个kafka的吧

Re: 求依赖包

2020-04-15 文章 Benchao Li
Hi, 这个问题是一个已知问题[1],已经在1.10.1和master修复了。但是现在1.10.1还没有发布。 你可以直接下载flink源码,用release-1.10分支编译一个。 [1] https://issues.apache.org/jira/browse/FLINK-16170 samuel@ubtrobot.com 于2020年4月16日周四 上午9:57写道: > 大家好,有哪位大神有现成的包,非常感谢! > > flink-connector-elasticsearch7_2.11 > > -- >

回复: flink-1.10-sql 维表问题

2020-04-15 文章 111
Hi, 基于1.10 源码按照jira里面的PR修改不行么? 跟hbase的ddl关系应该不大,就发一个kafka的吧。 //代码占位符 Flink SQL> CREATE TABLE kafka_test1 ( //代码占位符 Flink SQL> CREATE TABLE kafka_test1 ( > id varchar, > a varchar, > b int, > ts as PROCTIME() > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11

Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
Hi, 你提到的这两个issue都是在1.10.1版本中才会修复,但是现在还没有release1.10.1版本。 你现在是用release-1.10 branch编译的么? 此外,是否方便也贴一下完整的DDL以及query呢? 111 于2020年4月16日周四 上午8:22写道: > Hi, > 更正一下,我的问题跟这个类似,遇到的问题也在评论中: > > https://issues.apache.org/jira/browse/FLINK-16345?jql=text%20~%20%22Caused%20by%3A%20java.lang.AssertionError%3A%2

Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 文章 Benchao Li
我感觉双流join如果要保证结果是一致的,需要用事件时间,而不是处理时间或者是摄入时间。 如果可能,建议尝试下基于事件时间的双流join。 xue...@outlook.com 于2020年4月16日周四 上午9:15写道: > 双流join对数据和环境的要求很严格,这样适用的业务场景就会有限。没有办法保证环境和数据一致是严格符合的 > > 发送自 Windows 10 版邮件应用 > > 发件人: tison > 发送时间: 202

Re: flink cep 匹配一段时间类A,B,C事件发生

2020-04-15 文章 Dian Fu
类似于这样? AA follow by BB follow by CC AA定义成A or B or C BB定义成(A or B or C)and BB.type != AA.type CC定义成(A or B or C)and CC.type != AA.type and CC.type != BB.type > 在 2020年4月16日,上午8:40,Peihui He 写道: > > hello,all > >我这个边需要匹配一段时间内A,B,C事件同时发生,但是不要求A,B,C事件的顺序,flink cep有什么好的方式不? > > 有

求依赖包

2020-04-15 文章 samuel....@ubtrobot.com
大家好,有哪位大神有现成的包,非常感谢! flink-connector-elasticsearch7_2.11 深圳市优必选科技股份有限公司 | 平台软件部 邱钺 Samuel Qiu 手机/微信: +0086 150 1356 8368 Email: samuel@ubtrobot.com UBTECH Robotics | www.ubtrobot.com 广东省深圳市南山区平山路鸿莱科创楼13栋3楼优必选 From: samuel@ubtrobot.com Date: 2020-04-15 17:37 To: user-zh Subject: flink

回复: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 文章 xue...@outlook.com
双流join对数据和环境的要求很严格,这样适用的业务场景就会有限。没有办法保证环境和数据一致是严格符合的 发送自 Windows 10 版邮件应用 发件人: tison 发送时间: 2020年4月15日 22:26 收件人: user-zh 主题: Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题 FYI h

flink cep 匹配一段时间类A,B,C事件发生

2020-04-15 文章 Peihui He
hello,all 我这个边需要匹配一段时间内A,B,C事件同时发生,但是不要求A,B,C事件的顺序,flink cep有什么好的方式不? 有个方案是 定义多个模式组,每个模式组是A,B,C事件的一次排列组合,但是这样比较麻烦,如果事件个数多的话,需要写太多组合。 best wish

回复: flink-1.10-sql 维表问题

2020-04-15 文章 111
Hi, 我的时间字段就是proctime()产生的...因为当时有个time关键字的bug,所以按照这个confluence进行了修正。 后来使用时间字段的时候,就出了现在的问题。 https://issues.apache.org/jira/browse/FLINK-16068 Best, Xinghalo 在2020年04月15日 21:21,Benchao Li 写道: 这个原因是维表join的时候需要使用的时间是*有处理时间属性*[1] 的。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table

回复:flink1.9,后台提交job失败

2020-04-15 文章 胡泽康
是不是没有执行操作啊。例如print、collect等方法 --原始邮件-- 发件人:"guanyq "

回复: flink-1.10-sql 维表问题

2020-04-15 文章 111
Hi, 更正一下,我的问题跟这个类似,遇到的问题也在评论中: https://issues.apache.org/jira/browse/FLINK-16345?jql=text%20~%20%22Caused%20by%3A%20java.lang.AssertionError%3A%20Conversion%20to%20relational%20algebra%20failed%20to%20preserve%20datatypes%3A%22 Best, Xinghalo 在2020年04月16日 08:18,111 写道: Hi, 我的时间字段就是proctime()产生的..

Re:flink1.9,后台提交job失败

2020-04-15 文章 guanyq
ok 找到原因了!不好意思! 在 2020-04-16 08:03:29,"guanyq" 写道: >代码里面是有env.execute,提交job出现以下错误,可能时什么原因? >The program didn't contain a Flink job. Perhaps you forgot to call execute() >on the execution environment.

flink1.9,后台提交job失败

2020-04-15 文章 guanyq
代码里面是有env.execute,提交job出现以下错误,可能时什么原因? The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment.

Re: flink 1.7.2 YARN Session模式提交任务问题求助

2020-04-15 文章 tison
注意环境变量和 fs.hdfs.hdfsdefault 要配置成 HDFS 路径或 YARN 集群已知的本地路径,不要配置成客户端的路径。因为实际起作用是在拉起 TM 的那台机器上解析拉取的。 Best, tison. Chief 于2020年4月15日周三 下午7:40写道: > hi Yangze Guo > 您说的环境变量已经在当前用户的环境变量文件里面设置了,您可以看看我的问题描述,现在如果checkpoint的路径设置不是namenode > ha的nameservice就不会报错,checkpoint都正常。 > > > > > -- 原

Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 文章 tison
FYI https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamp_extractors.html IngestionTime 的时间基准是进入 Flink 系统即 So

Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 文章 tison
IngestionTime 多次运行结果不一样很正常啊,试试 event time? Best, tison. xuefli 于2020年4月15日周三 下午10:10写道: > 遇到一个非常头痛的问题 > > Flink1.10的集群,用hdfs做backend > > 一个流aStream准备了10亿的数据,另外一个流bStream百万 > 如果如下操作 > > 我遇到一个问题 双流Join > 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,

双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 文章 xuefli
遇到一个非常头痛的问题 Flink1.10的集群,用hdfs做backend 一个流aStream准备了10亿的数据,另外一个流bStream百万 如果如下操作 我遇到一个问题 双流Join  带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同  。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后  再对cStream进行keyBy-->timeWind

Re: 请问有没有什么方法可以把checkpoint打到集群外的hdfs?

2020-04-15 文章 tao wang
多谢两位 Yangze and LakeShen,我研究一下。 Yangze Guo 于2020年4月15日周三 下午3:45写道: > 1. flink会去读YARN_CONF_DIR or HADOOP_CONF_DIR这两个环境变量 > 2. 我理解这和你flink运行的集群是解耦的,只要你dir的路径不变,就会从那个dir找checkpoint恢复 > > Best, > Yangze Guo > > On Wed, Apr 15, 2020 at 3:38 PM tao wang wrote: > > > > 多谢回复, 还有几个问题请教: > > 1、外部集群的hdfs

Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
这个原因是维表join的时候需要使用的时间是*有处理时间属性*[1] 的。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html 111 于2020年4月15日周三 下午9:08写道: > Hi, > 现在直接使用DDL声明可以正常使用吗?我这边使用的时候发现,FOR SYSTEM_TIME AS OF o.proctime > 会报类型不匹配问题…timestamp(3)和time attribute 不匹配. > > > 所以现在只能使

回复: flink-1.10-sql 维表问题

2020-04-15 文章 111
Hi, 现在直接使用DDL声明可以正常使用吗?我这边使用的时候发现,FOR SYSTEM_TIME AS OF o.proctime 会报类型不匹配问题…timestamp(3)和time attribute 不匹配. 所以现在只能使用connector内部提供的Lookup function手动注册成 table function,使用lateral table xxx的语法来使用。 Best, Xinghalo

Re: flink-1.10-sql 维表问题

2020-04-15 文章 Zhenghua Gao
JDBC connector 支持作为维表,DDL无需特殊字段指定。部分可选的参数可以控制temporary join行为[1]。 用作维表join时,需要使用特殊的join语法 [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins *B

Re: flink-1.10-sql 维表问题

2020-04-15 文章 Benchao Li
Hi, 维表创建的DDL跟普通的source没有区别,主要是在使用的时候,需要使用维表join专有的语法。 SELECT o.amout, o.currency, r.rate, o.amount * r.rateFROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency guaishushu1...@163.com 于2020年4月15日周三 下午7:48写道: > hi 大家 > 想问下flink-1.10-sql支

flink-1.10-sql 维表问题

2020-04-15 文章 guaishushu1...@163.com
hi 大家 想问下flink-1.10-sql支持维表DDL吗,看社区文档好像mysql和hbase支持,但是需要什么字段显示声明为创建的表是维表呀? guaishushu1...@163.com

?????? flink 1.7.2 YARN Session????????????????????

2020-04-15 文章 Chief
hi Yangze Guo ??checkpoint??namenode ha??nameservicecheckpoint --  -- ??: "Yangze Guo"https://ci.apache.org/projects/flink/flink-docs-

Re: 关于状态TTL

2020-04-15 文章 Benchao Li
Hi, 你用的是哪个版本呢? 在1.9版本里面的确是有点问题,默认没有开启cleanup in background [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。 [1] https://issues.apache.org/jira/browse/FLINK-15938 酷酷的浑蛋 于2020年4月15日周三 下午5:40写道: > > > 我在flink sql中设置了 > tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); > sql: select * fr

Re: flink-sql-connector-elasticsearch7_2.11-1.10.0.jar

2020-04-15 文章 Benchao Li
Hi, 这个是个已知问题[1],已经在1.10.1和master上修复了。你可以尝试下~ [1] https://issues.apache.org/jira/browse/FLINK-16170 samuel@ubtrobot.com 于2020年4月15日周三 下午5:37写道: > 在提交job后,发现不成功,这个问题要怎么解决? > 版本:Flink1.10.0 elasticsearch:7.6.0 > > 看了源码,确实是没这个类的: > > Caused by: java.lang.NoClassDefFoundError: > org/apache/fli

关于状态TTL

2020-04-15 文章 酷酷的浑蛋
我在flink sql中设置了 tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6)); sql: select * from test t join test2 t2 on t.a=t2.a 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明

flink-sql-connector-elasticsearch7_2.11-1.10.0.jar

2020-04-15 文章 samuel....@ubtrobot.com
在提交job后,发现不成功,这个问题要怎么解决? 版本:Flink1.10.0 elasticsearch:7.6.0 看了源码,确实是没这个类的: Caused by: java.lang.NoClassDefFoundError: org/apache/flink/elasticsearch7/shaded/org/elasticsearch/script/mustache/SearchTemplateRequest at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCall

Re: FlinkSQL构建流式应用checkpoint设置

2020-04-15 文章 godfrey he
Hi Even, 1. 目前 SQL CLI 支持通过在 sql-client-default.yaml 里设置 parallelism 和 max-parallelism 来控制 CLI 任务的默认并发。或者通过 set 命令,如 set execution.parallelism=10;放方式动态设置。例外,对于如果使用 blink planner,可以用 table.exec.resource.default-parallelism 来配置默认并发。[1] 另外 SQL CLI 还不支持 checkpoint 的设置。 2. 目前 SQL CLI 默认是 in-memory cata

Re: 关于FLINK PYTHON UDF

2020-04-15 文章 Xingbo Huang
Hi, 我刚刚在本地完全模拟了你的数据和核心的代码,是可以在sink里拿到结果的。 我把我的测试代码放到附件里面了, 你可以参考一下,如果还是不行的话,可以提供下你的代码再帮你看一下 Best, Xingbo 秦寒 于2020年4月15日周三 下午3:16写道: > 你好 > >我在使用kafka produce数据后,在python中使用UDF做一个add function,但是最后的sink > 文件里面没有任何数据, > > 如果不用UDF的话直接获取一个数据在最后的sink文件里面是有数据的如下所示,DEBUG很久也不清楚是什么原因是否能帮忙分下 > > >

Re: 关于flink检查点

2020-04-15 文章 half coke
是的,根据任务负载的变化自动调整checkpoint的间隔,或者可以通过用户写的逻辑调整检查点。 刚开始学习flink,想请教一下。 Congxian Qiu 于2020年4月15日周三 下午12:33写道: > hi > > 你说的间隔自适应是指什么呢?是指做 checkpoint 的间隔自动调整吗? > > Best, > Congxian > > > half coke 于2020年4月15日周三 下午12:24写道: > > > 请问下为什么flink没有支持自适应检查点间隔呢?是出于什么样的考虑吗? > > >

Re: flinksql如何控制结果输出的频率

2020-04-15 文章 Benchao Li
非常开心能够帮助到你~ 刘建刚 于2020年4月15日周三 下午3:57写道: > 感谢 Benchao,问题应解决了! > > 2020年4月15日 下午3:38,Benchao Li 写道: > > Hi 建刚, > > 现在Emit的原理是这样子的: > - *当某个key*下面来了第一条数据的时候,注册一个emit delay之后的*处理时间定时器*; > - 当定时器到了的时候, > - 检查当前的key下的聚合结果跟上次输出的结果是否有变化, > - 如果有变化,就发送-[old], +[new] 两条结果到下游; > - 如果是*没有变化,则不做

Re: flinksql如何控制结果输出的频率

2020-04-15 文章 刘建刚
感谢 Benchao,问题应解决了! > 2020年4月15日 下午3:38,Benchao Li 写道: > > Hi 建刚, > > 现在Emit的原理是这样子的: > - 当某个key下面来了第一条数据的时候,注册一个emit delay之后的处理时间定时器; > - 当定时器到了的时候, > - 检查当前的key下的聚合结果跟上次输出的结果是否有变化, > - 如果有变化,就发送-[old], +[new] 两条结果到下游; > - 如果是没有变化,则不做任何处理; > - 再次注册一个新的emit delay之后的处理时间定时器。 > > 你可

Re: Flink On Yarn , ResourceManager is HA , if active ResourceManager changed,what is flink task status ?

2020-04-15 文章 Xintong Song
Normally, Yarn RM switch should not cause any problem to the running Flink instance. Unless the RM switch takes too long and Flink happens to request new containers during that time, it might lead to resource allocation timeout. Thank you~ Xintong Song On Wed, Apr 15, 2020 at 3:49 PM LakeShen

Re: 请问有没有什么方法可以把checkpoint打到集群外的hdfs?

2020-04-15 文章 Yangze Guo
1. flink会去读YARN_CONF_DIR or HADOOP_CONF_DIR这两个环境变量 2. 我理解这和你flink运行的集群是解耦的,只要你dir的路径不变,就会从那个dir找checkpoint恢复 Best, Yangze Guo On Wed, Apr 15, 2020 at 3:38 PM tao wang wrote: > > 多谢回复, 还有几个问题请教: > 1、外部集群的hdfs-site, core-site 这些怎么配置? > 2、另外一个角度, 如果我把任务迁移到另外一个集群,如何让它从老的集群的checkpoint 恢复。 > > Yangze

Flink On Yarn , ResourceManager is HA , if active ResourceManager changed,what is flink task status ?

2020-04-15 文章 LakeShen
Hi community, I have a question about flink on yarn ha , if active resourcemanager changed, what is the flink task staus. Is flink task running normally? Should I must restart my flink task to run? Thanks to your reply. Best, LakeShen

Re: flinksql如何控制结果输出的频率

2020-04-15 文章 Benchao Li
Hi 建刚, 现在Emit的原理是这样子的: - *当某个key*下面来了第一条数据的时候,注册一个emit delay之后的*处理时间定时器*; - 当定时器到了的时候, - 检查当前的key下的聚合结果跟上次输出的结果是否有变化, - 如果有变化,就发送-[old], +[new] 两条结果到下游; - 如果是*没有变化,则不做任何处理*; - 再次注册一个新的emit delay之后的处理时间定时器。 你可以根据这个原理,再对照下你的数据,看看是否符合预期。 刘建刚 于2020年4月15日周三 下午3:32写道: > > 我们也经常有固定窗口定期触

Re: 请问有没有什么方法可以把checkpoint打到集群外的hdfs?

2020-04-15 文章 LakeShen
Hi tao wang, 你可以在你的 flink-conf.yaml 里面配置 Checkpoint 的目录,就像楼上 Yangze 所说 state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints/ Best, LakeShen Yangze Guo 于2020年4月15日周三 下午2:44写道: > checkpoint的目录设置key为state.checkpoints.dir > > 你可以这样设置 > state.checkpoints.dir: hdfs://namenode:port/flink/ch

FlinkSQL????????????checkpoint????

2020-04-15 文章 Even
Hi?? ?? 1?? Flink SQL CLI ??DDL??checkpoint?? 2?? Flink SQL CLI ??CLItable??

Re: flinksql如何控制结果输出的频率

2020-04-15 文章 刘建刚
我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码: public class EarlyEmitter { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);

关于FLINK PYTHON UDF

2020-04-15 文章 秦寒
你好 我在使用kafka produce数据后,在python中使用UDF做一个add function,但 是最后的sink文件里面没有任何数据, 如果不用UDF的话直接获取一个数据在最后的sink文件里面是有数据的如下所示,DEBUG 很久也不清楚是什么原因是否能帮忙分下 Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1} 测试结果 Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1} st_env.from_path("source")\

Re: flink java.util.concurrent.TimeoutException

2020-04-15 文章 Yangze Guo
日志上看是Taskmanager心跳超时了,如果tm还在,是不是网络问题呢?尝试把heartbeat.timeout调大一些试试? Best, Yangze Guo On Mon, Apr 13, 2020 at 10:40 AM 欧阳苗 wrote: > > job运行了两天就挂了,然后抛出如下异常,但是taskManager没有挂,其他的job还能正常在上面跑,请问这个问题是什么原因导致的,有什么好的解决办法吗 > > > 2020-04-13 06:20:31.379 ERROR 1 --- [ent-IO-thread-3] > org.apache.flink.runtim

Re: flink 1.7.2 YARN Session模式提交任务问题求助

2020-04-15 文章 Yangze Guo
Flink需要设置hadoop相关conf位置的环境变量 YARN_CONF_DIR or HADOOP_CONF_DIR [1] [1] https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html Best, Yangze Guo On Mon, Apr 13, 2020 at 10:52 PM Chief wrote: > > 大家好 > 目前环境是flink 1.7.2,使用YARN Session模式提交任务,Hadoop 版本2.7.3,hdfs > name