hi:
flink keyby()时能否获取到subTask的编号,根据编号分组,让上游数据可以继续保持原有的数据依然在同一个subTask中进行后续计算。
在AbstractRichFunction类中this.getRuntimeContext().getIndexOfThisSubtask()可以获取编号,但是keyby()的KeySelector类中没有getRuntimeContext()方法。
hi,
感谢你的回复。
报错是在 getValue 的时候。
at GroupAggsHandler$439.getValue(Unknown Source)
at
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:146)
我的疑问是使用一个 Class 包装下 ListView 就能正常工作,而直接使用 ListView 是会报错。
比如使用 AggregateFunction
通过metrics观察,下游的input queue都是0, 上游queue是满的
观察监控,发现当出现速率跌零的时候,下游的input queue是0
max by
(exported_task_name)(flink_taskmanager_job_task_Network_Input_0_maxQueueLen{job_name=~'$job_name'})
此时上游的队列还是满的
(flink_taskmanager_job_task_Network_Output_0_maxQueueLen{job_name=~'$job_name'})
这个问
Hi all,
看table
store的介绍也是关于数据湖存储以及用于实时流式读取的,那在定位上与iceberg、hudi等项目有什么不一样么,为什么要再开发一个项目?
Best.
Hi,
理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。
实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象
--
Best!
Xuyang
在 2022-09-07 16:23:25,"Zhiwen Sun" 写道:
Hi,
理论上来说,在你的ca
Hi
有 savepoint/checkpoint 失败时的具体 jobmanager log 以及失败 task 对应的 taskmanager log
的话可以发一下,大家帮助看一下
Best,
Congxian
Xuyang 于2022年8月30日周二 23:18写道:
>
> Hi,看起来这个报错是用于输出信息的文件找不到了,可以尝试加一下这个配置再试一下“taskmanager.log.path”,找一下导致tasks超时的根本原因。
> 还可以试一下用火焰图或jstack查看一下那几个tasks超时的时候是卡在哪个方法上。
>
>
>
>
>
>
>
>
>
>
>
Hello all,
我看 ListView 使用的时候,有以下示例
public class MyAccumulator {
public ListView list = new ListView<>();
// or explicit:
// {@literal @}DataTypeHint("ARRAY")
// public ListView list = new ListView<>();
public long count = 0L;
}
public class MyAggregateFunction extends A