嗯嗯!感谢

Yun Tang <myas...@live.com> 于2019年7月25日周四 下午9:37写道:

> Hi
>
> 你给的代码跟你的异常栈其实还是对不上,前文已经说了,出问题的是operator state,但是你的代码都是keyed
> state相关的代码。不过从你出问题的operator name "KeyedProcess -> async wait operator ->
> Flat Map -> Sink",
> 以及异常栈中的StreamElementSerializer使用和一致性问题的表象,我推测应该是应该是AsyncWaitOperator中的operator
> state "_async_wait_operator_state_"相关。最近fix的
> https://issues.apache.org/jira/browse/FLINK-13063
> 问题,正是暂时解决一致性问题,可以考虑cherry pick相关的fix重新部署你们的Flink作业,观察该问题是否还会复现。
>
> 祝好
> 唐云
> ________________________________
> From: 戴嘉诚 <a773807...@gmail.com>
> Sent: Thursday, July 25, 2019 21:07
> To: user-zh <user-zh@flink.apache.org>
> Subject: Re: Re: Flink checkpoint 并发问题
>
> Hi 唐云
>
>
> 这个问题在这个job上是必现的,即使是failvoer后自动重启,也会出现,但是在同一个处理逻辑,在其他的场景下是没有出现过,而这其中的区别是,这个job是数据量比较大,将近一分钟80万左右。
>
> 这个其中的ProcessElement大致的代码是这样的:
> .process(new KeyedProcessFunction<String, Map<String, Object>, Map<String,
> Object>>() {
>           private static final long serialVersionUID =
> 5245290869789704294L;
>
>           private MapState<String, Long> accumulateStateMap;
>           Map<String, Object> resultMap = new HashMap<>();
>           private transient Long hourClear = 24L;
>
>
>           @Override
>           public void open(Configuration parameters) throws Exception {
>             MapStateDescriptor<String, Long> accumulateState = new
> MapStateDescriptor<>("accumulateState", StringSerializer.INSTANCE,
> LongSerializer.INSTANCE);
>             accumulateStateMap =
> getRuntimeContext().getMapState(accumulateState);
>           }
>
>           @Override
>           public void processElement(Map<String, Object> value, Context
> ctx,
>               Collector<Map<String, Object>> out) throws Exception {
>             logger.info("来数据了:{}", value);
>             realData.increment();
>             resultMap.clear();
>             String valueFieldValue =
> String.valueOf(value.get(getLastName(valueFieldName)));
>             Long timeFieldValue =
> Long.parseLong(String.valueOf(value.get(timeFieldName)));
>             //写到state中
>             //判断,state是否存在fieldValue,  如果fieldValue
> 存在,再判断state的时间是否小于time(用于判断乱序时间
>             if (!accumulateStateMap.contains(valueFieldValue) ||
> accumulateStateMap.get(valueFieldValue) < timeFieldValue) {
>               accumulateStateMap.put(valueFieldValue, timeFieldValue);
>             }
>             //判断配置是否已经刷进来了
>             if (value.containsKey("config")) {
>               Map<String, String> config = (Map<String, String>)
> value.get("config");
>               Integer configCount =
> Integer.parseInt(config.get(countFieldName));
>               Long configTime =
> Long.parseLong(config.get(timeRangeFieldName)) * 1000;
>               //在配置时间范围前
>               long lastTimeStamp = timeFieldValue - configTime;
>               //状态里面有多少个值
>               int stateSize = 0;
>               //遍历state, 删除过时的时间
>               Iterator<Entry<String, Long>> iterator =
> accumulateStateMap.iterator();
>               while (iterator.hasNext()) {
>                 ++stateSize;
>                 Entry<String, Long> next = iterator.next();
>                 if (lastTimeStamp >= next.getValue()) {
>                   iterator.remove();
>                   --stateSize;
>                 }
>               }
>               //state的值的数量大于阈值
>               if (stateSize >= configCount) {
>                 resultMap.put("id", config.get("id"));
>                 resultMap.put("config_id", config.get("_id"));
>                 resultMap.put("config_version", config.get("_version"));
>                 resultMap.put("config_score", config.get("score"));
>                 resultMap.put("config_ttl", config.get("ttl"));
>                 resultMap.put("startTime", lastTimeStamp);
>                 resultMap.put("endTime", timeFieldValue);
>                 resultMap.put("key", ctx.getCurrentKey());
>                 resultMap.put("value", valueFieldValue);
>                 resultMap.put("count", stateSize);
>                 out.collect(resultMap);
>               }
>               logger.info("当前key为:{}, 聚集数量为:{}", ctx.getCurrentKey(),
> stateSize);
>               //根据配置时间乘以2,错信息范围加上来注册指定清理state的时间
>               hourClear = (configTime * 2  + EXPIRATION_TIME) / 3600000;
>               LocalDateTime localDateTime =
>
> Instant.ofEpochMilli(timeFieldValue).atZone(ZoneId.systemDefault()).toLocalDateTime();
>               localDateTime =
> localDateTime.withMinute(0).withSecond(0).withNano(0).plusHours(hourClear);
>               long timeClean =
> localDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();
>               timeTimerGauge = String.valueOf(timeClean);
>               //注册过期时间
>               ctx.timerService().registerEventTimeTimer(timeClean);
>             }
>           }
>
>           @Override
>           public void onTimer(long timestamp, OnTimerContext ctx,
>               Collector<Map<String, Object>> out) throws Exception {
>             //减去时间,防止删除中间要累积的数据
>             Long lasttime = timestamp - (hourClear * 3600000);
>             //删除过期时间
>             int stateSize = 0;
>             int removeState = 0;
>             Iterator<Entry<String, Long>> iterator =
> accumulateStateMap.iterator();
>             while (iterator.hasNext()) {
>               ++stateSize;
>               Entry<String, Long> next = iterator.next();
>               if (lasttime >= next.getValue()) {
>                 iterator.remove();
>                 --stateSize;
>                 ++removeState;
>               }
>             }
>             if (stateSize == 0) {
>               accumulateStateMap.clear();
>             }
>             //把这个定时器删除掉
>             ctx.timerService().deleteEventTimeTimer(timestamp);
>           }
>         })
>
> Yun Tang <myas...@live.com> 于2019年7月25日周四 下午8:39写道:
>
> > Hi 戴嘉诚
> >
> > 从异常栈上看,DefaultOperatorStateBackendSnapshotStrategy 88行左右的代码[1]是对list
> > state进行deepcopy,但是你告诉我的state descriptor却是MapStateDescriptor<String,
> >
> Long>,这二者之间明显是对不上的,还请再校验一下你的代码。异常栈出现在checkpoint同步阶段的内容拷贝时的ConcurrentModification,这个确实是很奇怪的,所以还需要麻烦回答一下下面几个问题:
> >
> >   *   这个问题是必现的么?作业failover或者重新提交之后也会出现么?
> >   *   由于operator
> >
> state的snapshot/restore需要由用户提供,所以如果你继承了ListCheckpointed接口,请提供一下snapshotState和restoreState接口的实现(如果是继承了CheckpointedFunction接口,请提供一下snapshotState和initializeState的实现)。另外,这个operator
> > state的申明以及相关的使用地方也最好提供一下。
> >
> > [1]
> >
> https://github.com/apache/flink/blob/480875f045a9777877ed1a90f9e0c6e01b7e03c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java#L88
> >
> > 祝好
> > 唐云
> > ________________________________
> > From: 戴嘉诚 <a773807...@gmail.com>
> > Sent: Thursday, July 25, 2019 19:26
> > To: user-zh <user-zh@flink.apache.org>
> > Subject: Re: Re: Flink checkpoint 并发问题
> >
> > hi
> >     你好,我用的flink是1.8,但是是根据hadoop 2.7.3.2.6.5.0-292  自行打包的,  operator state
> > descriptor是使用MapStateDescriptor<String, Long>,
> > 谢谢!
> >
> > Yun Tang <myas...@live.com> 于2019年7月25日周四 下午7:10写道:
> >
> > > Hi  all
> > >
> > > 你们讨论的已经越来越偏了,出问题的是operator state
> > > backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。
> > >
> > > To 戴嘉诚
> > > 你使用的Flink版本是什么?这个operator 里面的operator state descriptor是什么?
> > >
> > > 祝好
> > > 唐云
> > > ________________________________
> > > From: 戴嘉诚 <a773807...@gmail.com>
> > > Sent: Thursday, July 25, 2019 19:04
> > > To: user-zh@flink.apache.org <user-zh@flink.apache.org>
> > > Subject: Re: Re: Flink checkpoint 并发问题
> > >
> > > 这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
> > > 7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数
> > >
> > > athlon...@gmail.com <athlon...@gmail.com>于2019年7月25日 周四18:50写道:
> > >
> > > > 那你用window和evictor 不可以吗?
> > > > 类似这样,因为我理解你的业务需求可以用这个来实现
> > > >
> > 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
> > > >
> > > > ------------------------------
> > > > athlon...@gmail.com
> > > >
> > > >
> > > > *发件人:* 戴嘉诚 <a773807...@gmail.com>
> > > > *发送时间:* 2019-07-25 18:45
> > > > *收件人:* user-zh <user-zh@flink.apache.org>
> > > > *主题:* Re: Re: Flink checkpoint 并发问题
> > > >
> > > >
> > > >
> > >
> >
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> > > > 对
> > > >
> > > > athlon...@gmail.com <athlon...@gmail.com> 于2019年7月25日周四 下午6:40写道:
> > > >
> > > > > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> > > > >
> > > > >
> > > > >
> > > > > athlon...@gmail.com
> > > > >
> > > > > 发件人: 戴嘉诚
> > > > > 发送时间: 2019-07-25 18:24
> > > > > 收件人: user-zh
> > > > > 主题: Re: Flink checkpoint 并发问题
> > > > > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> > > > >
> > > > > athlon...@gmail.com <athlon...@gmail.com> 于2019年7月25日周四 下午6:20写道:
> > > > >
> > > > > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > > > > >
> > > > > >
> > > > > >
> > > > > > athlon...@gmail.com
> > > > > >
> > > > > > 发件人: 戴嘉诚
> > > > > > 发送时间: 2019-07-25 18:07
> > > > > > 收件人: user-zh
> > > > > > 主题: Flink checkpoint 并发问题
> > > > > > 大家好:
> > > > > >
> > > > > >     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > > > > >
> > > > > >
> > > > > > java.lang.Exception: Could not perform checkpoint 550 for
> operator
> > > > > > KeyedProcess -> async wait operator -> Flat Map -> Sink:
> 写入redis库存
> > > > > > (16/20).
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> > > > > >
> > > > > >          at org.apache.flink.streaming.runtime.io
> > > > > > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> > > > > >
> > > > > >          at
> > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > > > > >
> > > > > >          at java.lang.Thread.run(Thread.java:748)
> > > > > >
> > > > > > Caused by: java.lang.Exception: Could not complete snapshot 550
> for
> > > > > > operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > > > > > 写入redis库存 (16/20).
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
> > > > > >
> > > > > >          ... 8 more
> > > > > >
> > > > > > Caused by: java.util.ConcurrentModificationException
> > > > > >
> > > > > >          at
> > > java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > > > > >
> > > > > >          at
> java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > > > > >
> > > > > >          at
> java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > > > > >
> > > > > >          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> > > > > >
> > > > > >          ... 13 more
> > > > > >
> > > > >
> > > >
> > > >
> > >
> >
>

Reply via email to