Re: flink滑动窗口输出结果的问题

2021-01-17 文章 marble.zh...@coinflex.com.INVALID
是的, 现在的问题是sliding会产生多个结果,而我只要输出最早的那个窗口的结果数据。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink滑动窗口输出结果的问题

2021-01-15 文章 marble.zh...@coinflex.com.INVALID
大家好, 我现在的场景需求是,窗口size是1个小时,每分钟触发统计一次结果。比如现在是10点钟,则统计9点到10点的结果。 下一分钟则在10:01分时触发统计9:01到10:01的结果。 如果用Sliding window, 比如.timeWindow(Time.hours(1L), Time.minutes(1)), 则会输出60/1=60个结果集,这不是我想要的结果,我只想要当前时间往前一个小时的结果。 除了在window function api做逻辑过虑外,还有什么方法可以实现这种场景?

Re: flink在source中定义了eventtime和watermark,可以在sink时从context中获取eventtime吗

2020-11-02 文章 marble.zh...@coinflex.com.INVALID
方便贴出代码吗,这样容易理解。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

大家好 kafka table connector eventTime的问题

2020-11-02 文章 marble.zh...@coinflex.com.INVALID
你好。 在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample, 我是这样用的, 1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 2. 在connector里指定watermark,其中transTime是消息里的字段 "rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, '-MM-dd HH:mm:ss')), \n " + "

大家好 kafka table connector eventTime的问题

2020-11-02 文章 marble.zh...@coinflex.com.INVALID
你好。在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample,我是这样用的,1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime);2. 在connector里指定watermark,其中transTime是消息里的字段"rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, '-MM-dd HH:mm:ss')), \n " +"

kafka table connector eventTime的问题

2020-11-02 文章 marble.zh...@coinflex.com.INVALID
你好。 在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample, 我是这样用的, 1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 2. 在connector里指定watermark,其中transTime是消息里的字段 "rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, '-MM-dd HH:mm:ss')), \n " + "

大家好 kafka table connector eventTime的问题

2020-11-02 文章 marble.zh...@coinflex.com.INVALID
你好。 在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample, 我是这样用的, 1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 2. 在connector里指定watermark,其中transTime是消息里的字段 "rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, '-MM-dd HH:mm:ss')), \n " + "

No pooled slot available and request to ResourceManager for new slot failed

2020-10-28 文章 marble.zh...@coinflex.com.INVALID
大家好, 我已经分配了8个taskmanager.numberOfTaskSlots,但还是遇到如下exception, 我为job/task分配了每个3G的总内存。有没有什么建议?, 谢谢 Caused by: org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException: Could not fulfill slot request 294b93e601744edd7be66dec41e8d8ed. Requested resource profile

No pooled slot available and request to ResourceManager for new slot failed

2020-10-28 文章 marble.zh...@coinflex.com.INVALID
大家好。 只有一个job,设置了jm/tm各总内存为3G,一个taskmanager,总共10个slot,为什么还是报这个错? Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed ... 27 more 有没有一些建议,谢谢。 -- Sent from:

tumbling的窗口更新随着job运行时间越长,delay越久,sliding不会

2020-10-28 文章 marble.zh...@coinflex.com.INVALID
大家好。 我用的tumbling window, ds.keyBy(CandleView::getMarketCode) .timeWindow(Time.minutes(5L)) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) .aggregate(new OhlcAggregateFunction(), new OhlcWindowFunction()) .addSink(new

Re: flink state.savepoints.dir 目录配置问题

2020-10-27 文章 marble.zh...@coinflex.com.INVALID
刚钉钉群里建议我把路径指到jobId/chk-xx目录,这样就可以恢复了。 但是如果这样,这个xx随着checkpoint的变化而变化,这样怎么做到自动提交job? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:无法从checkpoint中恢复state

2020-10-27 文章 marble.zh...@coinflex.com.INVALID
/opt/flink/bin/flink run -d -s /opt/flink/savepoints -c com.xxx.flink.ohlc.kafka.OrderTickCandleView /home/service-ohlc-*-SNAPSHOT.jar 在启动job时,已经指定这个目录,但会报以下错, Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate JobManager. at

Re: flink state.savepoints.dir 目录配置问题

2020-10-27 文章 marble.zh...@coinflex.com.INVALID
谢谢, 我把这个folder 设置为一个755就可以了。 但现在我遇到一个问题,我目前的环境是用docker 创建了一个jobmanager, 二个taskmanager, 这三个container都map到了主机上的一个地址, 用于放checkpoints/savepoints,理论上这三个container都可以访问得到。 但尝试用这个命令恢复state启动job时报以下错误, /opt/flink/bin/flink run -d -s /opt/flink/savepoints -c

flink state.savepoints.dir 目录配置问题

2020-10-26 文章 marble.zh...@coinflex.com.INVALID
你好, 我在flink jobmanager里的flink-conf.yaml添加了以加三个个关的state配置参数, state.backend: filesystem state.checkpoints.dir: file:///opt/flink/savepoints state.savepoints.dir: file:///opt/flink/savepoints 但在做./flink savepoint 时还是报以下的错, Caused by: java.io.IOException: Failed to create savepoint directory at

Re: 请教 table /sql API, 窗口frist/last value

2020-10-21 文章 marble.zh...@coinflex.com.INVALID
up一下, 有给些建议的吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

请教 table /sql API, 窗口frist/last value

2020-10-21 文章 marble.zh...@coinflex.com.INVALID
你好, 我有一个sql, select marketCode as market_code, CURRENT_TIMESTAMP as process_time, sum((CASE WHEN msgType = 'FUTURE' THEN matchedPrice WHEN msgType = 'SPOT' THEN matchedPrice ELSE 0.5 * (leg1Price + leg2Price) END) *

kafka table connector保留多久的数据

2020-10-14 文章 marble.zh...@coinflex.com.INVALID
你好, 用kafka table connector接过来的数据,在flink这边会保留多久,在参数列表里没有看到有这个设置,如果保留太久,内存会撑暴,比如我只想保留半个小时,之前的数据可以清除。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

有docker jobmanager HA的 1.11.2 docker-entrypoint.sh

2020-09-30 文章 marble.zh...@coinflex.com.INVALID
大家好, 有有docker jobmanager HA的 1.11.2 docker-entrypoint.sh脚本吗? 在官方的github里没有看到。 另外,有没有docker-compose.yml的配HA的例子吗? 谢谢。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Processing time attribute 'pt' is not of type SQL_TIMESTAMP.

2020-09-24 文章 marble.zh...@coinflex.com.INVALID
hi, 在用sql api时遇到下面这个exception, Processing time attribute 'proctime' is not of type SQL_TIMESTAMP. 我是用这个方式map出来的, tEnv.connect(getPulsarDescriptor(inTopic)) .withSchema( new Schema() .field("marketId", DataTypes.BIGINT())

A group window expects a time attribute for grouping in a stream environment.

2020-09-24 文章 marble.zh...@coinflex.com.INVALID
hi, 在用table window时报下面的exception, 需要在groupBy里增加什么吗? window W已经指定的proctime字段pt了。 报Exception, Caused by: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment. Table outTable = tEnv.from(tableName)

Re: flink 填补窗口问题

2020-09-15 文章 marble.zh...@coinflex.com.INVALID
可以补吗? 比如我现在是1分钟的窗口,要是这一分种 没有message,那就以上一个窗口的数据作为这一窗口的数据。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink table Kafka 重新连接的问题

2020-09-10 文章 marble.zh...@coinflex.com.INVALID
自己顶一下, 有人给些建议吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink 填补窗口问题

2020-09-10 文章 marble.zh...@coinflex.com.INVALID
大家好, flink第二个窗口无数据时,怎么填充第一个窗口数据;flink是以事件为驱动,这种需求能实现吗?通过状态可以保存上一个窗口数据吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink table Kafka 重新连接的问题

2020-09-09 文章 marble.zh...@coinflex.com.INVALID
你好。 当connector连接kafka,如果某个message出现exception时,task就停了, 没有自动重新连接, 看了kafka connector的配置,没有这方面的设置,这个有什么重连机制吗? Thanks. -- Sent from: http://apache-flink.147419.n8.nabble.com/