设定kafka消费结束位置

2022-03-20 Thread
我想通过Table API 的方式消费kafka
需要设定消费kafka 开始位置(时间)和结束位置(时间)
请问有没有相关属性 例如DataStream中的setUnbounded属性,
有没有其他办法可以实现类似功能?

设定kafka消费结束位置

2022-03-20 Thread
我想通过Table API 的方式消费kafka
需要设定消费kafka 开始位置(时间)和结束位置(时间)
请问有没有相关属性 例如DataStream中的setUnbounded属性,
有没有其他办法可以实现类似功能?

Table Api Connectors kafka bounded

2022-03-18 Thread
现在有一个需求是,创建一个任务,消费kafka,仅消费一个片段 即设定起始消费点和结束消费位置
我看到DataStream Connectors kafka 中有一个setBounded (setUnbounded)属性 ,可以满足需求。
问题;
我想使用 Table API 完成上面的需求,该怎么办?
Table API 是否有相关属性? 
有其他办法满足这个需求吗?
流处理批处理都行。

退订

2021-12-28 Thread
退订

Tumbling Windows 窗口可开的最小单位

2021-11-04 Thread
滚动窗口最小可开多大,100ms?
对性能有什么影响吗?

Re:Re: table.exec.state.ttl

2021-08-26 Thread
你好:

我现在想在 execution environment 里面设置微批和stateValue的过期时间该怎么设?

这样 conf.setString("exec.state.ttl","15 s");
或者这样  conf.setString("stream.exec.state.ttl","15 s");











在 2021-08-26 19:05:07,"Caizhi Weng"  写道:
>Hi!
>
>table 层的配置是加在 table environment 的 table config 里的,加在 execution environment
>里无效。
>
>李航飞  于2021年8月26日周四 下午7:02写道:
>
>> Configuration conf = new Configuration();
>> conf.setString("table.exec.mini-batch.enabled","true");
>> conf.setString("table.exec.mini-batch.allow-latency","15s");
>> conf.setString("table.exec.mini-batch.size","50");
>> conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment
>> execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>> execEnv.configure(conf,this.getClass().getClassLoader());
>> EnvironmentSetting setting = ...
>> StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
>> 微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
>> 上次那个 allow 也就算了,这次这个 table.exec.state.ttl
>> 设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
>> 程序是通过StatementSet .execute()执行的


table.exec.state.ttl

2021-08-26 Thread
Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15s");
conf.setString("table.exec.mini-batch.size","50");
conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment 
execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
execEnv.configure(conf,this.getClass().getClassLoader()); EnvironmentSetting 
setting = ... StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
上次那个 allow 也就算了,这次这个 table.exec.state.ttl 
设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
程序是通过StatementSet .execute()执行的

mini-batch配置信息没产生效果

2021-08-25 Thread


Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15s");
conf.setString("table.exec.mini-batch.size","50");


StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.configure(conf,this.getClass().getClassLoader());
EnvironmentSetting setting = ...
StreamTableEnvironment.create(execEnv,setting);


基于flink1.13.2
微批配置信息如上,kafka流数据,测试效果数据计算没有延迟15s 

Re:Re: mini-batch 设置后没效果

2021-08-24 Thread
设置的延迟15秒,实际测试发现数据来了之后就处理了,没有延迟15秒








在 2021-08-25 11:12:19,"Caizhi Weng"  写道:
>Hi!
>
>所谓的没效果指的是什么现象呢?建议详细描述一下场景与问题。
>
>李航飞  于2021年8月25日周三 上午11:04写道:
>
>> 通过Configuration 设置
>> table.exec.mini-batch.enabled= true;
>> table.exec.mini-batch.allow-latency = 15s;
>> table.exec.mini-batch.size = 50;
>> 上面的配置测试了,没效果,下面的测试
>>  table.exec.mini-batch.allow-latency = 15000
>> 也没效果什么原因?
>>
>>


mini-batch 设置后没效果

2021-08-24 Thread
通过Configuration 设置
table.exec.mini-batch.enabled= true;
table.exec.mini-batch.allow-latency = 15s;
table.exec.mini-batch.size = 50;
上面的配置测试了,没效果,下面的测试
 table.exec.mini-batch.allow-latency = 15000
也没效果什么原因?



Re:Re: cumulate累加函数输出问题

2021-08-20 Thread



你好:
我使用的场景是要实时统计一天的数据,在小窗口进行即时输出,延迟不能太高,cumulate window符合要求,tumble window 延迟太高了。
在 2021-08-20 16:01:57,"Caizhi Weng"  写道:
>Hi!
>
>你可能想要的是 tumble window 而不是 cumulate window。
>
>李航飞  于2021年8月20日周五 下午3:26写道:
>
>> 能不能通过设置,每分钟只输出 当前小窗口中计算数据的结果,之前计算的结果不再输出
>>
>>
>> 目前测试发现,输入的数据越多,到下次输出的数据也会越来越多,
>> 不同窗口的计算结果,都会再下次窗口中输出,
>>
>>


cumulate累加函数输出问题

2021-08-20 Thread
能不能通过设置,每分钟只输出 当前小窗口中计算数据的结果,之前计算的结果不再输出


目前测试发现,输入的数据越多,到下次输出的数据也会越来越多,
不同窗口的计算结果,都会再下次窗口中输出,



Re:Re: Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread
你好:

SELECT window_start,window_end,SUM(price),item 
 FROM TABLE(
CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '1' MINUTES,INTERVAL'10' HOUR))

GROUP BY window_start,window_end,item
语句没有问题,正常每1分钟输出一次,过期时间代码已注释,
public ChangelogMode  getChanglogMode(ChangelogMode arg0){
   return ChangelogMode.upsert();
}
实现RedisMapper 方法  落地redis 有输出语句,每1分钟都会落地一次,我确定数据每次都一样
这upsert 不合理啊
在 2021-08-20 11:15:17,"Caizhi Weng"  写道:
>Hi!
>
>之前没注意到是 cumulate 窗口,具体的 group by 语句是什么样的呢?描述里写着“小窗口 1 分钟计算一次”,那 sink
>确实应该每分钟收到一条消息。
>
>sink 在 streaming 场景可以接收 upsert 消息,只要 getChangelogMode 返回的是 upsert 即可。
>
>李航飞  于2021年8月20日周五 上午10:03写道:
>
>> 你好:
>> 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?
>>
>>
>>
>> 在 2021-08-20 09:10:44,"李航飞"  写道:
>> >你好:
>> >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
>> >
>> >
>> >我在RichMapFunction接口里面实现open方法
>> >设置过StateTtlConfig;
>> >之后在RedisConmmand.SETEX设置过期时间
>> >都注释了,但upsert()方法还是没效
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >在 2021-08-19 17:44:02,"Caizhi Weng"  写道:
>> >>Hi!
>> >>
>> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>> >>
>> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>> >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>> >>
>> >>李航飞  于2021年8月19日周四 下午5:03写道:
>> >>
>> >>> 版本 flink1.13.2
>> >>> 具体场景
>> >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>> >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>> >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>> >>>
>> >>>
>> >>> 问题:
>> >>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>> >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>> >>>
>> >>>
>>


Re:Re: Re: cumulate函数和比较函数连用报错

2021-08-19 Thread
你好:
具体场景是对agg结果之前进行过滤,现在通过create view进行提取过滤了
现在我想通过DynameicTable的方式,以upsert写入redis里面




在 2021-08-20 10:31:18,"Caizhi Weng"  写道:
>Hi!
>
>具体的条件是什么样的呢?理论上如果是和原本的 group by ... having ... 等价的语句应该是支持的。
>
>李航飞  于2021年8月18日周三 下午4:34写道:
>
>> 哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(where a = '2')可以用,这个功能后续还会调整吗
>> 在 2021-08-18 16:21:20,"Caizhi Weng"  写道:
>> >Hi!
>> >
>> >目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window
>> >agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。
>> >
>> >李航飞  于2021年8月18日周三 下午3:55写道:
>> >
>> >> 通过flinksql建立数据处理通道
>> >> SELECT window_start,window_end,SUM(price)
>> >>
>> >> FROM TABLE(
>> >>
>> >> CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL
>> '10'
>> >> MINUTES))
>> >>
>> >> GROUP BY window_start,window_end;
>> >>
>> >> 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题
>> >> 关键一步是 StatementSet对象 sta.execute() 执行报错
>> >> java.lang.UnsupportedOperationException:
>> >> Currently Flink doesn't support individual window table-valued function
>> >> CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]).
>> >>  Please use window table-valued function with aggregate together using
>> >> window_start and window_end as group keys.
>> >> 执行环境是flink1.13.1  去掉where条件可以正常执行,加上就不行。
>> >>
>> >>
>>


Re:Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread
你好:
我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?



在 2021-08-20 09:10:44,"李航飞"  写道:
>你好:
>我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
>
>
>我在RichMapFunction接口里面实现open方法
>设置过StateTtlConfig;
>之后在RedisConmmand.SETEX设置过期时间
>都注释了,但upsert()方法还是没效
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-08-19 17:44:02,"Caizhi Weng"  写道:
>>Hi!
>>
>>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>>
>>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>>
>>李航飞  于2021年8月19日周四 下午5:03写道:
>>
>>> 版本 flink1.13.2
>>> 具体场景
>>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>>>
>>>
>>> 问题:
>>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>>>
>>>


Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread
你好:
我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。


我在RichMapFunction接口里面实现open方法
设置过StateTtlConfig;
之后在RedisConmmand.SETEX设置过期时间
都注释了,但upsert()方法还是没效














在 2021-08-19 17:44:02,"Caizhi Weng"  写道:
>Hi!
>
>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>
>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>
>李航飞  于2021年8月19日周四 下午5:03写道:
>
>> 版本 flink1.13.2
>> 具体场景
>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>>
>>
>> 问题:
>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>>
>>


flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread
版本 flink1.13.2
具体场景
flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次


问题:
测试发现,每1分钟都会输出一次,落地的数据一样,
根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?



Re:Re: cumulate函数和比较函数连用报错

2021-08-18 Thread
哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(where a = '2')可以用,这个功能后续还会调整吗
在 2021-08-18 16:21:20,"Caizhi Weng"  写道:
>Hi!
>
>目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window
>agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。
>
>李航飞  于2021年8月18日周三 下午3:55写道:
>
>> 通过flinksql建立数据处理通道
>> SELECT window_start,window_end,SUM(price)
>>
>> FROM TABLE(
>>
>> CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL '10'
>> MINUTES))
>>
>> GROUP BY window_start,window_end;
>>
>> 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题
>> 关键一步是 StatementSet对象 sta.execute() 执行报错
>> java.lang.UnsupportedOperationException:
>> Currently Flink doesn't support individual window table-valued function
>> CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]).
>>  Please use window table-valued function with aggregate together using
>> window_start and window_end as group keys.
>> 执行环境是flink1.13.1  去掉where条件可以正常执行,加上就不行。
>>
>>


cumulate函数和比较函数连用报错

2021-08-18 Thread
通过flinksql建立数据处理通道
SELECT window_start,window_end,SUM(price)

FROM TABLE(

CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL '10' 
MINUTES))

GROUP BY window_start,window_end;

大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题
关键一步是 StatementSet对象 sta.execute() 执行报错
java.lang.UnsupportedOperationException:
Currently Flink doesn't support individual window table-valued function 
CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]).
 Please use window table-valued function with aggregate together using 
window_start and window_end as group keys.
执行环境是flink1.13.1  去掉where条件可以正常执行,加上就不行。



Re:Re:Re: cumulate 不能和比较函数连用

2021-08-11 Thread
抱歉,sql语句是我手打的,没注意到,我确定和这个descriptor没关系。我去掉where条件就能正常运行,同时,我测试in,not in 
函数的时候,会报同样的错误。
At 2021-08-11 13:51:16, "李航飞"  wrote:
>org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
>Could not execute application: 
>org.apache.flink.client.program.ProgramInvocationException: The main 
>method caused an error: Currently Flink doesn't support individual window 
>table-valued function CUMULATE(time_col=[ts], max_size=[10 min], step=[1 
>min]). Please use window table-valued function with aggregate together 
>using window_start and window_end as group keys.at 
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> ~[flink-clients_2.12-1.13.1.jar:1.13.1]at 
>org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-clients_2.12-1.13.1.jar:1.13.1] at 
>org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
>~[flink-clients_2.12-1.13.1.jar:1.13.1]at 
>org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
> ~
>在 2021-08-11 12:44:38,"Caizhi Weng"  写道:
>>Hi!
>>
>>descriptor 拼错了吧。我在本地没有复现这个问题,Flink 版本是多少呢?
>>
>>李航飞  于2021年8月11日周三 上午11:41写道:
>>
>>> sql语句如下:
>>> select count(clicknum) as num
>>>
>>> from table(
>>>
>>> cumulate(table testTable, desctiptor(crtTime),interval '1'minutes,
>>> interval '10' minutes))
>>>
>>> where clicknum <>'-99'
>>>
>>> group by window_start,window_end
>>>
>>>
>>> 报错 信息:
>>> Flink doesn't support individual window table-valued function
>>> cumulate(time_col=[app_date],max_size=[10 min],step=[1 min]...
>>>
>>>
>>> 请问如何解决,谢谢


Re:Re: cumulate 不能和比较函数连用

2021-08-10 Thread
org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
Could not execute application: 
org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Currently Flink doesn't support individual window 
table-valued function CUMULATE(time_col=[ts], max_size=[10 min], step=[1 
min]). Please use window table-valued function with aggregate together 
using window_start and window_end as group keys. at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-clients_2.12-1.13.1.jar:1.13.1]at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-clients_2.12-1.13.1.jar:1.13.1] at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-clients_2.12-1.13.1.jar:1.13.1]at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
 ~
在 2021-08-11 12:44:38,"Caizhi Weng"  写道:
>Hi!
>
>descriptor 拼错了吧。我在本地没有复现这个问题,Flink 版本是多少呢?
>
>李航飞  于2021年8月11日周三 上午11:41写道:
>
>> sql语句如下:
>> select count(clicknum) as num
>>
>> from table(
>>
>> cumulate(table testTable, desctiptor(crtTime),interval '1'minutes,
>> interval '10' minutes))
>>
>> where clicknum <>'-99'
>>
>> group by window_start,window_end
>>
>>
>> 报错 信息:
>> Flink doesn't support individual window table-valued function
>> cumulate(time_col=[app_date],max_size=[10 min],step=[1 min]...
>>
>>
>> 请问如何解决,谢谢


cumulate 不能和比较函数连用

2021-08-10 Thread
sql语句如下:
select count(clicknum) as num 

from table(

cumulate(table testTable, desctiptor(crtTime),interval '1'minutes, interval 
'10' minutes))

where clicknum <>'-99'

group by window_start,window_end


报错 信息:
Flink doesn't support individual window table-valued function 
cumulate(time_col=[app_date],max_size=[10 min],step=[1 min]...


请问如何解决,谢谢