Re: How to share a NonSerializable variable among tasks in the same worker node?

2015-01-21 Thread Sean Owen
Singletons aren't hacks; it can be an entirely appropriate pattern for
this. What exception do you get? From Spark or your code? I think this
pattern is orthogonal to using Spark.
On Jan 21, 2015 8:11 AM, "octavian.ganea" 
wrote:

> In case someone has the same problem:
>
> The singleton hack works for me sometimes, sometimes it doesn't in spark
> 1.2.0, that is, sometimes I get nullpointerexception. Anyway, if you really
> need to work with big indexes and you want to have the smallest amount of
> communication between master and nodes, as well as if you have RAM
> available
> just for one instance of the indexes data per machine, than I suggest you
> use spark with memcached .
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tp11048p21282.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to share a NonSerializable variable among tasks in the same worker node?

2015-01-21 Thread octavian.ganea
In case someone has the same problem:

The singleton hack works for me sometimes, sometimes it doesn't in spark
1.2.0, that is, sometimes I get nullpointerexception. Anyway, if you really
need to work with big indexes and you want to have the smallest amount of
communication between master and nodes, as well as if you have RAM available
just for one instance of the indexes data per machine, than I suggest you
use spark with memcached . 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tp11048p21282.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to share a NonSerializable variable among tasks in the same worker node?

2015-01-20 Thread Fengyun RAO
currently we migrate from 1.1 to 1.2, and found our program 3x slower,
maybe due to the singleton hack?

could you explain in detail why or how "The singleton hack works very
different in spark 1.2.0 "

thanks!

2015-01-18 20:56 GMT+08:00 octavian.ganea :

> The singleton hack works very different in spark 1.2.0 (it does not work if
> the program has multiple map-reduce jobs in the same program). I guess
> there
> should be an official documentation on how to have each machine/node do an
> init step locally before executing any other instructions (e.g. loading
> locally a very big object once at the begining that can be used in all
> further map jobs that will be assigned to that worker).
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tp11048p21219.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to share a NonSerializable variable among tasks in the same worker node?

2015-01-18 Thread octavian.ganea
The singleton hack works very different in spark 1.2.0 (it does not work if
the program has multiple map-reduce jobs in the same program). I guess there
should be an official documentation on how to have each machine/node do an
init step locally before executing any other instructions (e.g. loading
locally a very big object once at the begining that can be used in all
further map jobs that will be assigned to that worker).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tp11048p21219.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-10 Thread DB Tsai
Spark cached the RDD in JVM, so presumably, yes, the singleton trick should
work.

Sent from my Google Nexus 5
On Aug 9, 2014 11:00 AM, "Kevin James Matzen" 
wrote:

> I have a related question.  With Hadoop, I would do the same thing for
> non-serializable objects and setup().  I also had a use case where it
> was so expensive to initialize the non-serializable object that I
> would make it a static member of the mapper, turn on JVM reuse across
> tasks, and then prevent the reinitialization for every task on the
> same node.  Is that easy to do with Spark?  Assuming Spark reuses the
> JVM across tasks by default, then taking raofengyun's factory method
> and it return a singleton should work, right?  Does Spark reuse JVMs
> across tasks?
>
> On Sat, Aug 9, 2014 at 7:48 AM, Fengyun RAO  wrote:
> > Although nobody answers the Two questions, in my practice, it seems both
> are
> > yes.
> >
> >
> > 2014-08-04 19:50 GMT+08:00 Fengyun RAO :
> >>
> >> object LogParserWrapper {
> >> private val logParser = {
> >> val settings = new ...
> >> val builders = new 
> >> new LogParser(builders, settings)
> >> }
> >> def getParser = logParser
> >> }
> >>
> >> object MySparkJob {
> >>def main(args: Array[String]) {
> >> val sc = new SparkContext()
> >> val lines = sc.textFile(arg(0))
> >>
> >> val parsed = lines.map(line =>
> >> LogParserWrapper.getParser.parse(line))
> >> ...
> >> }
> >>
> >> Q1: Is this the right way to share LogParser instance among all tasks on
> >> the same worker, if LogParser is not serializable?
> >>
> >> Q2: LogParser is read-only, but can LogParser hold a cache field such
> as a
> >> ConcurrentHashMap where all tasks on the same worker try to get() and
> put()
> >> items?
> >>
> >>
> >> 2014-08-04 19:29 GMT+08:00 Sean Owen :
> >>
> >>> The issue is that it's not clear what "parser" is. It's not shown in
> >>> your code. The snippet you show does not appear to contain a parser
> >>> object.
> >>>
> >>> On Mon, Aug 4, 2014 at 10:01 AM, Fengyun RAO 
> >>> wrote:
> >>> > Thanks, Sean!
> >>> >
> >>> > It works, but as the link in 2 - Why Is My Spark Job so Slow and Only
> >>> > Using
> >>> > a Single Thread? says " parser instance is now a singleton created in
> >>> > the
> >>> > scope of our driver program" which I thought was in the scope of
> >>> > executor.
> >>> > Am I wrong, or why?
> >>> >
> >>> > I didn't want the equivalent of "setup()" method, since I want to
> share
> >>> > the
> >>> > "parser" among tasks in the same worker node. It takes tens of
> seconds
> >>> > to
> >>> > initialize a "parser". What's more, I want to know if the "parser"
> >>> > could
> >>> > have a field such as ConcurrentHashMap which all tasks in the node
> may
> >>> > get()
> >>> > of put() items.
> >>> >
> >>> >
> >>> >
> >>> >
> >>> > 2014-08-04 16:35 GMT+08:00 Sean Owen :
> >>> >
> >>> >> The parser does not need to be serializable. In the line:
> >>> >>
> >>> >> lines.map(line => JSONParser.parse(line))
> >>> >>
> >>> >> ... the parser is called but there is no parser object that with
> state
> >>> >> that can be serialized. Are you sure it does not work?
> >>> >>
> >>> >> The error message alluded to originally refers to an object not
> shown
> >>> >> in the code, so I'm not 100% sure this was the original issue.
> >>> >>
> >>> >> If you want, the equivalent of "setup()" is really "writing some
> code
> >>> >> at the start of a call to mapPartitions()"
> >>> >>
> >>> >> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO 
> >>> >> wrote:
> >>> >> > Thanks, Ron.
> >>> >> >
> >>> >> > The problem is that the "parser" is written in another package
> which
> >>> >> > is
> >>> >> > not
> >>> >> > serializable.
> >>> >> >
> >>> >> > In mapreduce, I could create the "parser" in the map setup()
> method.
> >>> >> >
> >>> >> > Now in spark, I want to create it for each worker, and share it
> >>> >> > among
> >>> >> > all
> >>> >> > the tasks on the same work node.
> >>> >> >
> >>> >> > I know different workers run on different machine, but it doesn't
> >>> >> > have
> >>> >> > to
> >>> >> > communicate between workers.
> >>> >
> >>> >
> >>
> >>
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-09 Thread Kevin James Matzen
I have a related question.  With Hadoop, I would do the same thing for
non-serializable objects and setup().  I also had a use case where it
was so expensive to initialize the non-serializable object that I
would make it a static member of the mapper, turn on JVM reuse across
tasks, and then prevent the reinitialization for every task on the
same node.  Is that easy to do with Spark?  Assuming Spark reuses the
JVM across tasks by default, then taking raofengyun's factory method
and it return a singleton should work, right?  Does Spark reuse JVMs
across tasks?

On Sat, Aug 9, 2014 at 7:48 AM, Fengyun RAO  wrote:
> Although nobody answers the Two questions, in my practice, it seems both are
> yes.
>
>
> 2014-08-04 19:50 GMT+08:00 Fengyun RAO :
>>
>> object LogParserWrapper {
>> private val logParser = {
>> val settings = new ...
>> val builders = new 
>> new LogParser(builders, settings)
>> }
>> def getParser = logParser
>> }
>>
>> object MySparkJob {
>>def main(args: Array[String]) {
>> val sc = new SparkContext()
>> val lines = sc.textFile(arg(0))
>>
>> val parsed = lines.map(line =>
>> LogParserWrapper.getParser.parse(line))
>> ...
>> }
>>
>> Q1: Is this the right way to share LogParser instance among all tasks on
>> the same worker, if LogParser is not serializable?
>>
>> Q2: LogParser is read-only, but can LogParser hold a cache field such as a
>> ConcurrentHashMap where all tasks on the same worker try to get() and put()
>> items?
>>
>>
>> 2014-08-04 19:29 GMT+08:00 Sean Owen :
>>
>>> The issue is that it's not clear what "parser" is. It's not shown in
>>> your code. The snippet you show does not appear to contain a parser
>>> object.
>>>
>>> On Mon, Aug 4, 2014 at 10:01 AM, Fengyun RAO 
>>> wrote:
>>> > Thanks, Sean!
>>> >
>>> > It works, but as the link in 2 - Why Is My Spark Job so Slow and Only
>>> > Using
>>> > a Single Thread? says " parser instance is now a singleton created in
>>> > the
>>> > scope of our driver program" which I thought was in the scope of
>>> > executor.
>>> > Am I wrong, or why?
>>> >
>>> > I didn't want the equivalent of "setup()" method, since I want to share
>>> > the
>>> > "parser" among tasks in the same worker node. It takes tens of seconds
>>> > to
>>> > initialize a "parser". What's more, I want to know if the "parser"
>>> > could
>>> > have a field such as ConcurrentHashMap which all tasks in the node may
>>> > get()
>>> > of put() items.
>>> >
>>> >
>>> >
>>> >
>>> > 2014-08-04 16:35 GMT+08:00 Sean Owen :
>>> >
>>> >> The parser does not need to be serializable. In the line:
>>> >>
>>> >> lines.map(line => JSONParser.parse(line))
>>> >>
>>> >> ... the parser is called but there is no parser object that with state
>>> >> that can be serialized. Are you sure it does not work?
>>> >>
>>> >> The error message alluded to originally refers to an object not shown
>>> >> in the code, so I'm not 100% sure this was the original issue.
>>> >>
>>> >> If you want, the equivalent of "setup()" is really "writing some code
>>> >> at the start of a call to mapPartitions()"
>>> >>
>>> >> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO 
>>> >> wrote:
>>> >> > Thanks, Ron.
>>> >> >
>>> >> > The problem is that the "parser" is written in another package which
>>> >> > is
>>> >> > not
>>> >> > serializable.
>>> >> >
>>> >> > In mapreduce, I could create the "parser" in the map setup() method.
>>> >> >
>>> >> > Now in spark, I want to create it for each worker, and share it
>>> >> > among
>>> >> > all
>>> >> > the tasks on the same work node.
>>> >> >
>>> >> > I know different workers run on different machine, but it doesn't
>>> >> > have
>>> >> > to
>>> >> > communicate between workers.
>>> >
>>> >
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-09 Thread Fengyun RAO
Although nobody answers the Two questions, in my practice, it seems both
are yes.


2014-08-04 19:50 GMT+08:00 Fengyun RAO :

> object LogParserWrapper {
> private val logParser = {
> val settings = new ...
> val builders = new 
> new LogParser(builders, settings)
> }
> def getParser = logParser
> }
>
> object MySparkJob {
>def main(args: Array[String]) {
> val sc = new SparkContext()
> val lines = sc.textFile(arg(0))
>
> val parsed = lines.map(line =>
> LogParserWrapper.getParser.parse(line))
> ...
> }
>
> Q1: Is this the right way to share LogParser instance among all tasks on
> the same worker, if LogParser is not serializable?
>
> Q2: LogParser is read-only, but can LogParser hold a cache field such as a
> ConcurrentHashMap where all tasks on the same worker try to get() and put()
> items?
>
>
> 2014-08-04 19:29 GMT+08:00 Sean Owen :
>
> The issue is that it's not clear what "parser" is. It's not shown in
>> your code. The snippet you show does not appear to contain a parser
>> object.
>>
>> On Mon, Aug 4, 2014 at 10:01 AM, Fengyun RAO 
>> wrote:
>> > Thanks, Sean!
>> >
>> > It works, but as the link in 2 - Why Is My Spark Job so Slow and Only
>> Using
>> > a Single Thread? says " parser instance is now a singleton created in
>> the
>> > scope of our driver program" which I thought was in the scope of
>> executor.
>> > Am I wrong, or why?
>> >
>> > I didn't want the equivalent of "setup()" method, since I want to share
>> the
>> > "parser" among tasks in the same worker node. It takes tens of seconds
>> to
>> > initialize a "parser". What's more, I want to know if the "parser" could
>> > have a field such as ConcurrentHashMap which all tasks in the node may
>> get()
>> > of put() items.
>> >
>> >
>> >
>> >
>> > 2014-08-04 16:35 GMT+08:00 Sean Owen :
>> >
>> >> The parser does not need to be serializable. In the line:
>> >>
>> >> lines.map(line => JSONParser.parse(line))
>> >>
>> >> ... the parser is called but there is no parser object that with state
>> >> that can be serialized. Are you sure it does not work?
>> >>
>> >> The error message alluded to originally refers to an object not shown
>> >> in the code, so I'm not 100% sure this was the original issue.
>> >>
>> >> If you want, the equivalent of "setup()" is really "writing some code
>> >> at the start of a call to mapPartitions()"
>> >>
>> >> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO 
>> wrote:
>> >> > Thanks, Ron.
>> >> >
>> >> > The problem is that the "parser" is written in another package which
>> is
>> >> > not
>> >> > serializable.
>> >> >
>> >> > In mapreduce, I could create the "parser" in the map setup() method.
>> >> >
>> >> > Now in spark, I want to create it for each worker, and share it among
>> >> > all
>> >> > the tasks on the same work node.
>> >> >
>> >> > I know different workers run on different machine, but it doesn't
>> have
>> >> > to
>> >> > communicate between workers.
>> >
>> >
>>
>
>


Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-04 Thread Fengyun RAO
object LogParserWrapper {
private val logParser = {
val settings = new ...
val builders = new 
new LogParser(builders, settings)
}
def getParser = logParser
}

object MySparkJob {
   def main(args: Array[String]) {
val sc = new SparkContext()
val lines = sc.textFile(arg(0))

val parsed = lines.map(line =>
LogParserWrapper.getParser.parse(line))
...
}

Q1: Is this the right way to share LogParser instance among all tasks on
the same worker, if LogParser is not serializable?

Q2: LogParser is read-only, but can LogParser hold a cache field such as a
ConcurrentHashMap where all tasks on the same worker try to get() and put()
items?


2014-08-04 19:29 GMT+08:00 Sean Owen :

> The issue is that it's not clear what "parser" is. It's not shown in
> your code. The snippet you show does not appear to contain a parser
> object.
>
> On Mon, Aug 4, 2014 at 10:01 AM, Fengyun RAO  wrote:
> > Thanks, Sean!
> >
> > It works, but as the link in 2 - Why Is My Spark Job so Slow and Only
> Using
> > a Single Thread? says " parser instance is now a singleton created in the
> > scope of our driver program" which I thought was in the scope of
> executor.
> > Am I wrong, or why?
> >
> > I didn't want the equivalent of "setup()" method, since I want to share
> the
> > "parser" among tasks in the same worker node. It takes tens of seconds to
> > initialize a "parser". What's more, I want to know if the "parser" could
> > have a field such as ConcurrentHashMap which all tasks in the node may
> get()
> > of put() items.
> >
> >
> >
> >
> > 2014-08-04 16:35 GMT+08:00 Sean Owen :
> >
> >> The parser does not need to be serializable. In the line:
> >>
> >> lines.map(line => JSONParser.parse(line))
> >>
> >> ... the parser is called but there is no parser object that with state
> >> that can be serialized. Are you sure it does not work?
> >>
> >> The error message alluded to originally refers to an object not shown
> >> in the code, so I'm not 100% sure this was the original issue.
> >>
> >> If you want, the equivalent of "setup()" is really "writing some code
> >> at the start of a call to mapPartitions()"
> >>
> >> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO 
> wrote:
> >> > Thanks, Ron.
> >> >
> >> > The problem is that the "parser" is written in another package which
> is
> >> > not
> >> > serializable.
> >> >
> >> > In mapreduce, I could create the "parser" in the map setup() method.
> >> >
> >> > Now in spark, I want to create it for each worker, and share it among
> >> > all
> >> > the tasks on the same work node.
> >> >
> >> > I know different workers run on different machine, but it doesn't have
> >> > to
> >> > communicate between workers.
> >
> >
>


Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-04 Thread DB Tsai
You can try to define a wrapper class for your parser, and create an
instance of your parser in companion object as a singleton object.
Thus, even you create an object of wrapper in mapPartition every time,
each JVM will have only a single instance of your parser object.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Mon, Aug 4, 2014 at 2:01 AM, Fengyun RAO  wrote:
> Thanks, Sean!
>
> It works, but as the link in 2 - Why Is My Spark Job so Slow and Only Using
> a Single Thread? says " parser instance is now a singleton created in the
> scope of our driver program" which I thought was in the scope of executor.
> Am I wrong, or why?
>
> I didn't want the equivalent of "setup()" method, since I want to share the
> "parser" among tasks in the same worker node. It takes tens of seconds to
> initialize a "parser". What's more, I want to know if the "parser" could
> have a field such as ConcurrentHashMap which all tasks in the node may get()
> of put() items.
>
>
>
>
> 2014-08-04 16:35 GMT+08:00 Sean Owen :
>
>> The parser does not need to be serializable. In the line:
>>
>> lines.map(line => JSONParser.parse(line))
>>
>> ... the parser is called but there is no parser object that with state
>> that can be serialized. Are you sure it does not work?
>>
>> The error message alluded to originally refers to an object not shown
>> in the code, so I'm not 100% sure this was the original issue.
>>
>> If you want, the equivalent of "setup()" is really "writing some code
>> at the start of a call to mapPartitions()"
>>
>> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO  wrote:
>> > Thanks, Ron.
>> >
>> > The problem is that the "parser" is written in another package which is
>> > not
>> > serializable.
>> >
>> > In mapreduce, I could create the "parser" in the map setup() method.
>> >
>> > Now in spark, I want to create it for each worker, and share it among
>> > all
>> > the tasks on the same work node.
>> >
>> > I know different workers run on different machine, but it doesn't have
>> > to
>> > communicate between workers.
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-04 Thread Fengyun RAO
Thanks, Sean!

It works, but as the link in 2 - Why Is My Spark Job so Slow and Only Using
a Single Thread?

 says " parser instance is now a singleton created in the scope of our
driver program" which I thought was in the scope of executor. Am I wrong,
or why?

I didn't want the equivalent of "setup()" method, since I want to share the
"parser" among tasks in the same worker node. It takes tens of seconds to
initialize a "parser". What's more, I want to know if the "parser" could
have a field such as ConcurrentHashMap which all tasks in the node may
get() of put() items.




2014-08-04 16:35 GMT+08:00 Sean Owen :

> The parser does not need to be serializable. In the line:
>
> lines.map(line => JSONParser.parse(line))
>
> ... the parser is called but there is no parser object that with state
> that can be serialized. Are you sure it does not work?
>
> The error message alluded to originally refers to an object not shown
> in the code, so I'm not 100% sure this was the original issue.
>
> If you want, the equivalent of "setup()" is really "writing some code
> at the start of a call to mapPartitions()"
>
> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO  wrote:
> > Thanks, Ron.
> >
> > The problem is that the "parser" is written in another package which is
> not
> > serializable.
> >
> > In mapreduce, I could create the "parser" in the map setup() method.
> >
> > Now in spark, I want to create it for each worker, and share it among all
> > the tasks on the same work node.
> >
> > I know different workers run on different machine, but it doesn't have to
> > communicate between workers.
>


Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-04 Thread Sean Owen
The parser does not need to be serializable. In the line:

lines.map(line => JSONParser.parse(line))

... the parser is called but there is no parser object that with state
that can be serialized. Are you sure it does not work?

The error message alluded to originally refers to an object not shown
in the code, so I'm not 100% sure this was the original issue.

If you want, the equivalent of "setup()" is really "writing some code
at the start of a call to mapPartitions()"

On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO  wrote:
> Thanks, Ron.
>
> The problem is that the "parser" is written in another package which is not
> serializable.
>
> In mapreduce, I could create the "parser" in the map setup() method.
>
> Now in spark, I want to create it for each worker, and share it among all
> the tasks on the same work node.
>
> I know different workers run on different machine, but it doesn't have to
> communicate between workers.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-04 Thread Fengyun RAO
Thanks, Ron.

The problem is that the "parser" is written in another package which is not
serializable.

In mapreduce, I could create the "parser" in the map setup() method.

Now in spark, I want to create it for each worker, and share it among all
the tasks on the same work node.

I know different workers run on different machine, but it doesn't have to
communicate between workers.



2014-08-04 10:51 GMT+08:00 Ron's Yahoo! :

> I think you’re going to have to make it serializable by registering it
> with the Kryo registrator. I think multiple workers are running as separate
> VMs so it might need to be able to serialize and deserialize broadcasted
> variables to the different executors.
>
> Thanks,
> Ron
>
> On Aug 3, 2014, at 6:38 PM, Fengyun RAO  wrote:
>
> Could anybody help?
>
> I wonder if I asked a stupid question or I didn't make the question clear?
>
>
> 2014-07-31 21:47 GMT+08:00 Fengyun RAO :
>
>> As shown here:
>> 2 - Why Is My Spark Job so Slow and Only Using a Single Thread?
>> 
>>
>>
>>  123456789101112131415
>>
>> object JSONParser {  def parse(raw: String): String = ...}object 
>> MyFirstSparkJob {  def main(args: Array[String]) {val sc = new 
>> SparkContext()val lines = sc.textFileStream("beacons.txt")
>> lines.map(line => JSONParser.parse(line))lines.foreach(line => 
>> println(line))ssc.start()  }}
>>
>> It says " parser instance is now a singleton created in the scope of our
>> driver program" which I thought was in the scope of executor. Am I
>> wrong, or why?
>>
>> What if the parser is not serializable, and I want to share it among
>> tasks in the same worker node?
>>
>>
>>
>>
>>
>
>


Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-03 Thread Ron's Yahoo!
I think you’re going to have to make it serializable by registering it with the 
Kryo registrator. I think multiple workers are running as separate VMs so it 
might need to be able to serialize and deserialize broadcasted variables to the 
different executors.

Thanks,
Ron

On Aug 3, 2014, at 6:38 PM, Fengyun RAO  wrote:

> Could anybody help?
> 
> I wonder if I asked a stupid question or I didn't make the question clear?
> 
> 
> 2014-07-31 21:47 GMT+08:00 Fengyun RAO :
> As shown here:
> 2 - Why Is My Spark Job so Slow and Only Using a Single Thread?
> 
> 
> 1
> 2
> 3
> 4
> 5
> 6
> 7
> 8
> 9
> 10
> 11
> 12
> 13
> 14
> 15
> object JSONParser {
>   def parse(raw: String): String = ...
> }
> 
> object MyFirstSparkJob {
>   def main(args: Array[String]) {
> val sc = new SparkContext()
> 
> val lines = sc.textFileStream("beacons.txt")
> lines.map(line => JSONParser.parse(line))
> lines.foreach(line => println(line))
> 
> ssc.start()
>   }
> }
> It says " parser instance is now a singleton created in the scope of our 
> driver program" which I thought was in the scope of executor. Am I wrong, or 
> why?
> 
> What if the parser is not serializable, and I want to share it among tasks in 
> the same worker node?
> 
> 
> 
> 
> 
> 



Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-03 Thread Fengyun RAO
Could anybody help?

I wonder if I asked a stupid question or I didn't make the question clear?


2014-07-31 21:47 GMT+08:00 Fengyun RAO :

> As shown here:
> 2 - Why Is My Spark Job so Slow and Only Using a Single Thread?
> 
>
>
>  123456789101112131415
>
> object JSONParser {  def parse(raw: String): String = ...}object 
> MyFirstSparkJob {  def main(args: Array[String]) {val sc = new 
> SparkContext()val lines = sc.textFileStream("beacons.txt")
> lines.map(line => JSONParser.parse(line))lines.foreach(line => 
> println(line))ssc.start()  }}
>
> It says " parser instance is now a singleton created in the scope of our
> driver program" which I thought was in the scope of executor. Am I wrong,
> or why?
>
> What if the parser is not serializable, and I want to share it among
> tasks in the same worker node?
>
>
>
>
>


How to share a NonSerializable variable among tasks in the same worker node?

2014-07-31 Thread Fengyun RAO
As shown here:
2 - Why Is My Spark Job so Slow and Only Using a Single Thread?



123456789101112131415

object JSONParser {  def parse(raw: String): String = ...}object
MyFirstSparkJob {  def main(args: Array[String]) {val sc = new
SparkContext()val lines = sc.textFileStream("beacons.txt")
lines.map(line => JSONParser.parse(line))lines.foreach(line =>
println(line))ssc.start()  }}

It says " parser instance is now a singleton created in the scope of our
driver program" which I thought was in the scope of executor. Am I wrong,
or why?

What if the parser is not serializable, and I want to share it among tasks
in the same worker node?