Hi Oytun,

I think there is a regression introduced in 1.8 how we handle output
tags. The problem is we do not call ClosureCleaner on OutputTag.

There are two options how you can workaround this issue:

1. Declare the OutputTag static

2. Clean the closure explicitly as Guowei suggested:
StreamExecutionEnvironment.clean(pendingProjectsTag)

I also opened a jira issue to fix this (FLINK-12297[1])

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-12297

On 22/04/2019 03:06, Guowei Ma wrote:
> I think you could try
> StreamExecutionEnvironment.clean(pendingProjectsTag). 
>
>
> Oytun Tez <oy...@motaword.com <mailto:oy...@motaword.com>>于2019年4月19日
> 周五下午9:58写道:
>
>     Forgot to answer one of your points: the parent class compiles
>     well without this CEP selector (with timeout signature)...
>
>
>     ---
>     Oytun Tez
>
>     *M O T A W O R D*
>     The World's Fastest Human Translation Platform.
>     oy...@motaword.com <mailto:oy...@motaword.com> — www.motaword.com
>     <http://www.motaword.com/>
>
>
>     On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <oy...@motaword.com
>     <mailto:oy...@motaword.com>> wrote:
>
>         Hey JingsongLee!
>
>         Here are some findings...
>
>           * flatSelect *without timeout* works normally:
>             patternStream.flatSelect(PatternFlatSelectFunction), this
>             compiles well.
>           * Converted the both timeout and select selectors to an
>             *inner class* (not static), yielded the same results,
>             doesn't compile.
>           * flatSelect *without* timeout, but with an inner class for
>             PatternFlatSelectFunction, it compiles (same as first bullet).
>           * Tried both of these selectors with _empty_ body. Just a
>             skeleton class. Doesn't compile either. Empty body example
>             is in my first email.
>           * Tried making both selectors *static public inner* classes,
>             doesn't compile either.
>           * Extracted both timeout and flat selectors to their own
>             *independent classes* in separate files. Doesn't compile.
>           * I am putting the *error stack* below.
>           * Without the timeout selector in any class or lambda shape,
>             with empty or full body, flatSelect compiles well.
>
>         Would these findings help? Any ideas?
>
>         Here is an error stack:
>
>         09:36:51,925 ERROR
>         com.motaword.ipm.kernel.error.controller.ExceptionHandler     - 
>         org.apache.flink.api.common.InvalidProgramException: The
>         implementation of the PatternFlatSelectAdapter is not
>         serializable. The object probably contains or references non
>         serializable fields.
>         at
>         org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
>         at
>         
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
>         at
>         
> org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
>         at
>         org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
>         at
>         org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
>         at
>         org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
>         at
>         
> com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
>         at
>         
> com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
>         at
>         
> com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
>         at com.motaword.ipm.kernel.Application.main(Application.java:63)
>         Caused by: java.io.NotSerializableException:
>         org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
>         at
>         java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>         at
>         
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>         at
>         
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>         at
>         
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at
>         java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at
>         
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>         at
>         
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>         at
>         
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at
>         java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at
>         
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>         at
>         
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>         at
>         
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at
>         java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at
>         java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>         at
>         
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
>         at
>         org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
>         ... 9 more
>
>
>
>
>
>
>         ---
>         Oytun Tez
>
>         *M O T A W O R D*
>         The World's Fastest Human Translation Platform.
>         oy...@motaword.com
>         <mailto:oy...@motaword.com> — www.motaword.com
>         <http://www.motaword.com/>
>
>
>         On Fri, Apr 19, 2019 at 3:14 AM JingsongLee
>         <lzljs3620...@aliyun.com <mailto:lzljs3620...@aliyun.com>> wrote:
>
>             Hi @Oytun Tez 
>             It Looks like your *PatternFlatSelectFunction* is
>             not serializable.
>             Because you use anonymous inner class,
>             Check the class to which getPending belongs, maybe that
>             class is not serializable?
>             Or you may be advised not to use internal classes, but to use a 
> static internal class.
>
>             Best, JingsongLee
>
>                 
> ------------------------------------------------------------------
>                 From:Oytun Tez <oy...@motaword.com
>                 <mailto:oy...@motaword.com>>
>                 Send Time:2019年4月19日(星期五) 03:38
>                 To:user <user@flink.apache.org
>                 <mailto:user@flink.apache.org>>
>                 Subject:PatternFlatSelectAdapter - Serialization issue
>                 after 1.8 upgrade
>
>                 Hi all,
>
>                 We are just migration from 1.6 to 1.8. I encountered a
>                 serialization error which we didn't have before if
>                 memory serves: The implementation of the
>                 *PatternFlatSelectAdapter* is not serializable. The
>                 object probably contains or references non
>                 serializable fields.
>
>                 The method below simply intakes a PatternStream from
>                 CEP.pattern() and makes use of the sideoutput for
>                 timed-out events. Can you see anything weird here
>                 (WorkerEvent is the input, but collectors collect
>                 Project object)?
>
>                 protected DataStream<Project>
>                 getPending(PatternStream<WorkerEvent> patternStream) {
>                             OutputTag<Project> pendingProjectsTag =
>                 new *OutputTag*<Project>("invitation-pending-projects"){};
>
>                             return patternStream.*flatSelect*(
>                                     pendingProjectsTag,
>                                     new
>                 *PatternFlatTimeoutFunction*<WorkerEvent, Project>() {
>                                         @Override
>                                         public void
>                 *timeout*(Map<String, List<WorkerEvent>> map, long l,
>                 Collector<Project> collector) {
>                                         }
>                                     },
>                                     new
>                 *PatternFlatSelectFunction*<WorkerEvent, Project>() {
>                                         @Override
>                                         public void
>                 *flatSelect*(Map<String, List<WorkerEvent>> pattern,
>                 Collector<Project> collector) {
>                                         }
>                                     }
>                             ).name("Select pending projects for
>                 invitation").*getSideOutput*(pendingProjectsTag);
>                         }
>
>                 ---
>                 Oytun Tez
>
>                 *M O T A W O R D*
>                 The World's Fastest Human Translation Platform.
>                 oy...@motaword.com
>                 <mailto:oy...@motaword.com> — www.motaword.com
>                 <http://www.motaword.com/>
>
> -- 
> Best,
> Guowei

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to