I think you could try StreamExecutionEnvironment.clean(pendingProjectsTag).


Oytun Tez <oy...@motaword.com>于2019年4月19日 周五下午9:58写道:

> Forgot to answer one of your points: the parent class compiles well
> without this CEP selector (with timeout signature)...
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <oy...@motaword.com> wrote:
>
>> Hey JingsongLee!
>>
>> Here are some findings...
>>
>>    - flatSelect *without timeout* works normally:
>>    patternStream.flatSelect(PatternFlatSelectFunction), this compiles
>>    well.
>>    - Converted the both timeout and select selectors to an *inner class*
>>    (not static), yielded the same results, doesn't compile.
>>    - flatSelect *without* timeout, but with an inner class for
>>    PatternFlatSelectFunction, it compiles (same as first bullet).
>>    - Tried both of these selectors with *empty* body. Just a skeleton
>>    class. Doesn't compile either. Empty body example is in my first email.
>>    - Tried making both selectors *static public inner* classes, doesn't
>>    compile either.
>>    - Extracted both timeout and flat selectors to their own *independent
>>    classes* in separate files. Doesn't compile.
>>    - I am putting the *error stack* below.
>>    - Without the timeout selector in any class or lambda shape, with
>>    empty or full body, flatSelect compiles well.
>>
>> Would these findings help? Any ideas?
>>
>> Here is an error stack:
>>
>> 09:36:51,925 ERROR
>> com.motaword.ipm.kernel.error.controller.ExceptionHandler     -
>> org.apache.flink.api.common.InvalidProgramException: The implementation
>> of the PatternFlatSelectAdapter is not serializable. The object probably
>> contains or references non serializable fields.
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
>> at
>> org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
>> at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
>> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
>> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
>> at
>> com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
>> at
>> com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
>> at
>> com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
>> at com.motaword.ipm.kernel.Application.main(Application.java:63)
>> Caused by: java.io.NotSerializableException:
>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
>> ... 9 more
>>
>>
>>
>>
>>
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>
>> On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <lzljs3620...@aliyun.com>
>> wrote:
>>
>>> Hi @Oytun Tez
>>> It Looks like your *PatternFlatSelectFunction* is not serializable.
>>> Because you use anonymous inner class,
>>> Check the class to which getPending belongs, maybe that class is not
>>> serializable?
>>>
>>> Or you may be advised not to use internal classes, but to use a static 
>>> internal class.
>>>
>>> Best, JingsongLee
>>>
>>> ------------------------------------------------------------------
>>> From:Oytun Tez <oy...@motaword.com>
>>> Send Time:2019年4月19日(星期五) 03:38
>>> To:user <user@flink.apache.org>
>>> Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade
>>>
>>> Hi all,
>>>
>>> We are just migration from 1.6 to 1.8. I encountered a serialization
>>> error which we didn't have before if memory serves: The implementation
>>> of the *PatternFlatSelectAdapter* is not serializable. The object
>>> probably contains or references non serializable fields.
>>>
>>> The method below simply intakes a PatternStream from CEP.pattern() and
>>> makes use of the sideoutput for timed-out events. Can you see anything
>>> weird here (WorkerEvent is the input, but collectors collect Project
>>> object)?
>>>
>>> protected DataStream<Project> getPending(PatternStream<WorkerEvent>
>>> patternStream) {
>>>             OutputTag<Project> pendingProjectsTag = new *OutputTag*
>>> <Project>("invitation-pending-projects"){};
>>>
>>>             return patternStream.*flatSelect*(
>>>                     pendingProjectsTag,
>>>                     new *PatternFlatTimeoutFunction*<WorkerEvent,
>>> Project>() {
>>>                         @Override
>>>                         public void *timeout*(Map<String,
>>> List<WorkerEvent>> map, long l, Collector<Project> collector) {
>>>                         }
>>>                     },
>>>                     new *PatternFlatSelectFunction*<WorkerEvent,
>>> Project>() {
>>>                         @Override
>>>                         public void *flatSelect*(Map<String,
>>> List<WorkerEvent>> pattern, Collector<Project> collector) {
>>>                         }
>>>                     }
>>>             ).name("Select pending projects for invitation").
>>> *getSideOutput*(pendingProjectsTag);
>>>         }
>>>
>>> ---
>>> Oytun Tez
>>>
>>> *M O T A W O R D*
>>> The World's Fastest Human Translation Platform.
>>> oy...@motaword.com — www.motaword.com
>>>
>>> --
Best,
Guowei

Reply via email to