Re: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-28 文章 Lei Wang
谢谢,这种是可以。

取窗口内最新的数据怎么写合适呢,我直接这样试了下不符合预期:

env.addSource(consumer).keyBy(new KeySelector() {
@Override
public String getKey(String value) throws Exception {
return value;
}
}).timeWindow(Time.seconds(10)).reduce((a,b)->b).addSink()

实际上逆序输出了窗口内的所有记录。

谢谢,

王磊



On Mon, Feb 28, 2022 at 9:59 AM 18703416...@163.com <18703416...@163.com>
wrote:

> keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小
>
> > 2022年2月25日 下午6:45,Lei Wang  写道:
> >
> > 场景描述:
> > Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
> > order_id   status
> > 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。
> >
> > 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
> > 最终的状态不丢,但这个最终的状态也不确定是多少。
> >
> > 我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
> > 最后来的两条记录时间间隔太小,会导致最终的状态丢失。
> >
> > 请问有什么其他的解决方法吗?
> >
> > 谢谢,
> > 王磊
>
>


Re: keyBy 后的 getKey 函数调用了两次

2022-02-28 文章 Lei Wang
谢谢,了解了。

另外一个问题,我 timeWindown 之后只想保留最后一条在这个 window 中的数据直接输出:

env.addSource(consumer).keyBy(new KeySelector() {
@Override
public String getKey(String value) throws Exception {
return value;
}
}).*timeWindow(Time.seconds(10)).reduce((a,b)->b).*addSink(new SinkTest(1));

上面的代码我测试了下不符合预期,其实是逆序输出了窗口中左右的记录。

需要用什么方式实现这个功能比较合适呢?


On Tue, Mar 1, 2022 at 10:52 AM yidan zhao  wrote:

>
> keyBy的时候调用一次,sink的时候应该也会调用一次。因为keyBy是hash分区,前后是不chain在一起的。sink部分处理输入的ele的时候需要基于keySelector获取key。
>
> Lei Wang  于2022年3月1日周二 10:49写道:
>
> > 接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。
> >
> > env.addSource(consumer).keyBy(new KeySelector() {
> > @Override
> > public String getKey(String value) throws Exception {
> > System.out.println(value);
> > return value;
> > }
> > }).addSink(new SinkTest(1));
> >
> >
> > 我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。
> >
> > 为什么会这样呢,哪个地方还调用了 getKey 这个函数呢?
> >
> >
> > 谢谢,
> >
> > 王磊
> >
>


keyBy 后的 getKey 函数调用了两次

2022-02-28 文章 Lei Wang
接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。

env.addSource(consumer).keyBy(new KeySelector() {
@Override
public String getKey(String value) throws Exception {
System.out.println(value);
return value;
}
}).addSink(new SinkTest(1));


我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。

为什么会这样呢,哪个地方还调用了 getKey 这个函数呢?


谢谢,

王磊


实时数据入库怎样过滤中间状态,保证最终一致

2022-02-25 文章 Lei Wang
场景描述:
Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
order_id   status
只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。

对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
最终的状态不丢,但这个最终的状态也不确定是多少。

我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
最后来的两条记录时间间隔太小,会导致最终的状态丢失。

请问有什么其他的解决方法吗?

谢谢,
王磊


Re: flink 以阿里云 oss 作为 checkpoint cpu 过高

2021-10-19 文章 Lei Wang
确实是跟 OSS 有关,我换成 HDFS 作为 checkpoint 后端就没有这种现象了,但我也不明白为什么会这样。
程序中设置了增量 checkpoit,但 flink web UI 中显示的 checkpoint data size 一直不断变高,三天就到了 1G


On Mon, Oct 18, 2021 at 10:44 AM Michael Ran  wrote:

> 应该和OSS没关系吧,毕竟只是个存储。
> 我们CPU 你先看看消耗在哪个线程或者方法类呗
>
>
>
> 在 2021-10-08 16:34:47,"Lei Wang"  写道:
>
>
>
> flink 程序以 RocksDB 作为 stateBackend,  aliyun OSS 作为 checkpoint 数据最终的物理位置。
> 我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。
>
>
>
>
>
>
> 这个可能的原因是什么?会跟 OSS 有关吗?
>
>
> 谢谢,
> 王磊


flink 以阿里云 oss 作为 checkpoint cpu 过高

2021-10-08 文章 Lei Wang
flink 程序以 RocksDB 作为 stateBackend,  aliyun OSS 作为 checkpoint 数据最终的物理位置。
我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。

[image: image.png]

这个可能的原因是什么?会跟 OSS 有关吗?

谢谢,
王磊


Re: flink 提交job后 task 一直是 schedule 状态

2021-06-18 文章 Lei Wang
flink-conf.yaml 中加入下面的配置就可以了,但我不知道为什么。

taskmanager.host: localhost



On Fri, Jun 18, 2021 at 1:43 PM Lei Wang  wrote:

> flink-1.11.2
> ./bin/start-cluster.sh 启动然后
> ./bin/flink run examples/streaming/SocketWindowWordCount.jar  --hostname
> localhost --port 
>
> 但是 jobmanger 页面 task 一直是 scheduler 状态,过了一段那时间后输出错误
>
> 2021-06-18 13:34:26,683 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> Socket Stream -> Flat Map (1/1) (7fc37b6f2e20170da9d95a9b2
> 28577f2) switched from SCHEDULED to FAILED on not deployed.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout. Please
> make sure tha
> t the cluster has enough resources.
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ~[flink-dist_2.11-1.11.2.jar:1
> .11.2]
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2
>
> 但是 slot 资源是有的。我在其他的机器上执行这种操作是正常的。
>
> 有大神帮助解释一下吗?
>
> 谢谢,
> 王磊
>


flink 提交job后 task 一直是 schedule 状态

2021-06-17 文章 Lei Wang
flink-1.11.2
./bin/start-cluster.sh 启动然后
./bin/flink run examples/streaming/SocketWindowWordCount.jar  --hostname
localhost --port 

但是 jobmanger 页面 task 一直是 scheduler 状态,过了一段那时间后输出错误

2021-06-18 13:34:26,683 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Socket Stream -> Flat Map (1/1) (7fc37b6f2e20170da9d95a9b2
28577f2) switched from SCHEDULED to FAILED on not deployed.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate the required slot within slot request timeout. Please
make sure tha
t the cluster has enough resources.
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
~[flink-dist_2.11-1.11.2.jar:1
.11.2]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
~[flink-dist_2.11-1.11.2.jar:1.11.2

但是 slot 资源是有的。我在其他的机器上执行这种操作是正常的。

有大神帮助解释一下吗?

谢谢,
王磊


Flink 提交 job 后 task 始终是schedule 状态

2021-06-17 文章 Lei Wang
用 standone 方式在一台机器上启动,提交job 后

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate the required slot within slot request timeout. Please
make sure that the cluster has enough resources.

slots 是充足的。
我用的是 flink-1.11.2 ,我看了下跟 https://issues.apache.org/jira/browse/FLINK-19237
类似,但是我看不懂是什么意思。

但奇怪的是,我在其他服务器上做相同的操作就没有这个问题。有大神给解释下吗?


谢谢,
王磊


flink 在不同的 operator 之间传递通过基类方式,在 operator 中能转换为子类型吗?

2021-03-11 文章 Lei Wang
flink 变量在 operator 之间传递是需要序列话的。如果 DataStream<> 泛型通过基类引用,到后面的 operator
上能保留子类的完整信息并强制转换吗?


比如:

DataStream stream = source.from(SubClass);

stream.keyBy(   ) {

这里的代码能判断并强制转换吗。

SubClass subObj = (SubClass) baseObj;

}


谢谢,

王磊


flink standalone 模式运行任务的问题

2021-03-10 文章 Lei Wang
java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
has occurred. This can mean two things: either the job requires a larger
size of JVM metaspace to load classes or there is a class loading leak. In
the first case 'taskmanager.memory.jvm-metaspace.size'

按照日志中的建议,我把 metaspace size 从 256M 调整到了 512M,但还是出现了这个错误。
经过我的观察,在新提交任务的时候比较容易出现这个问题,任务一直正常跑很少出现这个错误。
我 flink 是 standAlone 模式部署的。就两个机器, 每台机器 一个 taskManager,总共运行了 9 个 job,所有 job
都打在了一个 jar 中,jar 大小为 42M.

我自己的猜想是程序正常运行时 metaspace 基本已经满了,再新提交一个任务导致又重新初始化 jar 中所有类的 符号
引用空间不够导致了这个错误。不知道这个想法对不对。

但我还有一个疑问,standalone 模式不同  job 实际上跑在了相同的 TaskMgr 进程上,只有一个 JVM,怎么实现代码隔离呢?
比如下面的例子:

job1 和  job2 打在了同一个 jar 中,都用到了代码中的一个 static 变量。
kill 掉 job1, 更改了 这个 static 变种的值,再提交 job1,那更改后的static 变量值 对 job2 会生效吗?

谢谢,
王磊


Flink RocksDBStateBackend 可以设置阿里云 OSS 存储吗?

2020-12-28 文章 Lei Wang
如题, 可以直接这样写吗?

env.setStateBackend(new RocksDBStateBackend(“oss://”, true));

谢谢,
王磊


怎样定时更新广播变量的值

2020-12-08 文章 Lei Wang
flink 程序读配置文件,配置文件的内容作为广播变量广播出去。

如果配置文件更新了,怎样能把广播变量的内容也更新呢?

谢谢,
王磊


Flink Sink function 的 close() 在程序停止时一定会被调用到吗?

2020-11-24 文章 Lei Wang
我自己写了个 Sink 到数据库的 SinkFunction,SinkFunction 中指定只有数据到了一定条数(100)
才执行入库操作。我通过定义了一个 List 缓存需要入库的数据的方式实现。


public class SinkToJDBCWithJDBCStatementBatch extends
RichSinkFunction {

private List statementList = new
ArrayList();


@Override
public void close() throws Exception {
writeToDatabase();
this.statementList.clear();
super.close();
if (dataSource != null) {
dataSource.close();
}
}

@Override
public void invoke(JDBCStatement statement, Context context) throws
Exception {

if (statementList.size() < 100) {
statementList.add(statement);
return;
}
writeToDatabase();
this.statementList.clear();
}

public void writeToDatabase(){
.
}
}

我想确认一下 这个 close() 方法在程序停止的时候一定会被调用到吗?是通过怎样的机制实现的呢?

谢谢,
王磊


flink keyedState 能不能存储泛型或抽象类型

2020-11-16 文章 Lei Wang
下面的业务逻辑

robot 传感器上报的信息,先按 robotId keyBy,之后要遍历很多的规则。每个规则存储一个之前的对象,实现如下:

private transient MapState state;

for (Entry entry :
RulesFactory.getChargerTwoRecordRules().entrySet()) {
String faultName = entry.getKey();
IChargerTwoRecordRule rule = entry.getValue();
RobotData old = state.get(faultName);
rule.handleLogMsg(old, current);
}

现在有部分规则存储的对象不能用 RobotData 表示,有没有可能用类似泛型或继承的方式实现 MapState value 存储不同类型的数据呢?


比如

MapState state;

之后根据不同的规则 把 Object 转换成具体的类



谢谢,

王磊


Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 文章 Lei Wang
用 session windown 确实能满足功能:

robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x,
y) -> y);

按照这种写法, 我理解 window state 中只保存了最近的一条记录。


正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。



On Thu, Nov 12, 2020 at 5:25 PM hailongwang <18868816...@163.com> wrote:

>
>
>
> 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
> 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
>
>
>
>
> 在 2020-11-12 14:34:59,"Danny Chan"  写道:
> >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
> >
> >Lei Wang  于2020年11月11日周三 下午2:03写道:
> >
> >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
> >>
> >> 比如
> >> robot1   2020-11-11 12:00:00 msginfo
> >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00
> 就发出报警呢?
> >>
> >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
> >>
> >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
> >>
> >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
> >> 我们必须 按 robotId 做 keyBy
> >>
> >> 求大神指教。
> >>
> >> 谢谢,
> >> 王磊
> >>
>


怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-10 文章 Lei Wang
有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。

比如
robot1   2020-11-11 12:00:00 msginfo
之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00 就发出报警呢?

flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?

我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。

这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
我们必须 按 robotId 做 keyBy

求大神指教。

谢谢,
王磊