Re: 广告业务中 如何用flink替换spark一些逻辑处理,是否需要用到processfunction
逻辑混乱,没听懂你的需求。大搜? 张锴 于2021年8月18日周三 上午10:26写道: > > 需求描述: > 需要将目前的spark程序替换成flink去做,在梳理逻辑的时候有一块不知道用flink咋实现,spark是按每三分钟一个批次来跑的。 > 描述如下: > 广告日志按照ask日志->bid->show->click顺序流程,要求是要将不同的日志都与bid日志merge,来保证bid数据的完整性,key按sessionid+Adid做唯一 > 逻辑:spark读取多个日志topic > 含xxtopic,格式化,joinAll之后得到(string,pair)日志类型pair.logType如果是'bid'直接写到bidtopic,如果是其他类型,需要从之前HBASE缓存中拿bid表匹配,匹配到(可能是show > or click ..)合并输出到bidtopic, > 没有匹配到,会有pair.n来记录次数,并写到xxtopic,n>10次(循环来回30分钟)都没有匹配到bid数据直接写到bidtopic,n<=10次内匹配不到bid > n+1,并写到xxtopic进入下个批次。 > 10次是业务方提的,也就是30分钟的缓存,如果没有10次限定,会有很多数据都写到xxtopic,这里不涉及计算,只是合并,也不去重,假如根据key > 找到了3条同样的数据,也要合并三条。 > > 这个用flink怎么实现?
flink任务触发检查点时报错,非必现。并发访问Map异常。
下面是异常栈,我检查了出问题的那个task,该task包含2个算子A和B。 B是异步算子,但是目前无任何状态。A是广播处理算子(接受普通流和广播流),也仅用到broadcast state。 请问有人能分析下啥问题导致的Map并发访问问题吗。 2021-08-18 06:18:37 java.io.IOException: Could not perform checkpoint 575 for operator ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel (18/60)#0. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431) at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 575 for operator ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel (18/60)#0. Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029) ... 20 more Caused by: java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445) at java.util.HashMap$EntryIterator.next(HashMap.java:1479) at java.util.HashMap$EntryIterator.next(HashMap.java:1477) at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156) at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:260) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:234)
广告业务中 如何用flink替换spark一些逻辑处理,是否需要用到processfunction
需求描述: 需要将目前的spark程序替换成flink去做,在梳理逻辑的时候有一块不知道用flink咋实现,spark是按每三分钟一个批次来跑的。 描述如下: 广告日志按照ask日志->bid->show->click顺序流程,要求是要将不同的日志都与bid日志merge,来保证bid数据的完整性,key按sessionid+Adid做唯一 逻辑:spark读取多个日志topic 含xxtopic,格式化,joinAll之后得到(string,pair)日志类型pair.logType如果是'bid'直接写到bidtopic,如果是其他类型,需要从之前HBASE缓存中拿bid表匹配,匹配到(可能是show or click ..)合并输出到bidtopic, 没有匹配到,会有pair.n来记录次数,并写到xxtopic,n>10次(循环来回30分钟)都没有匹配到bid数据直接写到bidtopic,n<=10次内匹配不到bid n+1,并写到xxtopic进入下个批次。 10次是业务方提的,也就是30分钟的缓存,如果没有10次限定,会有很多数据都写到xxtopic,这里不涉及计算,只是合并,也不去重,假如根据key 找到了3条同样的数据,也要合并三条。 这个用flink怎么实现?
flinksql的udf中可以使用Operator state的api么?
hi,你好: 通过flinksql读kafka数据流,实现监控用户信息基于上一次状态值发生变更触发最新用户信息输出.
Re:回复:如何监控kafka延迟
@Jimmy Zhang 了解下checkpoint/savepoint 中间计算的结果可以间隔时间写入外部hdfs等 在 2021-08-09 09:51:21,"Jimmy Zhang" 写道: >您好,看到你们在用kafka相关metrics,我想咨询一个问题。你们是否遇见过在重启一个kafka sink >job后,相关指标清零的情况?这样是不是就无法持续的进行数据想加?我们想做一个数据对账,查询不同时间段的输出量统计,这样可能中间归零就有问题,所以想咨询下,任何的回复都非常感谢! > > > > >| >Best, >Jimmy >| > >Signature is customized by Netease Mail Master > >在2021年07月28日 17:58,jie mei 写道: >hi,all > >我们是通过 grafana 对采集到的 flink kafka 的 >metrics(taskmanager_job_task_operator_KafkaConsumer_records) 配置报警规则来报警的。 > >xuhaiLong 于2021年7月28日周三 下午5:46写道: > >> 参考下kafka_exporter,获取所有的 group 的消费情况,然后配置不同的规则去监控。 >> >> >> 在2021年7月28日 17:39,laohu<2372554...@qq.com.INVALID> 写道: >> Hi comsir >> >> kafka的控制台能力比较弱,想知道延迟只能自己维护。 >> >> 维护方式: >> >> 1. 每个服务的topic的offset 减去 groupid的offset >> >> 2. 尽量可以计算出各种消费速度 >> >> 3. rocketmq控制台,可看到消费进度,可以参照下。 >> >> >> 在 2021/7/28 上午11:02, 龙逸尘 写道: >> Hi comsir, >> 采用 kafka 集群元数据 的 offset 信息和当前 group offset 相减得到的 lag 是比较准确的。 >> group id 需要自己维护。 >> >> comsir <609326...@qq.com.invalid> 于2021年7月20日周二 下午12:41写道: >> >> hi all >> 以kafka为source的flink任务,各位都是如何监控kafka的延迟情况?? >> 监控这个延迟的目的:1.大盘展示,2.延迟后报警 >> 小问题: >> 1.发现flink原生的相关metric指标很多,研究后都不是太准确,大家都用哪个指标? >> 2.怎么获取groupId呢,多个group消费的话,如何区分呀? >> 3.能通过kafka集群侧的元数据,和当前offset做减法,计算lag吗? >> 4.有比较优雅的实现方式吗? >> 非常感谢 期待解答 感谢感谢 >> > > >-- > >*Best Regards* >*Jeremy Mei*
????????
??
Re: Flink On Yarn HA 部署模式下Flink程序无法启动
您好,我的版本是1.13.1 -- Original -- From: "Yang Wang"https://issues.apache.org/jira/browse/FLINK-19212 Best, Yang 周瑞
Re: Flink On Yarn HA 部署模式下Flink程序无法启动
看报错应该是个已知问题[1]并且已经在1.11.2中修复 [1]. https://issues.apache.org/jira/browse/FLINK-19212 Best, Yang 周瑞 于2021年8月17日周二 上午11:04写道: > 您好:Flink程序部署在Yran上以Appliation Mode 模式启动的,在没有采用HA > 模式的时候可以正常启动,配置了HA之后,启动异常,麻烦帮忙看下是什么原因导致的. > > > HA 配置如下: > high-availability: zookeeper high-availability.storageDir: > hdfs://mycluster/flink/ha high-availability.zookeeper.quorum: > zk-1:2181,zk-2:2181,zk-3:2181 high-availability.zookeeper.path.root: /flink > high-availability.cluster-id: /flink_cluster > > > 异常如下: > 2021-08-17 10:24:18,938 INFO > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - > Starting DefaultLeaderElectionService with > ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}. > 2021-08-17 10:25:09,706 ERROR > org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler > [] - Unhandled exception. > org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to > serialize the result for RPC call : requestTaskManagerDetailsInfo. > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:404) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:360) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) > ~[?:1.8.0_292] > at > java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848) > ~[?:1.8.0_292] > at > java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168) > ~[?:1.8.0_292] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:352) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:319) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at akka.actor.Actor.aroundReceive(Actor.scala:517) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > akka.actor.Actor.aroundReceive$(Actor.scala:515) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > [flink-dist_2.12-1.13.1.jar:1.13.1] > at > akka.actor.ActorCell.invoke(ActorCell.scala:561) > [flink-dist_2.12-1.13.1.jar:1.13.1] > at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > [flink-dist_2.12-1.13.1.jar:1.13.1] > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > [flink-dist_2.12-1.13.1.jar:1.13.1] > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > [flink-dist_2.12-1.13.1.jar:1.13.1] > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.12-1.13.1.jar:1.13.1] > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.12-1.13.1.jar:1.13.1] > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.12-1.13.1.jar:1.13.1] > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.12-1.13.1.jar:1.13.1] > Caused by: java.io.NotSerializableException: > org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > ~[?:1.8.0_292] > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > ~[?:1.8.0_292] > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > at > org.