Hi Oytun, I think there is a regression introduced in 1.8 how we handle output tags. The problem is we do not call ClosureCleaner on OutputTag.
There are two options how you can workaround this issue: 1. Declare the OutputTag static 2. Clean the closure explicitly as Guowei suggested: StreamExecutionEnvironment.clean(pendingProjectsTag) I also opened a jira issue to fix this (FLINK-12297[1]) Best, Dawid [1] https://issues.apache.org/jira/browse/FLINK-12297 On 22/04/2019 03:06, Guowei Ma wrote: > I think you could try > StreamExecutionEnvironment.clean(pendingProjectsTag). > > > Oytun Tez <oy...@motaword.com <mailto:oy...@motaword.com>>于2019年4月19日 > 周五下午9:58写道: > > Forgot to answer one of your points: the parent class compiles > well without this CEP selector (with timeout signature)... > > > --- > Oytun Tez > > *M O T A W O R D* > The World's Fastest Human Translation Platform. > oy...@motaword.com <mailto:oy...@motaword.com> — www.motaword.com > <http://www.motaword.com/> > > > On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <oy...@motaword.com > <mailto:oy...@motaword.com>> wrote: > > Hey JingsongLee! > > Here are some findings... > > * flatSelect *without timeout* works normally: > patternStream.flatSelect(PatternFlatSelectFunction), this > compiles well. > * Converted the both timeout and select selectors to an > *inner class* (not static), yielded the same results, > doesn't compile. > * flatSelect *without* timeout, but with an inner class for > PatternFlatSelectFunction, it compiles (same as first bullet). > * Tried both of these selectors with _empty_ body. Just a > skeleton class. Doesn't compile either. Empty body example > is in my first email. > * Tried making both selectors *static public inner* classes, > doesn't compile either. > * Extracted both timeout and flat selectors to their own > *independent classes* in separate files. Doesn't compile. > * I am putting the *error stack* below. > * Without the timeout selector in any class or lambda shape, > with empty or full body, flatSelect compiles well. > > Would these findings help? Any ideas? > > Here is an error stack: > > 09:36:51,925 ERROR > com.motaword.ipm.kernel.error.controller.ExceptionHandler - > org.apache.flink.api.common.InvalidProgramException: The > implementation of the PatternFlatSelectAdapter is not > serializable. The object probably contains or references non > serializable fields. > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558) > at > > org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86) > at > org.apache.flink.cep.PatternStream.process(PatternStream.java:114) > at > org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451) > at > org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408) > at > > com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89) > at > > com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45) > at > > com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31) > at com.motaword.ipm.kernel.Application.main(Application.java:63) > Caused by: java.io.NotSerializableException: > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at > > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) > ... 9 more > > > > > > > --- > Oytun Tez > > *M O T A W O R D* > The World's Fastest Human Translation Platform. > oy...@motaword.com > <mailto:oy...@motaword.com> — www.motaword.com > <http://www.motaword.com/> > > > On Fri, Apr 19, 2019 at 3:14 AM JingsongLee > <lzljs3620...@aliyun.com <mailto:lzljs3620...@aliyun.com>> wrote: > > Hi @Oytun Tez > It Looks like your *PatternFlatSelectFunction* is > not serializable. > Because you use anonymous inner class, > Check the class to which getPending belongs, maybe that > class is not serializable? > Or you may be advised not to use internal classes, but to use a > static internal class. > > Best, JingsongLee > > > ------------------------------------------------------------------ > From:Oytun Tez <oy...@motaword.com > <mailto:oy...@motaword.com>> > Send Time:2019年4月19日(星期五) 03:38 > To:user <user@flink.apache.org > <mailto:user@flink.apache.org>> > Subject:PatternFlatSelectAdapter - Serialization issue > after 1.8 upgrade > > Hi all, > > We are just migration from 1.6 to 1.8. I encountered a > serialization error which we didn't have before if > memory serves: The implementation of the > *PatternFlatSelectAdapter* is not serializable. The > object probably contains or references non > serializable fields. > > The method below simply intakes a PatternStream from > CEP.pattern() and makes use of the sideoutput for > timed-out events. Can you see anything weird here > (WorkerEvent is the input, but collectors collect > Project object)? > > protected DataStream<Project> > getPending(PatternStream<WorkerEvent> patternStream) { > OutputTag<Project> pendingProjectsTag = > new *OutputTag*<Project>("invitation-pending-projects"){}; > > return patternStream.*flatSelect*( > pendingProjectsTag, > new > *PatternFlatTimeoutFunction*<WorkerEvent, Project>() { > @Override > public void > *timeout*(Map<String, List<WorkerEvent>> map, long l, > Collector<Project> collector) { > } > }, > new > *PatternFlatSelectFunction*<WorkerEvent, Project>() { > @Override > public void > *flatSelect*(Map<String, List<WorkerEvent>> pattern, > Collector<Project> collector) { > } > } > ).name("Select pending projects for > invitation").*getSideOutput*(pendingProjectsTag); > } > > --- > Oytun Tez > > *M O T A W O R D* > The World's Fastest Human Translation Platform. > oy...@motaword.com > <mailto:oy...@motaword.com> — www.motaword.com > <http://www.motaword.com/> > > -- > Best, > Guowei
signature.asc
Description: OpenPGP digital signature