cumulate 不能和比较函数连用

2021-08-10 Thread
sql语句如下: select count(clicknum) as num from table( cumulate(table testTable, desctiptor(crtTime),interval '1'minutes, interval '10' minutes)) where clicknum <>'-99' group by window_start,window_end 报错 信息: Flink doesn't support individual window table-valued function cumulate(time_col=[app_

Re:Re: cumulate 不能和比较函数连用

2021-08-10 Thread
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ~ 在 2021-08-11 12:44:38,"Caizhi Weng" 写道: >Hi! > >descriptor 拼错了吧。我在本地没有复现这个问题,Flink 版本是多少呢? > >李航飞 于2021年8月11日周三 上午11:41写道: > >> sql语句如下: >> s

Re:Re:Re: cumulate 不能和比较函数连用

2021-08-10 Thread
抱歉,sql语句是我手打的,没注意到,我确定和这个descriptor没关系。我去掉where条件就能正常运行,同时,我测试in,not in 函数的时候,会报同样的错误。 At 2021-08-11 13:51:16, "李航飞" wrote: >org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - >Could not execute application: >org.apache.flink.client.program.Program

cumulate函数和比较函数连用报错

2021-08-18 Thread
通过flinksql建立数据处理通道 SELECT window_start,window_end,SUM(price) FROM TABLE( CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL '10' MINUTES)) GROUP BY window_start,window_end; 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题 关键一步是 StatementSet对象 sta.execute() 执行报

Re:Re: cumulate函数和比较函数连用报错

2021-08-18 Thread
哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(where a = '2')可以用,这个功能后续还会调整吗 在 2021-08-18 16:21:20,"Caizhi Weng" 写道: >Hi! > >目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window >agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。 > &g

flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread
版本 flink1.13.2 具体场景 flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入, 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次 问题: 测试发现,每1分钟都会输出一次,落地的数据一样, 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?

Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread
置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key >value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。 > >李航飞 于2021年8月19日周四 下午5:03写道: > >> 版本 flink1.13.2 >> 具体场景 >> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, >> 通过重写getChangelogMode方法,设置Changelo

Re:Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread
你好: 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗? 在 2021-08-20 09:10:44,"李航飞" 写道: >你好: >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。 > > >我在RichMapFunction接口里面实现open方法 >设置过StateTtlConfig; >之后在RedisConmmand.SETEX设置过期时间 >都注释了,但upsert()方法还是没效 > > > > > >

Re:Re: Re: cumulate函数和比较函数连用报错

2021-08-19 Thread
你好: 具体场景是对agg结果之前进行过滤,现在通过create view进行提取过滤了 现在我想通过DynameicTable的方式,以upsert写入redis里面 在 2021-08-20 10:31:18,"Caizhi Weng" 写道: >Hi! > >具体的条件是什么样的呢?理论上如果是和原本的 group by ... having ... 等价的语句应该是支持的。 > >李航飞 于2021年8月18日周三 下午4:34写道: > >> 哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(

Re:Re: Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread
arg0){ return ChangelogMode.upsert(); } 实现RedisMapper 方法 落地redis 有输出语句,每1分钟都会落地一次,我确定数据每次都一样 这upsert 不合理啊 在 2021-08-20 11:15:17,"Caizhi Weng" 写道: >Hi! > >之前没注意到是 cumulate 窗口,具体的 group by 语句是什么样的呢?描述里写着“小窗口 1 分钟计算一次”,那 sink >确实应该每分钟收到一条消息。 > >sink 在 streaming 场景可以接收 upsert

cumulate累加函数输出问题

2021-08-20 Thread
能不能通过设置,每分钟只输出 当前小窗口中计算数据的结果,之前计算的结果不再输出 目前测试发现,输入的数据越多,到下次输出的数据也会越来越多, 不同窗口的计算结果,都会再下次窗口中输出,

Re:Re: cumulate累加函数输出问题

2021-08-20 Thread
你好: 我使用的场景是要实时统计一天的数据,在小窗口进行即时输出,延迟不能太高,cumulate window符合要求,tumble window 延迟太高了。 在 2021-08-20 16:01:57,"Caizhi Weng" 写道: >Hi! > >你可能想要的是 tumble window 而不是 cumulate window。 > >李航飞 于2021年8月20日周五 下午3:26写道: > >> 能不能通过设置,每分钟只输出 当前小窗口中计算数据的结果,之前计算的结果不再输出 >> &

mini-batch 设置后没效果

2021-08-24 Thread
通过Configuration 设置 table.exec.mini-batch.enabled= true; table.exec.mini-batch.allow-latency = 15s; table.exec.mini-batch.size = 50; 上面的配置测试了,没效果,下面的测试 table.exec.mini-batch.allow-latency = 15000 也没效果什么原因?

Re:Re: mini-batch 设置后没效果

2021-08-24 Thread
设置的延迟15秒,实际测试发现数据来了之后就处理了,没有延迟15秒 在 2021-08-25 11:12:19,"Caizhi Weng" 写道: >Hi! > >所谓的没效果指的是什么现象呢?建议详细描述一下场景与问题。 > >李航飞 于2021年8月25日周三 上午11:04写道: > >> 通过Configuration 设置 >> table.exec.mini-batch.enabled= true; >> table.exec.mini-batch.allow-lat

mini-batch配置信息没产生效果

2021-08-25 Thread
Configuration conf = new Configuration(); conf.setString("table.exec.mini-batch.enabled","true"); conf.setString("table.exec.mini-batch.allow-latency","15s"); conf.setString("table.exec.mini-batch.size","50"); StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironm

table.exec.state.ttl

2021-08-26 Thread
Configuration conf = new Configuration(); conf.setString("table.exec.mini-batch.enabled","true"); conf.setString("table.exec.mini-batch.allow-latency","15s"); conf.setString("table.exec.mini-batch.size","50"); conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment execEnv = Strea

Re:Re: table.exec.state.ttl

2021-08-26 Thread
environment 的 table config 里的,加在 execution environment >里无效。 > >李航飞 于2021年8月26日周四 下午7:02写道: > >> Configuration conf = new Configuration(); >> conf.setString("table.exec.mini-batch.enabled","true"); >> conf.setString("table.exec.mini-batch.al

Tumbling Windows 窗口可开的最小单位

2021-11-04 Thread
滚动窗口最小可开多大,100ms? 对性能有什么影响吗?

退订

2021-12-28 Thread
退订

Table Api Connectors kafka bounded

2022-03-18 Thread
现在有一个需求是,创建一个任务,消费kafka,仅消费一个片段 即设定起始消费点和结束消费位置 我看到DataStream Connectors kafka 中有一个setBounded (setUnbounded)属性 ,可以满足需求。 问题; 我想使用 Table API 完成上面的需求,该怎么办? Table API 是否有相关属性? 有其他办法满足这个需求吗? 流处理批处理都行。

设定kafka消费结束位置

2022-03-20 Thread
我想通过Table API 的方式消费kafka 需要设定消费kafka 开始位置(时间)和结束位置(时间) 请问有没有相关属性 例如DataStream中的setUnbounded属性, 有没有其他办法可以实现类似功能?

设定kafka消费结束位置

2022-03-20 Thread
我想通过Table API 的方式消费kafka 需要设定消费kafka 开始位置(时间)和结束位置(时间) 请问有没有相关属性 例如DataStream中的setUnbounded属性, 有没有其他办法可以实现类似功能?