是的, 现在的问题是sliding会产生多个结果,而我只要输出最早的那个窗口的结果数据。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
大家好, 我现在的场景需求是,窗口size是1个小时,每分钟触发统计一次结果。比如现在是10点钟,则统计9点到10点的结果。
下一分钟则在10:01分时触发统计9:01到10:01的结果。
如果用Sliding window, 比如.timeWindow(Time.hours(1L), Time.minutes(1)),
则会输出60/1=60个结果集,这不是我想要的结果,我只想要当前时间往前一个小时的结果。
除了在window function api做逻辑过虑外,还有什么方法可以实现这种场景?
方便贴出代码吗,这样容易理解。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你好。
在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample,
我是这样用的,
1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
2. 在connector里指定watermark,其中transTime是消息里的字段
"rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, '-MM-dd
HH:mm:ss')), \n " +
"
你好。在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime,
没有找到一些相应的sample,我是这样用的,1.
设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime);2.
在connector里指定watermark,其中transTime是消息里的字段"rowtime AS
TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, '-MM-dd HH:mm:ss')), \n "
+"
你好。
在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample,
我是这样用的,
1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
2. 在connector里指定watermark,其中transTime是消息里的字段
"rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, '-MM-dd
HH:mm:ss')), \n " +
"
你好。
在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample,
我是这样用的,
1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
2. 在connector里指定watermark,其中transTime是消息里的字段
"rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, '-MM-dd
HH:mm:ss')), \n " +
"
大家好,
我已经分配了8个taskmanager.numberOfTaskSlots,但还是遇到如下exception,
我为job/task分配了每个3G的总内存。有没有什么建议?, 谢谢
Caused by:
org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
Could not fulfill slot request 294b93e601744edd7be66dec41e8d8ed. Requested
resource profile
大家好。
只有一个job,设置了jm/tm各总内存为3G,一个taskmanager,总共10个slot,为什么还是报这个错?
Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
No pooled slot available and request to ResourceManager for new slot failed
... 27 more
有没有一些建议,谢谢。
--
Sent from:
大家好。
我用的tumbling window,
ds.keyBy(CandleView::getMarketCode)
.timeWindow(Time.minutes(5L))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
.aggregate(new OhlcAggregateFunction(), new
OhlcWindowFunction())
.addSink(new
刚钉钉群里建议我把路径指到jobId/chk-xx目录,这样就可以恢复了。
但是如果这样,这个xx随着checkpoint的变化而变化,这样怎么做到自动提交job?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
/opt/flink/bin/flink run -d -s /opt/flink/savepoints -c
com.xxx.flink.ohlc.kafka.OrderTickCandleView
/home/service-ohlc-*-SNAPSHOT.jar
在启动job时,已经指定这个目录,但会报以下错,
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
instantiate JobManager.
at
谢谢, 我把这个folder 设置为一个755就可以了。
但现在我遇到一个问题,我目前的环境是用docker 创建了一个jobmanager, 二个taskmanager,
这三个container都map到了主机上的一个地址,
用于放checkpoints/savepoints,理论上这三个container都可以访问得到。
但尝试用这个命令恢复state启动job时报以下错误,
/opt/flink/bin/flink run -d -s /opt/flink/savepoints -c
你好,
我在flink jobmanager里的flink-conf.yaml添加了以加三个个关的state配置参数,
state.backend: filesystem
state.checkpoints.dir: file:///opt/flink/savepoints
state.savepoints.dir: file:///opt/flink/savepoints
但在做./flink savepoint 时还是报以下的错,
Caused by: java.io.IOException: Failed to create savepoint directory at
up一下, 有给些建议的吗?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你好,
我有一个sql,
select marketCode as market_code,
CURRENT_TIMESTAMP as process_time,
sum((CASE
WHEN msgType = 'FUTURE' THEN matchedPrice
WHEN msgType = 'SPOT' THEN matchedPrice
ELSE 0.5 * (leg1Price + leg2Price)
END) *
你好, 用kafka table
connector接过来的数据,在flink这边会保留多久,在参数列表里没有看到有这个设置,如果保留太久,内存会撑暴,比如我只想保留半个小时,之前的数据可以清除。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
大家好, 有有docker jobmanager HA的 1.11.2 docker-entrypoint.sh脚本吗? 在官方的github里没有看到。
另外,有没有docker-compose.yml的配HA的例子吗? 谢谢。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hi,
在用sql api时遇到下面这个exception,
Processing time attribute 'proctime' is not of type SQL_TIMESTAMP.
我是用这个方式map出来的,
tEnv.connect(getPulsarDescriptor(inTopic))
.withSchema( new Schema()
.field("marketId", DataTypes.BIGINT())
hi,
在用table window时报下面的exception, 需要在groupBy里增加什么吗? window W已经指定的proctime字段pt了。
报Exception, Caused by: org.apache.flink.table.api.ValidationException: A
group window expects a time attribute for grouping in a stream environment.
Table outTable = tEnv.from(tableName)
可以补吗? 比如我现在是1分钟的窗口,要是这一分种 没有message,那就以上一个窗口的数据作为这一窗口的数据。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
自己顶一下, 有人给些建议吗?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
大家好, flink第二个窗口无数据时,怎么填充第一个窗口数据;flink是以事件为驱动,这种需求能实现吗?通过状态可以保存上一个窗口数据吗?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你好。
当connector连接kafka,如果某个message出现exception时,task就停了, 没有自动重新连接, 看了kafka
connector的配置,没有这方面的设置,这个有什么重连机制吗? Thanks.
--
Sent from: http://apache-flink.147419.n8.nabble.com/
24 matches
Mail list logo