Hi Debasish,

I guess the reason is something unexpectedly involved in serialization due
to a reference from inner class (anonymous class or lambda expression).
When Flink serializes this inner class instance, it would also serialize
all referenced objects, for example, the outer class instance. If the outer
class is not serializable, this error would happen.

You could have a try to move the piece of codes to a named non-inner class.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> My main question is why serialisation kicks in when I try to execute
> within a `Future` and not otherwise.
>
> regards.
>
> On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> Yes, they are generated from Avro Schema and implements Serializable ..
>>
>> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <deepakmc...@gmail.com>
>> wrote:
>>
>>> Does TaxiRide or TaxiRideFare implements Serializable?
>>>
>>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <ghosh.debas...@gmail.com>
>>> wrote:
>>>
>>>> Hello -
>>>>
>>>> The following piece of code is an example of a connected data streams ..
>>>>
>>>> val rides: DataStream[TaxiRide] =
>>>>   readStream(inTaxiRide)
>>>>     .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>>     .keyBy("rideId")
>>>>
>>>> val fares: DataStream[TaxiFare] =
>>>>   readStream(inTaxiFare)
>>>>     .keyBy("rideId")
>>>>
>>>> val processed: DataStream[TaxiRideFare] =
>>>>   rides
>>>>     .connect(fares)
>>>>     .flatMap(new EnrichmentFunction)
>>>>
>>>> When I execute the above logic using
>>>> StreamExecutionEnvironment.execute(..) it runs fine.
>>>> But if I try to execute the above from within a scala.concurrent.Future,
>>>> I get the following exception ..
>>>>
>>>> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
>>>> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
>>>> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
>>>> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
>>>> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
>>>> serializable. The object probably contains or references non serializable
>>>> fields.
>>>>             at
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>             at
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>             at
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>             at
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>             at
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>             at
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>             at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>>>>             at
>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>>>>             at
>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>>>>             at
>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>>>>   ...
>>>>
>>>> Caused by: java.io.NotSerializableException:
>>>> org.apache.avro.Schema$Field
>>>>             at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>
>>>> Any thoughts why this may happen ?
>>>>
>>>> regards.
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>
>>>
>>> --
>>> Thanks
>>> Deepak
>>> www.bigdatabig.com
>>> www.keosha.net
>>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
> --
> Sent from my iPhone
>

Reply via email to