Re: Running synchronized JRI code

2016-02-15 Thread Simon Hafner
2016-02-15 14:02 GMT+01:00 Sun, Rui :
> On computation, RRDD launches one R process for each partition, so there 
> won't be thread-safe issue
>
> Could you give more details on your new environment?

Running on EC2, I start the executors via

 /usr/bin/R CMD javareconf -e "/usr/lib/spark/sbin/start-master.sh"

I invoke R via roughly

object R {
  case class Element(value: Double)
  lazy val re = Option(REngine.getLastEngine()).getOrElse({
val eng = new JRI.JRIEngine()

eng.parseAndEval(scala.io.Source.fromInputStream(this.getClass().getClassLoader().getResourceAsStream("r/fit.R")).mkString)
eng
  })

  def fit(curve: Seq[Element]): Option[Fitting] = {
synchronized {
  val env = re.newEnvironment(null, false)
  re.assign("curve", new REXPDouble(curve.map(_.value).toArray), env)
  val df = re.parseAndEval("data.frame(curve=curve)", env, true)
  re.assign("df", df, env)
  val fitted = re.parseAndEval("fit(df)", env, true).asList
  if (fitted.keys == null) {
None
  } else {
val map = fitted.keys.map(key => (key,
fitted.at(key).asDouble)).toMap
Some(Fitting(map("values")))
  }
}
  }
}

where `fit` is wrapped in an UDAF.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Running synchronized JRI code

2016-02-15 Thread Sun, Rui
On computation, RRDD launches one R process for each partition, so there won't 
be thread-safe issue

Could you give more details on your new environment?

-Original Message-
From: Simon Hafner [mailto:reactorm...@gmail.com] 
Sent: Monday, February 15, 2016 7:31 PM
To: Sun, Rui <rui@intel.com>
Cc: user <user@spark.apache.org>
Subject: Re: Running synchronized JRI code

2016-02-15 4:35 GMT+01:00 Sun, Rui <rui@intel.com>:
> Yes, JRI loads an R dynamic library into the executor JVM, which faces 
> thread-safe issue when there are multiple task threads within the executor.
>
> I am thinking if the demand like yours (calling R code in RDD 
> transformations) is much desired, we may consider refactoring RRDD for this 
> purpose, although it is currently intended for internal use by SparkR and not 
> a public API.
So the RRDDs don't have that thread safety issue? I'm currently creating a new 
environment for each call, but it still crashes.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running synchronized JRI code

2016-02-15 Thread Simon Hafner
2016-02-15 4:35 GMT+01:00 Sun, Rui :
> Yes, JRI loads an R dynamic library into the executor JVM, which faces 
> thread-safe issue when there are multiple task threads within the executor.
>
> I am thinking if the demand like yours (calling R code in RDD 
> transformations) is much desired, we may consider refactoring RRDD for this 
> purpose, although it is currently intended for internal use by SparkR and not 
> a public API.
So the RRDDs don't have that thread safety issue? I'm currently
creating a new environment for each call, but it still crashes.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Running synchronized JRI code

2016-02-14 Thread Sun, Rui
For YARN mode, you can set --executor-cores 1

-Original Message-
From: Sun, Rui [mailto:rui@intel.com] 
Sent: Monday, February 15, 2016 11:35 AM
To: Simon Hafner <reactorm...@gmail.com>; user <user@spark.apache.org>
Subject: RE: Running synchronized JRI code

Yes, JRI loads an R dynamic library into the executor JVM, which faces 
thread-safe issue when there are multiple task threads within the executor.

If you are running Spark on Standalone mode, it is possible to run multiple 
workers per node, and at the same time, limit the cores per worker to be 1. 

You could use RDD.pipe(), but you may need handle binary-text conversion as the 
input/output to/from the R process is string-based.

I am thinking if the demand like yours (calling R code in RDD transformations) 
is much desired, we may consider refactoring RRDD for this purpose, although it 
is currently intended for internal use by SparkR and not a public API. 

-Original Message-
From: Simon Hafner [mailto:reactorm...@gmail.com] 
Sent: Monday, February 15, 2016 5:09 AM
To: user <user@spark.apache.org>
Subject: Running synchronized JRI code

Hello

I'm currently running R code in an executor via JRI. Because R is 
single-threaded, any call to R needs to be wrapped in a `synchronized`. Now I 
can use a bit more than one core per executor, which is undesirable. Is there a 
way to tell spark that this specific application (or even specific UDF) needs 
multiple JVMs? Or should I switch from JRI to a pipe-based (slower) setup?

Cheers,
Simon

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Running synchronized JRI code

2016-02-14 Thread Sun, Rui
Yes, JRI loads an R dynamic library into the executor JVM, which faces 
thread-safe issue when there are multiple task threads within the executor.

If you are running Spark on Standalone mode, it is possible to run multiple 
workers per node, and at the same time, limit the cores per worker to be 1. 

You could use RDD.pipe(), but you may need handle binary-text conversion as the 
input/output to/from the R process is string-based.

I am thinking if the demand like yours (calling R code in RDD transformations) 
is much desired, we may consider refactoring RRDD for this purpose, although it 
is currently intended for internal use by SparkR and not a public API. 

-Original Message-
From: Simon Hafner [mailto:reactorm...@gmail.com] 
Sent: Monday, February 15, 2016 5:09 AM
To: user 
Subject: Running synchronized JRI code

Hello

I'm currently running R code in an executor via JRI. Because R is 
single-threaded, any call to R needs to be wrapped in a `synchronized`. Now I 
can use a bit more than one core per executor, which is undesirable. Is there a 
way to tell spark that this specific application (or even specific UDF) needs 
multiple JVMs? Or should I switch from JRI to a pipe-based (slower) setup?

Cheers,
Simon

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org