Re: 实时数据入库怎样过滤中间状态,保证最终一致
谢谢,这种是可以。 取窗口内最新的数据怎么写合适呢,我直接这样试了下不符合预期: env.addSource(consumer).keyBy(new KeySelector() { @Override public String getKey(String value) throws Exception { return value; } }).timeWindow(Time.seconds(10)).reduce((a,b)->b).addSink() 实际上逆序输出了窗口内的所有记录。 谢谢, 王磊 On Mon, Feb 28, 2022 at 9:59 AM 18703416...@163.com <18703416...@163.com> wrote: > keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小 > > > 2022年2月25日 下午6:45,Lei Wang 写道: > > > > 场景描述: > > Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下: > > order_id status > > 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。 > > > > 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id > > 最终的状态不丢,但这个最终的状态也不确定是多少。 > > > > 我的做法是 KeyBy orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id > > 最后来的两条记录时间间隔太小,会导致最终的状态丢失。 > > > > 请问有什么其他的解决方法吗? > > > > 谢谢, > > 王磊 > >
Re: keyBy 后的 getKey 函数调用了两次
谢谢,了解了。 另外一个问题,我 timeWindown 之后只想保留最后一条在这个 window 中的数据直接输出: env.addSource(consumer).keyBy(new KeySelector() { @Override public String getKey(String value) throws Exception { return value; } }).*timeWindow(Time.seconds(10)).reduce((a,b)->b).*addSink(new SinkTest(1)); 上面的代码我测试了下不符合预期,其实是逆序输出了窗口中左右的记录。 需要用什么方式实现这个功能比较合适呢? On Tue, Mar 1, 2022 at 10:52 AM yidan zhao wrote: > > keyBy的时候调用一次,sink的时候应该也会调用一次。因为keyBy是hash分区,前后是不chain在一起的。sink部分处理输入的ele的时候需要基于keySelector获取key。 > > Lei Wang 于2022年3月1日周二 10:49写道: > > > 接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。 > > > > env.addSource(consumer).keyBy(new KeySelector() { > > @Override > > public String getKey(String value) throws Exception { > > System.out.println(value); > > return value; > > } > > }).addSink(new SinkTest(1)); > > > > > > 我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。 > > > > 为什么会这样呢,哪个地方还调用了 getKey 这个函数呢? > > > > > > 谢谢, > > > > 王磊 > > >
keyBy 后的 getKey 函数调用了两次
接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。 env.addSource(consumer).keyBy(new KeySelector() { @Override public String getKey(String value) throws Exception { System.out.println(value); return value; } }).addSink(new SinkTest(1)); 我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。 为什么会这样呢,哪个地方还调用了 getKey 这个函数呢? 谢谢, 王磊
实时数据入库怎样过滤中间状态,保证最终一致
场景描述: Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下: order_id status 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id 最终的状态不丢,但这个最终的状态也不确定是多少。 我的做法是 KeyBy orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id 最后来的两条记录时间间隔太小,会导致最终的状态丢失。 请问有什么其他的解决方法吗? 谢谢, 王磊
Re: flink 以阿里云 oss 作为 checkpoint cpu 过高
确实是跟 OSS 有关,我换成 HDFS 作为 checkpoint 后端就没有这种现象了,但我也不明白为什么会这样。 程序中设置了增量 checkpoit,但 flink web UI 中显示的 checkpoint data size 一直不断变高,三天就到了 1G On Mon, Oct 18, 2021 at 10:44 AM Michael Ran wrote: > 应该和OSS没关系吧,毕竟只是个存储。 > 我们CPU 你先看看消耗在哪个线程或者方法类呗 > > > > 在 2021-10-08 16:34:47,"Lei Wang" 写道: > > > > flink 程序以 RocksDB 作为 stateBackend, aliyun OSS 作为 checkpoint 数据最终的物理位置。 > 我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。 > > > > > > > 这个可能的原因是什么?会跟 OSS 有关吗? > > > 谢谢, > 王磊
flink 以阿里云 oss 作为 checkpoint cpu 过高
flink 程序以 RocksDB 作为 stateBackend, aliyun OSS 作为 checkpoint 数据最终的物理位置。 我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。 [image: image.png] 这个可能的原因是什么?会跟 OSS 有关吗? 谢谢, 王磊
Re: flink 提交job后 task 一直是 schedule 状态
flink-conf.yaml 中加入下面的配置就可以了,但我不知道为什么。 taskmanager.host: localhost On Fri, Jun 18, 2021 at 1:43 PM Lei Wang wrote: > flink-1.11.2 > ./bin/start-cluster.sh 启动然后 > ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname > localhost --port > > 但是 jobmanger 页面 task 一直是 scheduler 状态,过了一段那时间后输出错误 > > 2021-06-18 13:34:26,683 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > Socket Stream -> Flat Map (1/1) (7fc37b6f2e20170da9d95a9b2 > 28577f2) switched from SCHEDULED to FAILED on not deployed. > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Could not allocate the required slot within slot request timeout. Please > make sure tha > t the cluster has enough resources. > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441) > ~[flink-dist_2.11-1.11.2.jar:1 > .11.2] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422) > ~[flink-dist_2.11-1.11.2.jar:1.11.2 > > 但是 slot 资源是有的。我在其他的机器上执行这种操作是正常的。 > > 有大神帮助解释一下吗? > > 谢谢, > 王磊 >
flink 提交job后 task 一直是 schedule 状态
flink-1.11.2 ./bin/start-cluster.sh 启动然后 ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname localhost --port 但是 jobmanger 页面 task 一直是 scheduler 状态,过了一段那时间后输出错误 2021-06-18 13:34:26,683 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Socket Stream -> Flat Map (1/1) (7fc37b6f2e20170da9d95a9b2 28577f2) switched from SCHEDULED to FAILED on not deployed. org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate the required slot within slot request timeout. Please make sure tha t the cluster has enough resources. at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441) ~[flink-dist_2.11-1.11.2.jar:1 .11.2] at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422) ~[flink-dist_2.11-1.11.2.jar:1.11.2 但是 slot 资源是有的。我在其他的机器上执行这种操作是正常的。 有大神帮助解释一下吗? 谢谢, 王磊
Flink 提交 job 后 task 始终是schedule 状态
用 standone 方式在一台机器上启动,提交job 后 org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. slots 是充足的。 我用的是 flink-1.11.2 ,我看了下跟 https://issues.apache.org/jira/browse/FLINK-19237 类似,但是我看不懂是什么意思。 但奇怪的是,我在其他服务器上做相同的操作就没有这个问题。有大神给解释下吗? 谢谢, 王磊
flink 在不同的 operator 之间传递通过基类方式,在 operator 中能转换为子类型吗?
flink 变量在 operator 之间传递是需要序列话的。如果 DataStream<> 泛型通过基类引用,到后面的 operator 上能保留子类的完整信息并强制转换吗? 比如: DataStream stream = source.from(SubClass); stream.keyBy( ) { 这里的代码能判断并强制转换吗。 SubClass subObj = (SubClass) baseObj; } 谢谢, 王磊
flink standalone 模式运行任务的问题
java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case 'taskmanager.memory.jvm-metaspace.size' 按照日志中的建议,我把 metaspace size 从 256M 调整到了 512M,但还是出现了这个错误。 经过我的观察,在新提交任务的时候比较容易出现这个问题,任务一直正常跑很少出现这个错误。 我 flink 是 standAlone 模式部署的。就两个机器, 每台机器 一个 taskManager,总共运行了 9 个 job,所有 job 都打在了一个 jar 中,jar 大小为 42M. 我自己的猜想是程序正常运行时 metaspace 基本已经满了,再新提交一个任务导致又重新初始化 jar 中所有类的 符号 引用空间不够导致了这个错误。不知道这个想法对不对。 但我还有一个疑问,standalone 模式不同 job 实际上跑在了相同的 TaskMgr 进程上,只有一个 JVM,怎么实现代码隔离呢? 比如下面的例子: job1 和 job2 打在了同一个 jar 中,都用到了代码中的一个 static 变量。 kill 掉 job1, 更改了 这个 static 变种的值,再提交 job1,那更改后的static 变量值 对 job2 会生效吗? 谢谢, 王磊
Flink RocksDBStateBackend 可以设置阿里云 OSS 存储吗?
如题, 可以直接这样写吗? env.setStateBackend(new RocksDBStateBackend(“oss://”, true)); 谢谢, 王磊
怎样定时更新广播变量的值
flink 程序读配置文件,配置文件的内容作为广播变量广播出去。 如果配置文件更新了,怎样能把广播变量的内容也更新呢? 谢谢, 王磊
Flink Sink function 的 close() 在程序停止时一定会被调用到吗?
我自己写了个 Sink 到数据库的 SinkFunction,SinkFunction 中指定只有数据到了一定条数(100) 才执行入库操作。我通过定义了一个 List 缓存需要入库的数据的方式实现。 public class SinkToJDBCWithJDBCStatementBatch extends RichSinkFunction { private List statementList = new ArrayList(); @Override public void close() throws Exception { writeToDatabase(); this.statementList.clear(); super.close(); if (dataSource != null) { dataSource.close(); } } @Override public void invoke(JDBCStatement statement, Context context) throws Exception { if (statementList.size() < 100) { statementList.add(statement); return; } writeToDatabase(); this.statementList.clear(); } public void writeToDatabase(){ . } } 我想确认一下 这个 close() 方法在程序停止的时候一定会被调用到吗?是通过怎样的机制实现的呢? 谢谢, 王磊
flink keyedState 能不能存储泛型或抽象类型
下面的业务逻辑 robot 传感器上报的信息,先按 robotId keyBy,之后要遍历很多的规则。每个规则存储一个之前的对象,实现如下: private transient MapState state; for (Entry entry : RulesFactory.getChargerTwoRecordRules().entrySet()) { String faultName = entry.getKey(); IChargerTwoRecordRule rule = entry.getValue(); RobotData old = state.get(faultName); rule.handleLogMsg(old, current); } 现在有部分规则存储的对象不能用 RobotData 表示,有没有可能用类似泛型或继承的方式实现 MapState value 存储不同类型的数据呢? 比如 MapState state; 之后根据不同的规则 把 Object 转换成具体的类 谢谢, 王磊
Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?
用 session windown 确实能满足功能: robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x, y) -> y); 按照这种写法, 我理解 window state 中只保存了最近的一条记录。 正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。 On Thu, Nov 12, 2020 at 5:25 PM hailongwang <18868816...@163.com> wrote: > > > > 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。 > 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。 > > > > > 在 2020-11-12 14:34:59,"Danny Chan" 写道: > >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑 > > > >Lei Wang 于2020年11月11日周三 下午2:03写道: > > > >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。 > >> > >> 比如 > >> robot1 2020-11-11 12:00:00 msginfo > >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 > 就发出报警呢? > >> > >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢? > >> > >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。 > >> > >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。 > >> 我们必须 按 robotId 做 keyBy > >> > >> 求大神指教。 > >> > >> 谢谢, > >> 王磊 > >> >
怎样实现超过一定时间没有收到消息就发出报警的功能?
有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。 比如 robot1 2020-11-11 12:00:00 msginfo 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 就发出报警呢? flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢? 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。 我们必须 按 robotId 做 keyBy 求大神指教。 谢谢, 王磊