We solved the problem of serialization by making some things transient
which were being captured as part of the closure. So we no longer have
serialization errors. Everything works properly without the future.

I realize that because of statics concurrent job submission will be an
issue. But we are submitting one job only - the difference is that it's
through a Future. So there is no concurrent submission unless I am missing
something.

regards.

On Thu, Sep 19, 2019 at 12:54 PM Biao Liu <mmyy1...@gmail.com> wrote:

> Hi Debasish,
>
> I think there is something critical of your usage hided. It might help if
> you could provide more details.
>
> It still confuses me how you solve the serialization issue. Why the
> non-transient fields only affects serialization in a future?
>
> WRT this ProgramAbortException issue, do you submit jobs concurrently in
> one process?
> Currently job submission is not thread-safe. It relies on some static
> variables which could be affected by other concurrent submissions in the
> same process.
> Asking this because usually job submission is not through
> OptimizerPlanEnvironment which appears in your exception stack trace.
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> I think what you are pointing at is asynchronous datastream operations.
>> In our case we want to submit the entire job in a Future. Something like
>> the following ..
>>
>> def execute(..) = {
>>   // this does all data stream manipulation, joins etc.
>>   buildComputationGraph()
>>
>>   // submits for execution with StreamExecutionEnvironment
>>   env.execute(..)
>> }
>>
>> and we want to do ..
>>
>> val jobExecutionResultFuture = Future(execute(..))
>>
>> and this gives that exception.
>>
>> regards.
>>
>> On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch <rafi.ar...@gmail.com> wrote:
>>
>>> Hi Debasish,
>>>
>>> Have you taken a look at the AsyncIO API for running async operations? I
>>> think this is the preferred way of doing it. [1]
>>> So it would look something like this:
>>>
>>> class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
>>>
>>>     /** The database specific client that can issue concurrent requests 
>>> with callbacks */
>>>     lazy val client: DatabaseClient = new DatabaseClient(host, post, 
>>> credentials)
>>>
>>>     /** The context used for the future callbacks */
>>>     implicit lazy val executor: ExecutionContext = 
>>> ExecutionContext.fromExecutor(Executors.directExecutor())
>>>
>>>
>>>     override def asyncInvoke(str: String, resultFuture: 
>>> ResultFuture[(String, String)]): Unit = {
>>>
>>>         // issue the asynchronous request, receive a future for the result  
>>>       val resultFutureRequested: Future[String] = client.query(str)
>>>
>>>         // set the callback to be executed once the request by the client 
>>> is complete        // the callback simply forwards the result to the result 
>>> future        resultFutureRequested.onSuccess {
>>>             case result: String => resultFuture.complete(Iterable((str, 
>>> result)))
>>>         }
>>>     }}
>>>
>>> // create the original streamval stream: DataStream[String] = ...
>>> // apply the async I/O transformationval resultStream: DataStream[(String, 
>>> String)] =
>>>     AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
>>> TimeUnit.MILLISECONDS, 100)
>>>
>>>
>>> Thanks,
>>> Rafi
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api
>>>
>>> On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh <ghosh.debas...@gmail.com>
>>> wrote:
>>>
>>>> ok, the above problem was due to some serialization issues which we
>>>> fixed by marking some of the things transient. This fixes the serialization
>>>> issues .. But now when I try to execute in a Future I hit upon this ..
>>>>
>>>>
>>>> *java.util.concurrent.ExecutionException: Boxed Error* at
>>>> scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
>>>> at
>>>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
>>>> at
>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
>>>> at scala.concurrent.Promise.complete(Promise.scala:53)
>>>> at scala.concurrent.Promise.complete$(Promise.scala:52)
>>>> at
>>>> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
>>>> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
>>>> at
>>>> java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
>>>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>>>> at
>>>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>>>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>>>> at
>>>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>>>>
>>>> *Caused by:
>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException*
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>> at
>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>> at
>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
>>>> at
>>>> pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
>>>> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
>>>> at scala.util.Success.$anonfun$map$1(Try.scala:255)
>>>> at scala.util.Success.map(Try.scala:213)
>>>> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
>>>> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
>>>> ... 7 more
>>>>
>>>> I found this issue in JIRA
>>>> https://issues.apache.org/jira/browse/FLINK-10381 which is still open
>>>> and talks about a related issue. But we are not submitting multiple jobs -
>>>> we are just submitting 1 job but async in a Future. I am not clear why this
>>>> should create the problem that I see.
>>>>
>>>> Can anyone please help with an explanation ?
>>>>
>>>> regards.
>>>>
>>>> On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <
>>>> ghosh.debas...@gmail.com> wrote:
>>>>
>>>>> I think the issue may not be linked with Future. What happens is when
>>>>> this piece of code is executed ..
>>>>>
>>>>> val rides: DataStream[TaxiRide] =
>>>>>   readStream(inTaxiRide)
>>>>>     .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>>>     .keyBy("rideId")
>>>>>
>>>>> val fares: DataStream[TaxiFare] =
>>>>>   readStream(inTaxiFare)
>>>>>     .keyBy("rideId")
>>>>>
>>>>> val processed: DataStream[TaxiRideFare] =
>>>>>   rides
>>>>>     .connect(fares)
>>>>>     .flatMap(new EnrichmentFunction)
>>>>>
>>>>> somehow the ClosureCleaner gets executed as evident from the following
>>>>> which tries to serialize Avro data. Is there any way to pass the custom
>>>>> avro serializer that I am using ?
>>>>>
>>>>> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
>>>>> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
>>>>> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
>>>>> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
>>>>> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
>>>>> serializable. The object probably contains or references non serializable
>>>>> fields.
>>>>>             at
>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>             at
>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>             at
>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>             at
>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>             at
>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>             at
>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>             at
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>>>>>             at
>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>>>>>             at
>>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>>>>>             at
>>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>>>>>             at
>>>>> pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
>>>>>             at
>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
>>>>>             at
>>>>> pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
>>>>>             at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
>>>>>             at scala.util.Try$.apply(Try.scala:213)
>>>>>             at pipelines.runner.Runner$.run(Runner.scala:43)
>>>>>             at pipelines.runner.Runner$.main(Runner.scala:30)
>>>>>             at pipelines.runner.Runner.main(Runner.scala)
>>>>>             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>> Method)
>>>>>             at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>             at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>             at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>             at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>>>>             at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>>>>             at
>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>>>             at
>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>>>>>             at
>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>>>>             at
>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
>>>>>             at
>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>>             at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>             at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>             at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.io.NotSerializableException:
>>>>> org.apache.avro.Schema$Field
>>>>>             at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>             at
>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>             at java.util.ArrayList.writeObject(ArrayList.java:766)
>>>>>             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>> Method)
>>>>>             at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>             at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>             at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>             at
>>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
>>>>>             at
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>>>>>             at
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>             at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>             at
>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>             at
>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
>>>>>             at
>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
>>>>>
>>>>>
>>>>> I also tried the following ..
>>>>>
>>>>> class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide,
>>>>> TaxiFare, TaxiRideFare] {
>>>>>
>>>>>     @transient var rideState: ValueState[TaxiRide] = null
>>>>>     @transient var fareState: ValueState[TaxiFare] = null
>>>>>
>>>>>     override def open(params: Configuration): Unit = {
>>>>>       super.open(params)
>>>>>       rideState = getRuntimeContext.getState(
>>>>>         new ValueStateDescriptor[TaxiRide]("saved ride",
>>>>> classOf[TaxiRide]))
>>>>>       fareState = getRuntimeContext.getState(
>>>>>         new ValueStateDescriptor[TaxiFare]("saved fare",
>>>>> classOf[TaxiFare]))
>>>>>     }
>>>>>
>>>>> and moved the state initialization to open function. But still get the
>>>>> same result.
>>>>>
>>>>> Help ?
>>>>>
>>>>> regards.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <mmyy1...@gmail.com> wrote:
>>>>>
>>>>>> Hi Debasish,
>>>>>>
>>>>>> I guess the reason is something unexpectedly involved in
>>>>>> serialization due to a reference from inner class (anonymous class or
>>>>>> lambda expression).
>>>>>> When Flink serializes this inner class instance, it would also
>>>>>> serialize all referenced objects, for example, the outer class instance. 
>>>>>> If
>>>>>> the outer class is not serializable, this error would happen.
>>>>>>
>>>>>> You could have a try to move the piece of codes to a named non-inner
>>>>>> class.
>>>>>>
>>>>>> Thanks,
>>>>>> Biao /'bɪ.aʊ/
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <
>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>
>>>>>>> My main question is why serialisation kicks in when I try to execute
>>>>>>> within a `Future` and not otherwise.
>>>>>>>
>>>>>>> regards.
>>>>>>>
>>>>>>> On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <
>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Yes, they are generated from Avro Schema and implements
>>>>>>>> Serializable ..
>>>>>>>>
>>>>>>>> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <
>>>>>>>> deepakmc...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Does TaxiRide or TaxiRideFare implements Serializable?
>>>>>>>>>
>>>>>>>>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <
>>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello -
>>>>>>>>>>
>>>>>>>>>> The following piece of code is an example of a connected data
>>>>>>>>>> streams ..
>>>>>>>>>>
>>>>>>>>>> val rides: DataStream[TaxiRide] =
>>>>>>>>>>   readStream(inTaxiRide)
>>>>>>>>>>     .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>>>>>>>>     .keyBy("rideId")
>>>>>>>>>>
>>>>>>>>>> val fares: DataStream[TaxiFare] =
>>>>>>>>>>   readStream(inTaxiFare)
>>>>>>>>>>     .keyBy("rideId")
>>>>>>>>>>
>>>>>>>>>> val processed: DataStream[TaxiRideFare] =
>>>>>>>>>>   rides
>>>>>>>>>>     .connect(fares)
>>>>>>>>>>     .flatMap(new EnrichmentFunction)
>>>>>>>>>>
>>>>>>>>>> When I execute the above logic using
>>>>>>>>>> StreamExecutionEnvironment.execute(..) it runs fine.
>>>>>>>>>> But if I try to execute the above from within a
>>>>>>>>>> scala.concurrent.Future, I get the following exception ..
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.api.common.InvalidProgramException: [rideId
>>>>>>>>>> type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2,
>>>>>>>>>> passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon 
>>>>>>>>>> type:FLOAT
>>>>>>>>>> pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat
>>>>>>>>>> type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG 
>>>>>>>>>> pos:10] is
>>>>>>>>>> not serializable. The object probably contains or references non
>>>>>>>>>> serializable fields.
>>>>>>>>>>             at
>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>>>>>>             at
>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>>>>             at
>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>>>>             at
>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>>>>             at
>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>>>>             at
>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>>>>>             at
>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>>>>>>>>>>             at
>>>>>>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>>>>>>>>>>             at
>>>>>>>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>>>>>>>>>>             at
>>>>>>>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>>>>>>>>>>   ...
>>>>>>>>>>
>>>>>>>>>> Caused by: java.io.NotSerializableException:
>>>>>>>>>> org.apache.avro.Schema$Field
>>>>>>>>>>             at
>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>>>>>>
>>>>>>>>>> Any thoughts why this may happen ?
>>>>>>>>>>
>>>>>>>>>> regards.
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Debasish Ghosh
>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>
>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Thanks
>>>>>>>>> Deepak
>>>>>>>>> www.bigdatabig.com
>>>>>>>>> www.keosha.net
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Debasish Ghosh
>>>>>>>> http://manning.com/ghosh2
>>>>>>>> http://manning.com/ghosh
>>>>>>>>
>>>>>>>> Twttr: @debasishg
>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>
>>>>>>> --
>>>>>>> Sent from my iPhone
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Debasish Ghosh
>>>>> http://manning.com/ghosh2
>>>>> http://manning.com/ghosh
>>>>>
>>>>> Twttr: @debasishg
>>>>> Blog: http://debasishg.blogspot.com
>>>>> Code: http://github.com/debasishg
>>>>>
>>>>
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to