Re: 关于flink中端对端的精确一次性理解问题

2019-08-27 Thread Yun Tang
Hi 可以看一下TwoPhaseCommitSinkFunction的实现,preCommit是在snapshotState时调用,会将当前的currentTransactionHolder存储到pendingCommitTransactions,直到notifyCheckpointComplete时(也就是commit时),将pendingCommitTransactions取出进行事务性操作。所以preCommit时候不需要写是不符合语义的。 如果借助TwoPhaseCommitSinkFunction,确实需要适当减少checkpoint interval,否则可能很久都没有输出

在子查询上使用row_number over返回的rn都是1

2019-08-27 Thread ddwcg
如果直接查询表是没问题,但是业务需求是按汇总后的amount排序,所以有一个from子查询,请问有没有什么方法汇总后求topN select id,province,amount,rn from( select id,province,amount, row_number() over(partition by province order by amount desc ) as rn from ( select id,province,sum(amount) amount from mytable group by id,province )m )a wh

flink1.7.2如何进行hdfs的kerberos认证

2019-08-27 Thread 杨文生-java开发
public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ParameterTool .fromPropertiesFile(BizlogStreamWithEventTimeCleaner.class.getResourceAsStream(PROPERTIES_FILE_NAME)) .mergeWith(ParameterTool.fromArgs(args)

Fwd: PathIsNotEmptyDirectoryException 异常

2019-08-27 Thread Andrew Lin
https://issues.apache.org/jira/browse/FLINK-13856 可能是 s3a的问题,删除操作是不是阻塞的? 报错看起来是因删除文件夹时候前面的删除操作还没完成。文件不是空的 > 下面是被转发的邮件: > > 发件人: 王佩 > 主题: PathIsNotEmptyDirectoryException 异常 > 日期: 2019年8月24日 GMT+8 下午4:49:36 > 收件人: user-zh > 回复-收件人: user-zh@f

Re: 任务内存增长

2019-08-27 Thread Xintong Song
你用的是heap state backend吗?可以看下checkpoint size是否持续在增大,如果是的话很可能就是state增大导致的。作业运行后,随着处理的数据越来越多,state的key数量也会越来越多,大小随之增大。解决方案要么是改用RocksDB,要么是把tm内存配大为state增大留出富裕。 另外,如果checkpoint size持续增长没有趋于平缓的趋势,那么也可能state的使用有问题。 如果观察到不是state的问题,那么可能需要dump下tm的内存,看看是否哪里有内存泄露的情况。 Thank you~ Xintong Song On Mon, Aug

kafka流与hive表join问题

2019-08-27 Thread like
请问一下,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR SYSTEM_TIME AS OF PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了? | | like | | likeg...@163.com | 签名由网易邮箱大师定制

Re: PathIsNotEmptyDirectoryException 异常

2019-08-27 Thread Yun Tang
在Flink看来,删除操作都是同一个线程内顺序执行的,S3AFileSystem.delete 接口看上去也不是异步执行的。出异常的时候,能检查一下chk-50832 目录里面还有什么么? From: Andrew Lin Sent: Tuesday, August 27, 2019 16:53 To: wang...@cmcm.com ; user-zh@flink.apache.org Subject: Fwd: PathIsNotEmptyDirectoryException 异常 https://issu

kafka流与hive表join问题

2019-08-27 Thread like
请问一下各位大佬,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR SYSTEM_TIME AS OF PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会join输出了?

Re: 任务内存增长

2019-08-27 Thread 张坤
感谢您的回复,checkpoint使用的rocksDB,现在查看GC日志得到以下信息,堆内存使用正常,线程数使用在500左右,线程回收,但是线程占用的内存好像并没有回收掉。 在 2019/8/27 下午5:02,“Xintong Song” 写入: 你用的是heap state backend吗?可以看下checkpoint size是否持续在增大,如果是的话很可能就是state增大导致的。作业运行后,随着处理的数据越来越多,state的key数量也会越来越多,大小随之增大。解决方案要么是改用RocksDB,要么是把tm内存配大为state增大留出富裕。

Re: kafka流与hive表join问题

2019-08-27 Thread Jeff Zhang
你是怎么join hive表的,能share你的代码吗? like 于2019年8月27日周二 下午5:15写道: > 请问一下各位大佬,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有一个问题是hive表加载完之后不会再做checkpoint?我目前使用的是1.7.1版本,看了1.9的维表join,blink文档说(必须加上FOR > SYSTEM_TIME AS OF > PROCTIME(),表示JOIN维表当前时刻所看到的每条数据),这也就意味着跟我现在一样,没有加载完hive就会joi

Re: flink1.7.2如何进行hdfs的kerberos认证

2019-08-27 Thread Jeff Zhang
See https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#kerberos-based-security 杨文生-java开发 于2019年8月27日周二 下午3:30写道: > > >public static void main(String[] args) throws Exception { > final ParameterTool parameterTool = ParameterTool > > .fromPropertiesFile(Bizlo

Re: 任务内存增长

2019-08-27 Thread Xintong Song
这个邮件列表看不到图片附件的,文本内容可以直接贴出来,图片的话需要放外部链接 Thank you~ Xintong Song On Tue, Aug 27, 2019 at 5:17 PM 张坤 wrote: > > 感谢您的回复,checkpoint使用的rocksDB,现在查看GC日志得到以下信息,堆内存使用正常,线程数使用在500左右,线程回收,但是线程占用的内存好像并没有回收掉。 > > 在 2019/8/27 下午5:02,“Xintong Song” 写入: > > 你用的是heap state backend吗?可以看下checkpoint > > si

回复: kafka流与hive表join问题

2019-08-27 Thread like
我通过HCatInputFormat读取了hive的数据注册了一张表,然后读取kafka的数据也注册了一张表,join就是通过sql写的,没有什么代码逻辑呢。 | | like | | likeg...@163.com | 签名由网易邮箱大师定制 在2019年8月27日 17:17,Jeff Zhang 写道: 你是怎么join hive表的,能share你的代码吗? like 于2019年8月27日周二 下午5:15写道: 请问一下各位大佬,如何保证先加载完hive表,然后再与流join,我发现在hive还没有加载完就已经有join的结果出来,这样刚开始出来的结果是不准确的,还有

Re: kafka流与hive表join问题

2019-08-27 Thread 苏 欣
我之前试过两种方式,但都有各自的问题: 1.注册hiveCatalog然后关联kafka,这种方式会存在hive表数据读取一次后不再更新的问题。 2.实现LookupableTableSource方法,通过jdbc方式读取hive维表来关联kafka,但是这种方式可以做到实时加载hive数据,但是效率比较低,尤其是读取数据量较大的hive表时。 sean...@live.com 发件人: like 发送时间: 2019-08-27 17:15 收件人: user-zh@fl

回复: kafka流与hive表join问题

2019-08-27 Thread like
我通过hive union kafka数据来解决hive数据不更新的问题,我现在碰到的问题是hive没有加载完就开始有join数据输出,比如我取最大日期,hive读取到26号就输出了26,过一会读取到27号,又会输出27,有没有办法让hive全部加载完再join输出,这样就只会有一个值输出了? 在2019年8月27日 17:33,苏 欣 写道: 我之前试过两种方式,但都有各自的问题: 1.注册hiveCatalog然后关联kafka,这种方式会存在hive表数据读取一次后不再更新的问题。 2.实现LookupableTableSource方法,通过jdbc方式读取hive维表来关联ka

flink基于yarn的方式提交,log在web上看太卡了。能不能直接看log文件?

2019-08-27 Thread 陈帅
flink基于yarn的方式提交,log在web上看太卡了。能不能直接看log文件?

flink基于yarn提交,需要依赖很多第三方的包,有没有办法添加classpath之类的,本地测试总是报错

2019-08-27 Thread 陈帅
现在都是把代码打成一个胖包,每次这样,传输太麻烦了

flink 1.8 sql rowtime window ????

2019-08-27 Thread 1142632215
1.mysql binlog??200-3002.??250??4001200-300?? 3.source?? 2 ??id??RetractStream??timestamp??Ka

Re: Re:回复: Re: flink1.9 blink planner table ddl 使用问题

2019-08-27 Thread 徐骁
这部分有文档吗,看了好几圈没看到 hb <343122...@163.com> 于2019年8月26日周一 下午3:34写道: > 感谢,解决了, 指定 'connector.version' = '0.11' 就可以了. > Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题. > > 在 2019-08-26 14:26:15,"hb" <343122...@163.com> 写道: > >kafka版本是 kafka_2.11-1.1.0, > >支持的kafka版本有哪些 > >在 2019-08-26 14:23:19,"pengcheng...@bonc.com

Re: flink基于yarn的方式提交,log在web上看太卡了。能不能直接看log文件?

2019-08-27 Thread Jeff Zhang
你是通过flink UI看log还是yarn ui 看log ? 陈帅 于2019年8月27日周二 下午5:55写道: > flink基于yarn的方式提交,log在web上看太卡了。能不能直接看log文件? > -- Best Regards Jeff Zhang

Checkpoint使用

2019-08-27 Thread yanggang_it_job
关于flink从checkpoint的问题: 1、如果我的并行度发生了改变,怎么从checkpoint启动? 2、是否可以动态设置checkpoint触发时间?

Re: Checkpoint使用

2019-08-27 Thread Congxian Qiu
Hi 1. 你可以按照文档[1]从一个 retained checkpoint 进行恢复,并发度可以改变,但是你需要保证最大并发是一样的 2. 不能动态设置 checkpoint 触发时间 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint Best, Congxian yanggang_it_job 于2019年8月27日周二 下午6:08写道: > 关于flink从checkpo

flink异常恢复

2019-08-27 Thread 王金海
讨论下flink异常重启问题 从kafka消费数据,checkpoint周期在5分钟,如果在第6分钟,因为异常导致flink任务重启,flink是从上个checkpoint恢复呢?还是从异常时的offset恢复呢? csbl...@163.com Have a nice day !

Re: flink异常恢复

2019-08-27 Thread Jeff Zhang
上个checkpoint 王金海 于2019年8月27日周二 下午6:14写道: > 讨论下flink异常重启问题 > > > 从kafka消费数据,checkpoint周期在5分钟,如果在第6分钟,因为异常导致flink任务重启,flink是从上个checkpoint恢复呢?还是从异常时的offset恢复呢? > > > > csbl...@163.com > Have a nice day ! > > -- Best Regards Jeff Zhang

maven配置错误百出

2019-08-27 Thread like
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/hive/

Re: flink1.9 blink planner table ddl 使用问题

2019-08-27 Thread Jark Wu
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector > 在 2019年8月27日,17:59,徐骁 写道: > > 这部分有文档吗,看了好几圈没看到 > > hb <343122...@163.com> 于2019年8月26日周一 下午3

flink????????????

2019-08-27 Thread ????
??flink??ERROR ??flink??log4j.properties??INFO LogManager.getRootLogger().setLevel(Level.ERROR);open??

Re: flink1.9 blink planner table ddl 使用问题

2019-08-27 Thread 徐骁
🤗感谢 Jark Wu 于2019年8月27日周二 下午6:49写道: > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector > < > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector > > > > > 在 2019年8月27日,17:59,徐骁 写道: > > > > 这部分有文档吗

Re: flink日志级别问题

2019-08-27 Thread Zili Chen
另一种思路如果可以 download 日志的话可以直接文本处理挑出 ERROR 日志(x Best, tison. 陈思 <58683...@qq.com> 于2019年8月27日周二 下午6:56写道: > 目的:调整flink作业日志级别为ERROR > > > 背景:公司提交flink作业只能通过公司的可视化平台,因此不能修改集群上的log4j.properties文件,现在的日志级别是INFO,日志太多不方便排错 > > 目前情况:打算在代码中设置日志级别,使用LogManager.getRootLogger().setLevel(Level.ERROR);在算子的open方

回复: flink日志级别问题

2019-08-27 Thread 王金海
日志同步到ES,然后检索error类型的 至于是否可以代码自定义,就不太清楚了 csbl...@163.com Have a nice day ! 在2019年08月27日 19:29,Zili Chen 写道: 另一种思路如果可以 download 日志的话可以直接文本处理挑出 ERROR 日志(x Best, tison. 陈思 <58683...@qq.com> 于2019年8月27日周二 下午6:56写道: 目的:调整flink作业日志级别为ERROR 背景:公司提交flink作业只能通过公司的可视化平台,因此不能修改集群上的log4j.properties文件,

回复: flink日志级别问题

2019-08-27 Thread 王金海
可以日志同步到ES,然后检索error类型的 至于是否可以代码自定义,自己也没试过 csbl...@163.com Have a nice day ! 在2019年08月27日 19:46,王金海 写道: 日志同步到ES,然后检索error类型的 至于是否可以代码自定义,就不太清楚了 csbl...@163.com Have a nice day ! 在2019年08月27日 19:29,Zili Chen 写道: 另一种思路如果可以 download 日志的话可以直接文本处理挑出 ERROR 日志(x Best, tison. 陈思 <58683...@qq.com>

flink1.9 hadoop3 on yarn "StoppableFunction not found"

2019-08-27 Thread Michael Ran
deal ALL : 目前在CDH6.2.0 hadoop3 上,编译了 flink 1.9 提交的时候 异常: CONSOLE#org.apache.flink.client.program.ProgramInvocationException: The program caused an error: CONSOLE# at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironm

Fwd: Checkpoint使用

2019-08-27 Thread Andrew Lin
1,你说的应该是savepoint吧,checkpoint是运行中failover自动恢复的,savepoint如果设置了uid,改变并行度是可以从savepoint启动的 > 下面是被转发的邮件: > > 发件人: yanggang_it_job > 主题: Checkpoint使用 > 日期: 2019年8月27日 GMT+8 下午6:08:24 > 收件人: user-zh@flink.apache.org > 回复-收件人: user-zh@flink.apache.org > > 关于flink从checkpoint的问题: > 1、如果我的并行度发生了改变

Re:Fwd: Checkpoint使用

2019-08-27 Thread yanggang_it_job
您说的也对,但是我们重启的时候也是可以从checkpoint启动的。当我们安装flink的时候可以指定checkpoint的保存地址,同时我们编写任务的时候设置checkpoint的生成规则,当我们需要重启任务的时候直接指定这个地址就可以了。相对于savepoint来说checkpoint是自动生成的同时更轻量。所以我们现在是基于checkpoint重启的,然后就遇到了我提的那个问题。 在 2019-08-27 19:57:37,"Andrew Lin" 写道: >1,你说的应该是savepoint吧,checkpoint是运行中failover自动恢复的,savepoint如果设置了ui

Re: 在子查询上使用row_number over返回的rn都是1

2019-08-27 Thread Jark Wu
Hi, 你使用的是 flink 1.9 blink planner 吧? 首先你的 topn query 没有问题。结果也没有问题。 因为你是根据 province 分组求 top5,也就是每个省份排名前5的 id。但是现在你的数据中,每个省份只有一个 id,所以大家的排名都是1。 如果你想求全局前5名的省份,那么row_number 那里不需要定义 partition by province。 Best, Jark > 在 2019年8月27日,15:16,ddwcg <3149768...@qq.com> 写道: > > 6> (true,id001,上海,647.55,1)

Re: 在子查询上使用row_number over返回的rn都是1

2019-08-27 Thread ddwcg
谢谢回复,是我sql理解的问题,给您添麻烦了。 另外我还想问一下,这个retractstream后面接了一个process算子来维护top5的list,这个算子是不是还需要自己实现维护state的代码? > 在 2019年8月28日,09:53,Jark Wu 写道: > > Hi, > > 你使用的是 flink 1.9 blink planner 吧? > > 首先你的 topn query 没有问题。结果也没有问题。 > 因为你是根据 province 分组求 top5,也就是每个省份排名前5的 id。但是现在你的数据中,每个省份只有一个 id,所以大家的排名都是1。 >

Re: kafka流与hive表join问题

2019-08-27 Thread Jark Wu
Hi, 看了你的问题,主要有两个问题。 1. join hive 维表,没加载完就有 join 输出了。 2. hive 加载完后,就不再做 checkpoint 了。 第一个问题,目前flink 还没有内置支持hive 维表的支持。你可以自己实现一个 udtf 去拉取 hive 数据到内存,udtf 的 eval 方法在加载完 hive 数据之前不返回,这样可以避免没有加载完就有输出的问题。 第二个问题,目前 streaming job 中如果存在 finish vertex,是无法做 checkpoint 的。 Best, Jark > 在 2019年8月27日,17:4

Re: 在子查询上使用row_number over返回的rn都是1

2019-08-27 Thread Jark Wu
为什么还需要后面接 process operator 呢? Flink TopN 已经帮你维护好了 state,直接输出到一个 update sink 中就可以了。 Best, Jark > 在 2019年8月28日,10:08,ddwcg <3149768...@qq.com> 写道: > > process

Re: 在子查询上使用row_number over返回的rn都是1

2019-08-27 Thread ddwcg
因为sink到hbase,使用一个column存了top5的list,sink前我要组合一下这个list > 在 2019年8月28日,10:12,Jark Wu 写道: > > 为什么还需要后面接 process operator 呢? Flink TopN 已经帮你维护好了 state,直接输出到一个 update sink > 中就可以了。 > > > Best, > Jark > >> 在 2019年8月28日,10:08,ddwcg <3149768...@qq.com> 写道: >> >> process >

回复: kafka流与hive表join问题

2019-08-27 Thread like
Hi Jark 非常感谢你提供的方案,我不了解udtf加载完后是一种什么情况,是每一个task都会维护一份hive数据吗?这应该会带来很大的开销。还有一个问题就是1.9中的维表join是不是跟现在一样呢?有什么特别的地方吗? 在2019年8月28日 10:10,Jark Wu 写道: Hi, 看了你的问题,主要有两个问题。 1. join hive 维表,没加载完就有 join 输出了。 2. hive 加载完后,就不再做 checkpoint 了。 第一个问题,目前flink 还没有内置支持hive 维表的支持。你可以自己实现一个 udtf 去拉取 hive 数

flink1.9 Blink planner create view 问题

2019-08-27 Thread hb
注册了T表后,创建view报错 tEnv.sqlUpdate(s"create view v1 as select * from T") Exception in thread "main" org.apache.flink.table.api.TableException: Unsupported node type SqlCreateView 是用错方法了,还是不支持

Re: kafka流与hive表join问题

2019-08-27 Thread Jark Wu
Hi like, > udtf加载完后是一种什么情况,是每一个task都会维护一份hive数据吗? 是的 > 还有一个问题就是1.9中的维表join是不是跟现在一样呢?有什么特别的地方吗? 1.9 中支持的维表 join,只支持 lookup 方式查询维表,其实现方式和 UDTF 是类似的。 Best, Jark > 在 2019年8月28日,10:57,like 写道: > > Hi Jark > > > 非常感谢你提供的方案,我不了解udtf加载完后是一种什么情况,是每一个task都会维护一份hive数据吗?这应该会带来很大的开销。还有一个问题就是1.9中的维表j

Re: flink1.9 Blink planner create view 问题

2019-08-27 Thread Jark Wu
1.9 还不支持 create view 语法。如果要注册一个 view,可以通过下面的办法: Table table = tEnv.sqlQuery(“select * from T”) tEnv.registerTable(“v1”, table); 然后你就可以在之后的sql 中直接查询 v1了 Best, Jark > 在 2019年8月28日,11:39,hb <343122...@163.com> 写道: > >

Re:回复: flink日志级别问题

2019-08-27 Thread 高飞龙
我也遇到同样的问题,目的是希望在web上只看到error的日志,我们不用es -- 高飞龙 手机 +86 18710107193 gaofeilong198...@163.com 在 2019-08-27 19:51:35,"王金海" 写道: >可以日志同步到ES,然后检索error类型的 >至于是否可以代码自定义,自己也没试过 > > >csbl...@163.com >Have a nice day ! > > >在2019年08月27日 19:46,王金海 写道: >日志同步到ES,然后检索error类型的 >至于是否可以代码自定义,就不太清楚了 > > >csbl

?????? flink????????????

2019-08-27 Thread ????
webERROR -- -- ??: "??"; : 2019??8??28??(??) 1:50 ??: "user-zh"; : Re:?? flink weberrores -- ??

Re: 回复: flink日志级别问题

2019-08-27 Thread wang jinhai
Flink的UI是有Exceptions页面的啊 各位 在 2019/8/28 下午2:05,“陈思”<58683...@qq.com> 写入: 是的,我的目的也是只想在web页面上面看到ERROR日志 -- 原始邮件 -- 发件人: "高飞龙"; 发送时间: 2019年8月28日(星期三) 中午1:50 收件人: "user-zh"; 主题: Re:回复: flink日志级别问题

回复: 回复: flink日志级别问题

2019-08-27 Thread 陈思
Exceptions看到的是运行时异常吧,看不到log4j输出的ERROR日志。我的目的是代码控制log4j的级别是ERROR -- 原始邮件 -- 发件人: "wang jinhai"; 发送时间: 2019年8月28日(星期三) 下午2:15 收件人: "user-zh@flink.apache.org"; 主题: Re: 回复: flink日志级别问题 Flink的UI是有Exceptions页面的啊 各位 在 2019/8/28 下午2:05,“陈思”<58683...@qq.com> 写入:

?????? ????flink????????????????????????????

2019-08-27 Thread 1900
hi, ?? public class Sink extends TwoPhaseCommitSinkFunction { //private Connection connection; public Sink() { super(new KryoSerializer <>(Connection.clas