Forgot to answer one of your points: the parent class compiles well without
this CEP selector (with timeout signature)...

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <oy...@motaword.com> wrote:

> Hey JingsongLee!
>
> Here are some findings...
>
>    - flatSelect *without timeout* works normally:
>    patternStream.flatSelect(PatternFlatSelectFunction), this compiles
>    well.
>    - Converted the both timeout and select selectors to an *inner class*
>    (not static), yielded the same results, doesn't compile.
>    - flatSelect *without* timeout, but with an inner class for
>    PatternFlatSelectFunction, it compiles (same as first bullet).
>    - Tried both of these selectors with *empty* body. Just a skeleton
>    class. Doesn't compile either. Empty body example is in my first email.
>    - Tried making both selectors *static public inner* classes, doesn't
>    compile either.
>    - Extracted both timeout and flat selectors to their own *independent
>    classes* in separate files. Doesn't compile.
>    - I am putting the *error stack* below.
>    - Without the timeout selector in any class or lambda shape, with
>    empty or full body, flatSelect compiles well.
>
> Would these findings help? Any ideas?
>
> Here is an error stack:
>
> 09:36:51,925 ERROR
> com.motaword.ipm.kernel.error.controller.ExceptionHandler     -
> org.apache.flink.api.common.InvalidProgramException: The implementation of
> the PatternFlatSelectAdapter is not serializable. The object probably
> contains or references non serializable fields.
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
> at
> org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
> at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
> at
> com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
> at
> com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
> at
> com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
> at com.motaword.ipm.kernel.Application.main(Application.java:63)
> Caused by: java.io.NotSerializableException:
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> ... 9 more
>
>
>
>
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <lzljs3620...@aliyun.com>
> wrote:
>
>> Hi @Oytun Tez
>> It Looks like your *PatternFlatSelectFunction* is not serializable.
>> Because you use anonymous inner class,
>> Check the class to which getPending belongs, maybe that class is not
>> serializable?
>>
>> Or you may be advised not to use internal classes, but to use a static 
>> internal class.
>>
>> Best, JingsongLee
>>
>> ------------------------------------------------------------------
>> From:Oytun Tez <oy...@motaword.com>
>> Send Time:2019年4月19日(星期五) 03:38
>> To:user <user@flink.apache.org>
>> Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade
>>
>> Hi all,
>>
>> We are just migration from 1.6 to 1.8. I encountered a serialization
>> error which we didn't have before if memory serves: The implementation
>> of the *PatternFlatSelectAdapter* is not serializable. The object
>> probably contains or references non serializable fields.
>>
>> The method below simply intakes a PatternStream from CEP.pattern() and
>> makes use of the sideoutput for timed-out events. Can you see anything
>> weird here (WorkerEvent is the input, but collectors collect Project
>> object)?
>>
>> protected DataStream<Project> getPending(PatternStream<WorkerEvent>
>> patternStream) {
>>             OutputTag<Project> pendingProjectsTag = new *OutputTag*
>> <Project>("invitation-pending-projects"){};
>>
>>             return patternStream.*flatSelect*(
>>                     pendingProjectsTag,
>>                     new *PatternFlatTimeoutFunction*<WorkerEvent,
>> Project>() {
>>                         @Override
>>                         public void *timeout*(Map<String,
>> List<WorkerEvent>> map, long l, Collector<Project> collector) {
>>                         }
>>                     },
>>                     new *PatternFlatSelectFunction*<WorkerEvent,
>> Project>() {
>>                         @Override
>>                         public void *flatSelect*(Map<String,
>> List<WorkerEvent>> pattern, Collector<Project> collector) {
>>                         }
>>                     }
>>             ).name("Select pending projects for invitation").
>> *getSideOutput*(pendingProjectsTag);
>>         }
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>

Reply via email to