Re: How to process one partition at a time?

2016-04-06 Thread Andrei
I'm writing a kind of sampler which in most cases will require only 1
partition, sometimes 2 and very rarely more. So it doesn't make sense to
process all partitions in parallel. What is the easiest way to limit
computations to one partition only?

So far the best idea I came to is to create a custom partition whose
`compute` method looks something like:

def compute(split: Partition, context: TaskContext) = {
if (split.index == targetPartition) {
// do computation
} else {
   // return empty iterator
}
}



But it's quite ugly and I'm unlikely to be the first person with such a
need. Is there easier way to do it?


Re: How to process one partition at a time?

2016-04-07 Thread Andrei
Thanks everyone, both - `submitJob` and `PartitionPrunningRDD` - work for
me.

On Thu, Apr 7, 2016 at 8:22 AM, Hemant Bhanawat 
wrote:

> Apparently, there is another way to do it. You can try creating a
> PartitionPruningRDD and pass a partition filter function to it. This RDD
> will do the same thing that I suggested in my mail and you will not have to
> create a new RDD.
>
> Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
> www.snappydata.io
>
> On Wed, Apr 6, 2016 at 5:35 PM, Sun, Rui  wrote:
>
>> Maybe you can try SparkContext.submitJob:
>>
>> *def **submitJob**[T, U, R](rdd: RDD
>> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html>[T],
>>  processPartition:
>> (Iterator[T]) **⇒** U, partitions: Seq[Int], resultHandler: (Int, U) **⇒** 
>> Unit, resultFunc:
>> **⇒** R): SimpleFutureAction
>> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/SimpleFutureAction.html>[R]*
>>
>>
>>
>>
>>
>> *From:* Hemant Bhanawat [mailto:hemant9...@gmail.com]
>> *Sent:* Wednesday, April 6, 2016 7:16 PM
>> *To:* Andrei 
>> *Cc:* user 
>> *Subject:* Re: How to process one partition at a time?
>>
>>
>>
>> Instead of doing it in compute, you could rather override getPartitions
>> method of your RDD and return only the target partitions. This way tasks
>> for only target partitions will be created. Currently in your case, tasks
>> for all the partitions are getting created.
>>
>> I hope it helps. I would like to hear if you take some other approach.
>>
>>
>> Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
>>
>> www.snappydata.io
>>
>>
>>
>> On Wed, Apr 6, 2016 at 3:49 PM, Andrei  wrote:
>>
>> I'm writing a kind of sampler which in most cases will require only 1
>> partition, sometimes 2 and very rarely more. So it doesn't make sense to
>> process all partitions in parallel. What is the easiest way to limit
>> computations to one partition only?
>>
>>
>>
>> So far the best idea I came to is to create a custom partition whose
>> `compute` method looks something like:
>>
>>
>>
>> def compute(split: Partition, context: TaskContext) = {
>>
>> if (split.index == targetPartition) {
>>
>> // do computation
>>
>> } else {
>>
>>// return empty iterator
>>
>> }
>>
>> }
>>
>>
>>
>>
>>
>> But it's quite ugly and I'm unlikely to be the first person with such a
>> need. Is there easier way to do it?
>>
>>
>>
>>
>>
>>
>>
>
>


How does spark-submit handle Python scripts (and how to repeat it)?

2016-04-11 Thread Andrei
I'm working on a wrapper [1] around Spark for the Julia programming
language [2] similar to PySpark. I've got it working with Spark Standalone
server by creating local JVM and setting master programmatically. However,
this approach doesn't work with YARN (and probably Mesos), which require
running via `spark-submit`.

In `SparkSubmit` class I see that for Python a special class `PythonRunner`
is launched, so I tried to do similar `JuliaRunner`, which essentially does
the following:

val pb = new ProcessBuilder(Seq("julia", juliaScript))
val process = pb.start()
process.waitFor()


where `juliaScript` itself creates new JVM and `SparkContext` inside it
WITHOUT setting master URL. I then tried to launch this class using

spark-submit --master yarn \
  --class o.a.s.a.j.JuliaRunner \
  project.jar my_script.jl

I expected that `spark-submit` would set environment variables or something
that SparkContext would then read and connect to appropriate master. This
didn't happen, however, and process failed while trying to instantiate
`SparkContext`, saying that master is not specified.

So what am I missing? How can use `spark-submit` to run driver in a non-JVM
language?


[1]: https://github.com/dfdx/Sparta.jl
[2]: http://julialang.org/


Re: How does spark-submit handle Python scripts (and how to repeat it)?

2016-04-12 Thread Andrei
>
> One part is passing the command line options, like “--master”, from the
> JVM launched by spark-submit to the JVM where SparkContext resides


Since I have full control over both - JVM and Julia parts - I can pass
whatever options to both. But what exactly should be passed? Currently
pipeline looks like this:

spark-submit JVM -> JuliaRunner -> julia process -> new JVM with
SparkContext


 I want to make the last JVM's SparkContext to understand that it should
run on YARN. Obviously, I can't pass `--master yarn` option to JVM itself.
Instead, I can pass system property "spark.master" = "yarn-client", but
this results in an error:

Retrying connect to server: 0.0.0.0/0.0.0.0:8032



So it's definitely not enough. I tried to set manually all system
properties that `spark-submit` adds to the JVM (including
"spark-submit=true", "spark.submit.deployMode=client", etc.), but it didn't
help too. Source code is always good, but for a stranger like me it's a
little bit hard to grasp control flow in SparkSubmit class.


For pySpark & SparkR, when running scripts in client deployment modes
> (standalone client and yarn client), the JVM is the same (py4j/RBackend
> running as a thread in the JVM launched by spark-submit)


Can you elaborate on this? Does it mean that `spark-submit` creates new
Python/R process that connects back to that same JVM and creates
SparkContext in it?


On Tue, Apr 12, 2016 at 2:04 PM, Sun, Rui  wrote:

> There is much deployment preparation work handling different deployment
> modes for pyspark and SparkR in SparkSubmit. It is difficult to summarize
> it briefly, you had better refer to the source code.
>
>
>
> Supporting running Julia scripts in SparkSubmit is more than implementing
> a ‘JuliaRunner’. One part is passing the command line options, like
> “--master”, from the JVM launched by spark-submit to the JVM where
> SparkContext resides, in the case that the two JVMs are not the same. For
> pySpark & SparkR, when running scripts in client deployment modes
> (standalone client and yarn client), the JVM is the same (py4j/RBackend
> running as a thread in the JVM launched by spark-submit) , so no need to
> pass the command line options around. However, in your case, Julia
> interpreter launches an in-process JVM for SparkContext, which is a
> separate JVM from the one launched by spark-submit. So you need a way,
> typically an environment environment variable, like “SPARKR_SUBMIT_ARGS”
> for SparkR or “PYSPARK_SUBMIT_ARGS” for pyspark, to pass command line args
> to the in-process JVM in the Julia interpreter so that SparkConf can pick
> the options.
>
>
>
> *From:* Andrei [mailto:faithlessfri...@gmail.com]
> *Sent:* Tuesday, April 12, 2016 3:48 AM
> *To:* user 
> *Subject:* How does spark-submit handle Python scripts (and how to repeat
> it)?
>
>
>
> I'm working on a wrapper [1] around Spark for the Julia programming
> language [2] similar to PySpark. I've got it working with Spark Standalone
> server by creating local JVM and setting master programmatically. However,
> this approach doesn't work with YARN (and probably Mesos), which require
> running via `spark-submit`.
>
>
>
> In `SparkSubmit` class I see that for Python a special class
> `PythonRunner` is launched, so I tried to do similar `JuliaRunner`, which
> essentially does the following:
>
>
>
> val pb = new ProcessBuilder(Seq("julia", juliaScript))
>
> val process = pb.start()
>
> process.waitFor()
>
>
>
> where `juliaScript` itself creates new JVM and `SparkContext` inside it
> WITHOUT setting master URL. I then tried to launch this class using
>
>
>
> spark-submit --master yarn \
>
>   --class o.a.s.a.j.JuliaRunner \
>
>   project.jar my_script.jl
>
>
>
> I expected that `spark-submit` would set environment variables or
> something that SparkContext would then read and connect to appropriate
> master. This didn't happen, however, and process failed while trying to
> instantiate `SparkContext`, saying that master is not specified.
>
>
>
> So what am I missing? How can use `spark-submit` to run driver in a
> non-JVM language?
>
>
>
>
>
> [1]: https://github.com/dfdx/Sparta.jl
>
> [2]: http://julialang.org/
>


Re: How does spark-submit handle Python scripts (and how to repeat it)?

2016-04-13 Thread Andrei
>
> Julia can pick the env var, and set the system properties or directly fill
> the configurations into a SparkConf, and then create a SparkContext


That's the point - just setting master to "yarn-client" doesn't work, even
in Java/Scala. E.g. following code in *Scala*:


val conf = new SparkConf().setAppName("My App").setMaster("yarn-client")
val sc = new SparkContext(conf)
sc.parallelize(1 to 10).collect()
sc.stop()


results in an error:

Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032


I think for now we can even put Julia aside and concentrate the following
question: how does submitting application via `spark-submit` with
"yarn-client" mode differ from setting the same mode directly in
`SparkConf`?



On Wed, Apr 13, 2016 at 5:06 AM, Sun, Rui  wrote:

> Spark configurations specified at the command line for spark-submit should
> be passed to the JVM inside Julia process. You can refer to
> https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L267
> and
> https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L295
>
> Generally,
>
> spark-submit JVM -> JuliaRunner -> Env var like
> “JULIA_SUBMIT_ARGS” -> julia process -> new JVM with SparkContext
>
>   Julia can pick the env var, and set the system properties or directly
> fill the configurations into a SparkConf, and then create a SparkContext
>
>
>
> Yes, you are right, `spark-submit` creates new Python/R process that
> connects back to that same JVM and creates SparkContext in it.
>
> Refer to
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala#L47
> and
>
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/RRunner.scala#L65
>
>
>
>
>
> *From:* Andrei [mailto:faithlessfri...@gmail.com]
> *Sent:* Wednesday, April 13, 2016 4:32 AM
> *To:* Sun, Rui 
> *Cc:* user 
> *Subject:* Re: How does spark-submit handle Python scripts (and how to
> repeat it)?
>
>
>
> One part is passing the command line options, like “--master”, from the
> JVM launched by spark-submit to the JVM where SparkContext resides
>
>
>
> Since I have full control over both - JVM and Julia parts - I can pass
> whatever options to both. But what exactly should be passed? Currently
> pipeline looks like this:
>
>
>
> spark-submit JVM -> JuliaRunner -> julia process -> new JVM with
> SparkContext
>
>
>
>  I want to make the last JVM's SparkContext to understand that it should
> run on YARN. Obviously, I can't pass `--master yarn` option to JVM itself.
> Instead, I can pass system property "spark.master" = "yarn-client", but
> this results in an error:
>
>
>
> Retrying connect to server: 0.0.0.0/0.0.0.0:8032
>
>
>
>
>
> So it's definitely not enough. I tried to set manually all system
> properties that `spark-submit` adds to the JVM (including
> "spark-submit=true", "spark.submit.deployMode=client", etc.), but it didn't
> help too. Source code is always good, but for a stranger like me it's a
> little bit hard to grasp control flow in SparkSubmit class.
>
>
>
>
>
> For pySpark & SparkR, when running scripts in client deployment modes
> (standalone client and yarn client), the JVM is the same (py4j/RBackend
> running as a thread in the JVM launched by spark-submit)
>
>
>
> Can you elaborate on this? Does it mean that `spark-submit` creates new
> Python/R process that connects back to that same JVM and creates
> SparkContext in it?
>
>
>
>
>
> On Tue, Apr 12, 2016 at 2:04 PM, Sun, Rui  wrote:
>
> There is much deployment preparation work handling different deployment
> modes for pyspark and SparkR in SparkSubmit. It is difficult to summarize
> it briefly, you had better refer to the source code.
>
>
>
> Supporting running Julia scripts in SparkSubmit is more than implementing
> a ‘JuliaRunner’. One part is passing the command line options, like
> “--master”, from the JVM launched by spark-submit to the JVM where
> SparkContext resides, in the case that the two JVMs are not the same. For
> pySpark & SparkR, when running scripts in client deployment modes
> (standalone client and yarn client), the JVM is the same (py4j/RBackend
> running as a thread in the JVM launched by spark-submit) , so no need to
> pass the command line options around. However, in your case, Julia
> interpreter launches an in-process JVM for SparkContext, which is a
> separate JVM from th

Re: How does spark-submit handle Python scripts (and how to repeat it)?

2016-04-14 Thread Andrei
Yes, I tried setting YARN_CONF_DIR, but with no luck. I will play around
with environment variables and system properties and post back in case of
success. Thanks for your help so far!

On Thu, Apr 14, 2016 at 5:48 AM, Sun, Rui  wrote:

> In SparkSubmit, there is less work for yarn-client than that for
> yarn-cluster. Basically prepare some spark configurations into system prop
> , for example, information on additional resources required by the
> application that need to be distributed to the cluster. These
> configurations will be used in SparkContext initialization later.
>
>
>
> So generally for yarn-client, maybe you can skip spark-submit and directly
> launching the spark application with some configurations setup before new
> SparkContext.
>
>
>
> Not sure about your error, have you setup YARN_CONF_DIR?
>
>
>
> *From:* Andrei [mailto:faithlessfri...@gmail.com]
> *Sent:* Thursday, April 14, 2016 5:45 AM
>
> *To:* Sun, Rui 
> *Cc:* user 
> *Subject:* Re: How does spark-submit handle Python scripts (and how to
> repeat it)?
>
>
>
> Julia can pick the env var, and set the system properties or directly fill
> the configurations into a SparkConf, and then create a SparkContext
>
>
>
> That's the point - just setting master to "yarn-client" doesn't work, even
> in Java/Scala. E.g. following code in *Scala*:
>
>
> val conf = new SparkConf().setAppName("My App").setMaster("yarn-client")
> val sc = new SparkContext(conf)
> sc.parallelize(1 to 10).collect()
> sc.stop()
>
>
>
> results in an error:
>
>
>
> Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032
>
>
>
> I think for now we can even put Julia aside and concentrate the following
> question: how does submitting application via `spark-submit` with
> "yarn-client" mode differ from setting the same mode directly in
> `SparkConf`?
>
>
>
>
>
>
>
> On Wed, Apr 13, 2016 at 5:06 AM, Sun, Rui  wrote:
>
> Spark configurations specified at the command line for spark-submit should
> be passed to the JVM inside Julia process. You can refer to
> https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L267
> and
> https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L295
>
> Generally,
>
> spark-submit JVM -> JuliaRunner -> Env var like
> “JULIA_SUBMIT_ARGS” -> julia process -> new JVM with SparkContext
>
>   Julia can pick the env var, and set the system properties or directly
> fill the configurations into a SparkConf, and then create a SparkContext
>
>
>
> Yes, you are right, `spark-submit` creates new Python/R process that
> connects back to that same JVM and creates SparkContext in it.
>
> Refer to
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala#L47
> and
>
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/RRunner.scala#L65
>
>
>
>
>
> *From:* Andrei [mailto:faithlessfri...@gmail.com]
> *Sent:* Wednesday, April 13, 2016 4:32 AM
> *To:* Sun, Rui 
> *Cc:* user 
> *Subject:* Re: How does spark-submit handle Python scripts (and how to
> repeat it)?
>
>
>
> One part is passing the command line options, like “--master”, from the
> JVM launched by spark-submit to the JVM where SparkContext resides
>
>
>
> Since I have full control over both - JVM and Julia parts - I can pass
> whatever options to both. But what exactly should be passed? Currently
> pipeline looks like this:
>
>
>
> spark-submit JVM -> JuliaRunner -> julia process -> new JVM with
> SparkContext
>
>
>
>  I want to make the last JVM's SparkContext to understand that it should
> run on YARN. Obviously, I can't pass `--master yarn` option to JVM itself.
> Instead, I can pass system property "spark.master" = "yarn-client", but
> this results in an error:
>
>
>
> Retrying connect to server: 0.0.0.0/0.0.0.0:8032
>
>
>
>
>
> So it's definitely not enough. I tried to set manually all system
> properties that `spark-submit` adds to the JVM (including
> "spark-submit=true", "spark.submit.deployMode=client", etc.), but it didn't
> help too. Source code is always good, but for a stranger like me it's a
> little bit hard to grasp control flow in SparkSubmit class.
>
>
>
>
>
> For pySpark & SparkR, when running scripts in client deployment modes
> (standalone client and yarn client), the JVM is t

Re: DeepLearning and Spark ?

2015-01-09 Thread Andrei
Does it makes sense to use Spark's actor system (e.g. via
SparkContext.env.actorSystem) to create parameter server?

On Fri, Jan 9, 2015 at 10:09 PM, Peng Cheng  wrote:

> You are not the first :) probably not the fifth to have the question.
> parameter server is not included in spark framework and I've seen all
> kinds of hacking to improvise it: REST api, HDFS, tachyon, etc.
> Not sure if an 'official' benchmark & implementation will be released soon
>
> On 9 January 2015 at 10:59, Marco Shaw  wrote:
>
>> Pretty vague on details:
>>
>>
>> http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199
>>
>>
>> On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa  wrote:
>>
>> Hi all,
>>
>> DeepLearning algorithms are popular and achieve many state of the art
>> performance in several real world machine learning problems. Currently
>> there are no DL implementation in spark and I wonder if there is an ongoing
>> work on this topics.
>>
>> We can do DL in spark Sparkling water and H2O but this adds an additional
>> software stack.
>>
>> Deeplearning4j seems to implements a distributed version of many popural
>> DL algorithm. Porting DL4j in Spark can be interesting.
>>
>> Google describes an implementation of a large scale DL in this paper
>> http://research.google.com/archive/large_deep_networks_nips2012.html.
>> Based on model parallelism and data parallelism.
>>
>> So, I'm trying to imaging what should be a good design for DL algorithm
>> in Spark ? Spark already have RDD (for data parallelism). Can GraphX be
>> used for the model parallelism (as DNN are generally designed as DAG) ? And
>> what about using GPUs to do local parallelism (mecanism to push partition
>> into GPU memory ) ?
>>
>>
>> What do you think about this ?
>>
>>
>> Cheers,
>>
>> Jao
>>
>>
>


Re: NLP with Spark

2014-03-12 Thread Andrei
In my experience, choice of tools for NLP mostly depends on concrete tasks.
For example, for named entity recognition (NER) there's nice  Java library
called GATE [1]. It allows you to annotate your text with special marks
(e.g. part of speech tags, "time", "name", etc.) and write regex-like rules
to capture even very complicated patterns. On other hand, Stanford NLP
Parser [2] gives unique possibility to extract sentense structure, feature,
that is not available in any other library known to me. And in Python world
there's NLTK, NumPy, SciKit Learn, easy integration with TreeTagger [3] and
super cool ecosystem for statistical text analysis. Each of these tools and
their combination has its pros and cons, so final choice really depends on
your specific needs and personal preferences.

As for Spark (and distributed computations in general), most of the NLP
tasks may be performed locally on workers (e.g. you don't need 1Tb dataset
to find out part of speech tags for particular sentense - you need only
this specific sentence and maybe some little context). Some tasks, however,
do require entire dataset at once. Most popular of them, such as KMeans
clustering or collaborative filtering, are already implemented in MLlib.
But it's always worth to check for specific algos you may need before
taking a final decision.

Let me know if you need advice on specific NLP or ML tasks.

[1]: https://gate.ac.uk/
[2]: http://nlp.stanford.edu/software/lex-parser.shtml
[3]: http://www.cis.uni-muenchen.de/~schmid/tools/TreeTagger/

Best Regards,
Andrei



On Wed, Mar 12, 2014 at 10:12 PM, Brian O'Neill wrote:

>
> Please let us know how you make out.  We have NLP  requirements on the
> horizon.  I've used NLTK before, but never on Spark.  I'd love to hear if
> that works out for you.
>
> -brian
>
> ---
>
> Brian O'Neill
>
> Chief Technology Officer
>
>
> *Health Market Science*
>
> *The Science of Better Results*
>
> 2700 Horizon Drive * King of Prussia, PA * 19406
>
> M: 215.588.6024 * @boneill42 <http://www.twitter.com/boneill42>  *
>
> healthmarketscience.com
>
>
> This information transmitted in this email message is for the intended
> recipient only and may contain confidential and/or privileged material. If
> you received this email in error and are not the intended recipient, or the
> person responsible to deliver it to the intended recipient, please contact
> the sender at the email above and delete this email and any attachments and
> destroy any copies thereof. Any review, retransmission, dissemination,
> copying or other use of, or taking any action in reliance upon, this
> information by persons or entities other than the intended recipient is
> strictly prohibited.
>
>
>
>
> From: Mayur Rustagi 
> Reply-To: 
> Date: Wednesday, March 12, 2014 at 2:38 PM
> To: 
> Cc: "u...@spark.incubator.apache.org" 
> Subject: Re: NLP with Spark
>
> Would love to know if somebody has tried this, only possible problem I can
> forsee is non-serializable libraries, else no reason it should not work.
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Wed, Mar 12, 2014 at 11:10 AM, shankark  wrote:
>
>> (apologies if this was sent out multiple times before)
>>
>> We are about to start a large-scale text-processing research project and
>> are debating between two alternatives for our cluster -- Spark and Hadoop.
>> I've researched possibilities of using NLTK with Hadoop and see that
>> there's some precedent (
>> http://blog.cloudera.com/blog/2010/03/natural-language-processing-with-hadoop-and-python/).
>> I wanted to know how easy it might be to use NLTK with pyspark, or if
>> scalanlp is mature enough to be used with the Scala API for Spark/mllib.
>>
>> Thanks!
>>
>> --
>> View this message in context: NLP with 
>> Spark<http://apache-spark-user-list.1001560.n3.nabble.com/NLP-with-Spark-tp2612.html>
>> Sent from the Apache Spark User List mailing list 
>> archive<http://apache-spark-user-list.1001560.n3.nabble.com/>at Nabble.com.
>>
>
>


Proper way to create standalone app with custom Spark version

2014-05-14 Thread Andrei
We can create standalone Spark application by simply adding
"spark-core_2.x" to build.sbt/pom.xml and connecting it to Spark master.

We can also compile custom version of Spark (e.g. compiled against Hadoop
2.x) from source and deploy it to cluster manually.

But what is a proper way to use _custom version_ of Spark in _standalone
application_? We can't simply include custom version into
build.sbt/pom.xml, since it's not in central repository.



I'm currently trying to deploy custom version to local Maven repository and
add it to SBT project. Another option is to add Spark as local jar to every
project. But both of these ways look overcomplicated and in general wrong.

What is an implied way to solve this issue?

Thanks,
Andrei


Proper way to create standalone app with custom Spark version

2014-05-16 Thread Andrei
(Sorry if you have already seen this message - it seems like there were
some issues delivering messages to the list yesterday)

We can create standalone Spark application by simply adding
"spark-core_2.x" to build.sbt/pom.xml and connecting it to Spark master.

We can also build custom version of Spark (e.g. compiled against Hadoop
2.x) from source and deploy it to cluster manually.

But what is a proper way to use _custom version_ of Spark in _standalone
application_?


I'm currently trying to deploy custom version to local Maven repository and
add it to SBT project. Another option is to add Spark as local jar to every
project. But both of these ways look overcomplicated and in general wrong.

So what is the implied way to do it?

Thanks,
Andrei


Re: Computing cosine similiarity using pyspark

2014-05-23 Thread Andrei
Do you need cosine distance and correlation between vectors or between
variables (elements of vector)? It would be helpful if you could tell us
details of your task.


On Thu, May 22, 2014 at 5:49 PM, jamal sasha  wrote:

> Hi,
>   I have bunch of vectors like
> [0.1234,-0.231,0.23131]
>  and so on.
>
> and  I want to compute cosine similarity and pearson correlation using
> pyspark..
> How do I do this?
> Any ideas?
> Thanks
>


Is uberjar a recommended way of running Spark/Scala applications?

2014-05-29 Thread Andrei
I'm using Spark 1.0 and sbt assembly plugin to create uberjar of my
application. However, when I run assembly command, I get a number of errors
like this:

java.lang.RuntimeException: deduplicate: different file contents found in
the following:
/home/username/.ivy2/cache/com.esotericsoftware.kryo/kryo/bundles/kryo-2.21.jar:com/esotericsoftware/minlog/Log$Logger.class
/home/username/.ivy2/cache/com.esotericsoftware.minlog/minlog/jars/minlog-1.2.jar:com/esotericsoftware/minlog/Log$Logger.class
...

As far as I can see, Spark Core depends on both - Minlog and Kryo, and the
latter includes Minlog classes itself. Classes are binary different, so
assembly can't combine them. And there's a number of such conflicts - I
fixed some of them manually via mergeStrategy, but list of exceptions
becomes larger and larger. I can continues, but it just does't look like
the right way.

My questions are:

1. Is an uberjar a recommended way of running Spark applications?
2. If so, should I include Spark itself into this large jar?
3. If not, what is a recommended way to do both - development and
deployment (assuming ordinary sbt project).

Thanks,
Andrei


Re: Is uberjar a recommended way of running Spark/Scala applications?

2014-05-29 Thread Andrei
Thanks, Jordi, your gist looks pretty much like what I have in my project
currently (with few exceptions that I'm going to borrow).

I like the idea of using "sbt package", since it doesn't require third
party plugins and, most important, doesn't create a mess of classes and
resources. But in this case I'll have to handle jar list manually via Spark
context. Is there a way to automate this process? E.g. when I was a Clojure
guy, I could run "lein deps" (lein is a build tool similar to sbt) to
download all dependencies and then just enumerate them from my app. Maybe
you have heard of something like that for Spark/SBT?

Thanks,
Andrei


On Thu, May 29, 2014 at 3:48 PM, jaranda  wrote:

> Hi Andrei,
>
> I think the preferred way to deploy Spark jobs is by using the sbt package
> task instead of using the sbt assembly plugin. In any case, as you comment,
> the mergeStrategy in combination with some dependency exlusions should fix
> your problems. Have a look at  this gist
> <https://gist.github.com/JordiAranda/bdbad58d128c14277a05>   for further
> details (I just followed some recommendations commented in the sbt assembly
> plugin documentation).
>
> Up to now I haven't found a proper way to combine my development/deployment
> phases, although I must say my experience in Spark is pretty poor (it
> really
> depends in your deployment requirements as well). In this case, I think
> someone else could give you some further insights.
>
> Best,
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-uberjar-a-recommended-way-of-running-Spark-Scala-applications-tp6518p6520.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Is uberjar a recommended way of running Spark/Scala applications?

2014-05-30 Thread Andrei
Thanks, Stephen. I have eventually decided to go with assembly, but put
away Spark and Hadoop jars, and instead use `spark-submit` to automatically
provide these dependencies. This way no resource conflicts arise and
mergeStrategy needs no modification. To memorize this stable setup and also
share it with the community I've crafted a project [1] with minimal working
config. It is SBT project with assembly plugin, Spark 1.0 and Cloudera's
Hadoop client. Hope, it will help somebody to take Spark setup quicker.

Though I'm fine with this setup for final builds, I'm still looking for a
more interactive dev setup - something that doesn't require full rebuild.

[1]: https://github.com/faithlessfriend/sample-spark-project

Thanks and have a good weekend,
Andrei

On Thu, May 29, 2014 at 8:27 PM, Stephen Boesch  wrote:

>
> The MergeStrategy combined with sbt assembly did work for me.  This is not
> painless: some trial and error and the assembly may take multiple minutes.
>
> You will likely want to filter out some additional classes from the
> generated jar file.  Here is an SOF answer to explain that and with IMHO
> the best answer snippet included here (in this case the OP understandably
> did not want to not include javax.servlet.Servlet)
>
> http://stackoverflow.com/questions/7819066/sbt-exclude-class-from-jar
>
>
> mappings in (Compile,packageBin) ~= { (ms: Seq[(File, String)]) => ms
> filter { case (file, toPath) => toPath != "javax/servlet/Servlet.class" }
> }
>
> There is a setting to not include the project files in the assembly but I
> do not recall it at this moment.
>
>
>
> 2014-05-29 10:13 GMT-07:00 Andrei :
>
> Thanks, Jordi, your gist looks pretty much like what I have in my project
>> currently (with few exceptions that I'm going to borrow).
>>
>> I like the idea of using "sbt package", since it doesn't require third
>> party plugins and, most important, doesn't create a mess of classes and
>> resources. But in this case I'll have to handle jar list manually via Spark
>> context. Is there a way to automate this process? E.g. when I was a Clojure
>> guy, I could run "lein deps" (lein is a build tool similar to sbt) to
>> download all dependencies and then just enumerate them from my app. Maybe
>> you have heard of something like that for Spark/SBT?
>>
>> Thanks,
>> Andrei
>>
>>
>> On Thu, May 29, 2014 at 3:48 PM, jaranda  wrote:
>>
>>> Hi Andrei,
>>>
>>> I think the preferred way to deploy Spark jobs is by using the sbt
>>> package
>>> task instead of using the sbt assembly plugin. In any case, as you
>>> comment,
>>> the mergeStrategy in combination with some dependency exlusions should
>>> fix
>>> your problems. Have a look at  this gist
>>> <https://gist.github.com/JordiAranda/bdbad58d128c14277a05>   for further
>>> details (I just followed some recommendations commented in the sbt
>>> assembly
>>> plugin documentation).
>>>
>>> Up to now I haven't found a proper way to combine my
>>> development/deployment
>>> phases, although I must say my experience in Spark is pretty poor (it
>>> really
>>> depends in your deployment requirements as well). In this case, I think
>>> someone else could give you some further insights.
>>>
>>> Best,
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-uberjar-a-recommended-way-of-running-Spark-Scala-applications-tp6518p6520.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>
>>
>


Re: Is uberjar a recommended way of running Spark/Scala applications?

2014-06-02 Thread Andrei
Thanks! This is even closer to what I am looking for. I'm in a trip now, so
I'm going to give it a try when I come back.


On Mon, Jun 2, 2014 at 5:12 AM, Ngoc Dao  wrote:

> Alternative solution:
> https://github.com/xitrum-framework/xitrum-package
>
> It collects all dependency .jar files in your Scala program into a
> directory. It doesn't merge the .jar files together, the .jar files
> are left "as is".
>
>
> On Sat, May 31, 2014 at 3:42 AM, Andrei  wrote:
> > Thanks, Stephen. I have eventually decided to go with assembly, but put
> away
> > Spark and Hadoop jars, and instead use `spark-submit` to automatically
> > provide these dependencies. This way no resource conflicts arise and
> > mergeStrategy needs no modification. To memorize this stable setup and
> also
> > share it with the community I've crafted a project [1] with minimal
> working
> > config. It is SBT project with assembly plugin, Spark 1.0 and Cloudera's
> > Hadoop client. Hope, it will help somebody to take Spark setup quicker.
> >
> > Though I'm fine with this setup for final builds, I'm still looking for a
> > more interactive dev setup - something that doesn't require full rebuild.
> >
> > [1]: https://github.com/faithlessfriend/sample-spark-project
> >
> > Thanks and have a good weekend,
> > Andrei
> >
> > On Thu, May 29, 2014 at 8:27 PM, Stephen Boesch 
> wrote:
> >>
> >>
> >> The MergeStrategy combined with sbt assembly did work for me.  This is
> not
> >> painless: some trial and error and the assembly may take multiple
> minutes.
> >>
> >> You will likely want to filter out some additional classes from the
> >> generated jar file.  Here is an SOF answer to explain that and with
> IMHO the
> >> best answer snippet included here (in this case the OP understandably
> did
> >> not want to not include javax.servlet.Servlet)
> >>
> >> http://stackoverflow.com/questions/7819066/sbt-exclude-class-from-jar
> >>
> >>
> >> mappings in (Compile,packageBin) ~= { (ms: Seq[(File, String)]) => ms
> >> filter { case (file, toPath) => toPath != "javax/servlet/Servlet.class"
> } }
> >>
> >> There is a setting to not include the project files in the assembly but
> I
> >> do not recall it at this moment.
> >>
> >>
> >>
> >> 2014-05-29 10:13 GMT-07:00 Andrei :
> >>
> >>> Thanks, Jordi, your gist looks pretty much like what I have in my
> project
> >>> currently (with few exceptions that I'm going to borrow).
> >>>
> >>> I like the idea of using "sbt package", since it doesn't require third
> >>> party plugins and, most important, doesn't create a mess of classes and
> >>> resources. But in this case I'll have to handle jar list manually via
> Spark
> >>> context. Is there a way to automate this process? E.g. when I was a
> Clojure
> >>> guy, I could run "lein deps" (lein is a build tool similar to sbt) to
> >>> download all dependencies and then just enumerate them from my app.
> Maybe
> >>> you have heard of something like that for Spark/SBT?
> >>>
> >>> Thanks,
> >>> Andrei
> >>>
> >>>
> >>> On Thu, May 29, 2014 at 3:48 PM, jaranda  wrote:
> >>>>
> >>>> Hi Andrei,
> >>>>
> >>>> I think the preferred way to deploy Spark jobs is by using the sbt
> >>>> package
> >>>> task instead of using the sbt assembly plugin. In any case, as you
> >>>> comment,
> >>>> the mergeStrategy in combination with some dependency exlusions should
> >>>> fix
> >>>> your problems. Have a look at  this gist
> >>>> <https://gist.github.com/JordiAranda/bdbad58d128c14277a05>   for
> further
> >>>> details (I just followed some recommendations commented in the sbt
> >>>> assembly
> >>>> plugin documentation).
> >>>>
> >>>> Up to now I haven't found a proper way to combine my
> >>>> development/deployment
> >>>> phases, although I must say my experience in Spark is pretty poor (it
> >>>> really
> >>>> depends in your deployment requirements as well). In this case, I
> think
> >>>> someone else could give you some further insights.
> >>>>
> >>>> Best,
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> View this message in context:
> >>>>
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-uberjar-a-recommended-way-of-running-Spark-Scala-applications-tp6518p6520.html
> >>>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>
> >>>
> >>
> >
>


Re: Loading Python libraries into Spark

2014-06-05 Thread Andrei
For third party libraries the simplest way is to use Puppet [1] or Chef [2]
or any similar automation tool to install packages (either from PIP [2] or
from distribution's repository). It's easy because if you manage your
cluster's software you are most probably already using one of these
automation tools, and thus need only to write one more recipe to keep your
Python packages healthy.

For your own libraries it may be inconvenient to publish them to PIP just
to deploy to your server. So you can also "attach" your Python lib to
SparkContext via "pyFiles" option or "addPyFile" method. For example, if
you want to attach single Python file, you can do the following:

conf = SparkConf(...) ...
sc = SparkContext(...)
sc.addPyFile("/path/to/yourmodule.py")

And for entire packages (in Python package is any directory with
"__init__.py" and maybe several more "*.py" files) you can use a trick and
pack them into zip archive. I used following code more my library:

import dictconfig
import zipfile

def ziplib():
libpath = os.path.dirname(__file__)  # this should
point to your packages directory
zippath = '/tmp/mylib-' + rand_str(6) + '.zip'  # some random
filename in writable directory
zf = zipfile.PyZipFile(zippath, mode='w')
try:
zf.debug = 3  #
making it verbose, good for debugging
zf.writepy(libpath)
return zippath #
return path to generated zip archive
finally:
zf.close()

...
zip_path = ziplib()   #
generate zip archive containing your lib
sc.addPyFile(zip_path)   # add the
entire archive to SparkContext
...
os.remove(zip_path)   # don't
forget to remove temporary file, preferably in "finally" clause


Python has one nice feature: it can import code not only from modules
(simple "*.py" files) and packages, but also from a variety of other
formats including zip archives. So when you write in your distributed code
something like "import mylib", Python finds "mylib.zip" attached to
SparkContext and imports required modules.

HTH,
Andrei







[1]: http://puppetlabs.com/
[2]: http://www.getchef.com/chef/
[3]: https://pypi.python.org/pypi/pip ; if you program in Python and still
don't use PIP, you should definitely give it a try


On Thu, Jun 5, 2014 at 5:29 PM, mrm  wrote:

> Hi,
>
> I am new to Spark (and almost-new in python!). How can I download and
> install a Python library in my cluster so I can just import it later?
>
> Any help would be much appreciated.
>
> Thanks!
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Loading-Python-libraries-into-Spark-tp7059.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Loading Python libraries into Spark

2014-06-05 Thread Andrei
In my answer I assumed you run your program with "pyspark" command (e.g.
"pyspark mymainscript.py", pyspark should be on your path). In this case
workflow is as follows:

1. You create SparkConf object that simply contains your app's options.
2. You create SparkContext, which initializes your application. At this
point application connects to master and asks for resources.
3. You modify SparkContext object to include everything you want to make
available for mappers on other hosts, e.g. other "*.py" files.
4. You create RDD (e.g. with "sc.textFile") and run actual commands (e.g.
"map", "filter", etc.). SparkContext knows about your additional files, so
these commands are aware of your library code.

So, yes, in these settings you need to create "sc" (SparkContext object)
beforehand and make "*.py" files available on application's host.

With pyspark shell you already do have "sc" object initialized for you (try
running "pyspark" and typing "sc" + Enter - shell will print spark context
details). You can also use spark-submit [1], which will initialize
SparkContext from command line options. But essentially idea is always the
same: there's driver application running on one host that creates
SparkContext, collects dependencies, controls program flow, etc., and there
are workers - applications on slave hosts, that use created SparkContext
and all serialized data to perform driver's commands. Driver should know
about everything and let workers know about what they need to know (e.g.
your library code).


[1]: http://spark.apache.org/docs/latest/submitting-applications.html





On Thu, Jun 5, 2014 at 8:10 PM, mrm  wrote:

> Hi Andrei,
>
> Thank you for your help! Just to make sure I understand, when I run this
> command sc.addPyFile("/path/to/yourmodule.py"), I need to be already logged
> into the master node and have my python files somewhere, is that correct?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Loading-Python-libraries-into-Spark-tp7059p7073.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: How do you run your spark app?

2014-06-20 Thread Andrei
Hi Shivani,

Adding JARs to classpath (e.g. via "-cp" option) is needed to run your
_local_ Java application, whatever it is. To deliver them to _other
machines_ for execution you need to add them to SparkContext. And you can
do it in 2 different ways:

1. Add them right from your code (your suggested
"sparkContext.setJars(...)").
2. Use "spark-submit" and pass JARs from command line.

Note, that both options are easier to do if you assemble your code and all
its dependencies into a single "fat" JAR instead of manually listing all
needed libraries.




On Sat, Jun 21, 2014 at 1:47 AM, Shivani Rao  wrote:

> Hello Shrikar,
>
> Thanks for your email. I have been using the same workflow as you did. But
> my questions was related to creation of the sparkContext. My question was
>
> If I am specifying jars in the "java -cp ", and adding to them
> to my build.sbt, do I need to additionally add them in my code while
> creating the sparkContext (sparkContext.setJars(" "))??
>
>
> Thanks,
> Shivani
>
>
> On Fri, Jun 20, 2014 at 11:03 AM, Shrikar archak 
> wrote:
>
>> Hi Shivani,
>>
>> I use sbt assembly to create a fat jar .
>> https://github.com/sbt/sbt-assembly
>>
>> Example of the sbt file is below.
>>
>> import AssemblyKeys._ // put this at the top of the file
>>
>> assemblySettings
>>
>> mainClass in assembly := Some("FifaSparkStreaming")
>>
>>  name := "FifaSparkStreaming"
>>
>> version := "1.0"
>>
>> scalaVersion := "2.10.4"
>>
>> libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.0.0"
>> % "provided",
>> "org.apache.spark" %% "spark-streaming" %
>> "1.0.0" % "provided",
>> ("org.apache.spark" %%
>> "spark-streaming-twitter" %
>> "1.0.0").exclude("org.eclipse.jetty.orbit","javax.transaction")
>>
>>  .exclude("org.eclipse.jetty.orbit","javax.servlet")
>>
>>  .exclude("org.eclipse.jetty.orbit","javax.mail.glassfish")
>>
>>  .exclude("org.eclipse.jetty.orbit","javax.activation")
>>
>>  .exclude("com.esotericsoftware.minlog", "minlog"),
>> ("net.debasishg" % "redisclient_2.10" %
>> "2.12").exclude("com.typesafe.akka","akka-actor_2.10"))
>>
>> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>>   {
>> case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
>> case PathList("org", "apache", xs @ _*) => MergeStrategy.first
>> case PathList("org", "apache", xs @ _*) => MergeStrategy.first
>> case "application.conf" => MergeStrategy.concat
>> case "unwanted.txt" => MergeStrategy.discard
>> case x => old(x)
>>   }
>> }
>>
>>
>> resolvers += "Akka Repository" at "http://repo.akka.io/releases/";
>>
>>
>> And I run as mentioned below.
>>
>> LOCALLY :
>> 1)  sbt 'run AP1z4IYraYm5fqWhITWArY53x
>> Cyyz3Zr67tVK46G8dus5tSbc83KQOdtMDgYoQ5WLQwH0mTWzB6
>> 115254720-OfJ4yFsUU6C6vBkEOMDlBlkIgslPleFjPwNcxHjN
>> Qd76y2izncM7fGGYqU1VXYTxg1eseNuzcdZKm2QJyK8d1 fifa fifa2014'
>>
>> If you want to submit on the cluster
>>
>> CLUSTER:
>> 2) spark-submit --class FifaSparkStreaming --master
>> "spark://server-8-144:7077" --driver-memory 2048 --deploy-mode cluster
>> FifaSparkStreaming-assembly-1.0.jar AP1z4IYraYm5fqWhITWArY53x
>> Cyyz3Zr67tVK46G8dus5tSbc83KQOdtMDgYoQ5WLQwH0mTWzB6
>> 115254720-OfJ4yFsUU6C6vBkEOMDlBlkIgslPleFjPwNcxHjN
>> Qd76y2izncM7fGGYqU1VXYTxg1eseNuzcdZKm2QJyK8d1 fifa fifa2014
>>
>>
>> Hope this helps.
>>
>> Thanks,
>> Shrikar
>>
>>
>> On Fri, Jun 20, 2014 at 9:16 AM, Shivani Rao 
>> wrote:
>>
>>> Hello Michael,
>>>
>>> I have a quick question for you. Can you clarify the statement " build
>>> fat JAR's and build dist-style TAR.GZ packages with launch scripts, JAR's
>>> and everything needed to run a Job".  Can you give an example.
>>>
>>> I am using sbt assembly as well to create a fat jar, and supplying the
>>> spark and hadoop locations in the class path. Inside the main() function
>>> where spark context is created, I use SparkContext.jarOfClass(this).toList
>>> add the fat jar to my spark context. However, I seem to be running into
>>> issues with this approach. I was wondering if you had any inputs Michael.
>>>
>>> Thanks,
>>> Shivani
>>>
>>>
>>> On Thu, Jun 19, 2014 at 10:57 PM, Sonal Goyal 
>>> wrote:
>>>
 We use maven for building our code and then invoke spark-submit through
 the exec plugin, passing in our parameters. Works well for us.

 Best Regards,
 Sonal
 Nube Technologies 

 




 On Fri, Jun 20, 2014 at 3:26 AM, Michael Cutler 
 wrote:

> P.S. Last but not least we use sbt-assembly to build fat JAR's and
> build dist-style TAR.GZ packages with launch scripts, JAR's and everything
> needed to run a Job.  These are automatically built from source by our
> Jenkins and stored in HDFS.  Our Chronos/Marathon jobs fetch the latest
> release TAR.GZ direct from HDFS, unpa

Re: Purpose of spark-submit?

2014-07-09 Thread Andrei
One another +1. For me it's a question of embedding. With
SparkConf/SparkContext I can easily create larger projects with Spark as a
separate service (just like MySQL and JDBC, for example). With spark-submit
I'm bound to Spark as a main framework that defines how my application
should look like. In my humble opinion, using Spark as embeddable library
rather than main framework and runtime is much easier.




On Wed, Jul 9, 2014 at 5:14 PM, Jerry Lam  wrote:

> +1 as well for being able to submit jobs programmatically without using
> shell script.
>
> we also experience issues of submitting jobs programmatically without
> using spark-submit. In fact, even in the Hadoop World, I rarely used
> "hadoop jar" to submit jobs in shell.
>
>
>
> On Wed, Jul 9, 2014 at 9:47 AM, Robert James 
> wrote:
>
>> +1 to be able to do anything via SparkConf/SparkContext.  Our app
>> worked fine in Spark 0.9, but, after several days of wrestling with
>> uber jars and spark-submit, and so far failing to get Spark 1.0
>> working, we'd like to go back to doing it ourself with SparkConf.
>>
>> As the previous poster said, a few scripts should be able to give us
>> the classpath and any other params we need, and be a lot more
>> transparent and debuggable.
>>
>> On 7/9/14, Surendranauth Hiraman  wrote:
>> > Are there any gaps beyond convenience and code/config separation in
>> using
>> > spark-submit versus SparkConf/SparkContext if you are willing to set
>> your
>> > own config?
>> >
>> > If there are any gaps, +1 on having parity within SparkConf/SparkContext
>> > where possible. In my use case, we launch our jobs programmatically. In
>> > theory, we could shell out to spark-submit but it's not the best option
>> for
>> > us.
>> >
>> > So far, we are only using Standalone Cluster mode, so I'm not
>> knowledgeable
>> > on the complexities of other modes, though.
>> >
>> > -Suren
>> >
>> >
>> >
>> > On Wed, Jul 9, 2014 at 8:20 AM, Koert Kuipers 
>> wrote:
>> >
>> >> not sure I understand why unifying how you submit app for different
>> >> platforms and dynamic configuration cannot be part of SparkConf and
>> >> SparkContext?
>> >>
>> >> for classpath a simple script similar to "hadoop classpath" that shows
>> >> what needs to be added should be sufficient.
>> >>
>> >> on spark standalone I can launch a program just fine with just
>> SparkConf
>> >> and SparkContext. not on yarn, so the spark-launch script must be
>> doing a
>> >> few things extra there I am missing... which makes things more
>> difficult
>> >> because I am not sure its realistic to expect every application that
>> >> needs
>> >> to run something on spark to be launched using spark-submit.
>> >>  On Jul 9, 2014 3:45 AM, "Patrick Wendell"  wrote:
>> >>
>> >>> It fulfills a few different functions. The main one is giving users a
>> >>> way to inject Spark as a runtime dependency separately from their
>> >>> program and make sure they get exactly the right version of Spark. So
>> >>> a user can bundle an application and then use spark-submit to send it
>> >>> to different types of clusters (or using different versions of Spark).
>> >>>
>> >>> It also unifies the way you bundle and submit an app for Yarn, Mesos,
>> >>> etc... this was something that became very fragmented over time before
>> >>> this was added.
>> >>>
>> >>> Another feature is allowing users to set configuration values
>> >>> dynamically rather than compile them inside of their program. That's
>> >>> the one you mention here. You can choose to use this feature or not.
>> >>> If you know your configs are not going to change, then you don't need
>> >>> to set them with spark-submit.
>> >>>
>> >>>
>> >>> On Wed, Jul 9, 2014 at 10:22 AM, Robert James > >
>> >>> wrote:
>> >>> > What is the purpose of spark-submit? Does it do anything outside of
>> >>> > the standard val conf = new SparkConf ... val sc = new SparkContext
>> >>> > ... ?
>> >>>
>> >>
>> >
>> >
>> > --
>> >
>> > SUREN HIRAMAN, VP TECHNOLOGY
>> > Velos
>> > Accelerating Machine Learning
>> >
>> > 440 NINTH AVENUE, 11TH FLOOR
>> > NEW YORK, NY 10001
>> > O: (917) 525-2466 ext. 105
>> > F: 646.349.4063
>> > E: suren.hiraman@v elos.io
>> > W: www.velos.io
>> >
>>
>
>


Re: Recommended pipeline automation tool? Oozie?

2014-07-10 Thread Andrei
I used both - Oozie and Luigi - but found them inflexible and still
overcomplicated, especially in presence of Spark.

Oozie has a fixed list of building blocks, which is pretty limiting. For
example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are
out of scope (of course, you can always write wrapper as Java or Shell
action, but does it really need to be so complicated?). Another issue with
Oozie is passing variables between actions. There's Oozie context that is
suitable for passing key-value pairs (both strings) between actions, but
for more complex objects (say, FileInputStream that should be closed at
last step only) you have to do some advanced kung fu.

Luigi, on other hand, has its niche - complicated dataflows with many tasks
that depend on each other. Basically, there are tasks (this is where you
define computations) and targets (something that can "exist" - file on
disk, entry in ZooKeeper, etc.). You ask Luigi to get some target, and it
creates a plan for achieving this. Luigi is really shiny when your workflow
fits this model, but one step away and you are in trouble. For example,
consider simple pipeline: run MR job and output temporary data, run another
MR job and output final data, clean temporary data. You can make target
Clean, that depends on target MRJob2 that, in its turn, depends on MRJob1,
right? Not so easy. How do you check that Clean task is achieved? If you
just test whether temporary directory is empty or not, you catch both cases
- when all tasks are done and when they are not even started yet. Luigi
allows you to specify all 3 actions - MRJob1, MRJob2, Clean - in a single
"run()" method, but ruins the entire idea.

And of course, both of these frameworks are optimized for standard
MapReduce jobs, which is probably not what you want on Spark mailing list
:)

Experience with these frameworks, however, gave me some insights about
typical data pipelines.

1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks
allow branching, but most pipelines actually consist of moving data from
source to destination with possibly some transformations in between (I'll
be glad if somebody share use cases when you really need branching).
2. Transactional logic is important. Either everything, or nothing.
Otherwise it's really easy to get into inconsistent state.
3. Extensibility is important. You never know what will need in a week or
two.

So eventually I decided that it is much easier to create your own pipeline
instead of trying to adopt your code to existing frameworks. My latest
pipeline incarnation simply consists of a list of steps that are started
sequentially. Each step is a class with at least these methods:

 * run() - launch this step
 * fail() - what to do if step fails
 * finalize() - (optional) what to do when all steps are done

For example, if you want to add possibility to run Spark jobs, you just
create SparkStep and configure it with required code. If you want Hive
query - just create HiveStep and configure it with Hive connection
settings. I use YAML file to configure steps and Context (basically,
Map[String, Any]) to pass variables between them. I also use configurable
Reporter available for all steps to report the progress.

Hopefully, this will give you some insights about best pipeline for your
specific case.



On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown  wrote:

>
> We use Luigi for this purpose.  (Our pipelines are typically on AWS (no
> EMR) backed by S3 and using combinations of Python jobs, non-Spark
> Java/Scala, and Spark.  We run Spark jobs by connecting drivers/clients to
> the master, and those are what is invoked from Luigi.)
>
> —
> p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/
>
>
> On Thu, Jul 10, 2014 at 10:20 AM, k.tham  wrote:
>
>> I'm just wondering what's the general recommendation for data pipeline
>> automation.
>>
>> Say, I want to run Spark Job A, then B, then invoke script C, then do D,
>> and
>> if D fails, do E, and if Job A fails, send email F, etc...
>>
>> It looks like Oozie might be the best choice. But I'd like some
>> advice/suggestions.
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Re: Emacs Setup Anyone?

2014-07-25 Thread Andrei
I have never tried Spark REPL from within Emacs, but I remember that
switching from normal Python to Pyspark was as simple as changing
interpreter name at the beginning of session. Seems like ensime [1]
(together with ensime-emacs [2]) should be a good point to start. For
example, take a look at ensime-sbt.el [3] that defines a number of
Scala/SBT commands.

[1]: https://github.com/ensime/ensime-server
[2]: https://github.com/ensime/ensime-emacs
[3]: https://github.com/ensime/ensime-emacs/blob/master/ensime-sbt.el




On Thu, Jul 24, 2014 at 10:14 PM, Steve Nunez 
wrote:

> Anyone out there have a good configuration for emacs? Scala-mode sort of
> works, but I’d love to see a fully-supported spark-mode with an inferior
> shell. Searching didn’t turn up much of anything.
>
> Any emacs users out there? What setup are you using?
>
> Cheers,
> - SteveN
>
>
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity
> to which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.


Iterator over RDD in PySpark

2014-08-01 Thread Andrei
Is there a way to get iterator from RDD? Something like rdd.collect(), but
returning lazy sequence and not single array.

Context: I need to GZip processed data to upload it to Amazon S3. Since
archive should be a single file, I want to iterate over RDD, writing each
line to a local .gz file. File is small enough to fit local disk, but still
large enough not to fit into memory.


Re: Iterator over RDD in PySpark

2014-08-01 Thread Andrei
Thanks, Aaron, it should be fine with partitions (I can repartition it
anyway, right?).
But rdd.toLocalIterator is purely Java/Scala method. Is there Python
interface to it?
I can get Java iterator though rdd._jrdd, but it isn't converted to Python
iterator automatically. E.g.:

  >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
  >>> it = rdd._jrdd.toLocalIterator()
  >>> next(it)
  14/08/02 01:02:32 INFO SparkContext: Starting job: apply at
Iterator.scala:371
  ...
  14/08/02 01:02:32 INFO SparkContext: Job finished: apply at
Iterator.scala:371, took 0.02064317 s
  bytearray(b'\x80\x02K\x01.')

I understand that returned byte array somehow corresponds to actual data,
but how can I get it?



On Fri, Aug 1, 2014 at 8:49 PM, Aaron Davidson  wrote:

> rdd.toLocalIterator will do almost what you want, but requires that each
> individual partition fits in memory (rather than each individual line).
> Hopefully that's sufficient, though.
>
>
> On Fri, Aug 1, 2014 at 1:38 AM, Andrei  wrote:
>
>> Is there a way to get iterator from RDD? Something like rdd.collect(),
>> but returning lazy sequence and not single array.
>>
>> Context: I need to GZip processed data to upload it to Amazon S3. Since
>> archive should be a single file, I want to iterate over RDD, writing each
>> line to a local .gz file. File is small enough to fit local disk, but still
>> large enough not to fit into memory.
>>
>
>


Re: Iterator over RDD in PySpark

2014-08-02 Thread Andrei
Excellent, thank you!


On Sat, Aug 2, 2014 at 4:46 AM, Aaron Davidson  wrote:

> Ah, that's unfortunate, that definitely should be added. Using a
> pyspark-internal method, you could try something like
>
> javaIterator = rdd._jrdd.toLocalIterator()
> it = rdd._collect_iterator_through_file(javaIterator)
>
>
> On Fri, Aug 1, 2014 at 3:04 PM, Andrei  wrote:
>
>> Thanks, Aaron, it should be fine with partitions (I can repartition it
>> anyway, right?).
>> But rdd.toLocalIterator is purely Java/Scala method. Is there Python
>> interface to it?
>> I can get Java iterator though rdd._jrdd, but it isn't converted to
>> Python iterator automatically. E.g.:
>>
>>   >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>   >>> it = rdd._jrdd.toLocalIterator()
>>   >>> next(it)
>>   14/08/02 01:02:32 INFO SparkContext: Starting job: apply at
>> Iterator.scala:371
>>   ...
>>   14/08/02 01:02:32 INFO SparkContext: Job finished: apply at
>> Iterator.scala:371, took 0.02064317 s
>>   bytearray(b'\x80\x02K\x01.')
>>
>> I understand that returned byte array somehow corresponds to actual data,
>> but how can I get it?
>>
>>
>>
>> On Fri, Aug 1, 2014 at 8:49 PM, Aaron Davidson 
>> wrote:
>>
>>> rdd.toLocalIterator will do almost what you want, but requires that each
>>> individual partition fits in memory (rather than each individual line).
>>> Hopefully that's sufficient, though.
>>>
>>>
>>> On Fri, Aug 1, 2014 at 1:38 AM, Andrei 
>>> wrote:
>>>
>>>> Is there a way to get iterator from RDD? Something like rdd.collect(),
>>>> but returning lazy sequence and not single array.
>>>>
>>>> Context: I need to GZip processed data to upload it to Amazon S3. Since
>>>> archive should be a single file, I want to iterate over RDD, writing each
>>>> line to a local .gz file. File is small enough to fit local disk, but still
>>>> large enough not to fit into memory.
>>>>
>>>
>>>
>>
>


Re: Spark S3 Performance

2014-11-22 Thread Andrei
Not that I'm professional user of Amazon services, but I have a guess about
your performance issues. From [1], there are two different filesystems over
S3:

 - native that behaves just like regular files (schema: s3n)
 - block-based that looks more like HDFS (schema: s3)

Since you use "s3n" in your URL, each Spark worker seems to treat the file
as unsplittable piece of data and downloads it all (though, probably,
applies functions to specific regions only). If I understand it right,
using "s3" instead will allow Spark workers see data as a sequence of
blocks and download each block separately.

But anyway, using s3 Implies loss of data locality, so data will be
transferred to workers instead of code being transferred to data. Given
data size of 1.2Gb, consider also storing data in Hadoop's HDFS instead of
S3 (as far as I remember, Amazon allows using both at the same time).

Please, let us know if it works.


[1]: https://wiki.apache.org/hadoop/AmazonS3

On Sat, Nov 22, 2014 at 6:21 PM, Nitay Joffe  wrote:

> Err I meant #1 :)
>
> - Nitay
> Founder & CTO
>
>
> On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe  wrote:
>
>> Anyone have any thoughts on this? Trying to understand especially #2 if
>> it's a legit bug or something I'm doing wrong.
>>
>> - Nitay
>> Founder & CTO
>>
>>
>> On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe  wrote:
>>
>>> I have a simple S3 job to read a text file and do a line count.
>>> Specifically I'm doing *sc.textFile("s3n://mybucket/myfile").count*.The
>>> file is about 1.2GB. My setup is standalone spark cluster with 4 workers
>>> each with 2 cores / 16GB ram. I'm using branch-1.2 code built against
>>> hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).
>>>
>>> The whole count is taking on the order of a couple of minutes, which
>>> seems extremely slow.
>>> I've been looking into it and so far have noticed two things, hoping the
>>> community has seen this before and knows what to do...
>>>
>>> 1) Every executor seems to make an S3 call to read the *entire file* before
>>> making another call to read just it's split. Here's a paste I've cleaned up
>>> to show just one task: http://goo.gl/XCfyZA. I've verified this happens
>>> in every task. It is taking a long time (40-50 seconds), I don't see why it
>>> is doing this?
>>> 2) I've tried a few numPartitions parameters. When I make the parameter
>>> anything below 21 it seems to get ignored. Under the hood FileInputFormat
>>> is doing something that always ends up with at least 21 partitions of ~64MB
>>> or so. I've also tried 40, 60, and 100 partitions and have seen that the
>>> performance only gets worse as I increase it beyond 21. I would like to try
>>> 8 just to see, but again I don't see how to force it to go below 21.
>>>
>>> Thanks for the help,
>>> - Nitay
>>> Founder & CTO
>>>
>>>
>>
>


Re: Spark S3 Performance

2014-11-22 Thread Andrei
Concerning your second question, I believe you try to set number of
partitions with something like this:

rdd = sc.textFile(..., 8)

but things like `textFile()` don't actually take fixed number of
partitions. Instead, they expect *minimal* number of partitions. Since in
your file you have 21 blocks of data, it creates exactly 21 worker (which
is greater than 8, as expected). To set exact number of partitions, use
`repartition()` or its full version - `coalesce()` (see example [1])

[1]:
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#coalesce



On Sat, Nov 22, 2014 at 10:04 PM, Ashish Rangole  wrote:

> What makes you think that each executor is reading the whole file? If that
> is the case then the count value returned to the driver will be actual X
> NumOfExecutors. Is that the case when compared with actual lines in the
> input file? If the count returned is same as actual then you probably don't
> have an extra read problem.
>
> I also see this in your logs which indicates that it is a read that starts
> from an offset and reading one split size (64MB) worth of data:
>
> 14/11/20 15:39:45 [Executor task launch worker-1 ] INFO HadoopRDD: Input
> split: s3n://mybucket/myfile:335544320+67108864
> On Nov 22, 2014 7:23 AM, "Nitay Joffe"  wrote:
>
>> Err I meant #1 :)
>>
>> - Nitay
>> Founder & CTO
>>
>>
>> On Sat, Nov 22, 2014 at 10:20 AM, Nitay Joffe  wrote:
>>
>>> Anyone have any thoughts on this? Trying to understand especially #2 if
>>> it's a legit bug or something I'm doing wrong.
>>>
>>> - Nitay
>>> Founder & CTO
>>>
>>>
>>> On Thu, Nov 20, 2014 at 11:54 AM, Nitay Joffe  wrote:
>>>
 I have a simple S3 job to read a text file and do a line count.
 Specifically I'm doing *sc.textFile("s3n://mybucket/myfile").count*.The
 file is about 1.2GB. My setup is standalone spark cluster with 4 workers
 each with 2 cores / 16GB ram. I'm using branch-1.2 code built against
 hadoop 2.4 (though I'm not actually using HDFS, just straight S3 => Spark).

 The whole count is taking on the order of a couple of minutes, which
 seems extremely slow.
 I've been looking into it and so far have noticed two things, hoping
 the community has seen this before and knows what to do...

 1) Every executor seems to make an S3 call to read the *entire file* before
 making another call to read just it's split. Here's a paste I've cleaned up
 to show just one task: http://goo.gl/XCfyZA. I've verified this
 happens in every task. It is taking a long time (40-50 seconds), I don't
 see why it is doing this?
 2) I've tried a few numPartitions parameters. When I make the parameter
 anything below 21 it seems to get ignored. Under the hood FileInputFormat
 is doing something that always ends up with at least 21 partitions of ~64MB
 or so. I've also tried 40, 60, and 100 partitions and have seen that the
 performance only gets worse as I increase it beyond 21. I would like to try
 8 just to see, but again I don't see how to force it to go below 21.

 Thanks for the help,
 - Nitay
 Founder & CTO


>>>
>>


Spark 2.0 History Server Storage

2016-08-01 Thread Andrei Ivanov
Hi all,

I've just tried upgrading Spark to 2.0 and so far it looks generally good.

But there is at least one issue I see right away - jon histories are
missing storage information (persisted RRDs).
This info is also missing from pre upgrade jobs.

Does anyone have a clue what can be wrong?

Thanks, Andrei Ivanov.


Re: Spark 2.0 History Server Storage

2016-08-02 Thread Andrei Ivanov
OK, answering myself - this is broken since 1.6.2 by SPARK-13845
<https://issues.apache.org/jira/browse/SPARK-13845>

On Tue, Aug 2, 2016 at 12:10 AM, Andrei Ivanov  wrote:

> Hi all,
>
> I've just tried upgrading Spark to 2.0 and so far it looks generally good.
>
> But there is at least one issue I see right away - jon histories are
> missing storage information (persisted RRDs).
> This info is also missing from pre upgrade jobs.
>
> Does anyone have a clue what can be wrong?
>
> Thanks, Andrei Ivanov.
>


Re: Spark 2.0 History Server Storage

2016-08-02 Thread Andrei Ivanov
   1. SPARK-16859 <https://issues.apache.org/jira/browse/SPARK-16859>
submitted


On Tue, Aug 2, 2016 at 9:07 PM, Andrei Ivanov  wrote:

> OK, answering myself - this is broken since 1.6.2 by SPARK-13845
> <https://issues.apache.org/jira/browse/SPARK-13845>
>
> On Tue, Aug 2, 2016 at 12:10 AM, Andrei Ivanov 
> wrote:
>
>> Hi all,
>>
>> I've just tried upgrading Spark to 2.0 and so far it looks generally good.
>>
>> But there is at least one issue I see right away - jon histories are
>> missing storage information (persisted RRDs).
>> This info is also missing from pre upgrade jobs.
>>
>> Does anyone have a clue what can be wrong?
>>
>> Thanks, Andrei Ivanov.
>>
>
>


unsubscribe

2018-03-27 Thread Andrei Balici
-- 
Andrei Balici
Student at the School of Computer Science,
University of Manchester


Developing a spark streaming application

2014-08-27 Thread Filip Andrei
Hey guys, so the problem i'm trying to tackle is the following:

- I need a data source that emits messages at a certain frequency
- There are N neural nets that need to process each message individually
- The outputs from all neural nets are aggregated and only when all N
outputs for each message are collected, should a message be declared fully
processed
- At the end i should measure the time it took for a message to be fully
processed (time between when it was emitted and when all N neural net
outputs from that message have been collected)


What i'm mostly interested in is if i approached the problem correctly in
the first place and if so some best practice pointers on my approach.






And my current implementation if the following:


For a data source i created the class
public class JavaRandomReceiver extends Receiver>

As i decided a key-value store would be best suited to holding emitted data.


The onStart() method initializes a custom random sequence generator and
starts a thread that
continuously generates new neural net inputs and stores them as following:

SensorData sdata = generator.createSensorData();

Map result = new HashMap();

result.put("msgNo", sdata.getMsgNo());
result.put("sensorTime", sdata.getSampleTime());
result.put("list", sdata.getPayload());
result.put("timeOfProc", sdata.getCreationTime());

store(result);

// sleeps for a given amount of time set at generator creation
generator.waitForNextTuple();

The msgNo here is incremented for each newly created message and is used to
keep 


The neural net functionality is added by creating a custom mapper
public class NeuralNetMapper implements Function,
Map>

whose call function basically just takes the input map, plugs its "list"
object as the input to the neural net object, replaces the map's initial
list with the neural net output and returns the modified map.




The aggregator is implemented as a single class that has the following form

public class JavaSyncBarrier implements
Function>, Void>



This class maintains a google guava cache of neural net outputs that it has
received in the form of
>>, where the Long value is the msgNo
and the list contains all maps containing said message number.

When a new map is received, it is added to the cache, its list's length is
compared to to the total number of neural nets and, if these numbers match,
that message number is said to be fully processed and a difference between
timeOfProc (all maps with the same msgNo have the same timeOfProc) and the
current system time is displayed as the total time necessary for processing.





Now the way all these components are linked together is the following:

public static void main(String[] args) {


SparkConf conf = new SparkConf();
conf.setAppName("SimpleSparkStreamingTest");


JavaStreamingContext jssc = new JavaStreamingContext(conf, new
Duration(1000));

jssc.checkpoint("/tmp/spark-tempdir");

// Generator config goes here
// Set to emit new message every 1 second
// ---

// Neural net config goes here
// ---  

JavaReceiverInputDStream> rndLists = jssc
.receiverStream(new JavaRandomReceiver(generatorConfig);

List>> neuralNetOutputStreams = 
new
ArrayList>>();

for(int i = 0; i < numberOfNets; i++){

neuralNetOutputStreams .add(
rndLists.map(new NeuralNetMapper(neuralNetConfig))
);
}

JavaDStream> joined = 
joinStreams(neuralNetOutputs);

joined.foreach(new JavaSyncBarrier(numberOfNets));

jssc.start();
jssc.awaitTermination();
}

where joinStreams unifies a list of streams:
public static  JavaDStream joinStreams(List>
streams) {

JavaDStream result = streams.get(0);
for (int i = 1; i < streams.size(); i++) {
result = result.union(streams.get(i));
}

return result;
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-tp12893.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Ensuring object in spark streaming runs on specific node

2014-08-29 Thread Filip Andrei
Say you have a spark streaming setup such as

JavaReceiverInputDStream<...> rndLists = jssc.receiverStream(new
JavaRandomReceiver(...));

rndLists.map(new NeuralNetMapper(...))
.foreach(new JavaSyncBarrier(...));

Is there any way of ensuring that, say, a JavaRandomReceiver and
JavaSyncBarrier get distributed to the same node ? Or is this even a
question that makes sense ?

Some information as to how spark-streaming distributes work across a cluster
would also be greatly appreciated.

( i've also asked this question on stackoverflow at
http://stackoverflow.com/questions/25564356/ensuring-object-in-spark-streaming-runs-on-specific-node
)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Ensuring-object-in-spark-streaming-runs-on-specific-node-tp13114.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Odd error when using a rdd map within a stream map

2014-09-18 Thread Filip Andrei
here i wrote a simpler version of the code to get an understanding of how it
works:

final List nns = new ArrayList(); 
for(int i = 0; i < numberOfNets; i++){ 
nns.add(NeuralNet.createFrom(...)); 
} 

final JavaRDD nnRdd = sc.parallelize(nns);   
JavaDStream results = rndLists.flatMap(new
FlatMapFunction, Float>() { 
@Override 
public Iterable call(Map input) 
throws Exception { 

Float f = nnRdd.map(new Function() { 

@Override 
public Float call(NeuralNet nn) throws Exception { 

return 1.0f; 
} 
}).reduce(new Function2() { 

@Override 
public Float call(Float left, Float right) throws Exception { 

return left + right; 
} 
}); 

return Arrays.asList(f); 
} 
}); 
results.print();


This works as expected and print() simply shows the number of neural nets i
have
If instead a print() i use

results.foreach(new Function, Void>() { 

@Override 
public Void call(JavaRDD arg0) throws Exception { 


for(Float f : arg0.collect()){ 
System.out.println(f); 
} 
return null; 
} 
});

It fails with the following exception
org.apache.spark.SparkException: Job aborted due to stage failure: Task
1.0:0 failed 1 times, most recent failure: Exception failure in TID 1 on
host localhost: java.lang.NullPointerException 
org.apache.spark.rdd.RDD.map(RDD.scala:270)

This is weird to me since the same code executes as expected in one case and
doesn't in the other, any idea what's going on here ?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Odd-error-when-using-a-rdd-map-within-a-stream-map-tp14551.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Odd error when using a rdd map within a stream map

2014-09-19 Thread Filip Andrei
Hey, i don't think that's the issue, foreach is called on 'results' which is
a DStream of floats, so naturally it passes RDDs to its function.

And either way, changing the code in the first mapper to comment out the map
reduce process on the RDD

Float f = 1.0f; //nnRdd.map(new Function() {

//  /**
//   * 
//   */
//  private static final long 
serialVersionUID = 876245667956566483L;
//
//  @Override
//  public Float call(NeuralNet nn) 
throws Exception {
//  
//  return 1.0f;
//  }
//  }).reduce(new Function2() {
//  
//  /**
//   * 
//   */
//  private static final long 
serialVersionUID = 5461230777841578072L;
//
//  @Override
//  public Float call(Float left, 
Float right) throws Exception {
//  
//  return left + right;
//  }
//  });

return Arrays.asList(f);

works as expected, so it's most likely  running that RDD.map().reduce()
that's the issue somehow, i just don't get why it works when there's a
.print() and the end and not a .foreach()



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Odd-error-when-using-a-rdd-map-within-a-stream-map-tp14551p14652.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[K8S] Divergense in dockerfiles between official repositories.

2024-06-17 Thread Andrei L
Hello Spark Team and users.

I'm currently exploring custom spark image building.
I've found two places containing dockerfiles from the "original" apache
project.
The first one is well known github mirror
https://github.com/apache/spark/tree/v3.5.1/resource-managers/kubernetes/docker/src/main/dockerfiles/spark
The second one is related to SPARK-40513:
https://github.com/apache/spark-docker
Also documentation at
https://spark.apache.org/docs/latest/running-on-kubernetes.html#docker-images
notes dockerfile in distribution tar (which originates from the first repo)
as good starting point for customization.
However, starting with version 3.3.3 and SPARK-42505 the second repo's
entrypoint script and Dockerfile diverge significantly from the main repo.

E.g. "pass through" command is no longer executed under tini supervision.

Could you please guide me with the following questions:
1) Is the difference in dockerfile and entrypoint intended? Or just not
synced yet?
2) Which one is better to use as a starting point for a custom image?
3) Why is tini not used as a supervisor for "pass through" command mode in
the second repo? Really can't find any info behind this decision.

Sincerely, Andrei Lopukhov.