关于window过程中barrier的问题

2020-07-29 文章 shuwen zhou
大家好,想请教一个关于barrier的问题
如果我有如下算子
.window()
.reduce()
假设barrier和元素顺序是
tuple 和 barrier
当tuple流经窗口期,barrier是会阻塞等待tuple从窗口流出之后再达到下一个算子,还是不在窗口阻塞,直接流到下一个算子呢?



Best Wishes,
Shuwen Zhou


Re: Flink使用Kafka作为source时checkpoint成功提交offset的机制

2020-07-29 文章 shuwen zhou
比如读到一条offset值为100的消息,有Kafka Consumer source,一个Process算子和一个Sink算子
那么在这条消息到达Process算子,还未到达Sink算子时,提交offset是100吗?

On Wed, 29 Jul 2020 at 14:51, venn  wrote:

> checkpoint成功时就会把它的offset提交,可以看下这个类:  FlinkKafkaConsumerBase  的 这个方法:
> notifyCheckpointComplete
>
> -Original Message-
> From: user-zh-return-5976-wxchunjhyy=163@flink.apache.org
>  On Behalf Of
> shuwen zhou
> Sent: 2020年7月29日 14:24
> To: user-zh@flink.apache.org
> Subject: Flink使用Kafka作为source时checkpoint成功提交offset的机制
>
> 大家好,请教一个问题,
>
> 当Flink在使用Kafka作为source,并且checkpoint开启时,提交的offset是在消息被链路最后一个算子处理之后才会把这条offset提交吗?
> 还是说这个消息只要从Kafka source读取后,不论链路处理到哪个算子, checkpoint成功时就会把它的offset提交呢?
>
> 另外有大神指路这段代码具体在哪个文件吗?
> 谢谢!
>
> --
> Best Wishes,
> Shuwen Zhou
>


-- 
Best Wishes,
Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>


Flink使用Kafka作为source时checkpoint成功提交offset的机制

2020-07-28 文章 shuwen zhou
大家好,请教一个问题,
当Flink在使用Kafka作为source,并且checkpoint开启时,提交的offset是在消息被链路最后一个算子处理之后才会把这条offset提交吗?
还是说这个消息只要从Kafka source读取后,不论链路处理到哪个算子, checkpoint成功时就会把它的offset提交呢?

另外有大神指路这段代码具体在哪个文件吗?
谢谢!

-- 
Best Wishes,
Shuwen Zhou


Re: 关于从savepoint启动作业报错 migration for MapState currently isn't supported.

2019-11-20 文章 shuwen zhou
是在哪里指定序列化类的呢?我没有显示的指定序列化方法,前后用的版本都是flink 1.9.0,代码也是一样的

On Wed, 20 Nov 2019 at 11:25, claylin <1012539...@qq.com> wrote:

> 我的那个问题后面使用1.9.1版本解决了。你这个错误和我那个有点不一样,看下是不是还改了其他东西,看这个错误
>  Caused by: org.apache.flink.util.StateMigrationException: The new
> state
> serializer cannot be incompatible.
> 是不是改了序列化类,和之前的不一样导致不兼容问题
>
>
> -- 原始邮件 ------
> 发件人: "shuwen zhou" 发送时间: 2019年11月20日(星期三) 中午11:18
> 收件人: "user-zh"
> 主题: Re: 关于从savepoint启动作业报错 migration for MapState currently isn't
> supported.
>
>
>
> 成功了吗?
> 我这边报的是另外一个错误, org.apache.flink.util.StateMigrationException: The new state
> serializer cannot be incompatible 使用的版本的是fink 1.9.0
> 具体操作是:
> trigger savepoint后从savepoint读取就是这个错误
> 使用的是MapState[String,Void] 在scala代码
>
>
> ... 25 more
>  Caused by: java.io.IOException: Failed to open user defined function
> at
>
> org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:210)
> at
>
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:185)
> at
>
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
> at
>
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.RuntimeException: Error while getting state
> at
>
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
> at
>
> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:243)
> at tv.freewheel.reporting.dip.ReaderFunction.open(SinkerReadState.scala:49)
> at
>
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
>
> org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:206)
> ... 6 more
>  Caused by: org.apache.flink.util.StateMigrationException: The new
> state
> serializer cannot be incompatible.
> at
>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
> at
>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
> at
>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
> at
>
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at
>
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at
>
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at
>
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at
>
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at
>
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
>
> On Fri, 15 Nov 2019 at 10:10, claylin <1012539...@qq.com> wrote:
>
> > 谢谢,我这边确实修改了状态的schema,现在试下看下
> >
> >
> >
> >
> > --&nbsp;原始邮件&nbsp;--
> > 发件人:&nbsp;"Congxian Qiu" > 发送时间:&nbsp;2019年11月15日(星期五) 上午10:07
> > 收件人:&nbsp;"user-zh" >
> > 主题:&nbsp;Re: 关于从savepoint启动作业报错 migration for MapState currently
> isn't
> > supported.
> >
> >
> >
> > Hi
> > 看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value
> schema
> > evolution[1]
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-11947
> > Best,
> > Congxian
> >
> >
> > claylin <1012539...@qq.com&gt; 于2019年11月14日周四 下午9:35写道:
> >
> > &gt; 从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案
> > &gt; java.lang.RuntimeException: Error while getting
> state&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
> > &gt;&nbsp; at
> > &gt;
> >
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> com.yy.kafka.template.job.PushSe

Re: 关于从savepoint启动作业报错 migration for MapState currently isn't supported.

2019-11-19 文章 shuwen zhou
 at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525)
> >    at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
> >  at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
> >    at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
> >  at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
> > at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
> >    at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
> >  at
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
> >  at
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
> >    at
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> >  at
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
> >  ... 9 more



-- 
Best Wishes,
Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>