设定kafka消费结束位置
我想通过Table API 的方式消费kafka 需要设定消费kafka 开始位置(时间)和结束位置(时间) 请问有没有相关属性 例如DataStream中的setUnbounded属性, 有没有其他办法可以实现类似功能?
设定kafka消费结束位置
我想通过Table API 的方式消费kafka 需要设定消费kafka 开始位置(时间)和结束位置(时间) 请问有没有相关属性 例如DataStream中的setUnbounded属性, 有没有其他办法可以实现类似功能?
Table Api Connectors kafka bounded
现在有一个需求是,创建一个任务,消费kafka,仅消费一个片段 即设定起始消费点和结束消费位置 我看到DataStream Connectors kafka 中有一个setBounded (setUnbounded)属性 ,可以满足需求。 问题; 我想使用 Table API 完成上面的需求,该怎么办? Table API 是否有相关属性? 有其他办法满足这个需求吗? 流处理批处理都行。
退订
退订
Tumbling Windows 窗口可开的最小单位
滚动窗口最小可开多大,100ms? 对性能有什么影响吗?
Re:Re: table.exec.state.ttl
你好: 我现在想在 execution environment 里面设置微批和stateValue的过期时间该怎么设? 这样 conf.setString("exec.state.ttl","15 s"); 或者这样 conf.setString("stream.exec.state.ttl","15 s"); 在 2021-08-26 19:05:07,"Caizhi Weng" 写道: >Hi! > >table 层的配置是加在 table environment 的 table config 里的,加在 execution environment >里无效。 > >李航飞 于2021年8月26日周四 下午7:02写道: > >> Configuration conf = new Configuration(); >> conf.setString("table.exec.mini-batch.enabled","true"); >> conf.setString("table.exec.mini-batch.allow-latency","15s"); >> conf.setString("table.exec.mini-batch.size","50"); >> conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment >> execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); >> execEnv.configure(conf,this.getClass().getClassLoader()); >> EnvironmentSetting setting = ... >> StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2 >> 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果 >> 上次那个 allow 也就算了,这次这个 table.exec.state.ttl >> 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。 >> 程序是通过StatementSet .execute()执行的
table.exec.state.ttl
Configuration conf = new Configuration(); conf.setString("table.exec.mini-batch.enabled","true"); conf.setString("table.exec.mini-batch.allow-latency","15s"); conf.setString("table.exec.mini-batch.size","50"); conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.configure(conf,this.getClass().getClassLoader()); EnvironmentSetting setting = ... StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果 上次那个 allow 也就算了,这次这个 table.exec.state.ttl 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。 程序是通过StatementSet .execute()执行的
mini-batch配置信息没产生效果
Configuration conf = new Configuration(); conf.setString("table.exec.mini-batch.enabled","true"); conf.setString("table.exec.mini-batch.allow-latency","15s"); conf.setString("table.exec.mini-batch.size","50"); StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.configure(conf,this.getClass().getClassLoader()); EnvironmentSetting setting = ... StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2 微批配置信息如上,kafka流数据,测试效果数据计算没有延迟15s
Re:Re: mini-batch 设置后没效果
设置的延迟15秒,实际测试发现数据来了之后就处理了,没有延迟15秒 在 2021-08-25 11:12:19,"Caizhi Weng" 写道: >Hi! > >所谓的没效果指的是什么现象呢?建议详细描述一下场景与问题。 > >李航飞 于2021年8月25日周三 上午11:04写道: > >> 通过Configuration 设置 >> table.exec.mini-batch.enabled= true; >> table.exec.mini-batch.allow-latency = 15s; >> table.exec.mini-batch.size = 50; >> 上面的配置测试了,没效果,下面的测试 >> table.exec.mini-batch.allow-latency = 15000 >> 也没效果什么原因? >> >>
mini-batch 设置后没效果
通过Configuration 设置 table.exec.mini-batch.enabled= true; table.exec.mini-batch.allow-latency = 15s; table.exec.mini-batch.size = 50; 上面的配置测试了,没效果,下面的测试 table.exec.mini-batch.allow-latency = 15000 也没效果什么原因?
Re:Re: cumulate累加函数输出问题
你好: 我使用的场景是要实时统计一天的数据,在小窗口进行即时输出,延迟不能太高,cumulate window符合要求,tumble window 延迟太高了。 在 2021-08-20 16:01:57,"Caizhi Weng" 写道: >Hi! > >你可能想要的是 tumble window 而不是 cumulate window。 > >李航飞 于2021年8月20日周五 下午3:26写道: > >> 能不能通过设置,每分钟只输出 当前小窗口中计算数据的结果,之前计算的结果不再输出 >> >> >> 目前测试发现,输入的数据越多,到下次输出的数据也会越来越多, >> 不同窗口的计算结果,都会再下次窗口中输出, >> >>
cumulate累加函数输出问题
能不能通过设置,每分钟只输出 当前小窗口中计算数据的结果,之前计算的结果不再输出 目前测试发现,输入的数据越多,到下次输出的数据也会越来越多, 不同窗口的计算结果,都会再下次窗口中输出,
Re:Re: Re:Re: flink-connector-redis连接器upsert()模式插入问题
你好: SELECT window_start,window_end,SUM(price),item FROM TABLE( CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '1' MINUTES,INTERVAL'10' HOUR)) GROUP BY window_start,window_end,item 语句没有问题,正常每1分钟输出一次,过期时间代码已注释, public ChangelogMode getChanglogMode(ChangelogMode arg0){ return ChangelogMode.upsert(); } 实现RedisMapper 方法 落地redis 有输出语句,每1分钟都会落地一次,我确定数据每次都一样 这upsert 不合理啊 在 2021-08-20 11:15:17,"Caizhi Weng" 写道: >Hi! > >之前没注意到是 cumulate 窗口,具体的 group by 语句是什么样的呢?描述里写着“小窗口 1 分钟计算一次”,那 sink >确实应该每分钟收到一条消息。 > >sink 在 streaming 场景可以接收 upsert 消息,只要 getChangelogMode 返回的是 upsert 即可。 > >李航飞 于2021年8月20日周五 上午10:03写道: > >> 你好: >> 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗? >> >> >> >> 在 2021-08-20 09:10:44,"李航飞" 写道: >> >你好: >> >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。 >> > >> > >> >我在RichMapFunction接口里面实现open方法 >> >设置过StateTtlConfig; >> >之后在RedisConmmand.SETEX设置过期时间 >> >都注释了,但upsert()方法还是没效 >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> >在 2021-08-19 17:44:02,"Caizhi Weng" 写道: >> >>Hi! >> >> >> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。 >> >> >> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key >> >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。 >> >> >> >>李航飞 于2021年8月19日周四 下午5:03写道: >> >> >> >>> 版本 flink1.13.2 >> >>> 具体场景 >> >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, >> >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入, >> >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次 >> >>> >> >>> >> >>> 问题: >> >>> 测试发现,每1分钟都会输出一次,落地的数据一样, >> >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况? >> >>> >> >>> >>
Re:Re: Re: cumulate函数和比较函数连用报错
你好: 具体场景是对agg结果之前进行过滤,现在通过create view进行提取过滤了 现在我想通过DynameicTable的方式,以upsert写入redis里面 在 2021-08-20 10:31:18,"Caizhi Weng" 写道: >Hi! > >具体的条件是什么样的呢?理论上如果是和原本的 group by ... having ... 等价的语句应该是支持的。 > >李航飞 于2021年8月18日周三 下午4:34写道: > >> 哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(where a = '2')可以用,这个功能后续还会调整吗 >> 在 2021-08-18 16:21:20,"Caizhi Weng" 写道: >> >Hi! >> > >> >目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window >> >agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。 >> > >> >李航飞 于2021年8月18日周三 下午3:55写道: >> > >> >> 通过flinksql建立数据处理通道 >> >> SELECT window_start,window_end,SUM(price) >> >> >> >> FROM TABLE( >> >> >> >> CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL >> '10' >> >> MINUTES)) >> >> >> >> GROUP BY window_start,window_end; >> >> >> >> 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题 >> >> 关键一步是 StatementSet对象 sta.execute() 执行报错 >> >> java.lang.UnsupportedOperationException: >> >> Currently Flink doesn't support individual window table-valued function >> >> CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]). >> >> Please use window table-valued function with aggregate together using >> >> window_start and window_end as group keys. >> >> 执行环境是flink1.13.1 去掉where条件可以正常执行,加上就不行。 >> >> >> >> >>
Re:Re:Re: flink-connector-redis连接器upsert()模式插入问题
你好: 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗? 在 2021-08-20 09:10:44,"李航飞" 写道: >你好: >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。 > > >我在RichMapFunction接口里面实现open方法 >设置过StateTtlConfig; >之后在RedisConmmand.SETEX设置过期时间 >都注释了,但upsert()方法还是没效 > > > > > > > > > > > > > > >在 2021-08-19 17:44:02,"Caizhi Weng" 写道: >>Hi! >> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。 >> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。 >> >>李航飞 于2021年8月19日周四 下午5:03写道: >> >>> 版本 flink1.13.2 >>> 具体场景 >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入, >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次 >>> >>> >>> 问题: >>> 测试发现,每1分钟都会输出一次,落地的数据一样, >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况? >>> >>>
Re:Re: flink-connector-redis连接器upsert()模式插入问题
你好: 我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。 我在RichMapFunction接口里面实现open方法 设置过StateTtlConfig; 之后在RedisConmmand.SETEX设置过期时间 都注释了,但upsert()方法还是没效 在 2021-08-19 17:44:02,"Caizhi Weng" 写道: >Hi! > >可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。 > >如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key >value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。 > >李航飞 于2021年8月19日周四 下午5:03写道: > >> 版本 flink1.13.2 >> 具体场景 >> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, >> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入, >> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次 >> >> >> 问题: >> 测试发现,每1分钟都会输出一次,落地的数据一样, >> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况? >> >>
flink-connector-redis连接器upsert()模式插入问题
版本 flink1.13.2 具体场景 flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入, 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次 问题: 测试发现,每1分钟都会输出一次,落地的数据一样, 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
Re:Re: cumulate函数和比较函数连用报错
哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(where a = '2')可以用,这个功能后续还会调整吗 在 2021-08-18 16:21:20,"Caizhi Weng" 写道: >Hi! > >目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window >agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。 > >李航飞 于2021年8月18日周三 下午3:55写道: > >> 通过flinksql建立数据处理通道 >> SELECT window_start,window_end,SUM(price) >> >> FROM TABLE( >> >> CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL '10' >> MINUTES)) >> >> GROUP BY window_start,window_end; >> >> 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题 >> 关键一步是 StatementSet对象 sta.execute() 执行报错 >> java.lang.UnsupportedOperationException: >> Currently Flink doesn't support individual window table-valued function >> CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]). >> Please use window table-valued function with aggregate together using >> window_start and window_end as group keys. >> 执行环境是flink1.13.1 去掉where条件可以正常执行,加上就不行。 >> >>
cumulate函数和比较函数连用报错
通过flinksql建立数据处理通道 SELECT window_start,window_end,SUM(price) FROM TABLE( CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL '10' MINUTES)) GROUP BY window_start,window_end; 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题 关键一步是 StatementSet对象 sta.execute() 执行报错 java.lang.UnsupportedOperationException: Currently Flink doesn't support individual window table-valued function CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]). Please use window table-valued function with aggregate together using window_start and window_end as group keys. 执行环境是flink1.13.1 去掉where条件可以正常执行,加上就不行。
Re:Re:Re: cumulate 不能和比较函数连用
抱歉,sql语句是我手打的,没注意到,我确定和这个descriptor没关系。我去掉where条件就能正常运行,同时,我测试in,not in 函数的时候,会报同样的错误。 At 2021-08-11 13:51:16, "李航飞" wrote: >org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - >Could not execute application: >org.apache.flink.client.program.ProgramInvocationException: The main >method caused an error: Currently Flink doesn't support individual window >table-valued function CUMULATE(time_col=[ts], max_size=[10 min], step=[1 >min]). Please use window table-valued function with aggregate together >using window_start and window_end as group keys.at >org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > ~[flink-clients_2.12-1.13.1.jar:1.13.1]at >org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > ~[flink-clients_2.12-1.13.1.jar:1.13.1] at >org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >~[flink-clients_2.12-1.13.1.jar:1.13.1]at >org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) > ~ >在 2021-08-11 12:44:38,"Caizhi Weng" 写道: >>Hi! >> >>descriptor 拼错了吧。我在本地没有复现这个问题,Flink 版本是多少呢? >> >>李航飞 于2021年8月11日周三 上午11:41写道: >> >>> sql语句如下: >>> select count(clicknum) as num >>> >>> from table( >>> >>> cumulate(table testTable, desctiptor(crtTime),interval '1'minutes, >>> interval '10' minutes)) >>> >>> where clicknum <>'-99' >>> >>> group by window_start,window_end >>> >>> >>> 报错 信息: >>> Flink doesn't support individual window table-valued function >>> cumulate(time_col=[app_date],max_size=[10 min],step=[1 min]... >>> >>> >>> 请问如何解决,谢谢
Re:Re: cumulate 不能和比较函数连用
org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - Could not execute application: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Currently Flink doesn't support individual window table-valued function CUMULATE(time_col=[ts], max_size=[10 min], step=[1 min]). Please use window table-valued function with aggregate together using window_start and window_end as group keys. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-clients_2.12-1.13.1.jar:1.13.1]at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-clients_2.12-1.13.1.jar:1.13.1] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-clients_2.12-1.13.1.jar:1.13.1]at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ~ 在 2021-08-11 12:44:38,"Caizhi Weng" 写道: >Hi! > >descriptor 拼错了吧。我在本地没有复现这个问题,Flink 版本是多少呢? > >李航飞 于2021年8月11日周三 上午11:41写道: > >> sql语句如下: >> select count(clicknum) as num >> >> from table( >> >> cumulate(table testTable, desctiptor(crtTime),interval '1'minutes, >> interval '10' minutes)) >> >> where clicknum <>'-99' >> >> group by window_start,window_end >> >> >> 报错 信息: >> Flink doesn't support individual window table-valued function >> cumulate(time_col=[app_date],max_size=[10 min],step=[1 min]... >> >> >> 请问如何解决,谢谢
cumulate 不能和比较函数连用
sql语句如下: select count(clicknum) as num from table( cumulate(table testTable, desctiptor(crtTime),interval '1'minutes, interval '10' minutes)) where clicknum <>'-99' group by window_start,window_end 报错 信息: Flink doesn't support individual window table-valued function cumulate(time_col=[app_date],max_size=[10 min],step=[1 min]... 请问如何解决,谢谢