Re: Re: jdbc sink无法插入数据
原因找到了,因为在JdbcSink.sink里使用了默认的JdbcExecutionOptions,里面默认的batchSize是5000,就是说数据量到这个数了再批量入库。 因为这个任务数据量少,很久才能凑到5000条,导致看起来数据没有入库的假象。 guoliubi...@foxmail.com 发件人: guoliubi...@foxmail.com 发送时间: 2020-12-21 09:05 收件人: user-zh 主题: Re: Re: jdbc sink无法插入数据 确实可行,多谢指点。 guoliubi...@foxmail.com 发件人: 赵一旦 发送时间: 2020-12-20 23:24 收件人: user-zh 主题: Re: jdbc sink无法插入数据 Hi,你这个绕太多弯路了吧。 Flink任务构建的就是一个DAG,本身一个DataStream就可以分拆出多条数据流,不需要sideoutput。 SideOutput的作用是当你需要根据“一定逻辑”输出2类不同结果时使用,你这里是相同的一份数据输出到kafka和mysql,是不需要sideoutput的。这样只会多了一个流程,影响性能。 按照你的代码,应该如下写: sideStream.addSink(new FlinkKafkaProducer<>( "ratio_value", new RatioValueSerializationSchema(suffix), PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), tool.get(SCHEMA_REGISTRY_URL)), FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); sideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); 如上,针对sideStream直接添加2个sink即可。 r pp 于2020年12月19日周六 下午12:15写道: > 一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢? > 去掉kafka sink ,看下 写入效果。 > 再对比下 加入kafka 后的效果。 > > 一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了 > > guoliubi...@foxmail.com 于2020年12月18日周五 下午2:01写道: > > > Hi, > > > > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下 > > .process(new ProcessFunction() { > > @Override > > public void processElement(RatioValuevalue, Context ctx, > > Collector out) throws Exception { > > out.collect(value); > > ctx.output(ratioOutputTag, value); > > } > > }); > > sideStream.addSink(new FlinkKafkaProducer<>( > > "ratio_value", > > new RatioValueSerializationSchema(suffix), > > PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), > > tool.get(SCHEMA_REGISTRY_URL)), > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); > > DataStream ratioSideStream = > > sideStream.getSideOutput(ratioOutputTag); > > > ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); > > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。 > > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。 > > 想问下这种情况是否有什么排查手段? > > > > > > guoliubi...@foxmail.com > > >
Re: Re: 如何通过现实时间控制事件时间的窗口
因为表格样式被吃掉了,所以看不清,用图片说明下。 https://i.bmp.ovh/imgs/2020/12/78d7dee70d88ebc9.png 定义了3秒的滚动窗口 第一条消息的eventTime是9:00:01,是在系统实际时间9:00:01收到的。 第二条消息的eventTime是9:00:02,但是是在系统实际时间9:00:11分收到的。 想要达成的目标是在系统时间9:00:05时把这一窗口关闭掉进行运算,忽略迟到的第二条消息,更不必要等到第3条消息触发到下一个窗口的时间了再关闭这个窗口。 找了下用ProcessingTimeoutTrigger可以达到目的,不过不知道有没有更详细的文档说明trigger怎么用的。 guoliubi...@foxmail.com 发件人: 赵一旦 发送时间: 2020-12-20 23:30 收件人: user-zh 主题: Re: 如何通过现实时间控制事件时间的窗口 描述比较乱,看不懂。 guoliubi...@foxmail.com 于2020年12月17日周四 下午2:16写道: > Hi, > 我们现在以eventTime作为时间标准,每3秒做一次TumbleWindow,数据假设如下 > 系统时间 > 与上一条间隔 > 事件时间 > 与上一条间隔 > 9:00:01 > > 9:00:01 > > 9:00:11 > 10s > 9:00:02 > 1s > 9:00:12 > 1s > 9:00:12 > 10s > 从事件时间上看,第一条和第二条数据是归集到同一窗口的。 > 不过现在的处理需要在窗口有第一条数据后系统时间超时4s以上就强制关闭该窗口,即在系统时间9:00:05时关闭窗口,忽略掉第二条数据。 > 请问这种情况需要怎么生成watermark? > 使用过 > WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(4L)) > 或者 > > WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(4L)) > 结果都把第一条和第二条数据归集到同一个窗口中了, > 都没有达到预想的结果。 > 要如何设置才能在窗口中仅有一条数据而忽略第二条数据? > > > guoliubi...@foxmail.com >
Re: Re: jdbc sink无法插入数据
确实可行,多谢指点。 guoliubi...@foxmail.com 发件人: 赵一旦 发送时间: 2020-12-20 23:24 收件人: user-zh 主题: Re: jdbc sink无法插入数据 Hi,你这个绕太多弯路了吧。 Flink任务构建的就是一个DAG,本身一个DataStream就可以分拆出多条数据流,不需要sideoutput。 SideOutput的作用是当你需要根据“一定逻辑”输出2类不同结果时使用,你这里是相同的一份数据输出到kafka和mysql,是不需要sideoutput的。这样只会多了一个流程,影响性能。 按照你的代码,应该如下写: sideStream.addSink(new FlinkKafkaProducer<>( "ratio_value", new RatioValueSerializationSchema(suffix), PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), tool.get(SCHEMA_REGISTRY_URL)), FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); sideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); 如上,针对sideStream直接添加2个sink即可。 r pp 于2020年12月19日周六 下午12:15写道: > 一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢? > 去掉kafka sink ,看下 写入效果。 > 再对比下 加入kafka 后的效果。 > > 一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了 > > guoliubi...@foxmail.com 于2020年12月18日周五 下午2:01写道: > > > Hi, > > > > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下 > > .process(new ProcessFunction() { > > @Override > > public void processElement(RatioValuevalue, Context ctx, > > Collector out) throws Exception { > > out.collect(value); > > ctx.output(ratioOutputTag, value); > > } > > }); > > sideStream.addSink(new FlinkKafkaProducer<>( > > "ratio_value", > > new RatioValueSerializationSchema(suffix), > > PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), > > tool.get(SCHEMA_REGISTRY_URL)), > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); > > DataStream ratioSideStream = > > sideStream.getSideOutput(ratioOutputTag); > > > ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); > > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。 > > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。 > > 想问下这种情况是否有什么排查手段? > > > > > > guoliubi...@foxmail.com > > >
Re: Re: flink clickhouse connector
有,但是贴做附件时因为超长没法发出去 : ezmlm-reject: fatal: Sorry, I don't accept messages larger than 100 bytes (#5.2.3) 发你私人邮箱是否方便 guoliubi...@foxmail.com From: magichuang Date: 2020-12-17 20:18 To: user-zh Subject: Re: Re: flink clickhouse connector 您是用java写的还是pyflink 啊? 我是用pyflink写的程序,所以需要一个jar包,您那里有嘛,我本地是新安装的maven,在打包 但是一直在下载依赖好多。。 > -- 原始邮件 -- > 发 件 人:"guoliubi...@foxmail.com" > 发送时间:2020-12-17 19:36:55 > 收 件 人:user-zh > 抄 送: > 主 题:Re: flink clickhouse connector > > 我这也是往clickhouse写数据,用官方的或是其他第三方的JDBC驱动(我用的https://github.com/blynkkk/clickhouse4j),然后用JdbcSink就能写入了,不需要另外写connector。 > > > > guoliubi...@foxmail.com > > From: magichuang > Date: 2020-12-17 18:41 > To: user-zh > Subject: flink clickhouse connector > hi 想问一下有小伙伴使用flink > 往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX > 这个flink-connector,但是运行报错了: > > Caused by: java.io.IOException: unable to establish connection to ClickHouse > > at > com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79) > > at > org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65) > > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > > at > org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.io.IOException: table `default`.`traffic` is not a > Distributed table > > at > com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96) > > at > com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76) > > ... 12 more > > > > > 但 traffic 这个表我在clickhouse里面创建了,flink版本是1.11 > > > > > 有小伙伴成功对接的嘛,可否分享一下connector呀 > > > > > > > > > > > > > > > > > > > > Best, > > MagicHuang > > > > > -- Best, MagicHuang
jdbc sink无法插入数据
Hi, 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下 .process(new ProcessFunction() { @Override public void processElement(RatioValuevalue, Context ctx, Collector out) throws Exception { out.collect(value); ctx.output(ratioOutputTag, value); } }); sideStream.addSink(new FlinkKafkaProducer<>( "ratio_value", new RatioValueSerializationSchema(suffix), PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), tool.get(SCHEMA_REGISTRY_URL)), FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); DataStream ratioSideStream = sideStream.getSideOutput(ratioOutputTag); ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。 想问下这种情况是否有什么排查手段? guoliubi...@foxmail.com
task manager内存使用问题
Hi, 现在使用的是flink1.12,使用standalone cluster模式运行。 在上面运行一个Job内存消耗大,会用满heap然后把整个task manager带崩掉。 想问下怎么给task manager的jvm加上heap dump相关参数。 还有是否有选项,可以在某个job吃满heap后是kill这个job而不是shutdown整个task manager,因为这个task manager还有其他job在跑,会导致其他job一起fail。 guoliubi...@foxmail.com
Re: flink clickhouse connector
我这也是往clickhouse写数据,用官方的或是其他第三方的JDBC驱动(我用的https://github.com/blynkkk/clickhouse4j),然后用JdbcSink就能写入了,不需要另外写connector。 guoliubi...@foxmail.com From: magichuang Date: 2020-12-17 18:41 To: user-zh Subject: flink clickhouse connector hi想问一下有小伙伴使用flink 往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX 这个flink-connector,但是运行报错了: Caused by: java.io.IOException: unable to establish connection to ClickHouse at com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79) at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: table `default`.`traffic` is not a Distributed table at com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96) at com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76) ... 12 more 但 traffic 这个表我在clickhouse里面创建了,flink版本是1.11 有小伙伴成功对接的嘛,可否分享一下connector呀 Best, MagicHuang
如何通过现实时间控制事件时间的窗口
Hi, 我们现在以eventTime作为时间标准,每3秒做一次TumbleWindow,数据假设如下 系统时间 与上一条间隔 事件时间 与上一条间隔 9:00:01 9:00:01 9:00:11 10s 9:00:02 1s 9:00:12 1s 9:00:12 10s 从事件时间上看,第一条和第二条数据是归集到同一窗口的。 不过现在的处理需要在窗口有第一条数据后系统时间超时4s以上就强制关闭该窗口,即在系统时间9:00:05时关闭窗口,忽略掉第二条数据。 请问这种情况需要怎么生成watermark? 使用过 WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(4L)) 或者 WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(4L)) 结果都把第一条和第二条数据归集到同一个窗口中了, 都没有达到预想的结果。 要如何设置才能在窗口中仅有一条数据而忽略第二条数据? guoliubi...@foxmail.com
回复: temparol table join后无法sink
找到原因了,数据问题,两个kafka source的earliest的数据timestamp差距比较大,导致在join时一直在堆积数据等待另一个队列的时间戳到达。调整offset让两个队列的时间戳一致后问题消失。 guoliubi...@foxmail.com 发件人: guoliubi...@foxmail.com 发送时间: 2020-12-16 07:36 收件人: user-zh 主题: temparol table join后无法sink Hi, 流程是从两个kafka队列中取数据,做完temparol table join后取滚动窗口做UDAF,然后sink,代码大概如下 joined_table = t_env.sql_query(""" SELECT o.exchangeCode_ as code, o.price, o.volume, o.eventTime FROM orders AS o INNER JOIN quotes FOR SYSTEM_TIME AS OF o.eventTime q ON o.exchangeCode_ = q.exchangeCode_ """) tumble_window = Tumble.over(expr.lit(500).millis) \ .on(expr.col("eventTime")) \ .alias("w") aggregate_table = joined_table.window(tumble_window) \ .group_by("w") \ .select("orderCalc(code, price, volume) as aggValue") \ .execute_insert("kafkaSink") 然后执行的时候数据都堆积在TemporalJoin环节,没法进入sink环节。执行图如下 https://ftp.bmp.ovh/imgs/2020/12/702ccb600bb01968.png 最后sink环节的bytes received一直是0,然后运行到最后就因为内存不足失败。 看了taskmanager的日志里面没有报错。 想问下这种问题应该从哪里进行排查,多谢。 guoliubi...@foxmail.com
temparol table join后无法sink
Hi, 流程是从两个kafka队列中取数据,做完temparol table join后取滚动窗口做UDAF,然后sink,代码大概如下 joined_table = t_env.sql_query(""" SELECT o.exchangeCode_ as code, o.price, o.volume, o.eventTime FROM orders AS o INNER JOIN quotes FOR SYSTEM_TIME AS OF o.eventTime q ON o.exchangeCode_ = q.exchangeCode_ """) tumble_window = Tumble.over(expr.lit(500).millis) \ .on(expr.col("eventTime")) \ .alias("w") aggregate_table = joined_table.window(tumble_window) \ .group_by("w") \ .select("orderCalc(code, price, volume) as aggValue") \ .execute_insert("kafkaSink") 然后执行的时候数据都堆积在TemporalJoin环节,没法进入sink环节。执行图如下 https://ftp.bmp.ovh/imgs/2020/12/702ccb600bb01968.png 最后sink环节的bytes received一直是0,然后运行到最后就因为内存不足失败。 看了taskmanager的日志里面没有报错。 想问下这种问题应该从哪里进行排查,多谢。 guoliubi...@foxmail.com
Re: Re: pyflink是否可调试
1.13版本pyflink很多优化项啊,期待下个版本 guoliubi...@foxmail.com 发件人: Xingbo Huang 发送时间: 2020-12-15 10:37 收件人: user-zh 主题: Re: Re: pyflink是否可调试 Hi, 客户端写的python代码会在一个客户端的进程通过py4j调用flink java的代码去编译你的作业(这里有一个客户端的Python进程,只是用来编译代码生成pipeline), 然后实际运行时,非python代码部分(就是非各种udf的逻辑是运行在JVM里面的),python部分(各种udf)是运行在另一个Python进程里面的。 实际上,在下一个1.13版本我们有考虑在你本地运行调试的时候,将实际运行python代码的重连回客户端那个编译Python代码的进程,这样可以更利于你的本地调试,就不用开启remote debug了。 Best, Xingbo guoliubi...@foxmail.com 于2020年12月15日周二 上午10:29写道: > Hi Xingbo, > > 多谢指导,亲测有效。 > 源python文件运行一会儿本身就结束运行了,过阵子后才会跳到断点里。 > 所以源python文件只是做了个提交的动作,实际执行都是异步执行,是否可以这么理解? > 如果是的话,之前已经运行过很多次源python文件,是否本地已经在后台异步运行了多次?是的话是否能监控到这些任务? > > > > guoliubi...@foxmail.com > > 发件人: Xingbo Huang > 发送时间: 2020-12-15 09:59 > 收件人: user-zh > 主题: Re: pyflink是否可调试 > Hi, > 想要调试可以使用的方式为 > 1. 在PyCharm里创建一个Python Remote Debug > run -> Python Remote Debug -> + -> 选择一个端口(比如6789) > > 2. 安装pydevd-pycharm(你PyCharm使用的python解释器) > pip install pydevd-pycharm > 其实上一步那个界面也有指导你安装了 > > 3. 将以下代码插入到你要断点的udaf的代码前面(这段代码其实也是来自第一步创建remote debug里面) > import pydevd_pycharm > pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, > stderrToServer=True) > > 例如 > @udaf(result_type=DataTypes.INT(), func_type="pandas") > def mean_udaf(v): > import pydevd_pycharm > pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, > stderrToServer=True) > v.mean() > > 4. 在你的PyCharm里面点击Debug启动你刚刚创建的Remote Debug Server > > 5. 直接点击Run运行你的作业,这个时候会断点在你的udaf的代码里面了 > > Best, > Xingbo > > guoliubi...@foxmail.com 于2020年12月15日周二 上午9:25写道: > > > > > > 基于flink1.12.0版本的pyflink开发了一个程序,使用了udaf,想在本地的PyCharm环境调试该功能,在udaf的第一行打了断点,但是没法进入。 > > 可以确认程序有正确运行,因为sink到kafka里看了是有数据的。 > > 请问是否需要什么配置才能进行调试。 > > > > > > > > guoliubi...@foxmail.com > > >
Re: Re: pyflink是否可调试
Hi Xingbo, 多谢指导,亲测有效。 源python文件运行一会儿本身就结束运行了,过阵子后才会跳到断点里。 所以源python文件只是做了个提交的动作,实际执行都是异步执行,是否可以这么理解? 如果是的话,之前已经运行过很多次源python文件,是否本地已经在后台异步运行了多次?是的话是否能监控到这些任务? guoliubi...@foxmail.com 发件人: Xingbo Huang 发送时间: 2020-12-15 09:59 收件人: user-zh 主题: Re: pyflink是否可调试 Hi, 想要调试可以使用的方式为 1. 在PyCharm里创建一个Python Remote Debug run -> Python Remote Debug -> + -> 选择一个端口(比如6789) 2. 安装pydevd-pycharm(你PyCharm使用的python解释器) pip install pydevd-pycharm 其实上一步那个界面也有指导你安装了 3. 将以下代码插入到你要断点的udaf的代码前面(这段代码其实也是来自第一步创建remote debug里面) import pydevd_pycharm pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, stderrToServer=True) 例如 @udaf(result_type=DataTypes.INT(), func_type="pandas") def mean_udaf(v): import pydevd_pycharm pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, stderrToServer=True) v.mean() 4. 在你的PyCharm里面点击Debug启动你刚刚创建的Remote Debug Server 5. 直接点击Run运行你的作业,这个时候会断点在你的udaf的代码里面了 Best, Xingbo guoliubi...@foxmail.com 于2020年12月15日周二 上午9:25写道: > > 基于flink1.12.0版本的pyflink开发了一个程序,使用了udaf,想在本地的PyCharm环境调试该功能,在udaf的第一行打了断点,但是没法进入。 > 可以确认程序有正确运行,因为sink到kafka里看了是有数据的。 > 请问是否需要什么配置才能进行调试。 > > > > guoliubi...@foxmail.com >
pyflink是否可调试
基于flink1.12.0版本的pyflink开发了一个程序,使用了udaf,想在本地的PyCharm环境调试该功能,在udaf的第一行打了断点,但是没法进入。 可以确认程序有正确运行,因为sink到kafka里看了是有数据的。 请问是否需要什么配置才能进行调试。 guoliubi...@foxmail.com
temporal table join SQL报错
我在使用flink 1.12.0,在按博客里的例子实现temporal table join https://flink.apache.org/news/2020/12/10/release-1.12.0.html#table-apisql-support-for-temporal-table-joins-in-sql 构造了类似的表格后,写了类似SQL -- Event-time temporal table join SELECT o.order_id, o.order_time, o.amount * r.currency_rate AS amount, r.currency FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.order_time r ON o.currency = r.currency; 上传到flink后,提示 py4j.protocol.Py4JJavaError: An error occurred while calling o2.sqlQuery. : org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "ON" at line 8, column 5. Was expecting one of: "EXCEPT" ... "FETCH" ... "GROUP" ... 在把最后一行的ON改为WHERE后,这个SQL可以解析过去了,但是执行时报了另外的问题 py4j.protocol.Py4JJavaError: An error occurred while calling o153.executeInsert. : org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty. at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) 请问要如何正确的写该SQL,是否有关于temporal table join的更详细文档? guoliubi...@foxmail.com
回复: 如何让FlinkSQL访问到阿里云MaxCompute上的表?
Confluent Schema Registry参考这个 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html guoliubi...@foxmail.com 发件人: 陈帅 发送时间: 2020-12-14 23:33 收件人: user-zh@flink.apache.org 主题: 如何让FlinkSQL访问到阿里云MaxCompute上的表? 如何让FlinkSQL访问到阿里云MaxCompute上的表? 又或者是Confluent Schema Registry上那些带schema的kafka topic? 需要自己定义Catalog吗?有相关的教程和资料么?谢谢!
Re: Re: Pandas UDF处理过的数据sink问题
Hi xingbo, 文档中给的例子udtf需要和join一起使用,但是我现在不需要join,只是单纯的转换结果 如果直接调用了udtf后sink,会提示 Cause: Different number of columns. Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >] Sink schema: [buyQtl: BIGINT, aveBuy: INT] guoliubi...@foxmail.com 发件人: Xingbo Huang 发送时间: 2020-12-14 11:38 收件人: user-zh 主题: Re: Re: Pandas UDF处理过的数据sink问题 Hi, 你想要一列变多列的话,你需要使用UDTF了,具体使用方式,你可以参考文档[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#table-functions Best, Xingbo guoliubi...@foxmail.com 于2020年12月14日周一 上午11:00写道: > 多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。 > 但现在有另一个问题,根据文档 > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions > Vectorized Python aggregate functions takes one or more pandas.Series as > the inputs and return one scalar value as output. > Note The return type does not support RowType and MapType for the time > being. > udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。 > 现在是后面用另一个udf把这个string再做拆分,代码大概如下: > @udf(result_type=DataTypes.ROW( > [DataTypes.FIELD('value1', DataTypes.BIGINT()), > DataTypes.FIELD('value2', DataTypes.INT())])) > def flattenStr(inputStr): > ret_array = [int(x) for x in inputStr.split(',')] > return Row(ret_array[0], ret_array[1]) > t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table = > order_table.window(tumble_window) \ > .group_by("w") \ > .select("**调用udaf** as aggValue") > result_table = aggregate_table.select("flattenStr(aggValue) as retValue") > > result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的 > Caused by: java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306) > at > org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122) > > > guoliubi...@foxmail.com > > 发件人: Wei Zhong > 发送时间: 2020-12-14 10:38 > 收件人: user-zh > 主题: Re: Pandas UDF处理过的数据sink问题 > Hi Lucas, > > 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。 > > 你可以尝试将sql语句改成以下形式: > > select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1) > from `some_source` > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount > > 此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf” > > Best, > Wei > > > 在 2020年12月13日,13:13,Lucas 写道: > > > > 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下 > > > > @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()], > > result_type=DataTypes.ROW( > > [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()), > > DataTypes.FIELD('aveBuy', DataTypes.INT())), > > func_type='pandas') > > def orderCalc(code, amount): > > > >df = pd.DataFrame({'code': code, 'amount': amount}) > > # pandas 数据处理后输入另一个dataframe output > > return (output['buyQtl'], output['aveBuy']) > > > > > > 定义了csv的sink如下 > > > > create table csvSink ( > >buyQtl BIGINT, > >aveBuy INT > > ) with ( > >'connector.type' = 'filesystem', > >'format.type' = 'csv', > >'connector.path' = 'e:/output' > > ) > > > > > > > > 然后进行如下的操作: > > > > result_table = t_env.sql_query(""" > > select orderCalc(code, amount) > > from `some_source` > > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount > > """) > > result_table.execute_insert("csvSink") > > > > > > > > 在执行程序的时候提示没法入库 > > > > py4j.protocol.Py4JJavaError: An error occurred while calling > > o98.executeInsert. > > > > : org.apache.flink.table.api.ValidationException: Column types of query > > result and sink for registered table > > 'default_catalog.default_database.csvSink' do not match. > > > > Cause: Different number of columns. > > > > > > > > Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >] > > > > Sink schema: [buyQtl: BIGINT, aveBuy: INT] > > > >at > > > org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx > > ception(DynamicSinkUtils.java:304) > > > >at > > > org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply > > ImplicitCast(DynamicSinkUtils.java:134) > > > > > > > > 是UDF的输出结构不对吗,还是需要调整sink table的结构? > > > > >
回复: Re: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.
不好意思我没说清楚。 我这边用的是这样的SQL可以运作,你可以参考下。 CREATE TABLE `someTable` ( eventTime TIMESTAMP(3), WATERMARK FOR eventTime AS eventTime ) eventTime是java的Long类型,包含毫秒,SQL里可以直接转成TIMESTAMP select someFunc(field) from `someTable` group by TUMBLE(eventTime, INTERVAL '1' SECOND) guoliubi...@foxmail.com 发件人: kandy.wang 发送时间: 2020-12-14 11:23 收件人: user-zh 主题: Re:回复: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered. hi guoliubin85: 一样的报错: Flink SQL> select mid,code,floor_id,TUMBLE_START(time_local/1000, INTERVAL '1' MINUTE) as log_minute,count(1) pv > from lightart_expose > where code is not null and floor_id is not null > group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' > MINUTE);[ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' to arguments of type '$TUMBLE(, )'. Supported form(s): '$TUMBLE(, )' '$TUMBLE(, , )' > group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' > MINUTE);[ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' to arguments of type '$TUMBLE(, )'. Supported form(s): '$TUMBLE(, )' '$TUMBLE(, , )' 在 2020-12-14 10:41:12,"guoliubi...@foxmail.com" 写道: >TUMBLE第一个参数需要的就是bigint,你这边time_local 直接用就好,不用转另外TIMESTAMP > > > >guoliubi...@foxmail.com > >发件人: kandy.wang >发送时间: 2020-12-14 10:28 >收件人: user-zh >主题: Window aggregate can only be defined over a time attribute column, but >TIMESTAMP(3) encountered. >[ERROR] Could not execute SQL statement. >Reason:org.apache.flink.table.api.TableException: Window aggregate can only be >defined over a time attribute column, but TIMESTAMP(3) encountered. > > >SQL 如下: >create temporary view expose as > >select > >mid > >,time_local > >,TO_TIMESTAMP(FROM_UNIXTIME(time_local / 1000, '-MM-dd HH:mm:ss')) as >log_ts > >,proctime > >from hive.temp.kafka_table > >; >time_local 是bigint > > > >select TUMBLE_START(log_ts, INTERVAL '1' MINUTE) as log_minute,count(1) pv > >from expose > >group by TUMBLE(log_ts, INTERVAL '1' MINUTE); > > >window agg的字段报错,如何解决。
回复: Re: Pandas UDF处理过的数据sink问题
多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。 但现在有另一个问题,根据文档 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions Vectorized Python aggregate functions takes one or more pandas.Series as the inputs and return one scalar value as output. Note The return type does not support RowType and MapType for the time being. udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。 现在是后面用另一个udf把这个string再做拆分,代码大概如下: @udf(result_type=DataTypes.ROW( [DataTypes.FIELD('value1', DataTypes.BIGINT()), DataTypes.FIELD('value2', DataTypes.INT())])) def flattenStr(inputStr): ret_array = [int(x) for x in inputStr.split(',')] return Row(ret_array[0], ret_array[1]) t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table = order_table.window(tumble_window) \ .group_by("w") \ .select("**调用udaf** as aggValue") result_table = aggregate_table.select("flattenStr(aggValue) as retValue") result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的 Caused by: java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306) at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122) guoliubi...@foxmail.com 发件人: Wei Zhong 发送时间: 2020-12-14 10:38 收件人: user-zh 主题: Re: Pandas UDF处理过的数据sink问题 Hi Lucas, 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。 你可以尝试将sql语句改成以下形式: select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1) from `some_source` group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount 此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf” Best, Wei > 在 2020年12月13日,13:13,Lucas 写道: > > 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下 > > @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()], > result_type=DataTypes.ROW( > [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()), > DataTypes.FIELD('aveBuy', DataTypes.INT())), > func_type='pandas') > def orderCalc(code, amount): > >df = pd.DataFrame({'code': code, 'amount': amount}) > # pandas 数据处理后输入另一个dataframe output > return (output['buyQtl'], output['aveBuy']) > > > 定义了csv的sink如下 > > create table csvSink ( >buyQtl BIGINT, >aveBuy INT > ) with ( >'connector.type' = 'filesystem', >'format.type' = 'csv', >'connector.path' = 'e:/output' > ) > > > > 然后进行如下的操作: > > result_table = t_env.sql_query(""" > select orderCalc(code, amount) > from `some_source` > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount > """) > result_table.execute_insert("csvSink") > > > > 在执行程序的时候提示没法入库 > > py4j.protocol.Py4JJavaError: An error occurred while calling > o98.executeInsert. > > : org.apache.flink.table.api.ValidationException: Column types of query > result and sink for registered table > 'default_catalog.default_database.csvSink' do not match. > > Cause: Different number of columns. > > > > Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >] > > Sink schema: [buyQtl: BIGINT, aveBuy: INT] > >at > org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx > ception(DynamicSinkUtils.java:304) > >at > org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply > ImplicitCast(DynamicSinkUtils.java:134) > > > > 是UDF的输出结构不对吗,还是需要调整sink table的结构? >
回复: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.
TUMBLE第一个参数需要的就是bigint,你这边time_local 直接用就好,不用转另外TIMESTAMP guoliubi...@foxmail.com 发件人: kandy.wang 发送时间: 2020-12-14 10:28 收件人: user-zh 主题: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered. [ERROR] Could not execute SQL statement. Reason:org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered. SQL 如下: create temporary view expose as select mid ,time_local ,TO_TIMESTAMP(FROM_UNIXTIME(time_local / 1000, '-MM-dd HH:mm:ss')) as log_ts ,proctime from hive.temp.kafka_table ; time_local 是bigint select TUMBLE_START(log_ts, INTERVAL '1' MINUTE) as log_minute,count(1) pv from expose group by TUMBLE(log_ts, INTERVAL '1' MINUTE); window agg的字段报错,如何解决。
Tumble Window某个时间区间没数据的问题
Flink 1.12.0版本,java代码做了如下处理 txnStream .window(TumblingEventTimeWindows.of(Time.seconds(3L))) .process(new NetValueAggregateFunction()) 在某个3秒的区间没有数据的话,就不会进入process的function里, 是否有什么配置可以让每3秒必定进process的function吗 guoliubi...@foxmail.com