Re: Executor shutdown hook and initialization
Have a look at this ancient JIRA for a lot more discussion about this: https://issues.apache.org/jira/browse/SPARK-650 You have exactly the same issue described by another user. For your context, your approach is sound. You can set a shutdown hook using the normal Java Runtime API. You may not even need it; if your only resource is some data in memory or a daemon thread it will take care of itself. You can also consider rearchitecting to avoid needing global state. Per-partition resource management is easy. You just use mapPartitions, open reosurces per partition at the start of the function, and close them in a finally block, and do your work on the iterator over data in between. On Thu, Oct 27, 2016 at 3:44 PM Walter rakoff wrote: > Thanks for the info Sean. > > I'm initializing them in a singleton but Scala objects are evaluated > lazily. > So it gets initialized only when the first task is run(and makes use of > the object). > Plan is to start a background thread in the object that does periodic > cache refresh too. > I'm trying to see if this init can be done right when executor is created. > > Btw, this is for a Spark streaming app. So doing this per partition during > each batch isn't ideal. > I'd like to keep them(connect & cache) across batches. > > Finally, how do I setup the shutdown hook on an executor? Except for > operations on RDD everything else is executed in the driver. > All I can think of is something like this > sc.makeRDD((1 until sc.defaultParallelism), sc.defaultParallelism) >.foreachPartition(sys.ShutdownHookThread { Singleton.DoCleanup() } ) > > Walt > >
Re: Executor shutdown hook and initialization
Hi Sean Could you please elaborate on how can this be done on a per partition basis? Regards Sumit Chawla On Thu, Oct 27, 2016 at 7:44 AM, Walter rakoff wrote: > Thanks for the info Sean. > > I'm initializing them in a singleton but Scala objects are evaluated > lazily. > So it gets initialized only when the first task is run(and makes use of > the object). > Plan is to start a background thread in the object that does periodic > cache refresh too. > I'm trying to see if this init can be done right when executor is created. > > Btw, this is for a Spark streaming app. So doing this per partition during > each batch isn't ideal. > I'd like to keep them(connect & cache) across batches. > > Finally, how do I setup the shutdown hook on an executor? Except for > operations on RDD everything else is executed in the driver. > All I can think of is something like this > sc.makeRDD((1 until sc.defaultParallelism), sc.defaultParallelism) >.foreachPartition(sys.ShutdownHookThread { Singleton.DoCleanup() } > ) > > Walt > > On Thu, Oct 27, 2016 at 3:05 AM, Sean Owen wrote: > >> Init is easy -- initialize them in your singleton. >> Shutdown is harder; a shutdown hook is probably the only reliable way to >> go. >> Global state is not ideal in Spark. Consider initializing things like >> connections per partition, and open/close them with the lifecycle of a >> computation on a partition instead. >> >> On Wed, Oct 26, 2016 at 9:27 PM Walter rakoff >> wrote: >> >>> Hello, >>> >>> Is there a way I can add an init() call when an executor is created? I'd >>> like to initialize a few connections that are part of my singleton object. >>> Preferably this happens before it runs the first task >>> On the same line, how can I provide an shutdown hook that cleans up >>> these connections on termination. >>> >>> Thanks >>> Walt >>> >> >
Re: Executor shutdown hook and initialization
Thanks for the info Sean. I'm initializing them in a singleton but Scala objects are evaluated lazily. So it gets initialized only when the first task is run(and makes use of the object). Plan is to start a background thread in the object that does periodic cache refresh too. I'm trying to see if this init can be done right when executor is created. Btw, this is for a Spark streaming app. So doing this per partition during each batch isn't ideal. I'd like to keep them(connect & cache) across batches. Finally, how do I setup the shutdown hook on an executor? Except for operations on RDD everything else is executed in the driver. All I can think of is something like this sc.makeRDD((1 until sc.defaultParallelism), sc.defaultParallelism) .foreachPartition(sys.ShutdownHookThread { Singleton.DoCleanup() } ) Walt On Thu, Oct 27, 2016 at 3:05 AM, Sean Owen wrote: > Init is easy -- initialize them in your singleton. > Shutdown is harder; a shutdown hook is probably the only reliable way to > go. > Global state is not ideal in Spark. Consider initializing things like > connections per partition, and open/close them with the lifecycle of a > computation on a partition instead. > > On Wed, Oct 26, 2016 at 9:27 PM Walter rakoff > wrote: > >> Hello, >> >> Is there a way I can add an init() call when an executor is created? I'd >> like to initialize a few connections that are part of my singleton object. >> Preferably this happens before it runs the first task >> On the same line, how can I provide an shutdown hook that cleans up these >> connections on termination. >> >> Thanks >> Walt >> >
Re: Executor shutdown hook and initialization
Init is easy -- initialize them in your singleton. Shutdown is harder; a shutdown hook is probably the only reliable way to go. Global state is not ideal in Spark. Consider initializing things like connections per partition, and open/close them with the lifecycle of a computation on a partition instead. On Wed, Oct 26, 2016 at 9:27 PM Walter rakoff wrote: > Hello, > > Is there a way I can add an init() call when an executor is created? I'd > like to initialize a few connections that are part of my singleton object. > Preferably this happens before it runs the first task > On the same line, how can I provide an shutdown hook that cleans up these > connections on termination. > > Thanks > Walt >
Executor shutdown hook and initialization
Hello, Is there a way I can add an init() call when an executor is created? I'd like to initialize a few connections that are part of my singleton object. Preferably this happens before it runs the first task On the same line, how can I provide an shutdown hook that cleans up these connections on termination. Thanks Walt