Debshish, could you share an example of before and after of your classes
for future reference?

On Thu, 19 Sep 2019, 10:42 Debasish Ghosh, <ghosh.debas...@gmail.com> wrote:

> We solved the problem of serialization by making some things transient
> which were being captured as part of the closure. So we no longer have
> serialization errors. Everything works properly without the future.
>
> I realize that because of statics concurrent job submission will be an
> issue. But we are submitting one job only - the difference is that it's
> through a Future. So there is no concurrent submission unless I am missing
> something.
>
> regards.
>
> On Thu, Sep 19, 2019 at 12:54 PM Biao Liu <mmyy1...@gmail.com> wrote:
>
>> Hi Debasish,
>>
>> I think there is something critical of your usage hided. It might help if
>> you could provide more details.
>>
>> It still confuses me how you solve the serialization issue. Why the
>> non-transient fields only affects serialization in a future?
>>
>> WRT this ProgramAbortException issue, do you submit jobs concurrently in
>> one process?
>> Currently job submission is not thread-safe. It relies on some static
>> variables which could be affected by other concurrent submissions in the
>> same process.
>> Asking this because usually job submission is not through
>> OptimizerPlanEnvironment which appears in your exception stack trace.
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> I think what you are pointing at is asynchronous datastream operations.
>>> In our case we want to submit the entire job in a Future. Something like
>>> the following ..
>>>
>>> def execute(..) = {
>>>   // this does all data stream manipulation, joins etc.
>>>   buildComputationGraph()
>>>
>>>   // submits for execution with StreamExecutionEnvironment
>>>   env.execute(..)
>>> }
>>>
>>> and we want to do ..
>>>
>>> val jobExecutionResultFuture = Future(execute(..))
>>>
>>> and this gives that exception.
>>>
>>> regards.
>>>
>>> On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch <rafi.ar...@gmail.com>
>>> wrote:
>>>
>>>> Hi Debasish,
>>>>
>>>> Have you taken a look at the AsyncIO API for running async operations?
>>>> I think this is the preferred way of doing it. [1]
>>>> So it would look something like this:
>>>>
>>>> class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] 
>>>> {
>>>>
>>>>     /** The database specific client that can issue concurrent requests 
>>>> with callbacks */
>>>>     lazy val client: DatabaseClient = new DatabaseClient(host, post, 
>>>> credentials)
>>>>
>>>>     /** The context used for the future callbacks */
>>>>     implicit lazy val executor: ExecutionContext = 
>>>> ExecutionContext.fromExecutor(Executors.directExecutor())
>>>>
>>>>
>>>>     override def asyncInvoke(str: String, resultFuture: 
>>>> ResultFuture[(String, String)]): Unit = {
>>>>
>>>>         // issue the asynchronous request, receive a future for the result 
>>>>        val resultFutureRequested: Future[String] = client.query(str)
>>>>
>>>>         // set the callback to be executed once the request by the client 
>>>> is complete        // the callback simply forwards the result to the 
>>>> result future        resultFutureRequested.onSuccess {
>>>>             case result: String => resultFuture.complete(Iterable((str, 
>>>> result)))
>>>>         }
>>>>     }}
>>>>
>>>> // create the original streamval stream: DataStream[String] = ...
>>>> // apply the async I/O transformationval resultStream: DataStream[(String, 
>>>> String)] =
>>>>     AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 
>>>> 1000, TimeUnit.MILLISECONDS, 100)
>>>>
>>>>
>>>> Thanks,
>>>> Rafi
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api
>>>>
>>>> On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh <
>>>> ghosh.debas...@gmail.com> wrote:
>>>>
>>>>> ok, the above problem was due to some serialization issues which we
>>>>> fixed by marking some of the things transient. This fixes the 
>>>>> serialization
>>>>> issues .. But now when I try to execute in a Future I hit upon this ..
>>>>>
>>>>>
>>>>> *java.util.concurrent.ExecutionException: Boxed Error* at
>>>>> scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
>>>>> at
>>>>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
>>>>> at
>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
>>>>> at scala.concurrent.Promise.complete(Promise.scala:53)
>>>>> at scala.concurrent.Promise.complete$(Promise.scala:52)
>>>>> at
>>>>> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
>>>>> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
>>>>> at
>>>>> java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
>>>>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>>>>> at
>>>>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>>>>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>>>>> at
>>>>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>>>>>
>>>>> *Caused by:
>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException*
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>> at
>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>> at
>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
>>>>> at
>>>>> pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
>>>>> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
>>>>> at scala.util.Success.$anonfun$map$1(Try.scala:255)
>>>>> at scala.util.Success.map(Try.scala:213)
>>>>> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
>>>>> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
>>>>> ... 7 more
>>>>>
>>>>> I found this issue in JIRA
>>>>> https://issues.apache.org/jira/browse/FLINK-10381 which is still open
>>>>> and talks about a related issue. But we are not submitting multiple jobs -
>>>>> we are just submitting 1 job but async in a Future. I am not clear why 
>>>>> this
>>>>> should create the problem that I see.
>>>>>
>>>>> Can anyone please help with an explanation ?
>>>>>
>>>>> regards.
>>>>>
>>>>> On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh <
>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>
>>>>>> I think the issue may not be linked with Future. What happens is when
>>>>>> this piece of code is executed ..
>>>>>>
>>>>>> val rides: DataStream[TaxiRide] =
>>>>>>   readStream(inTaxiRide)
>>>>>>     .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>>>>     .keyBy("rideId")
>>>>>>
>>>>>> val fares: DataStream[TaxiFare] =
>>>>>>   readStream(inTaxiFare)
>>>>>>     .keyBy("rideId")
>>>>>>
>>>>>> val processed: DataStream[TaxiRideFare] =
>>>>>>   rides
>>>>>>     .connect(fares)
>>>>>>     .flatMap(new EnrichmentFunction)
>>>>>>
>>>>>> somehow the ClosureCleaner gets executed as evident from the
>>>>>> following which tries to serialize Avro data. Is there any way to pass 
>>>>>> the
>>>>>> custom avro serializer that I am using ?
>>>>>>
>>>>>> org.apache.flink.api.common.InvalidProgramException: [rideId
>>>>>> type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2,
>>>>>> passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon 
>>>>>> type:FLOAT
>>>>>> pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat
>>>>>> type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is
>>>>>> not serializable. The object probably contains or references non
>>>>>> serializable fields.
>>>>>>             at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>>             at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>             at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>             at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>             at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>             at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>             at
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>>>>>>             at
>>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>>>>>>             at
>>>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>>>>>>             at
>>>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>>>>>>             at
>>>>>> pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
>>>>>>             at
>>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
>>>>>>             at
>>>>>> pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
>>>>>>             at
>>>>>> pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
>>>>>>             at scala.util.Try$.apply(Try.scala:213)
>>>>>>             at pipelines.runner.Runner$.run(Runner.scala:43)
>>>>>>             at pipelines.runner.Runner$.main(Runner.scala:30)
>>>>>>             at pipelines.runner.Runner.main(Runner.scala)
>>>>>>             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>> Method)
>>>>>>             at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>             at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>             at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>             at
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>>>>>             at
>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>>>>>             at
>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>>>>             at
>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>>>>>>             at
>>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>>>>>             at
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
>>>>>>             at
>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>>>             at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>             at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>             at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.io.NotSerializableException:
>>>>>> org.apache.avro.Schema$Field
>>>>>>             at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>>             at
>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>>             at java.util.ArrayList.writeObject(ArrayList.java:766)
>>>>>>             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>> Method)
>>>>>>             at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>             at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>             at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>             at
>>>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
>>>>>>             at
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>>>>>>             at
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>>             at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>>             at
>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>>             at
>>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
>>>>>>             at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
>>>>>>
>>>>>>
>>>>>> I also tried the following ..
>>>>>>
>>>>>> class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide,
>>>>>> TaxiFare, TaxiRideFare] {
>>>>>>
>>>>>>     @transient var rideState: ValueState[TaxiRide] = null
>>>>>>     @transient var fareState: ValueState[TaxiFare] = null
>>>>>>
>>>>>>     override def open(params: Configuration): Unit = {
>>>>>>       super.open(params)
>>>>>>       rideState = getRuntimeContext.getState(
>>>>>>         new ValueStateDescriptor[TaxiRide]("saved ride",
>>>>>> classOf[TaxiRide]))
>>>>>>       fareState = getRuntimeContext.getState(
>>>>>>         new ValueStateDescriptor[TaxiFare]("saved fare",
>>>>>> classOf[TaxiFare]))
>>>>>>     }
>>>>>>
>>>>>> and moved the state initialization to open function. But still get
>>>>>> the same result.
>>>>>>
>>>>>> Help ?
>>>>>>
>>>>>> regards.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 17, 2019 at 12:28 PM Biao Liu <mmyy1...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Debasish,
>>>>>>>
>>>>>>> I guess the reason is something unexpectedly involved in
>>>>>>> serialization due to a reference from inner class (anonymous class or
>>>>>>> lambda expression).
>>>>>>> When Flink serializes this inner class instance, it would also
>>>>>>> serialize all referenced objects, for example, the outer class 
>>>>>>> instance. If
>>>>>>> the outer class is not serializable, this error would happen.
>>>>>>>
>>>>>>> You could have a try to move the piece of codes to a named non-inner
>>>>>>> class.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Biao /'bɪ.aʊ/
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh <
>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>
>>>>>>>> My main question is why serialisation kicks in when I try to
>>>>>>>> execute within a `Future` and not otherwise.
>>>>>>>>
>>>>>>>> regards.
>>>>>>>>
>>>>>>>> On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh <
>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Yes, they are generated from Avro Schema and implements
>>>>>>>>> Serializable ..
>>>>>>>>>
>>>>>>>>> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma <
>>>>>>>>> deepakmc...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Does TaxiRide or TaxiRideFare implements Serializable?
>>>>>>>>>>
>>>>>>>>>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh <
>>>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello -
>>>>>>>>>>>
>>>>>>>>>>> The following piece of code is an example of a connected data
>>>>>>>>>>> streams ..
>>>>>>>>>>>
>>>>>>>>>>> val rides: DataStream[TaxiRide] =
>>>>>>>>>>>   readStream(inTaxiRide)
>>>>>>>>>>>     .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>>>>>>>>>     .keyBy("rideId")
>>>>>>>>>>>
>>>>>>>>>>> val fares: DataStream[TaxiFare] =
>>>>>>>>>>>   readStream(inTaxiFare)
>>>>>>>>>>>     .keyBy("rideId")
>>>>>>>>>>>
>>>>>>>>>>> val processed: DataStream[TaxiRideFare] =
>>>>>>>>>>>   rides
>>>>>>>>>>>     .connect(fares)
>>>>>>>>>>>     .flatMap(new EnrichmentFunction)
>>>>>>>>>>>
>>>>>>>>>>> When I execute the above logic using
>>>>>>>>>>> StreamExecutionEnvironment.execute(..) it runs fine.
>>>>>>>>>>> But if I try to execute the above from within a
>>>>>>>>>>> scala.concurrent.Future, I get the following exception ..
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.api.common.InvalidProgramException: [rideId
>>>>>>>>>>> type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2,
>>>>>>>>>>> passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon 
>>>>>>>>>>> type:FLOAT
>>>>>>>>>>> pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat
>>>>>>>>>>> type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG 
>>>>>>>>>>> pos:10] is
>>>>>>>>>>> not serializable. The object probably contains or references non
>>>>>>>>>>> serializable fields.
>>>>>>>>>>>             at
>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>>>>>>>>>>             at
>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>>>>>             at
>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>>>>>             at
>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>>>>>             at
>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>>>>>>>>>>             at
>>>>>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>>>>>>>>>>             at
>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>>>>>>>>>>>             at
>>>>>>>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>>>>>>>>>>>             at
>>>>>>>>>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>>>>>>>>>>>             at
>>>>>>>>>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>>>>>>>>>>>   ...
>>>>>>>>>>>
>>>>>>>>>>> Caused by: java.io.NotSerializableException:
>>>>>>>>>>> org.apache.avro.Schema$Field
>>>>>>>>>>>             at
>>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>>>>>>>
>>>>>>>>>>> Any thoughts why this may happen ?
>>>>>>>>>>>
>>>>>>>>>>> regards.
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>
>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Thanks
>>>>>>>>>> Deepak
>>>>>>>>>> www.bigdatabig.com
>>>>>>>>>> www.keosha.net
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Debasish Ghosh
>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>
>>>>>>>>> Twttr: @debasishg
>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>
>>>>>>>> --
>>>>>>>> Sent from my iPhone
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Debasish Ghosh
>>>>>> http://manning.com/ghosh2
>>>>>> http://manning.com/ghosh
>>>>>>
>>>>>> Twttr: @debasishg
>>>>>> Blog: http://debasishg.blogspot.com
>>>>>> Code: http://github.com/debasishg
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Debasish Ghosh
>>>>> http://manning.com/ghosh2
>>>>> http://manning.com/ghosh
>>>>>
>>>>> Twttr: @debasishg
>>>>> Blog: http://debasishg.blogspot.com
>>>>> Code: http://github.com/debasishg
>>>>>
>>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to