Re: sc.textFile can't recognize '\004'

2014-06-20 Thread Sean Owen
These are actually Scala / Java questions.

On Sat, Jun 21, 2014 at 1:08 AM, anny9699  wrote:
> 1) One of the separators is '\004', which could be recognized by python or R
> or Hive, however Spark seems can't recognize this one and returns a symbol
> looking like '?'. Also this symbol is not a question mark and I don't know
> how to parse.

(The \004 octal syntax appears deprecated, but it works.) It's not
turned into ?, it is just how the shell shows non-printing characters.

scala> val c = '\004'
warning: there were 1 deprecation warning(s); re-run with -deprecation
for details
c: Char = ?

scala> c.toInt
res2: Int = 4

Which is all correct. Is it presenting any problem?

> 2) Some of the separator are composed of several Chars, like "} =>". If I
> use str.split(Array('}', '=>')), it will separate the string but with many
> white spaces included in the middle. Is there a good way that I could
> separate by String instead of by Array of Chars?

Your example doesn't compile but I assume the argument should be an
array of the 3 chars. String.split will return an empty match between
tokens. If you don't want them, you can
str.split(...).filterNot(_.isEmpty)


How to terminate job from the task code?

2014-06-20 Thread Piotr Kołaczkowski
If the task detects unrecoverable error, i.e. an error that we can't expect
to fix by retrying nor moving the task to another node, how to stop the job
/ prevent Spark from retrying it?

def process(taskContext: TaskContext, data: Iterator[T]) {
   ...

   if (unrecoverableError) {
  ??? // terminate the job immediately
   }
   ...
 }

Somewhere else:
rdd.sparkContext.runJob(rdd, something.process _)


Thanks,
Piotr


-- 
Piotr Kolaczkowski, Lead Software Engineer
pkola...@datastax.com

http://www.datastax.com/
777 Mariners Island Blvd., Suite 510
San Mateo, CA 94404


Re: Running Spark alongside Hadoop

2014-06-20 Thread Ognen Duzlevski
I only ran HDFS on the same nodes as Spark and that worked out great 
performance and robustness wise. However, I did not run Hadoop itself to 
do any computations/jobs on the same nodes. My expectation is that if 
you actually ran both at the same time with your configuration, the 
performance would be pretty bad. It's mostly about memory really and 
then CPU(s) etc.


OD

On 6/20/14, 2:41 PM, Sameer Tilak wrote:

Dear Spark users,

I have a small 4 node Hadoop cluster. Each node is a VM -- 4 virtual 
cores, 8GB memory and 500GB disk. I am currently running Hadoop on it. 
I would like to run Spark (in standalone mode) along side Hadoop on 
the same nodes. Given the configuration of my nodes, will that work? 
Does anyone has any experience in terms of stability and performance 
of running Spark and Hadoop on somewhat resource-constrained nodes.  I 
was looking at the Spark documentation and there is a way to configure 
memory and cores for the and worker nodes and memory for the master 
node: SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_DAEMON_MEMORY. 
Any recommendations on how to share resource between HAdoop and Spark?








sc.textFile can't recognize '\004'

2014-06-20 Thread anny9699
Hi,

I need to parse a file which is separated by a series of separators. I used
SparkContext.textFile and I met two problems:

1) One of the separators is '\004', which could be recognized by python or R
or Hive, however Spark seems can't recognize this one and returns a symbol
looking like '?'. Also this symbol is not a question mark and I don't know
how to parse.

2) Some of the separator are composed of several Chars, like "} =>". If I
use str.split(Array('}', '=>')), it will separate the string but with many
white spaces included in the middle. Is there a good way that I could
separate by String instead of by Array of Chars? 

Thanks a lot!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sc-textFile-can-t-recognize-004-tp8059.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How do you run your spark app?

2014-06-20 Thread Andrei
Hi Shivani,

Adding JARs to classpath (e.g. via "-cp" option) is needed to run your
_local_ Java application, whatever it is. To deliver them to _other
machines_ for execution you need to add them to SparkContext. And you can
do it in 2 different ways:

1. Add them right from your code (your suggested
"sparkContext.setJars(...)").
2. Use "spark-submit" and pass JARs from command line.

Note, that both options are easier to do if you assemble your code and all
its dependencies into a single "fat" JAR instead of manually listing all
needed libraries.




On Sat, Jun 21, 2014 at 1:47 AM, Shivani Rao  wrote:

> Hello Shrikar,
>
> Thanks for your email. I have been using the same workflow as you did. But
> my questions was related to creation of the sparkContext. My question was
>
> If I am specifying jars in the "java -cp ", and adding to them
> to my build.sbt, do I need to additionally add them in my code while
> creating the sparkContext (sparkContext.setJars(" "))??
>
>
> Thanks,
> Shivani
>
>
> On Fri, Jun 20, 2014 at 11:03 AM, Shrikar archak 
> wrote:
>
>> Hi Shivani,
>>
>> I use sbt assembly to create a fat jar .
>> https://github.com/sbt/sbt-assembly
>>
>> Example of the sbt file is below.
>>
>> import AssemblyKeys._ // put this at the top of the file
>>
>> assemblySettings
>>
>> mainClass in assembly := Some("FifaSparkStreaming")
>>
>>  name := "FifaSparkStreaming"
>>
>> version := "1.0"
>>
>> scalaVersion := "2.10.4"
>>
>> libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.0.0"
>> % "provided",
>> "org.apache.spark" %% "spark-streaming" %
>> "1.0.0" % "provided",
>> ("org.apache.spark" %%
>> "spark-streaming-twitter" %
>> "1.0.0").exclude("org.eclipse.jetty.orbit","javax.transaction")
>>
>>  .exclude("org.eclipse.jetty.orbit","javax.servlet")
>>
>>  .exclude("org.eclipse.jetty.orbit","javax.mail.glassfish")
>>
>>  .exclude("org.eclipse.jetty.orbit","javax.activation")
>>
>>  .exclude("com.esotericsoftware.minlog", "minlog"),
>> ("net.debasishg" % "redisclient_2.10" %
>> "2.12").exclude("com.typesafe.akka","akka-actor_2.10"))
>>
>> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>>   {
>> case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
>> case PathList("org", "apache", xs @ _*) => MergeStrategy.first
>> case PathList("org", "apache", xs @ _*) => MergeStrategy.first
>> case "application.conf" => MergeStrategy.concat
>> case "unwanted.txt" => MergeStrategy.discard
>> case x => old(x)
>>   }
>> }
>>
>>
>> resolvers += "Akka Repository" at "http://repo.akka.io/releases/";
>>
>>
>> And I run as mentioned below.
>>
>> LOCALLY :
>> 1)  sbt 'run AP1z4IYraYm5fqWhITWArY53x
>> Cyyz3Zr67tVK46G8dus5tSbc83KQOdtMDgYoQ5WLQwH0mTWzB6
>> 115254720-OfJ4yFsUU6C6vBkEOMDlBlkIgslPleFjPwNcxHjN
>> Qd76y2izncM7fGGYqU1VXYTxg1eseNuzcdZKm2QJyK8d1 fifa fifa2014'
>>
>> If you want to submit on the cluster
>>
>> CLUSTER:
>> 2) spark-submit --class FifaSparkStreaming --master
>> "spark://server-8-144:7077" --driver-memory 2048 --deploy-mode cluster
>> FifaSparkStreaming-assembly-1.0.jar AP1z4IYraYm5fqWhITWArY53x
>> Cyyz3Zr67tVK46G8dus5tSbc83KQOdtMDgYoQ5WLQwH0mTWzB6
>> 115254720-OfJ4yFsUU6C6vBkEOMDlBlkIgslPleFjPwNcxHjN
>> Qd76y2izncM7fGGYqU1VXYTxg1eseNuzcdZKm2QJyK8d1 fifa fifa2014
>>
>>
>> Hope this helps.
>>
>> Thanks,
>> Shrikar
>>
>>
>> On Fri, Jun 20, 2014 at 9:16 AM, Shivani Rao 
>> wrote:
>>
>>> Hello Michael,
>>>
>>> I have a quick question for you. Can you clarify the statement " build
>>> fat JAR's and build dist-style TAR.GZ packages with launch scripts, JAR's
>>> and everything needed to run a Job".  Can you give an example.
>>>
>>> I am using sbt assembly as well to create a fat jar, and supplying the
>>> spark and hadoop locations in the class path. Inside the main() function
>>> where spark context is created, I use SparkContext.jarOfClass(this).toList
>>> add the fat jar to my spark context. However, I seem to be running into
>>> issues with this approach. I was wondering if you had any inputs Michael.
>>>
>>> Thanks,
>>> Shivani
>>>
>>>
>>> On Thu, Jun 19, 2014 at 10:57 PM, Sonal Goyal 
>>> wrote:
>>>
 We use maven for building our code and then invoke spark-submit through
 the exec plugin, passing in our parameters. Works well for us.

 Best Regards,
 Sonal
 Nube Technologies 

 




 On Fri, Jun 20, 2014 at 3:26 AM, Michael Cutler 
 wrote:

> P.S. Last but not least we use sbt-assembly to build fat JAR's and
> build dist-style TAR.GZ packages with launch scripts, JAR's and everything
> needed to run a Job.  These are automatically built from source by our
> Jenkins and stored in HDFS.  Our Chronos/Marathon jobs fetch the latest
> release TAR.GZ direct from HDFS, unpa

Re: Worker dies while submitting a job

2014-06-20 Thread Shivani Rao
That error typically means that there is a communication error (wrong
ports) between master and worker. Also check if the worker has "write"
permissions to create the "work" directory. We were getting this error due
one of the above two reasons



On Tue, Jun 17, 2014 at 10:04 AM, Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com> wrote:

> I have been able to submit a job successfully but I had to config my spark
> job this way:
>
>   val sparkConf: SparkConf =
> new SparkConf()
>   .setAppName("TwitterPopularTags")
>   .setMaster("spark://int-spark-master:7077")
>   .setSparkHome("/opt/spark")
>   .setJars(Seq("/tmp/spark-test-0.1-SNAPSHOT.jar"))
>
> Now I'm getting this error on my worker:
>
> 4/06/17 17:03:40 WARN TaskSchedulerImpl: Initial job has not accepted any
> resources; check your cluster UI to ensure that workers are registered and
> have sufficient memory
>
>
>
>
> 2014-06-17 17:36 GMT+01:00 Luis Ángel Vicente Sánchez <
> langel.gro...@gmail.com>:
>
> Ok... I was checking the wrong version of that file yesterday. My worker
>> is sending a DriverStateChanged(_, DriverState.FAILED, _) but there is
>> no case branch for that state and the worker is crashing. I still don't
>> know why I'm getting a FAILED state but I'm sure that should kill the actor
>> due to a scala.MatchError.
>>
>> Usually in scala is a best-practice to use a sealed trait and case
>> classes/objects in a match statement instead of an enumeration (the
>> compiler will complain about missing cases); I think that should be
>> refactored to catch this kind of errors at compile time.
>>
>> Now I need to find why that state changed message is sent... I will
>> continue updating this thread until I found the problem :D
>>
>>
>> 2014-06-16 18:25 GMT+01:00 Luis Ángel Vicente Sánchez <
>> langel.gro...@gmail.com>:
>>
>> I'm playing with a modified version of the TwitterPopularTags example and
>>> when I tried to submit the job to my cluster, workers keep dying with this
>>> message:
>>>
>>> 14/06/16 17:11:16 INFO DriverRunner: Launch Command: "java" "-cp"
>>> "/opt/spark-1.0.0-bin-hadoop1/work/driver-20140616171115-0014/spark-test-0.1-SNAPSHOT.jar:::/opt/spark-1.0.0-bin-hadoop1/conf:/opt/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar"
>>> "-XX:MaxPermSize=128m" "-Xms512M" "-Xmx512M"
>>> "org.apache.spark.deploy.worker.DriverWrapper"
>>> "akka.tcp://sparkWorker@int-spark-worker:51676/user/Worker"
>>> "org.apache.spark.examples.streaming.TwitterPopularTags"
>>> 14/06/16 17:11:17 ERROR OneForOneStrategy: FAILED (of class
>>> scala.Enumeration$Val)
>>> scala.MatchError: FAILED (of class scala.Enumeration$Val)
>>> at
>>> org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:317)
>>>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>  at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>  at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>  at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> 14/06/16 17:11:17 INFO Worker: Starting Spark worker
>>> int-spark-app-ie005d6a3.mclabs.io:51676 with 2 cores, 6.5 GB RAM
>>> 14/06/16 17:11:17 INFO Worker: Spark home: /opt/spark-1.0.0-bin-hadoop1
>>> 14/06/16 17:11:17 INFO WorkerWebUI: Started WorkerWebUI at
>>> http://int-spark-app-ie005d6a3.mclabs.io:8081
>>> 14/06/16 17:11:17 INFO Worker: Connecting to master
>>> spark://int-spark-app-ie005d6a3.mclabs.io:7077...
>>> 14/06/16 17:11:17 ERROR Worker: Worker registration failed: Attempted to
>>> re-register worker at same address: akka.tcp://
>>> sparkwor...@int-spark-app-ie005d6a3.mclabs.io:51676
>>>
>>> This happens when the worker receive a DriverStateChanged(driverId,
>>> state, exception) message.
>>>
>>> To deploy the job I copied the jar file to the temporary folder of
>>> master node and execute the following command:
>>>
>>> ./spark-submit \
>>> --class org.apache.spark.examples.streaming.TwitterPopularTags \
>>> --master spark://int-spark-master:7077 \
>>> --deploy-mode cluster \
>>> file:///tmp/spark-test-0.1-SNAPSHOT.jar
>>>
>>> I don't really know what the problem could be as there is a 'case _'
>>> that should avoid that problem :S
>>>
>>
>>
>


-- 
Software Engineer
Analytics Engineering Team@ Box
Mountain View, CA


Re: How do you run your spark app?

2014-06-20 Thread Shivani Rao
Hello Shrikar,

Thanks for your email. I have been using the same workflow as you did. But
my questions was related to creation of the sparkContext. My question was

If I am specifying jars in the "java -cp ", and adding to them
to my build.sbt, do I need to additionally add them in my code while
creating the sparkContext (sparkContext.setJars(" "))??


Thanks,
Shivani


On Fri, Jun 20, 2014 at 11:03 AM, Shrikar archak 
wrote:

> Hi Shivani,
>
> I use sbt assembly to create a fat jar .
> https://github.com/sbt/sbt-assembly
>
> Example of the sbt file is below.
>
> import AssemblyKeys._ // put this at the top of the file
>
> assemblySettings
>
> mainClass in assembly := Some("FifaSparkStreaming")
>
>  name := "FifaSparkStreaming"
>
> version := "1.0"
>
> scalaVersion := "2.10.4"
>
> libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.0.0" %
> "provided",
> "org.apache.spark" %% "spark-streaming" %
> "1.0.0" % "provided",
> ("org.apache.spark" %%
> "spark-streaming-twitter" %
> "1.0.0").exclude("org.eclipse.jetty.orbit","javax.transaction")
>
>  .exclude("org.eclipse.jetty.orbit","javax.servlet")
>
>  .exclude("org.eclipse.jetty.orbit","javax.mail.glassfish")
>
>  .exclude("org.eclipse.jetty.orbit","javax.activation")
>
>  .exclude("com.esotericsoftware.minlog", "minlog"),
> ("net.debasishg" % "redisclient_2.10" %
> "2.12").exclude("com.typesafe.akka","akka-actor_2.10"))
>
> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>   {
> case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
> case PathList("org", "apache", xs @ _*) => MergeStrategy.first
> case PathList("org", "apache", xs @ _*) => MergeStrategy.first
> case "application.conf" => MergeStrategy.concat
> case "unwanted.txt" => MergeStrategy.discard
> case x => old(x)
>   }
> }
>
>
> resolvers += "Akka Repository" at "http://repo.akka.io/releases/";
>
>
> And I run as mentioned below.
>
> LOCALLY :
> 1)  sbt 'run AP1z4IYraYm5fqWhITWArY53x
> Cyyz3Zr67tVK46G8dus5tSbc83KQOdtMDgYoQ5WLQwH0mTWzB6
> 115254720-OfJ4yFsUU6C6vBkEOMDlBlkIgslPleFjPwNcxHjN
> Qd76y2izncM7fGGYqU1VXYTxg1eseNuzcdZKm2QJyK8d1 fifa fifa2014'
>
> If you want to submit on the cluster
>
> CLUSTER:
> 2) spark-submit --class FifaSparkStreaming --master
> "spark://server-8-144:7077" --driver-memory 2048 --deploy-mode cluster
> FifaSparkStreaming-assembly-1.0.jar AP1z4IYraYm5fqWhITWArY53x
> Cyyz3Zr67tVK46G8dus5tSbc83KQOdtMDgYoQ5WLQwH0mTWzB6
> 115254720-OfJ4yFsUU6C6vBkEOMDlBlkIgslPleFjPwNcxHjN
> Qd76y2izncM7fGGYqU1VXYTxg1eseNuzcdZKm2QJyK8d1 fifa fifa2014
>
>
> Hope this helps.
>
> Thanks,
> Shrikar
>
>
> On Fri, Jun 20, 2014 at 9:16 AM, Shivani Rao  wrote:
>
>> Hello Michael,
>>
>> I have a quick question for you. Can you clarify the statement " build
>> fat JAR's and build dist-style TAR.GZ packages with launch scripts, JAR's
>> and everything needed to run a Job".  Can you give an example.
>>
>> I am using sbt assembly as well to create a fat jar, and supplying the
>> spark and hadoop locations in the class path. Inside the main() function
>> where spark context is created, I use SparkContext.jarOfClass(this).toList
>> add the fat jar to my spark context. However, I seem to be running into
>> issues with this approach. I was wondering if you had any inputs Michael.
>>
>> Thanks,
>> Shivani
>>
>>
>> On Thu, Jun 19, 2014 at 10:57 PM, Sonal Goyal 
>> wrote:
>>
>>> We use maven for building our code and then invoke spark-submit through
>>> the exec plugin, passing in our parameters. Works well for us.
>>>
>>> Best Regards,
>>> Sonal
>>> Nube Technologies 
>>>
>>> 
>>>
>>>
>>>
>>>
>>> On Fri, Jun 20, 2014 at 3:26 AM, Michael Cutler 
>>> wrote:
>>>
 P.S. Last but not least we use sbt-assembly to build fat JAR's and
 build dist-style TAR.GZ packages with launch scripts, JAR's and everything
 needed to run a Job.  These are automatically built from source by our
 Jenkins and stored in HDFS.  Our Chronos/Marathon jobs fetch the latest
 release TAR.GZ direct from HDFS, unpack it and launch the appropriate
 script.

 Makes for a much cleaner development / testing / deployment to package
 everything required in one go instead of relying on cluster specific
 classpath additions or any add-jars functionality.


 On 19 June 2014 22:53, Michael Cutler  wrote:

> When you start seriously using Spark in production there are basically
> two things everyone eventually needs:
>
>1. Scheduled Jobs - recurring hourly/daily/weekly jobs.
>2. Always-On Jobs - that require monitoring, restarting etc.
>
> There are lots of ways to implement these requirements, everything
> from crontab through to workflow managers like Oozie.
>
> We opted for the followin

Fwd: Using Spark

2014-06-20 Thread Ricky Thomas
Hi,

Would like to add ourselves to the user list if possible please?

Company: truedash
url: truedash.io

Automatic pulling of all your data in to Spark for enterprise
visualisation, predictive analytics and data exploration at a low cost.

Currently in development with a few clients.

Thanks


Re: little confused about SPARK_JAVA_OPTS alternatives

2014-06-20 Thread Andrew Or
Well, even before spark-submit the standard way of setting spark
configurations is to create a new SparkConf, set the values in the conf,
and pass this to the SparkContext in your application. It's true that this
involves "hard-coding" these configurations in your application, but these
configurations intended to be application-level settings anyway, rather
than cluster-wide settings. Environment variables are not really ideal for
this purpose, though it's an easy way to change these settings quickly.


2014-06-20 14:03 GMT-07:00 Koert Kuipers :

> thanks for the detailed answer andrew. thats helpful.
>
> i think the main thing thats bugging me is that there is no simple way for
> an admin to always set something on the executors for a production
> environment (an akka timeout comes to mind). yes i could use
> spark-defaults  for that, although that means everything must be submitted
> through spark-submit, which is fairly new and i am not sure how much we
> will use that yet. i will look into that some more.
>
>
> On Thu, Jun 19, 2014 at 6:56 PM, Koert Kuipers  wrote:
>
>> for a jvm application its not very appealing to me to use spark
>> submit my application uses hadoop, so i should use "hadoop jar", and my
>> application uses spark, so it should use "spark-submit". if i add a piece
>> of code that uses some other system there will be yet another suggested way
>> to launch it. thats not very scalable, since i can only launch it one way
>> in the end...
>>
>>
>> On Thu, Jun 19, 2014 at 4:58 PM, Andrew Or  wrote:
>>
>>> Hi Koert and Lukasz,
>>>
>>> The recommended way of not hard-coding configurations in your
>>> application is through conf/spark-defaults.conf as documented here:
>>> http://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties.
>>> However, this is only applicable to
>>> spark-submit, so this may not be useful to you.
>>>
>>> Depending on how you launch your Spark applications, you can workaround
>>> this by manually specifying these configs as -Dspark.x=y
>>> in your java command to launch Spark. This is actually how
>>> SPARK_JAVA_OPTS used to work before 1.0. Note that spark-submit does
>>> essentially the same thing, but sets these properties programmatically
>>> by reading from the conf/spark-defaults.conf file and calling
>>> System.setProperty("spark.x", "y").
>>>
>>> Note that spark.executor.extraJavaOpts not intended for spark
>>> configuration (see
>>> http://spark.apache.org/docs/latest/configuration.html).
>>>  SPARK_DAEMON_JAVA_OPTS, as you pointed out, is for Spark daemons like
>>> the standalone master, worker, and the history server;
>>> it is also not intended for spark configurations to be picked up by
>>> Spark executors and drivers. In general, any reference to "java opts"
>>> in any variable or config refers to java options, as the name implies,
>>> not Spark configuration. Unfortunately, it just so happened that we
>>> used to mix the two in the same environment variable before 1.0.
>>>
>>> Is there a reason you're not using spark-submit? Is it for legacy
>>> reasons? As of 1.0, most changes to launching Spark applications
>>> will be done through spark-submit, so you may miss out on relevant new
>>> features or bug fixes.
>>>
>>> Andrew
>>>
>>>
>>>
>>> 2014-06-19 7:41 GMT-07:00 Koert Kuipers :
>>>
>>> still struggling with SPARK_JAVA_OPTS being deprecated. i am using spark
 standalone.

 for example if i have a akka timeout setting that i would like to be
 applied to every piece of the spark framework (so spark master, spark
 workers, spark executor sub-processes, spark-shell, etc.). i used to do
 that with SPARK_JAVA_OPTS. now i am unsure.

 SPARK_DAEMON_JAVA_OPTS works for the master and workers, but not for
 the spark-shell i think? i tried using SPARK_DAEMON_JAVA_OPTS, and it does
 not seem that useful. for example for a worker it does not apply the
 settings to the executor sub-processes, while for SPARK_JAVA_OPTS it does
 do that. so seems like SPARK_JAVA_OPTS is my only way to change settings
 for the executors, yet its deprecated?


 On Wed, Jun 11, 2014 at 10:59 PM, elyast 
 wrote:

> Hi,
>
> I tried to use SPARK_JAVA_OPTS in spark-env.sh as well as
> conf/java-opts
> file to set additional java system properties. In this case I could
> connect
> to tachyon without any problem.
>
> However when I tried setting executor and driver extraJavaOptions in
> spark-defaults.conf it doesn't.
>
> I suspect the root cause may be following:
>
> SparkSubmit doesn't fork additional JVM to actually run either driver
> or
> executor process and additional system properties are set after JVM is
> created and other classes are loaded. It may happen that Tachyon
> CommonConf
> class is already being loaded and since its Singleton it won't pick up
> and
> changes to system properties.

Re: Long running Spark Streaming Job increasing executing time per batch

2014-06-20 Thread Tathagata Das
In the spark web ui, you should see the same pattern of stage repeating
over time, as the same sequence of stages get computed in every batch. From
that you would be able to get a sense of how much corresponding stages take
across different batches, and which stage is actually is taking more time,
after a while.






On Thu, Jun 19, 2014 at 3:43 PM, Skogberg, Fredrik <
fredrik.skogb...@paddypower.com> wrote:

>
> Hi TD,
>
> >Thats quite odd. Yes, with checkpoint the lineage does not increase. Can
> you tell which stage is the >processing of each batch is causing the
> increase in the processing time?
>
> I haven’t been able to determine exactly what stage that is causing the
> increase in processing time. Any pointers that I should be looking out for?
> I’ve just monitored the “Total delay” and “execution times” part of the
> driver log.
>
> >Also, what is the batch interval, and checkpoint interval?
>
> The batch interval was set to a somewhat conservative 10 seconds, and the
> checkpoint I guess is the the default derived from that since I use
> updateStateByKey (as I understand it, using that function implies that the
> stream will be check pointed)
>
> Regards,
> Fred
>
> 
> Privileged, confidential and/or copyright information may be contained in
> this communication. This e-mail and any files transmitted with it are
> confidential and intended solely for the use of the individual or entity to
> whom they are addressed. If you are not the intended addressee, you may not
> copy, forward, disclose or otherwise use this e-mail or any part of it in
> any way whatsoever. To do so is prohibited and may be unlawful. If you have
> received this email in error
> please notify the sender immediately.
>
> Paddy Power PLC may monitor the content of e-mail sent and received for
> the purpose of ensuring compliance with its policies and procedures.
>
> Paddy Power plc, Power Tower, Blocks 1-3 Belfield Office Park, Beech Hill
> Road, Clonskeagh, Dublin 4.  Registered in Ireland: 16956
> 
>


kibana like frontend for spark

2014-06-20 Thread Mohit Jaggi
Folks,
I want to analyse logs and I want to use spark for that. However,
elasticsearch has a fancy frontend in Kibana. Kibana's docs indicate that
it works with elasticsearch only. Is there a similar frontend that can work
with spark?

Mohit.

P.S.: On MapR's spark FAQ I read a statement like "Kibana can use any
ODBC/JDBC backend and Shark has that interace"


Re: options set in spark-env.sh is not reflecting on actual execution

2014-06-20 Thread Andrew Or
Hi Meethu,

Are you using Spark 1.0? If so, you should use spark-submit (
http://spark.apache.org/docs/latest/submitting-applications.html), which
has --executor-memory. If you don't want to specify this every time you
submit an application, you can also specify spark.executor.memory in
$SPARK_HOME/conf/spark-defaults.conf (
http://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties
).

SPARK_WORKER_MEMORY is for the worker daemon, not your individual
application. A worker can launch many executors, and the value of
SPARK_WORKER_MEMORY is shared across all executors running on that worker.
SPARK_EXECUTOR_MEMORY is deprecated and replaced by
"spark.executor.memory". This is the value you should set.
SPARK_DAEMON_JAVA_OPTS should not be used for setting spark configs, but
instead is intended for java options for worker and master instances (not
for Spark applications). Similarly, you shouldn't be setting
SPARK_MASTER_OPTS or SPARK_WORKER_OPTS to configure your application.

The recommended way for setting spark.* configurations is to do it
programmatically by creating a new SparkConf, set these configurations in
the conf, and pass this conf to the SparkContext (see
http://spark.apache.org/docs/latest/configuration.html#spark-properties).

Andrew



2014-06-18 22:21 GMT-07:00 MEETHU MATHEW :

> Hi all,
>
> I have a doubt regarding the options in spark-env.sh. I set the following
> values in the file in master and 2 workers
>
> SPARK_WORKER_MEMORY=7g
> SPARK_EXECUTOR_MEMORY=6g
> SPARK_DAEMON_JAVA_OPTS+="- Dspark.akka.timeout=30
> -Dspark.akka.frameSize=1 -Dspark.blockManagerHeartBeatMs=80
> -Dspark.shuffle.spill=false
>
> But SPARK_EXECUTOR_MEMORY is showing 4g in web UI.Do I need to change it
> anywhere else to make it 4g and to reflect it in web UI.
>
> A warning is coming that blockManagerHeartBeatMs is exceeding 45 while
> executing a process even though I set it to 80.
>
> So I doubt whether it should be set  as SPARK_MASTER_OPTS
> or SPARK_WORKER_OPTS..
>
> Thanks & Regards,
> Meethu M
>


Re: Set the number/memory of workers under mesos

2014-06-20 Thread Shuo Xiang
Hi Mayur,
  Are you referring to overriding the default sc in sparkshell? Is there
any way to do that before running the shell?


On Fri, Jun 20, 2014 at 1:40 PM, Mayur Rustagi 
wrote:

> You should be able to configure in spark context in Spark shell.
> spark.cores.max & memory.
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Fri, Jun 20, 2014 at 4:30 PM, Shuo Xiang 
> wrote:
>
>> Hi, just wondering anybody knows how to set up the number of workers (and
>> the amount of memory) in mesos, while lauching spark-shell? I was trying to
>> edit conf/spark-env.sh and it looks like that the environment variables are
>> for YARN of standalone. Thanks!
>>
>>
>>
>>
>


Re: Parallel LogisticRegression?

2014-06-20 Thread Kyle Ellrott
I looks like I was running into
https://issues.apache.org/jira/browse/SPARK-2204
The issues went away when I changed to spark.mesos.coarse.

Kyle


On Fri, Jun 20, 2014 at 10:36 AM, Kyle Ellrott 
wrote:

> I've tried to parallelize the separate regressions using
> allResponses.toParArray.map( x=> do logistic regression against labels in x)
> But I start to see messages like
> 14/06/20 10:10:26 WARN scheduler.TaskSetManager: Lost TID 4193 (task
> 363.0:4)
> 14/06/20 10:10:27 WARN scheduler.TaskSetManager: Loss was due to fetch
> failure from null
> and finally
> 14/06/20 10:10:26 ERROR scheduler.TaskSetManager: Task 363.0:4 failed 4
> times; aborting job
>
> Then
> 14/06/20 10:10:26 ERROR scheduler.DAGSchedulerActorSupervisor:
> eventProcesserActor failed due to the error null; shutting down SparkContext
> 14/06/20 10:10:26 ERROR actor.OneForOneStrategy:
> java.lang.UnsupportedOperationException
> at
> org.apache.spark.scheduler.SchedulerBackend$class.killTask(SchedulerBackend.scala:32)
> at
> org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.killTask(MesosSchedulerBackend.scala:41)
>  at
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:185)
>
>
> This doesn't happen when I don't use toParArray. I read that spark was
> thread safe, but I seem to be running into problems. Am I doing something
> wrong?
>
> Kyle
>
>
>
> On Thu, Jun 19, 2014 at 11:21 AM, Kyle Ellrott 
> wrote:
>
>>
>> I'm working on a problem learning several different sets of responses
>> against the same set of training features. Right now I've written the
>> program to cycle through all of the different label sets, attached them to
>> the training data and run LogisticRegressionWithSGD on each of them. ie
>>
>> foreach curResponseSet in allResponses:
>>  currentRDD : RDD[LabeledPoints] = curResponseSet joined with
>> trainingData
>>  LogisticRegressionWithSGD.train(currentRDD)
>>
>>
>> Each of the different training runs are independent. It seems like I
>> should be parallelize them as well.
>> Is there a better way to do this?
>>
>>
>> Kyle
>>
>
>


Re: little confused about SPARK_JAVA_OPTS alternatives

2014-06-20 Thread Koert Kuipers
thanks for the detailed answer andrew. thats helpful.

i think the main thing thats bugging me is that there is no simple way for
an admin to always set something on the executors for a production
environment (an akka timeout comes to mind). yes i could use
spark-defaults  for that, although that means everything must be submitted
through spark-submit, which is fairly new and i am not sure how much we
will use that yet. i will look into that some more.


On Thu, Jun 19, 2014 at 6:56 PM, Koert Kuipers  wrote:

> for a jvm application its not very appealing to me to use spark submit
> my application uses hadoop, so i should use "hadoop jar", and my
> application uses spark, so it should use "spark-submit". if i add a piece
> of code that uses some other system there will be yet another suggested way
> to launch it. thats not very scalable, since i can only launch it one way
> in the end...
>
>
> On Thu, Jun 19, 2014 at 4:58 PM, Andrew Or  wrote:
>
>> Hi Koert and Lukasz,
>>
>> The recommended way of not hard-coding configurations in your application
>> is through conf/spark-defaults.conf as documented here:
>> http://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties.
>> However, this is only applicable to
>> spark-submit, so this may not be useful to you.
>>
>> Depending on how you launch your Spark applications, you can workaround
>> this by manually specifying these configs as -Dspark.x=y
>> in your java command to launch Spark. This is actually how
>> SPARK_JAVA_OPTS used to work before 1.0. Note that spark-submit does
>> essentially the same thing, but sets these properties programmatically by
>> reading from the conf/spark-defaults.conf file and calling
>> System.setProperty("spark.x", "y").
>>
>> Note that spark.executor.extraJavaOpts not intended for spark
>> configuration (see http://spark.apache.org/docs/latest/configuration.html
>> ).
>>  SPARK_DAEMON_JAVA_OPTS, as you pointed out, is for Spark daemons like
>> the standalone master, worker, and the history server;
>> it is also not intended for spark configurations to be picked up by Spark
>> executors and drivers. In general, any reference to "java opts"
>> in any variable or config refers to java options, as the name implies,
>> not Spark configuration. Unfortunately, it just so happened that we
>> used to mix the two in the same environment variable before 1.0.
>>
>> Is there a reason you're not using spark-submit? Is it for legacy
>> reasons? As of 1.0, most changes to launching Spark applications
>> will be done through spark-submit, so you may miss out on relevant new
>> features or bug fixes.
>>
>> Andrew
>>
>>
>>
>> 2014-06-19 7:41 GMT-07:00 Koert Kuipers :
>>
>> still struggling with SPARK_JAVA_OPTS being deprecated. i am using spark
>>> standalone.
>>>
>>> for example if i have a akka timeout setting that i would like to be
>>> applied to every piece of the spark framework (so spark master, spark
>>> workers, spark executor sub-processes, spark-shell, etc.). i used to do
>>> that with SPARK_JAVA_OPTS. now i am unsure.
>>>
>>> SPARK_DAEMON_JAVA_OPTS works for the master and workers, but not for the
>>> spark-shell i think? i tried using SPARK_DAEMON_JAVA_OPTS, and it does not
>>> seem that useful. for example for a worker it does not apply the settings
>>> to the executor sub-processes, while for SPARK_JAVA_OPTS it does do that.
>>> so seems like SPARK_JAVA_OPTS is my only way to change settings for the
>>> executors, yet its deprecated?
>>>
>>>
>>> On Wed, Jun 11, 2014 at 10:59 PM, elyast 
>>> wrote:
>>>
 Hi,

 I tried to use SPARK_JAVA_OPTS in spark-env.sh as well as conf/java-opts
 file to set additional java system properties. In this case I could
 connect
 to tachyon without any problem.

 However when I tried setting executor and driver extraJavaOptions in
 spark-defaults.conf it doesn't.

 I suspect the root cause may be following:

 SparkSubmit doesn't fork additional JVM to actually run either driver or
 executor process and additional system properties are set after JVM is
 created and other classes are loaded. It may happen that Tachyon
 CommonConf
 class is already being loaded and since its Singleton it won't pick up
 and
 changes to system properties.

 Please let me know what do u think.

 Can I use conf/java-opts ? since it's not really documented anywhere?

 Best regards
 Lukasz



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/little-confused-about-SPARK-JAVA-OPTS-alternatives-tp5798p7448.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

>>>
>>>
>>
>


Re: Possible approaches for adding extra metadata (Spark Streaming)?

2014-06-20 Thread Shrikar archak
Thanks Mayur and TD for your inputs.

~Shrikar


On Fri, Jun 20, 2014 at 1:20 PM, Tathagata Das 
wrote:

> If the metadata is directly related to each individual records, then it
> can be done either ways. Since I am not sure how easy or hard will it be
> for you add tags before putting the data into spark streaming, its hard to
> recommend one method over the other.
>
> However, if the metadata is related to each key (based on which you are
> called updateStateByKey) and not every record, then it may be more
> efficient to maintain that per-key metadata in the updateStateByKey's state
> object.
>
> Regarding doing http calls, I would be a bit cautious about performance.
> Doing a http call for every records it going to be quite expensive, and
> reduce throughput significantly. If it is possible, cache values as much as
> possible to amortize the cost of http calls.
>
> TD
>
>
>
>
>
> On Fri, Jun 20, 2014 at 11:16 AM, Shrikar archak 
> wrote:
>
>> Hi All,
>>
>> I was curious to know which of the two approach is better for doing
>> analytics using spark streaming. Lets say we want to add some metadata to
>> the stream which is being processed like sentiment, tags etc and then
>> perform some analytics using these added metadata.
>>
>> 1)  Is it ok to make a http call and add some extra information to the
>> stream being processed in the updateByKeyAndWindow operations.
>>
>> 2) Add these sentiment/tags before and then stream through DStreams.
>>
>> Thanks,
>> Shrikar
>>
>>
>


Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space

2014-06-20 Thread Eugen Cepoi
In short, ADD_JARS will add the jar to your driver classpath and also send
it to the workers (similar to what you are doing when you do sc.addJars).

ex: MASTER=master/url ADD_JARS=/path/to/myJob.jar ./bin/spark-shell


You also have SPARK_CLASSPATH var but it does not distribute the code, it
is only used to compute the driver classpath.


BTW, you are not supposed to change the compute_classpath.script


2014-06-20 19:45 GMT+02:00 Shivani Rao :

> Hello Eugene,
>
> You are right about this. I did encounter the "pergmgenspace" in the spark
> shell. Can you tell me a little more about "ADD_JARS". In order to ensure
> my spark_shell has all required jars, I added the jars to the "$CLASSPATH"
> in the compute_classpath.sh script. is there another way of doing it?
>
> Shivani
>
>
> On Fri, Jun 20, 2014 at 9:47 AM, Eugen Cepoi 
> wrote:
>
>> In my case it was due to a case class I was defining in the spark-shell
>> and not being available on the workers. So packaging it in a jar and adding
>> it with ADD_JARS solved the problem. Note that I don't exactly remember if
>> it was an out of heap space exception or pergmen space. Make sure your
>> jarsPath is correct.
>>
>> Usually to debug this kind of problems I am using the spark-shell (you
>> can do the same in your job but its more time constuming to repackage,
>> deploy, run, iterate). Try for example
>> 1) read the lines (without any processing) and count them
>> 2) apply processing and count
>>
>>
>>
>> 2014-06-20 17:15 GMT+02:00 Shivani Rao :
>>
>> Hello Abhi, I did try that and it did not work
>>>
>>> And Eugene, Yes I am assembling the argonaut libraries in the fat jar.
>>> So how did you overcome this problem?
>>>
>>> Shivani
>>>
>>>
>>> On Fri, Jun 20, 2014 at 1:59 AM, Eugen Cepoi 
>>> wrote:
>>>

 Le 20 juin 2014 01:46, "Shivani Rao"  a écrit :

 >
 > Hello Andrew,
 >
 > i wish I could share the code, but for proprietary reasons I can't.
 But I can give some idea though of what i am trying to do. The job reads a
 file and for each line of that file and processors these lines. I am not
 doing anything intense in the "processLogs" function
 >
 > import argonaut._
 > import argonaut.Argonaut._
 >
 >
 > /* all of these case classes are created from json strings extracted
 from the line in the processLogs() function
 > *
 > */
 > case class struct1…
 > case class struct2…
 > case class value1(struct1, struct2)
 >
 > def processLogs(line:String): Option[(key1, value1)] {…
 > }
 >
 > def run(sparkMaster, appName, executorMemory, jarsPath) {
 >   val sparkConf = new SparkConf()
 >sparkConf.setMaster(sparkMaster)
 >sparkConf.setAppName(appName)
 >sparkConf.set("spark.executor.memory", executorMemory)
 > sparkConf.setJars(jarsPath) // This includes all the jars
 relevant jars..
 >val sc = new SparkContext(sparkConf)
 >   val rawLogs =
 sc.textFile("hdfs://>>> >
 rawLogs.saveAsTextFile("hdfs://>>> >
 rawLogs.flatMap(processLogs).saveAsTextFile("hdfs://>>> > }
 >
 > If I switch to "local" mode, the code runs just fine, it fails with
 the error I pasted above. In the cluster mode, even writing back the file
 we just read fails
 (rawLogs.saveAsTextFile("hdfs://>>> >
 > I still believe this is a classNotFound error in disguise
 >

 Indeed you are right, this can be the reason. I had similar errors when
 defining case classes in the shell and trying to use them in the RDDs. Are
 you shading argonaut in the fat jar ?

 > Thanks
 > Shivani
 >
 >
 >
 > On Wed, Jun 18, 2014 at 2:49 PM, Andrew Ash 
 wrote:
 >>
 >> Wait, so the file only has four lines and the job running out of
 heap space?  Can you share the code you're running that does the
 processing?  I'd guess that you're doing some intense processing on every
 line but just writing parsed case classes back to disk sounds very
 lightweight.
 >>
 >> I
 >>
 >>
 >> On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao 
 wrote:
 >>>
 >>> I am trying to process a file that contains 4 log lines (not very
 long) and then write my parsed out case classes to a destination folder,
 and I get the following error:
 >>>
 >>>
 >>> java.lang.OutOfMemoryError: Java heap space
 >>>
 >>> at
 org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)
 >>>
 >>> at
 org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244)
 >>>
 >>> at
 org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)
 >>>
 >>> at
 org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)
 >>>
 >>> at
 org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
 >>>
 >>> at sun.reflect.NativeMethodAcc

Re: Set the number/memory of workers under mesos

2014-06-20 Thread Mayur Rustagi
You should be able to configure in spark context in Spark shell.
spark.cores.max & memory.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Fri, Jun 20, 2014 at 4:30 PM, Shuo Xiang  wrote:

> Hi, just wondering anybody knows how to set up the number of workers (and
> the amount of memory) in mesos, while lauching spark-shell? I was trying to
> edit conf/spark-env.sh and it looks like that the environment variables are
> for YARN of standalone. Thanks!
>
>
>
>


Set the number/memory of workers under mesos

2014-06-20 Thread Shuo Xiang
Hi, just wondering anybody knows how to set up the number of workers (and
the amount of memory) in mesos, while lauching spark-shell? I was trying to
edit conf/spark-env.sh and it looks like that the environment variables are
for YARN of standalone. Thanks!


Re: Spark and RDF

2014-06-20 Thread andy petrella
yep, would be cool. Even though sparql has its drawbacks (vs cypher vs
gremlin I mean), however still cool for semantic thingies and c°.

  aℕdy ℙetrella
about.me/noootsab
[image: aℕdy ℙetrella on about.me]




On Fri, Jun 20, 2014 at 10:03 PM, Mayur Rustagi 
wrote:

> or a seperate RDD for sparql operations ala SchemaRDD .. operators for
> sparql can be defined thr.. not a bad idea :)
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Fri, Jun 20, 2014 at 3:56 PM, andy petrella 
> wrote:
>
>> Maybe some SPARQL features in Shark, then ?
>>
>>  aℕdy ℙetrella
>> about.me/noootsab
>> [image: aℕdy ℙetrella on about.me]
>>
>> 
>>
>>
>> On Fri, Jun 20, 2014 at 9:45 PM, Mayur Rustagi 
>> wrote:
>>
>>> You are looking to create Shark operators for RDF? Since Shark backend
>>> is shifting to SparkSQL it would be slightly hard but much better effort
>>> would be to shift Gremlin to Spark (though a much beefier one :) )
>>>
>>> Mayur Rustagi
>>> Ph: +1 (760) 203 3257
>>> http://www.sigmoidanalytics.com
>>> @mayur_rustagi 
>>>
>>>
>>>
>>> On Fri, Jun 20, 2014 at 3:39 PM, andy petrella 
>>> wrote:
>>>
 For RDF, may GraphX be particularly approriated?

  aℕdy ℙetrella
 about.me/noootsab
 [image: aℕdy ℙetrella on about.me]

 


 On Thu, Jun 19, 2014 at 4:49 PM, Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Hi guys,
>
> I'm analyzing the possibility to use Spark to analyze RDF files and
> define reusable Shark operators on them (custom filtering, transforming,
> aggregating, etc). Is that possible? Any hint?
>
> Best,
> Flavio
>


>>>
>>
>


Re: Possible approaches for adding extra metadata (Spark Streaming)?

2014-06-20 Thread Tathagata Das
If the metadata is directly related to each individual records, then it can
be done either ways. Since I am not sure how easy or hard will it be for
you add tags before putting the data into spark streaming, its hard to
recommend one method over the other.

However, if the metadata is related to each key (based on which you are
called updateStateByKey) and not every record, then it may be more
efficient to maintain that per-key metadata in the updateStateByKey's state
object.

Regarding doing http calls, I would be a bit cautious about performance.
Doing a http call for every records it going to be quite expensive, and
reduce throughput significantly. If it is possible, cache values as much as
possible to amortize the cost of http calls.

TD





On Fri, Jun 20, 2014 at 11:16 AM, Shrikar archak 
wrote:

> Hi All,
>
> I was curious to know which of the two approach is better for doing
> analytics using spark streaming. Lets say we want to add some metadata to
> the stream which is being processed like sentiment, tags etc and then
> perform some analytics using these added metadata.
>
> 1)  Is it ok to make a http call and add some extra information to the
> stream being processed in the updateByKeyAndWindow operations.
>
> 2) Add these sentiment/tags before and then stream through DStreams.
>
> Thanks,
> Shrikar
>
>


Re: Running Spark alongside Hadoop

2014-06-20 Thread Koert Kuipers
for development/testing i think its fine to run them side by side as you
suggested, using spark standalone. just be realistic about what size data
you can load with limited RAM.


On Fri, Jun 20, 2014 at 3:43 PM, Mayur Rustagi 
wrote:

> The ideal way to do that is to use a cluster manager like Yarn & mesos.
> You can control how much resources to give to which node etc.
> You should be able to run both together in standalone mode however you may
> experience varying latency & performance in the cluster as both MR & spark
> demand resources from same machines etc.
>
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Fri, Jun 20, 2014 at 3:41 PM, Sameer Tilak  wrote:
>
>> Dear Spark users,
>>
>> I have a small 4 node Hadoop cluster. Each node is a VM -- 4 virtual
>> cores, 8GB memory and 500GB disk. I am currently running Hadoop on it. I
>> would like to run Spark (in standalone mode) along side Hadoop on the same
>> nodes. Given the configuration of my nodes, will that work? Does anyone has
>> any experience in terms of stability and performance of running Spark and
>> Hadoop on somewhat resource-constrained nodes.  I was looking at the Spark
>> documentation and there is a way to configure memory and cores for the and
>> worker nodes and memory for the master node: SPARK_WORKER_CORES,
>> SPARK_WORKER_MEMORY, SPARK_DAEMON_MEMORY. Any recommendations on how to
>> share resource between HAdoop and Spark?
>>
>>
>>
>>
>


Re: Spark and RDF

2014-06-20 Thread Mayur Rustagi
or a seperate RDD for sparql operations ala SchemaRDD .. operators for
sparql can be defined thr.. not a bad idea :)

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Fri, Jun 20, 2014 at 3:56 PM, andy petrella 
wrote:

> Maybe some SPARQL features in Shark, then ?
>
>  aℕdy ℙetrella
> about.me/noootsab
> [image: aℕdy ℙetrella on about.me]
>
> 
>
>
> On Fri, Jun 20, 2014 at 9:45 PM, Mayur Rustagi 
> wrote:
>
>> You are looking to create Shark operators for RDF? Since Shark backend is
>> shifting to SparkSQL it would be slightly hard but much better effort would
>> be to shift Gremlin to Spark (though a much beefier one :) )
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi 
>>
>>
>>
>> On Fri, Jun 20, 2014 at 3:39 PM, andy petrella 
>> wrote:
>>
>>> For RDF, may GraphX be particularly approriated?
>>>
>>>  aℕdy ℙetrella
>>> about.me/noootsab
>>> [image: aℕdy ℙetrella on about.me]
>>>
>>> 
>>>
>>>
>>> On Thu, Jun 19, 2014 at 4:49 PM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 Hi guys,

 I'm analyzing the possibility to use Spark to analyze RDF files and
 define reusable Shark operators on them (custom filtering, transforming,
 aggregating, etc). Is that possible? Any hint?

 Best,
 Flavio

>>>
>>>
>>
>


Re: Spark and RDF

2014-06-20 Thread andy petrella
Maybe some SPARQL features in Shark, then ?

 aℕdy ℙetrella
about.me/noootsab
[image: aℕdy ℙetrella on about.me]




On Fri, Jun 20, 2014 at 9:45 PM, Mayur Rustagi 
wrote:

> You are looking to create Shark operators for RDF? Since Shark backend is
> shifting to SparkSQL it would be slightly hard but much better effort would
> be to shift Gremlin to Spark (though a much beefier one :) )
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Fri, Jun 20, 2014 at 3:39 PM, andy petrella 
> wrote:
>
>> For RDF, may GraphX be particularly approriated?
>>
>>  aℕdy ℙetrella
>> about.me/noootsab
>> [image: aℕdy ℙetrella on about.me]
>>
>> 
>>
>>
>> On Thu, Jun 19, 2014 at 4:49 PM, Flavio Pompermaier > > wrote:
>>
>>> Hi guys,
>>>
>>> I'm analyzing the possibility to use Spark to analyze RDF files and
>>> define reusable Shark operators on them (custom filtering, transforming,
>>> aggregating, etc). Is that possible? Any hint?
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>


Re: Spark and RDF

2014-06-20 Thread Mayur Rustagi
You are looking to create Shark operators for RDF? Since Shark backend is
shifting to SparkSQL it would be slightly hard but much better effort would
be to shift Gremlin to Spark (though a much beefier one :) )

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Fri, Jun 20, 2014 at 3:39 PM, andy petrella 
wrote:

> For RDF, may GraphX be particularly approriated?
>
>  aℕdy ℙetrella
> about.me/noootsab
> [image: aℕdy ℙetrella on about.me]
>
> 
>
>
> On Thu, Jun 19, 2014 at 4:49 PM, Flavio Pompermaier 
> wrote:
>
>> Hi guys,
>>
>> I'm analyzing the possibility to use Spark to analyze RDF files and
>> define reusable Shark operators on them (custom filtering, transforming,
>> aggregating, etc). Is that possible? Any hint?
>>
>> Best,
>> Flavio
>>
>
>


Re: Running Spark alongside Hadoop

2014-06-20 Thread Mayur Rustagi
The ideal way to do that is to use a cluster manager like Yarn & mesos. You
can control how much resources to give to which node etc.
You should be able to run both together in standalone mode however you may
experience varying latency & performance in the cluster as both MR & spark
demand resources from same machines etc.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Fri, Jun 20, 2014 at 3:41 PM, Sameer Tilak  wrote:

> Dear Spark users,
>
> I have a small 4 node Hadoop cluster. Each node is a VM -- 4 virtual
> cores, 8GB memory and 500GB disk. I am currently running Hadoop on it. I
> would like to run Spark (in standalone mode) along side Hadoop on the same
> nodes. Given the configuration of my nodes, will that work? Does anyone has
> any experience in terms of stability and performance of running Spark and
> Hadoop on somewhat resource-constrained nodes.  I was looking at the Spark
> documentation and there is a way to configure memory and cores for the and
> worker nodes and memory for the master node: SPARK_WORKER_CORES,
> SPARK_WORKER_MEMORY, SPARK_DAEMON_MEMORY. Any recommendations on how to
> share resource between HAdoop and Spark?
>
>
>
>


Running Spark alongside Hadoop

2014-06-20 Thread Sameer Tilak
Dear Spark users,
I have a small 4 node Hadoop cluster. Each node is a VM -- 4 virtual cores, 8GB 
memory and 500GB disk. I am currently running Hadoop on it. I would like to run 
Spark (in standalone mode) along side Hadoop on the same nodes. Given the 
configuration of my nodes, will that work? Does anyone has any experience in 
terms of stability and performance of running Spark and Hadoop on somewhat 
resource-constrained nodes.  I was looking at the Spark documentation and there 
is a way to configure memory and cores for the and worker nodes and memory for 
the master node: SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_DAEMON_MEMORY. 
Any recommendations on how to share resource between HAdoop and Spark?


  

Re: Spark and RDF

2014-06-20 Thread andy petrella
For RDF, may GraphX be particularly approriated?

 aℕdy ℙetrella
about.me/noootsab
[image: aℕdy ℙetrella on about.me]




On Thu, Jun 19, 2014 at 4:49 PM, Flavio Pompermaier 
wrote:

> Hi guys,
>
> I'm analyzing the possibility to use Spark to analyze RDF files and define
> reusable Shark operators on them (custom filtering, transforming,
> aggregating, etc). Is that possible? Any hint?
>
> Best,
> Flavio
>


Re: Possible approaches for adding extra metadata (Spark Streaming)?

2014-06-20 Thread Mayur Rustagi
You can apply transformations on RDD's inside Dstreams using transform or
any number of operations.

Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Fri, Jun 20, 2014 at 2:16 PM, Shrikar archak  wrote:

> Hi All,
>
> I was curious to know which of the two approach is better for doing
> analytics using spark streaming. Lets say we want to add some metadata to
> the stream which is being processed like sentiment, tags etc and then
> perform some analytics using these added metadata.
>
> 1)  Is it ok to make a http call and add some extra information to the
> stream being processed in the updateByKeyAndWindow operations.
>
> 2) Add these sentiment/tags before and then stream through DStreams.
>
> Thanks,
> Shrikar
>
>


Re: spark on yarn is trying to use file:// instead of hdfs://

2014-06-20 Thread Koert Kuipers
ok solved it. as it happened in spark/conf i also had a file called
core.site.xml (with some tachyone related stuff in it) so thats why it
ignored /etc/hadoop/conf/core-site.xml




On Fri, Jun 20, 2014 at 3:24 PM, Koert Kuipers  wrote:

> i put some logging statements in yarn.Client and that confirms its using
> local filesystem:
> 14/06/20 15:20:33 INFO Client: fs.defaultFS is file:///
>
> so somehow fs.defaultFS is not being picked up from
> /etc/hadoop/conf/core-site.xml, but spark does correctly pick up
> yarn.resourcemanager.hostname from /etc/hadoop/conf/yarn-site.xml
>
> strange!
>
>
> On Fri, Jun 20, 2014 at 1:26 PM, Koert Kuipers  wrote:
>
>> in /etc/hadoop/conf/core-site.xml:
>>   
>> fs.defaultFS
>> hdfs://cdh5-yarn.tresata.com:8020
>>   
>>
>>
>> also hdfs seems the default:
>> [koert@cdh5-yarn ~]$ hadoop fs -ls /
>> Found 5 items
>> drwxr-xr-x   - hdfs supergroup  0 2014-06-19 12:31 /data
>> drwxrwxrwt   - hdfs supergroup  0 2014-06-20 12:17 /lib
>> drwxrwxrwt   - hdfs supergroup  0 2014-06-18 14:58 /tmp
>> drwxr-xr-x   - hdfs supergroup  0 2014-06-18 15:02 /user
>> drwxr-xr-x   - hdfs supergroup  0 2014-06-18 14:59 /var
>>
>> and in my spark-site.env:
>> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>
>>
>>
>> On Fri, Jun 20, 2014 at 1:04 PM, bc Wong  wrote:
>>
>>> Koert, is there any chance that your fs.defaultFS isn't setup right?
>>>
>>>
>>> On Fri, Jun 20, 2014 at 9:57 AM, Koert Kuipers 
>>> wrote:
>>>
  yeah sure see below. i strongly suspect its something i misconfigured
 causing yarn to try to use local filesystem mistakenly.

 *

 [koert@cdh5-yarn ~]$ /usr/local/lib/spark/bin/spark-submit --class
 org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 3
 --executor-cores 1
 hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar 10
 14/06/20 12:54:40 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 14/06/20 12:54:40 INFO RMProxy: Connecting to ResourceManager at
 cdh5-yarn.tresata.com/192.168.1.85:8032
 14/06/20 12:54:41 INFO Client: Got Cluster metric info from
 ApplicationsManager (ASM), number of NodeManagers: 1
 14/06/20 12:54:41 INFO Client: Queue info ... queueName: root.default,
 queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0,
   queueApplicationCount = 0, queueChildQueueCount = 0
 14/06/20 12:54:41 INFO Client: Max mem capabililty of a single resource
 in this cluster 8192
 14/06/20 12:54:41 INFO Client: Preparing Local resources
 14/06/20 12:54:41 WARN BlockReaderLocal: The short-circuit local reads
 feature cannot be used because libhadoop cannot be loaded.
 14/06/20 12:54:41 INFO Client: Uploading
 hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar to
 file:/home/koert/.sparkStaging/application_1403201750110_0060/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar
 14/06/20 12:54:43 INFO Client: Setting up the launch environment
 14/06/20 12:54:43 INFO Client: Setting up container launch context
 14/06/20 12:54:43 INFO Client: Command for starting the Spark
 ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx512m,
 -Djava.io.tmpdir=$PWD/tmp, -Dspark.akka.retry.wait=\"3\",
 -Dspark.storage.blockManagerTimeoutIntervalMs=\"12\",
 -Dspark.storage.blockManagerHeartBeatMs=\"12\", 
 -Dspark.app.name=\"org.apache.spark.examples.SparkPi\",
 -Dspark.akka.frameSize=\"1\", -Dspark.akka.timeout=\"3\",
 -Dspark.worker.timeout=\"3\",
 -Dspark.akka.logLifecycleEvents=\"true\",
 -Dlog4j.configuration=log4j-spark-container.properties,
 org.apache.spark.deploy.yarn.ApplicationMaster, --class,
 org.apache.spark.examples.SparkPi, --jar ,
 hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar,
 --args  '10' , --executor-memory, 1024, --executor-cores, 1,
 --num-executors , 3, 1>, /stdout, 2>, /stderr)
 14/06/20 12:54:43 INFO Client: Submitting application to ASM
 14/06/20 12:54:43 INFO YarnClientImpl: Submitted application
 application_1403201750110_0060
 14/06/20 12:54:44 INFO Client: Application report from ASM:
  application identifier: application_1403201750110_0060
  appId: 60
  clientToAMToken: null
  appDiagnostics:
  appMasterHost: N/A
  appQueue: root.koert
  appMasterRpcPort: -1
  appStartTime: 1403283283505
  yarnAppState: ACCEPTED
  distributedFinalState: UNDEFINED
  appTrackingUrl:
 http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/
  appUser: koert
 14/06/20 12:54:45 INFO Client: Application report from ASM:
  application identifier: application_1403201750110_0060
  appId: 60
  clientToAMToken: null
  appDiagnostics:

Re: spark on yarn is trying to use file:// instead of hdfs://

2014-06-20 Thread Koert Kuipers
i put some logging statements in yarn.Client and that confirms its using
local filesystem:
14/06/20 15:20:33 INFO Client: fs.defaultFS is file:///

so somehow fs.defaultFS is not being picked up from
/etc/hadoop/conf/core-site.xml, but spark does correctly pick up
yarn.resourcemanager.hostname from /etc/hadoop/conf/yarn-site.xml

strange!


On Fri, Jun 20, 2014 at 1:26 PM, Koert Kuipers  wrote:

> in /etc/hadoop/conf/core-site.xml:
>   
> fs.defaultFS
> hdfs://cdh5-yarn.tresata.com:8020
>   
>
>
> also hdfs seems the default:
> [koert@cdh5-yarn ~]$ hadoop fs -ls /
> Found 5 items
> drwxr-xr-x   - hdfs supergroup  0 2014-06-19 12:31 /data
> drwxrwxrwt   - hdfs supergroup  0 2014-06-20 12:17 /lib
> drwxrwxrwt   - hdfs supergroup  0 2014-06-18 14:58 /tmp
> drwxr-xr-x   - hdfs supergroup  0 2014-06-18 15:02 /user
> drwxr-xr-x   - hdfs supergroup  0 2014-06-18 14:59 /var
>
> and in my spark-site.env:
> export HADOOP_CONF_DIR=/etc/hadoop/conf
>
>
>
> On Fri, Jun 20, 2014 at 1:04 PM, bc Wong  wrote:
>
>> Koert, is there any chance that your fs.defaultFS isn't setup right?
>>
>>
>> On Fri, Jun 20, 2014 at 9:57 AM, Koert Kuipers  wrote:
>>
>>>  yeah sure see below. i strongly suspect its something i misconfigured
>>> causing yarn to try to use local filesystem mistakenly.
>>>
>>> *
>>>
>>> [koert@cdh5-yarn ~]$ /usr/local/lib/spark/bin/spark-submit --class
>>> org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 3
>>> --executor-cores 1
>>> hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar 10
>>> 14/06/20 12:54:40 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>> 14/06/20 12:54:40 INFO RMProxy: Connecting to ResourceManager at
>>> cdh5-yarn.tresata.com/192.168.1.85:8032
>>> 14/06/20 12:54:41 INFO Client: Got Cluster metric info from
>>> ApplicationsManager (ASM), number of NodeManagers: 1
>>> 14/06/20 12:54:41 INFO Client: Queue info ... queueName: root.default,
>>> queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0,
>>>   queueApplicationCount = 0, queueChildQueueCount = 0
>>> 14/06/20 12:54:41 INFO Client: Max mem capabililty of a single resource
>>> in this cluster 8192
>>> 14/06/20 12:54:41 INFO Client: Preparing Local resources
>>> 14/06/20 12:54:41 WARN BlockReaderLocal: The short-circuit local reads
>>> feature cannot be used because libhadoop cannot be loaded.
>>> 14/06/20 12:54:41 INFO Client: Uploading
>>> hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar to
>>> file:/home/koert/.sparkStaging/application_1403201750110_0060/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar
>>> 14/06/20 12:54:43 INFO Client: Setting up the launch environment
>>> 14/06/20 12:54:43 INFO Client: Setting up container launch context
>>> 14/06/20 12:54:43 INFO Client: Command for starting the Spark
>>> ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx512m,
>>> -Djava.io.tmpdir=$PWD/tmp, -Dspark.akka.retry.wait=\"3\",
>>> -Dspark.storage.blockManagerTimeoutIntervalMs=\"12\",
>>> -Dspark.storage.blockManagerHeartBeatMs=\"12\", 
>>> -Dspark.app.name=\"org.apache.spark.examples.SparkPi\",
>>> -Dspark.akka.frameSize=\"1\", -Dspark.akka.timeout=\"3\",
>>> -Dspark.worker.timeout=\"3\",
>>> -Dspark.akka.logLifecycleEvents=\"true\",
>>> -Dlog4j.configuration=log4j-spark-container.properties,
>>> org.apache.spark.deploy.yarn.ApplicationMaster, --class,
>>> org.apache.spark.examples.SparkPi, --jar ,
>>> hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar,
>>> --args  '10' , --executor-memory, 1024, --executor-cores, 1,
>>> --num-executors , 3, 1>, /stdout, 2>, /stderr)
>>> 14/06/20 12:54:43 INFO Client: Submitting application to ASM
>>> 14/06/20 12:54:43 INFO YarnClientImpl: Submitted application
>>> application_1403201750110_0060
>>> 14/06/20 12:54:44 INFO Client: Application report from ASM:
>>>  application identifier: application_1403201750110_0060
>>>  appId: 60
>>>  clientToAMToken: null
>>>  appDiagnostics:
>>>  appMasterHost: N/A
>>>  appQueue: root.koert
>>>  appMasterRpcPort: -1
>>>  appStartTime: 1403283283505
>>>  yarnAppState: ACCEPTED
>>>  distributedFinalState: UNDEFINED
>>>  appTrackingUrl:
>>> http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/
>>>  appUser: koert
>>> 14/06/20 12:54:45 INFO Client: Application report from ASM:
>>>  application identifier: application_1403201750110_0060
>>>  appId: 60
>>>  clientToAMToken: null
>>>  appDiagnostics:
>>>  appMasterHost: N/A
>>>  appQueue: root.koert
>>>  appMasterRpcPort: -1
>>>  appStartTime: 1403283283505
>>>  yarnAppState: ACCEPTED
>>>  distributedFinalState: UNDEFINED
>>>  appTrackingUrl:
>>> http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/
>>>  appUser: koert
>>> 14/06/20 12:54:46 INFO Clie

Re: Problems running Spark job on mesos in fine-grained mode

2014-06-20 Thread Sébastien Rainville
Hi,

this is just a follow-up regarding this issue. Turns out that it's caused
by a bug in Spark. I created a case for it:
https://issues.apache.org/jira/browse/SPARK-2204 and submitted a patch.

Any chance this could be included in the 1.0.1 release?

Thanks,

- Sebastien



On Tue, Jun 17, 2014 at 2:57 PM, Sébastien Rainville <
sebastienrainvi...@gmail.com> wrote:

> Hi,
>
> I'm having trouble running spark on mesos in fine-grained mode. I'm
> running spark 1.0.0 and mesos 0.18.0. The tasks are failing randomly, which
> most of the time, but not always, cause the job to fail. The same code is
> running fine in coarse-grained mode. I see the following exceptions in the
> logs of the spark driver:
>
> W0617 10:57:36.774382  8735 sched.cpp:901] Attempting to launch task 21
> with an unknown offer 20140416-011500-1369465866-5050-26096-52332715
> W0617 10:57:36.774433  8735 sched.cpp:901] Attempting to launch task 22
> with an unknown offer 20140416-011500-1369465866-5050-26096-52332715
> 14/06/17 10:57:36 INFO TaskSetManager: Re-queueing tasks for
> 201311011608-1369465866-5050-9189-46 from TaskSet 0.0
> 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 22 (task 0.0:2)
> 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 19 (task 0.0:0)
> 14/06/17 10:57:36 WARN TaskSetManager: Lost TID 21 (task 0.0:1)
> 14/06/17 10:57:36 INFO DAGScheduler: Executor lost:
> 201311011608-1369465866-5050-9189-46 (epoch 0)
> 14/06/17 10:57:36 INFO BlockManagerMasterActor: Trying to remove executor
> 201311011608-1369465866-5050-9189-46 from BlockManagerMaster.
> 14/06/17 10:57:36 INFO BlockManagerMaster: Removed
> 201311011608-1369465866-5050-9189-46 successfully in removeExecutor
> 14/06/17 10:57:36 DEBUG MapOutputTrackerMaster: Increasing epoch to 1
> 14/06/17 10:57:36 INFO DAGScheduler: Host added was in lost list earlier:
> ca1-dcc1-0065.lab.mtl
>
> I don't see any exceptions in the spark executor logs. The only error
> message I found in mesos itself is warnings in the mesos master:
>
> W0617 10:57:36.816748 26100 master.cpp:1615] Failed to validate task 21 :
> Task 21 attempted to use cpus(*):1 combined with already used cpus(*):1;
> mem(*):2048 is greater than offered mem(*):3216; disk(*):98304;
> ports(*):[11900-11919, 1192
> 1-11995, 11997-11999]; cpus(*):1
> W0617 10:57:36.819807 26100 master.cpp:1615] Failed to validate task 22 :
> Task 22 attempted to use cpus(*):1 combined with already used cpus(*):1;
> mem(*):2048 is greater than offered mem(*):3216; disk(*):98304;
> ports(*):[11900-11919, 1192
> 1-11995, 11997-11999]; cpus(*):1
> W0617 10:57:36.932287 26102 master.cpp:1615] Failed to validate task 28 :
> Task 28 attempted to use cpus(*):1 combined with already used cpus(*):1;
> mem(*):2048 is greater than offered cpus(*):1; mem(*):3216; disk(*):98304;
> ports(*):[11900-
> 11960, 11962-11978, 11980-11999]
> W0617 11:05:52.783133 26098 master.cpp:2106] Ignoring unknown exited
> executor 201311011608-1369465866-5050-9189-46 on slave
> 201311011608-1369465866-5050-9189-46 (ca1-dcc1-0065.lab.mtl)
> W0617 11:05:52.787739 26103 master.cpp:2106] Ignoring unknown exited
> executor 201311011608-1369465866-5050-9189-34 on slave
> 201311011608-1369465866-5050-9189-34 (ca1-dcc1-0053.lab.mtl)
> W0617 11:05:52.790292 26102 master.cpp:2106] Ignoring unknown exited
> executor 201311011608-1369465866-5050-9189-59 on slave
> 201311011608-1369465866-5050-9189-59 (ca1-dcc1-0079.lab.mtl)
> W0617 11:05:52.800649 26099 master.cpp:2106] Ignoring unknown exited
> executor 201311011608-1369465866-5050-9189-18 on slave
> 201311011608-1369465866-5050-9189-18 (ca1-dcc1-0027.lab.mtl)
> ... (more of those "Ignoring unknown exited executor")
>
>
> I analyzed the difference in between the execution of the same job in
> coarse-grained mode and fine-grained mode, and I noticed that in the
> fine-grained mode the tasks get executed on executors different than the
> ones reported in spark, as if spark and mesos get out of sync as to which
> executor is responsible for which task. See the following:
>
>
> Coarse-grained mode:
>
>  Spark Mesos Task IndexTask ID ExecutorStatusTask ID (UI)Task Name Task
> ID (logs)ExecutorState 0066SUCCESS 4"Task 4"0 66RUNNING1 159SUCCESS0 "Task
> 0"159 RUNNING22 54SUCCESS10"Task 10" 254RUNNING 33128 SUCCESS6"Task 6" 3
> 128RUNNING ...
>
> Fine-grained mode:
>
>  Spark Mesos Task IndexTask ID ExecutorTask ID (UI)Task NameTask ID (logs)
> ExecutorState0 23108SUCCESS 23"task 0.0:0"23 27FINISHED0 1965FAILED19 "task
> 0.0:0"1986 FINISHED1 2165FAILED Mesos executor was never created124 92
> SUCCESS24"task 0.0:1" 24129FINISHED 22265 FAILEDMesos executor was never
> created 225100SUCCESS 25"task 0.0:2" 2584FINISHED 32680SUCCESS 26"task
> 0.0:3"26 124FINISHED 42765FAILED 27"task 0.0:4"27 108FINISHED 42992SUCCESS
> 29"task 0.0:4"29 65FINISHED 52865FAILED Mesos executor was never created5
> 3077SUCCESS30 "task 0.0:5"3062 FINISHED6 053SUCCESS0 "task 0.0:6"041
> FINISHED7 177SUCCESS1 "task 0.0:7"1114 FINISHED...
>
>
> Is it normal that th

Possible approaches for adding extra metadata (Spark Streaming)?

2014-06-20 Thread Shrikar archak
Hi All,

I was curious to know which of the two approach is better for doing
analytics using spark streaming. Lets say we want to add some metadata to
the stream which is being processed like sentiment, tags etc and then
perform some analytics using these added metadata.

1)  Is it ok to make a http call and add some extra information to the
stream being processed in the updateByKeyAndWindow operations.

2) Add these sentiment/tags before and then stream through DStreams.

Thanks,
Shrikar


Re: parallel Reduce within a key

2014-06-20 Thread Michael Malak
How about a treeReduceByKey? :-)


On Friday, June 20, 2014 11:55 AM, DB Tsai  wrote:
 


Currently, the reduce operation combines the result from mapper
sequentially, so it's O(n).

Xiangrui is working on treeReduce which is O(log(n)). Based on the
benchmark, it dramatically increase the performance. You can test the
code in his own branch.
https://github.com/apache/spark/pull/1110

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai



On Fri, Jun 20, 2014 at 6:57 AM, ansriniv  wrote:
> Hi,
>
> I am on Spark 0.9.0
>
> I have a 2 node cluster (2 worker nodes) with 16 cores on each node (so, 32
> cores in the cluster).
> I have an input rdd with 64 partitions.
>
> I am running  "sc.mapPartitions(...).reduce(...)"
>
> I can see that I get full parallelism on the mapper (all my 32 cores are
> busy simultaneously). However, when it comes to reduce(), the outputs of the
> mappers are all reduced SERIALLY. Further, all the reduce processing happens
> only on 1 of the workers.
>
> I was expecting that the outputs of the 16 mappers on node 1 would be
> reduced in parallel in node 1 while the outputs of the 16 mappers on node 2
> would be reduced in parallel on node 2 and there would be 1 final inter-node
> reduce (of node 1 reduced result and node 2 reduced result).
>
> Isn't parallel reduce supported WITHIN a key (in this case I have no key) ?
> (I know that there is parallelism in reduce across keys)
>
> Best Regards
> Anand
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/parallel-Reduce-within-a-key-tp7998.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How do you run your spark app?

2014-06-20 Thread Shrikar archak
Hi Shivani,

I use sbt assembly to create a fat jar .
https://github.com/sbt/sbt-assembly

Example of the sbt file is below.

import AssemblyKeys._ // put this at the top of the file

assemblySettings

mainClass in assembly := Some("FifaSparkStreaming")

name := "FifaSparkStreaming"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "1.0.0" %
"provided",
"org.apache.spark" %% "spark-streaming" %
"1.0.0" % "provided",
("org.apache.spark" %%
"spark-streaming-twitter" %
"1.0.0").exclude("org.eclipse.jetty.orbit","javax.transaction")

   .exclude("org.eclipse.jetty.orbit","javax.servlet")

   .exclude("org.eclipse.jetty.orbit","javax.mail.glassfish")

   .exclude("org.eclipse.jetty.orbit","javax.activation")

   .exclude("com.esotericsoftware.minlog", "minlog"),
("net.debasishg" % "redisclient_2.10" %
"2.12").exclude("com.typesafe.akka","akka-actor_2.10"))

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", xs @ _*) => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "unwanted.txt" => MergeStrategy.discard
case x => old(x)
  }
}


resolvers += "Akka Repository" at "http://repo.akka.io/releases/";


And I run as mentioned below.

LOCALLY :
1)  sbt 'run AP1z4IYraYm5fqWhITWArY53x
Cyyz3Zr67tVK46G8dus5tSbc83KQOdtMDgYoQ5WLQwH0mTWzB6
115254720-OfJ4yFsUU6C6vBkEOMDlBlkIgslPleFjPwNcxHjN
Qd76y2izncM7fGGYqU1VXYTxg1eseNuzcdZKm2QJyK8d1 fifa fifa2014'

If you want to submit on the cluster

CLUSTER:
2) spark-submit --class FifaSparkStreaming --master
"spark://server-8-144:7077" --driver-memory 2048 --deploy-mode cluster
FifaSparkStreaming-assembly-1.0.jar AP1z4IYraYm5fqWhITWArY53x
Cyyz3Zr67tVK46G8dus5tSbc83KQOdtMDgYoQ5WLQwH0mTWzB6
115254720-OfJ4yFsUU6C6vBkEOMDlBlkIgslPleFjPwNcxHjN
Qd76y2izncM7fGGYqU1VXYTxg1eseNuzcdZKm2QJyK8d1 fifa fifa2014


Hope this helps.

Thanks,
Shrikar


On Fri, Jun 20, 2014 at 9:16 AM, Shivani Rao  wrote:

> Hello Michael,
>
> I have a quick question for you. Can you clarify the statement " build
> fat JAR's and build dist-style TAR.GZ packages with launch scripts, JAR's
> and everything needed to run a Job".  Can you give an example.
>
> I am using sbt assembly as well to create a fat jar, and supplying the
> spark and hadoop locations in the class path. Inside the main() function
> where spark context is created, I use SparkContext.jarOfClass(this).toList
> add the fat jar to my spark context. However, I seem to be running into
> issues with this approach. I was wondering if you had any inputs Michael.
>
> Thanks,
> Shivani
>
>
> On Thu, Jun 19, 2014 at 10:57 PM, Sonal Goyal 
> wrote:
>
>> We use maven for building our code and then invoke spark-submit through
>> the exec plugin, passing in our parameters. Works well for us.
>>
>> Best Regards,
>> Sonal
>> Nube Technologies 
>>
>> 
>>
>>
>>
>>
>> On Fri, Jun 20, 2014 at 3:26 AM, Michael Cutler 
>> wrote:
>>
>>> P.S. Last but not least we use sbt-assembly to build fat JAR's and build
>>> dist-style TAR.GZ packages with launch scripts, JAR's and everything needed
>>> to run a Job.  These are automatically built from source by our Jenkins and
>>> stored in HDFS.  Our Chronos/Marathon jobs fetch the latest release TAR.GZ
>>> direct from HDFS, unpack it and launch the appropriate script.
>>>
>>> Makes for a much cleaner development / testing / deployment to package
>>> everything required in one go instead of relying on cluster specific
>>> classpath additions or any add-jars functionality.
>>>
>>>
>>> On 19 June 2014 22:53, Michael Cutler  wrote:
>>>
 When you start seriously using Spark in production there are basically
 two things everyone eventually needs:

1. Scheduled Jobs - recurring hourly/daily/weekly jobs.
2. Always-On Jobs - that require monitoring, restarting etc.

 There are lots of ways to implement these requirements, everything from
 crontab through to workflow managers like Oozie.

 We opted for the following stack:

- Apache Mesos  (mesosphere.io distribution)


- Marathon  - init/control
system for starting, stopping, and maintaining always-on applications.


- Chronos  - general-purpose
scheduler for Mesos, supports job dependency graphs.


- ** Spark Job Server  -
primarily for it's ability to reuse shared contexts with multiple jobs

 The majority of our jobs are periodic (batch) jobs run through

Re: What is the best way to handle transformations or actions that takes forever?

2014-06-20 Thread Peng Cheng
Wow, that sounds a lot of work (need a mini-thread), thanks a lot for the
answer.
It might be a nice-to-have feature.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-best-way-to-handle-transformations-or-actions-that-takes-forever-tp7664p8024.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallel Reduce within a key

2014-06-20 Thread DB Tsai
Currently, the reduce operation combines the result from mapper
sequentially, so it's O(n).

Xiangrui is working on treeReduce which is O(log(n)). Based on the
benchmark, it dramatically increase the performance. You can test the
code in his own branch.
https://github.com/apache/spark/pull/1110

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Fri, Jun 20, 2014 at 6:57 AM, ansriniv  wrote:
> Hi,
>
> I am on Spark 0.9.0
>
> I have a 2 node cluster (2 worker nodes) with 16 cores on each node (so, 32
> cores in the cluster).
> I have an input rdd with 64 partitions.
>
> I am running  "sc.mapPartitions(...).reduce(...)"
>
> I can see that I get full parallelism on the mapper (all my 32 cores are
> busy simultaneously). However, when it comes to reduce(), the outputs of the
> mappers are all reduced SERIALLY. Further, all the reduce processing happens
> only on 1 of the workers.
>
> I was expecting that the outputs of the 16 mappers on node 1 would be
> reduced in parallel in node 1 while the outputs of the 16 mappers on node 2
> would be reduced in parallel on node 2 and there would be 1 final inter-node
> reduce (of node 1 reduced result and node 2 reduced result).
>
> Isn't parallel reduce supported WITHIN a key (in this case I have no key) ?
> (I know that there is parallelism in reduce across keys)
>
> Best Regards
> Anand
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/parallel-Reduce-within-a-key-tp7998.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space

2014-06-20 Thread Shivani Rao
Hello Eugene,

You are right about this. I did encounter the "pergmgenspace" in the spark
shell. Can you tell me a little more about "ADD_JARS". In order to ensure
my spark_shell has all required jars, I added the jars to the "$CLASSPATH"
in the compute_classpath.sh script. is there another way of doing it?

Shivani


On Fri, Jun 20, 2014 at 9:47 AM, Eugen Cepoi  wrote:

> In my case it was due to a case class I was defining in the spark-shell
> and not being available on the workers. So packaging it in a jar and adding
> it with ADD_JARS solved the problem. Note that I don't exactly remember if
> it was an out of heap space exception or pergmen space. Make sure your
> jarsPath is correct.
>
> Usually to debug this kind of problems I am using the spark-shell (you can
> do the same in your job but its more time constuming to repackage, deploy,
> run, iterate). Try for example
> 1) read the lines (without any processing) and count them
> 2) apply processing and count
>
>
>
> 2014-06-20 17:15 GMT+02:00 Shivani Rao :
>
> Hello Abhi, I did try that and it did not work
>>
>> And Eugene, Yes I am assembling the argonaut libraries in the fat jar. So
>> how did you overcome this problem?
>>
>> Shivani
>>
>>
>> On Fri, Jun 20, 2014 at 1:59 AM, Eugen Cepoi 
>> wrote:
>>
>>>
>>> Le 20 juin 2014 01:46, "Shivani Rao"  a écrit :
>>>
>>> >
>>> > Hello Andrew,
>>> >
>>> > i wish I could share the code, but for proprietary reasons I can't.
>>> But I can give some idea though of what i am trying to do. The job reads a
>>> file and for each line of that file and processors these lines. I am not
>>> doing anything intense in the "processLogs" function
>>> >
>>> > import argonaut._
>>> > import argonaut.Argonaut._
>>> >
>>> >
>>> > /* all of these case classes are created from json strings extracted
>>> from the line in the processLogs() function
>>> > *
>>> > */
>>> > case class struct1…
>>> > case class struct2…
>>> > case class value1(struct1, struct2)
>>> >
>>> > def processLogs(line:String): Option[(key1, value1)] {…
>>> > }
>>> >
>>> > def run(sparkMaster, appName, executorMemory, jarsPath) {
>>> >   val sparkConf = new SparkConf()
>>> >sparkConf.setMaster(sparkMaster)
>>> >sparkConf.setAppName(appName)
>>> >sparkConf.set("spark.executor.memory", executorMemory)
>>> > sparkConf.setJars(jarsPath) // This includes all the jars relevant
>>> jars..
>>> >val sc = new SparkContext(sparkConf)
>>> >   val rawLogs =
>>> sc.textFile("hdfs://>> >
>>> rawLogs.saveAsTextFile("hdfs://>> >
>>> rawLogs.flatMap(processLogs).saveAsTextFile("hdfs://>> > }
>>> >
>>> > If I switch to "local" mode, the code runs just fine, it fails with
>>> the error I pasted above. In the cluster mode, even writing back the file
>>> we just read fails
>>> (rawLogs.saveAsTextFile("hdfs://>> >
>>> > I still believe this is a classNotFound error in disguise
>>> >
>>>
>>> Indeed you are right, this can be the reason. I had similar errors when
>>> defining case classes in the shell and trying to use them in the RDDs. Are
>>> you shading argonaut in the fat jar ?
>>>
>>> > Thanks
>>> > Shivani
>>> >
>>> >
>>> >
>>> > On Wed, Jun 18, 2014 at 2:49 PM, Andrew Ash 
>>> wrote:
>>> >>
>>> >> Wait, so the file only has four lines and the job running out of heap
>>> space?  Can you share the code you're running that does the processing?
>>>  I'd guess that you're doing some intense processing on every line but just
>>> writing parsed case classes back to disk sounds very lightweight.
>>> >>
>>> >> I
>>> >>
>>> >>
>>> >> On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao 
>>> wrote:
>>> >>>
>>> >>> I am trying to process a file that contains 4 log lines (not very
>>> long) and then write my parsed out case classes to a destination folder,
>>> and I get the following error:
>>> >>>
>>> >>>
>>> >>> java.lang.OutOfMemoryError: Java heap space
>>> >>>
>>> >>> at
>>> org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)
>>> >>>
>>> >>> at
>>> org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244)
>>> >>>
>>> >>> at
>>> org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)
>>> >>>
>>> >>> at
>>> org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)
>>> >>>
>>> >>> at
>>> org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
>>> >>>
>>> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> >>>
>>> >>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>> >>>
>>> >>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>> >>>
>>> >>> at java.lang.reflect.Method.invoke(Method.java:597)
>>> >>>
>>> >>> at
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
>>> >>>
>>> >>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
>>> >>>
>>> >>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>>> >

Re: Parallel LogisticRegression?

2014-06-20 Thread Kyle Ellrott
I've tried to parallelize the separate regressions using
allResponses.toParArray.map( x=> do logistic regression against labels in x)
But I start to see messages like
14/06/20 10:10:26 WARN scheduler.TaskSetManager: Lost TID 4193 (task
363.0:4)
14/06/20 10:10:27 WARN scheduler.TaskSetManager: Loss was due to fetch
failure from null
and finally
14/06/20 10:10:26 ERROR scheduler.TaskSetManager: Task 363.0:4 failed 4
times; aborting job

Then
14/06/20 10:10:26 ERROR scheduler.DAGSchedulerActorSupervisor:
eventProcesserActor failed due to the error null; shutting down SparkContext
14/06/20 10:10:26 ERROR actor.OneForOneStrategy:
java.lang.UnsupportedOperationException
at
org.apache.spark.scheduler.SchedulerBackend$class.killTask(SchedulerBackend.scala:32)
at
org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.killTask(MesosSchedulerBackend.scala:41)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:185)


This doesn't happen when I don't use toParArray. I read that spark was
thread safe, but I seem to be running into problems. Am I doing something
wrong?

Kyle



On Thu, Jun 19, 2014 at 11:21 AM, Kyle Ellrott 
wrote:

>
> I'm working on a problem learning several different sets of responses
> against the same set of training features. Right now I've written the
> program to cycle through all of the different label sets, attached them to
> the training data and run LogisticRegressionWithSGD on each of them. ie
>
> foreach curResponseSet in allResponses:
>  currentRDD : RDD[LabeledPoints] = curResponseSet joined with
> trainingData
>  LogisticRegressionWithSGD.train(currentRDD)
>
>
> Each of the different training runs are independent. It seems like I
> should be parallelize them as well.
> Is there a better way to do this?
>
>
> Kyle
>


Re: 1.0.1 release plan

2014-06-20 Thread Mingyu Kim
Cool. Thanks for the note. Looking forward to it.

Mingyu

From:  Andrew Ash 
Reply-To:  "user@spark.apache.org" 
Date:  Friday, June 20, 2014 at 9:54 AM
To:  "user@spark.apache.org" 
Subject:  Re: 1.0.1 release plan

Sounds good.  Mingyu and I are waiting on 1.0.1 to get the fix for the below
issues without running a patched version of Spark:

https://issues.apache.org/jira/browse/SPARK-1935
  --
commons-codec version conflicts for client applications
https://issues.apache.org/jira/browse/SPARK-2043
  --
correctness issue with spilling


On Fri, Jun 20, 2014 at 1:04 AM, Patrick Wendell  wrote:
> Hey There,
> 
> I'd like to start voting on this release shortly because there are a
> few important fixes that have queued up. We're just waiting to fix an
> akka issue. I'd guess we'll cut a vote in the next few days.
> 
> - Patrick
> 
> On Thu, Jun 19, 2014 at 10:47 AM, Mingyu Kim  wrote:
>> > Hi all,
>> >
>> > Is there any plan for 1.0.1 release?
>> >
>> > Mingyu





smime.p7s
Description: S/MIME cryptographic signature


Can not checkpoint Graph object's vertices but could checkpoint edges

2014-06-20 Thread dash
I'm trying to workaround the StackOverflowError when an object have a long
dependency chain, someone said I should use checkpoint to cuts off
dependencies. I write a sample code to test it, but I can only checkpoint
edges but not vertices. I think I do materialize vertices and edges after
calling checkpoint, why only edge been checkpointed?

Here is my code, really appreciate if you can point out what I did wrong.

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Test")
  .setMaster("local[4]")
val sc = new SparkContext(conf)
sc.setCheckpointDir("./checkpoint")
val v = sc.parallelize(Seq[(VertexId, Long)]((0L, 0L), (1L, 1L), (2L,
2L)))
val e = sc.parallelize(Seq[Edge[Long]](Edge(0L, 1L, 0L), Edge(1L, 2L,
1L), Edge(2L, 0L, 2L)))
var g = Graph(v, e)
val vertexIds = Seq(0L, 1L, 2L)
var prevG: Graph[VertexId, Long] = null
for (i <- 1 to 10) {
  vertexIds.toStream.foreach(id => {
println("generate new graph")
prevG = g
g = Graph(g.vertices, g.edges)

println("uncache vertices")
prevG.unpersistVertices(blocking = false)
println("uncache edges")
prevG.edges.unpersist(blocking = false)

//Third approach, do checkpoint
//Vertices can not be checkpointed, still have StackOverflowError
g.vertices.checkpoint()
g.edges.checkpoint()
println(g.vertices.count()+g.edges.count())
println(g.vertices.isCheckpointed+" "+g.edges.isCheckpointed)

  })

  println(" iter " + i + " finished")
}
  }





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-not-checkpoint-Graph-object-s-vertices-but-could-checkpoint-edges-tp8019.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark on yarn is trying to use file:// instead of hdfs://

2014-06-20 Thread Koert Kuipers
in /etc/hadoop/conf/core-site.xml:
  
fs.defaultFS
hdfs://cdh5-yarn.tresata.com:8020
  


also hdfs seems the default:
[koert@cdh5-yarn ~]$ hadoop fs -ls /
Found 5 items
drwxr-xr-x   - hdfs supergroup  0 2014-06-19 12:31 /data
drwxrwxrwt   - hdfs supergroup  0 2014-06-20 12:17 /lib
drwxrwxrwt   - hdfs supergroup  0 2014-06-18 14:58 /tmp
drwxr-xr-x   - hdfs supergroup  0 2014-06-18 15:02 /user
drwxr-xr-x   - hdfs supergroup  0 2014-06-18 14:59 /var

and in my spark-site.env:
export HADOOP_CONF_DIR=/etc/hadoop/conf



On Fri, Jun 20, 2014 at 1:04 PM, bc Wong  wrote:

> Koert, is there any chance that your fs.defaultFS isn't setup right?
>
>
> On Fri, Jun 20, 2014 at 9:57 AM, Koert Kuipers  wrote:
>
>>  yeah sure see below. i strongly suspect its something i misconfigured
>> causing yarn to try to use local filesystem mistakenly.
>>
>> *
>>
>> [koert@cdh5-yarn ~]$ /usr/local/lib/spark/bin/spark-submit --class
>> org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 3
>> --executor-cores 1
>> hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar 10
>> 14/06/20 12:54:40 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 14/06/20 12:54:40 INFO RMProxy: Connecting to ResourceManager at
>> cdh5-yarn.tresata.com/192.168.1.85:8032
>> 14/06/20 12:54:41 INFO Client: Got Cluster metric info from
>> ApplicationsManager (ASM), number of NodeManagers: 1
>> 14/06/20 12:54:41 INFO Client: Queue info ... queueName: root.default,
>> queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0,
>>   queueApplicationCount = 0, queueChildQueueCount = 0
>> 14/06/20 12:54:41 INFO Client: Max mem capabililty of a single resource
>> in this cluster 8192
>> 14/06/20 12:54:41 INFO Client: Preparing Local resources
>> 14/06/20 12:54:41 WARN BlockReaderLocal: The short-circuit local reads
>> feature cannot be used because libhadoop cannot be loaded.
>> 14/06/20 12:54:41 INFO Client: Uploading
>> hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar to
>> file:/home/koert/.sparkStaging/application_1403201750110_0060/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar
>> 14/06/20 12:54:43 INFO Client: Setting up the launch environment
>> 14/06/20 12:54:43 INFO Client: Setting up container launch context
>> 14/06/20 12:54:43 INFO Client: Command for starting the Spark
>> ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx512m,
>> -Djava.io.tmpdir=$PWD/tmp, -Dspark.akka.retry.wait=\"3\",
>> -Dspark.storage.blockManagerTimeoutIntervalMs=\"12\",
>> -Dspark.storage.blockManagerHeartBeatMs=\"12\", 
>> -Dspark.app.name=\"org.apache.spark.examples.SparkPi\",
>> -Dspark.akka.frameSize=\"1\", -Dspark.akka.timeout=\"3\",
>> -Dspark.worker.timeout=\"3\",
>> -Dspark.akka.logLifecycleEvents=\"true\",
>> -Dlog4j.configuration=log4j-spark-container.properties,
>> org.apache.spark.deploy.yarn.ApplicationMaster, --class,
>> org.apache.spark.examples.SparkPi, --jar ,
>> hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar,
>> --args  '10' , --executor-memory, 1024, --executor-cores, 1,
>> --num-executors , 3, 1>, /stdout, 2>, /stderr)
>> 14/06/20 12:54:43 INFO Client: Submitting application to ASM
>> 14/06/20 12:54:43 INFO YarnClientImpl: Submitted application
>> application_1403201750110_0060
>> 14/06/20 12:54:44 INFO Client: Application report from ASM:
>>  application identifier: application_1403201750110_0060
>>  appId: 60
>>  clientToAMToken: null
>>  appDiagnostics:
>>  appMasterHost: N/A
>>  appQueue: root.koert
>>  appMasterRpcPort: -1
>>  appStartTime: 1403283283505
>>  yarnAppState: ACCEPTED
>>  distributedFinalState: UNDEFINED
>>  appTrackingUrl:
>> http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/
>>  appUser: koert
>> 14/06/20 12:54:45 INFO Client: Application report from ASM:
>>  application identifier: application_1403201750110_0060
>>  appId: 60
>>  clientToAMToken: null
>>  appDiagnostics:
>>  appMasterHost: N/A
>>  appQueue: root.koert
>>  appMasterRpcPort: -1
>>  appStartTime: 1403283283505
>>  yarnAppState: ACCEPTED
>>  distributedFinalState: UNDEFINED
>>  appTrackingUrl:
>> http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/
>>  appUser: koert
>> 14/06/20 12:54:46 INFO Client: Application report from ASM:
>>  application identifier: application_1403201750110_0060
>>  appId: 60
>>  clientToAMToken: null
>>  appDiagnostics:
>>  appMasterHost: N/A
>>  appQueue: root.koert
>>  appMasterRpcPort: -1
>>  appStartTime: 1403283283505
>>  yarnAppState: ACCEPTED
>>  distributedFinalState: UNDEFINED
>>  appTrackingUrl:
>> http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/
>>  appUser: koert
>> 14/06/20 12:54:47 INFO Client: A

Re: spark on yarn is trying to use file:// instead of hdfs://

2014-06-20 Thread bc Wong
Koert, is there any chance that your fs.defaultFS isn't setup right?


On Fri, Jun 20, 2014 at 9:57 AM, Koert Kuipers  wrote:

>  yeah sure see below. i strongly suspect its something i misconfigured
> causing yarn to try to use local filesystem mistakenly.
>
> *
>
> [koert@cdh5-yarn ~]$ /usr/local/lib/spark/bin/spark-submit --class
> org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 3
> --executor-cores 1
> hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar 10
> 14/06/20 12:54:40 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 14/06/20 12:54:40 INFO RMProxy: Connecting to ResourceManager at
> cdh5-yarn.tresata.com/192.168.1.85:8032
> 14/06/20 12:54:41 INFO Client: Got Cluster metric info from
> ApplicationsManager (ASM), number of NodeManagers: 1
> 14/06/20 12:54:41 INFO Client: Queue info ... queueName: root.default,
> queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0,
>   queueApplicationCount = 0, queueChildQueueCount = 0
> 14/06/20 12:54:41 INFO Client: Max mem capabililty of a single resource in
> this cluster 8192
> 14/06/20 12:54:41 INFO Client: Preparing Local resources
> 14/06/20 12:54:41 WARN BlockReaderLocal: The short-circuit local reads
> feature cannot be used because libhadoop cannot be loaded.
> 14/06/20 12:54:41 INFO Client: Uploading
> hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar to
> file:/home/koert/.sparkStaging/application_1403201750110_0060/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar
> 14/06/20 12:54:43 INFO Client: Setting up the launch environment
> 14/06/20 12:54:43 INFO Client: Setting up container launch context
> 14/06/20 12:54:43 INFO Client: Command for starting the Spark
> ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx512m,
> -Djava.io.tmpdir=$PWD/tmp, -Dspark.akka.retry.wait=\"3\",
> -Dspark.storage.blockManagerTimeoutIntervalMs=\"12\",
> -Dspark.storage.blockManagerHeartBeatMs=\"12\", 
> -Dspark.app.name=\"org.apache.spark.examples.SparkPi\",
> -Dspark.akka.frameSize=\"1\", -Dspark.akka.timeout=\"3\",
> -Dspark.worker.timeout=\"3\",
> -Dspark.akka.logLifecycleEvents=\"true\",
> -Dlog4j.configuration=log4j-spark-container.properties,
> org.apache.spark.deploy.yarn.ApplicationMaster, --class,
> org.apache.spark.examples.SparkPi, --jar ,
> hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar,
> --args  '10' , --executor-memory, 1024, --executor-cores, 1,
> --num-executors , 3, 1>, /stdout, 2>, /stderr)
> 14/06/20 12:54:43 INFO Client: Submitting application to ASM
> 14/06/20 12:54:43 INFO YarnClientImpl: Submitted application
> application_1403201750110_0060
> 14/06/20 12:54:44 INFO Client: Application report from ASM:
>  application identifier: application_1403201750110_0060
>  appId: 60
>  clientToAMToken: null
>  appDiagnostics:
>  appMasterHost: N/A
>  appQueue: root.koert
>  appMasterRpcPort: -1
>  appStartTime: 1403283283505
>  yarnAppState: ACCEPTED
>  distributedFinalState: UNDEFINED
>  appTrackingUrl:
> http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/
>  appUser: koert
> 14/06/20 12:54:45 INFO Client: Application report from ASM:
>  application identifier: application_1403201750110_0060
>  appId: 60
>  clientToAMToken: null
>  appDiagnostics:
>  appMasterHost: N/A
>  appQueue: root.koert
>  appMasterRpcPort: -1
>  appStartTime: 1403283283505
>  yarnAppState: ACCEPTED
>  distributedFinalState: UNDEFINED
>  appTrackingUrl:
> http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/
>  appUser: koert
> 14/06/20 12:54:46 INFO Client: Application report from ASM:
>  application identifier: application_1403201750110_0060
>  appId: 60
>  clientToAMToken: null
>  appDiagnostics:
>  appMasterHost: N/A
>  appQueue: root.koert
>  appMasterRpcPort: -1
>  appStartTime: 1403283283505
>  yarnAppState: ACCEPTED
>  distributedFinalState: UNDEFINED
>  appTrackingUrl:
> http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/
>  appUser: koert
> 14/06/20 12:54:47 INFO Client: Application report from ASM:
>  application identifier: application_1403201750110_0060
>  appId: 60
>  clientToAMToken: null
>  appDiagnostics: Application application_1403201750110_0060 failed 2
> times due to AM Container for appattempt_1403201750110_0060_02 exited
> with  exitCode: -1000 due to: File
> file:/home/koert/.sparkStaging/application_1403201750110_0060/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar
> does not exist
> .Failing this attempt.. Failing the application.
>  appMasterHost: N/A
>  appQueue: root.koert
>  appMasterRpcPort: -1
>  appStartTime: 1403283283505
>  yarnAppState: FAILED
>  distributedFinalState: FAILED
>  appTrackingUrl:
> cdh5-ya

Re: Performance problems on SQL JOIN

2014-06-20 Thread mathias
Thanks for your suggestions.

file.count() takes 7s, so that doesn't seem to be the problem.
Moreover, a union with the same code/CSV takes about 15s (SELECT * FROM
rooms2 UNION SELECT * FROM rooms3).

The web status page shows that both stages 'count at joins.scala:216' and
'reduce at joins.scala:219' take up the majority of the time.
Is this due to bad partitioning or caching? Or is there a problem with the
JOIN operator?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-problems-on-SQL-JOIN-tp8001p8016.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark on yarn is trying to use file:// instead of hdfs://

2014-06-20 Thread Koert Kuipers
 yeah sure see below. i strongly suspect its something i misconfigured
causing yarn to try to use local filesystem mistakenly.

*

[koert@cdh5-yarn ~]$ /usr/local/lib/spark/bin/spark-submit --class
org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 3
--executor-cores 1
hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar 10
14/06/20 12:54:40 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/06/20 12:54:40 INFO RMProxy: Connecting to ResourceManager at
cdh5-yarn.tresata.com/192.168.1.85:8032
14/06/20 12:54:41 INFO Client: Got Cluster metric info from
ApplicationsManager (ASM), number of NodeManagers: 1
14/06/20 12:54:41 INFO Client: Queue info ... queueName: root.default,
queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0,
  queueApplicationCount = 0, queueChildQueueCount = 0
14/06/20 12:54:41 INFO Client: Max mem capabililty of a single resource in
this cluster 8192
14/06/20 12:54:41 INFO Client: Preparing Local resources
14/06/20 12:54:41 WARN BlockReaderLocal: The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.
14/06/20 12:54:41 INFO Client: Uploading
hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar to
file:/home/koert/.sparkStaging/application_1403201750110_0060/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar
14/06/20 12:54:43 INFO Client: Setting up the launch environment
14/06/20 12:54:43 INFO Client: Setting up container launch context
14/06/20 12:54:43 INFO Client: Command for starting the Spark
ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx512m,
-Djava.io.tmpdir=$PWD/tmp, -Dspark.akka.retry.wait=\"3\",
-Dspark.storage.blockManagerTimeoutIntervalMs=\"12\",
-Dspark.storage.blockManagerHeartBeatMs=\"12\",
-Dspark.app.name=\"org.apache.spark.examples.SparkPi\",
-Dspark.akka.frameSize=\"1\", -Dspark.akka.timeout=\"3\",
-Dspark.worker.timeout=\"3\",
-Dspark.akka.logLifecycleEvents=\"true\",
-Dlog4j.configuration=log4j-spark-container.properties,
org.apache.spark.deploy.yarn.ApplicationMaster, --class,
org.apache.spark.examples.SparkPi, --jar ,
hdfs://cdh5-yarn/lib/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar,
--args  '10' , --executor-memory, 1024, --executor-cores, 1,
--num-executors , 3, 1>, /stdout, 2>, /stderr)
14/06/20 12:54:43 INFO Client: Submitting application to ASM
14/06/20 12:54:43 INFO YarnClientImpl: Submitted application
application_1403201750110_0060
14/06/20 12:54:44 INFO Client: Application report from ASM:
 application identifier: application_1403201750110_0060
 appId: 60
 clientToAMToken: null
 appDiagnostics:
 appMasterHost: N/A
 appQueue: root.koert
 appMasterRpcPort: -1
 appStartTime: 1403283283505
 yarnAppState: ACCEPTED
 distributedFinalState: UNDEFINED
 appTrackingUrl:
http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/
 appUser: koert
14/06/20 12:54:45 INFO Client: Application report from ASM:
 application identifier: application_1403201750110_0060
 appId: 60
 clientToAMToken: null
 appDiagnostics:
 appMasterHost: N/A
 appQueue: root.koert
 appMasterRpcPort: -1
 appStartTime: 1403283283505
 yarnAppState: ACCEPTED
 distributedFinalState: UNDEFINED
 appTrackingUrl:
http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/
 appUser: koert
14/06/20 12:54:46 INFO Client: Application report from ASM:
 application identifier: application_1403201750110_0060
 appId: 60
 clientToAMToken: null
 appDiagnostics:
 appMasterHost: N/A
 appQueue: root.koert
 appMasterRpcPort: -1
 appStartTime: 1403283283505
 yarnAppState: ACCEPTED
 distributedFinalState: UNDEFINED
 appTrackingUrl:
http://cdh5-yarn.tresata.com:8088/proxy/application_1403201750110_0060/
 appUser: koert
14/06/20 12:54:47 INFO Client: Application report from ASM:
 application identifier: application_1403201750110_0060
 appId: 60
 clientToAMToken: null
 appDiagnostics: Application application_1403201750110_0060 failed 2
times due to AM Container for appattempt_1403201750110_0060_02 exited
with  exitCode: -1000 due to: File
file:/home/koert/.sparkStaging/application_1403201750110_0060/spark-examples-1.0.0-hadoop2.3.0-cdh5.0.2.jar
does not exist
.Failing this attempt.. Failing the application.
 appMasterHost: N/A
 appQueue: root.koert
 appMasterRpcPort: -1
 appStartTime: 1403283283505
 yarnAppState: FAILED
 distributedFinalState: FAILED
 appTrackingUrl:
cdh5-yarn.tresata.com:8088/cluster/app/application_1403201750110_0060
 appUser: koert




On Fri, Jun 20, 2014 at 12:42 PM, Marcelo Vanzin 
wrote:

> Hi Koert,
>
> Could you provide more details? Job arguments, log messages, errors, etc.
>
> On Fri, Jun 20, 2014 at 9:40 AM, Koert Kuipers  wrote:
> > i noticed that when i submit a job to ya

Re: 1.0.1 release plan

2014-06-20 Thread Andrew Ash
Sounds good.  Mingyu and I are waiting on 1.0.1 to get the fix for the
below issues without running a patched version of Spark:

https://issues.apache.org/jira/browse/SPARK-1935 -- commons-codec version
conflicts for client applications
https://issues.apache.org/jira/browse/SPARK-2043 -- correctness issue with
spilling


On Fri, Jun 20, 2014 at 1:04 AM, Patrick Wendell  wrote:

> Hey There,
>
> I'd like to start voting on this release shortly because there are a
> few important fixes that have queued up. We're just waiting to fix an
> akka issue. I'd guess we'll cut a vote in the next few days.
>
> - Patrick
>
> On Thu, Jun 19, 2014 at 10:47 AM, Mingyu Kim  wrote:
> > Hi all,
> >
> > Is there any plan for 1.0.1 release?
> >
> > Mingyu
>


Re: trying to understand yarn-client mode

2014-06-20 Thread Marcelo Vanzin
On Fri, Jun 20, 2014 at 8:22 AM, Koert Kuipers  wrote:
> thanks! i will try that.
> i guess what i am most confused about is why the executors are trying to
> retrieve the jars directly using the info i provided to add jars to my spark
> context. i mean, thats bound to fail no? i could be on a different machine
> (so my file://) isnt going to work for them, or i could have the jars in a
> directory that is only readable by me.
>
> how come the jars are not just shipped to yarn as part of the job submittal?

They are if they are specified correctly. Check the guide:
http://spark.apache.org/docs/latest/submitting-applications.html

See the "Advanced Dependency Management" section.

Your default filesystem is probably hdfs, which means that if you
provide a path with no protocol, the executors will consider it as an
hdfs path, and it won't work if you're pointing at a file that exists
in your local fs.


-- 
Marcelo


Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space

2014-06-20 Thread Eugen Cepoi
In my case it was due to a case class I was defining in the spark-shell and
not being available on the workers. So packaging it in a jar and adding it
with ADD_JARS solved the problem. Note that I don't exactly remember if it
was an out of heap space exception or pergmen space. Make sure your
jarsPath is correct.

Usually to debug this kind of problems I am using the spark-shell (you can
do the same in your job but its more time constuming to repackage, deploy,
run, iterate). Try for example
1) read the lines (without any processing) and count them
2) apply processing and count



2014-06-20 17:15 GMT+02:00 Shivani Rao :

> Hello Abhi, I did try that and it did not work
>
> And Eugene, Yes I am assembling the argonaut libraries in the fat jar. So
> how did you overcome this problem?
>
> Shivani
>
>
> On Fri, Jun 20, 2014 at 1:59 AM, Eugen Cepoi 
> wrote:
>
>>
>> Le 20 juin 2014 01:46, "Shivani Rao"  a écrit :
>>
>> >
>> > Hello Andrew,
>> >
>> > i wish I could share the code, but for proprietary reasons I can't. But
>> I can give some idea though of what i am trying to do. The job reads a file
>> and for each line of that file and processors these lines. I am not doing
>> anything intense in the "processLogs" function
>> >
>> > import argonaut._
>> > import argonaut.Argonaut._
>> >
>> >
>> > /* all of these case classes are created from json strings extracted
>> from the line in the processLogs() function
>> > *
>> > */
>> > case class struct1…
>> > case class struct2…
>> > case class value1(struct1, struct2)
>> >
>> > def processLogs(line:String): Option[(key1, value1)] {…
>> > }
>> >
>> > def run(sparkMaster, appName, executorMemory, jarsPath) {
>> >   val sparkConf = new SparkConf()
>> >sparkConf.setMaster(sparkMaster)
>> >sparkConf.setAppName(appName)
>> >sparkConf.set("spark.executor.memory", executorMemory)
>> > sparkConf.setJars(jarsPath) // This includes all the jars relevant
>> jars..
>> >val sc = new SparkContext(sparkConf)
>> >   val rawLogs =
>> sc.textFile("hdfs://> >
>> rawLogs.saveAsTextFile("hdfs://> >
>> rawLogs.flatMap(processLogs).saveAsTextFile("hdfs://> > }
>> >
>> > If I switch to "local" mode, the code runs just fine, it fails with the
>> error I pasted above. In the cluster mode, even writing back the file we
>> just read fails
>> (rawLogs.saveAsTextFile("hdfs://> >
>> > I still believe this is a classNotFound error in disguise
>> >
>>
>> Indeed you are right, this can be the reason. I had similar errors when
>> defining case classes in the shell and trying to use them in the RDDs. Are
>> you shading argonaut in the fat jar ?
>>
>> > Thanks
>> > Shivani
>> >
>> >
>> >
>> > On Wed, Jun 18, 2014 at 2:49 PM, Andrew Ash 
>> wrote:
>> >>
>> >> Wait, so the file only has four lines and the job running out of heap
>> space?  Can you share the code you're running that does the processing?
>>  I'd guess that you're doing some intense processing on every line but just
>> writing parsed case classes back to disk sounds very lightweight.
>> >>
>> >> I
>> >>
>> >>
>> >> On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao 
>> wrote:
>> >>>
>> >>> I am trying to process a file that contains 4 log lines (not very
>> long) and then write my parsed out case classes to a destination folder,
>> and I get the following error:
>> >>>
>> >>>
>> >>> java.lang.OutOfMemoryError: Java heap space
>> >>>
>> >>> at
>> org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)
>> >>>
>> >>> at
>> org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244)
>> >>>
>> >>> at
>> org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)
>> >>>
>> >>> at
>> org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)
>> >>>
>> >>> at
>> org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
>> >>>
>> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>>
>> >>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>> >>>
>> >>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> >>>
>> >>> at java.lang.reflect.Method.invoke(Method.java:597)
>> >>>
>> >>> at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
>> >>>
>> >>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
>> >>>
>> >>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>> >>>
>> >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>> >>>
>> >>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>> >>>
>> >>> at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>> >>>
>> >>> at
>> org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
>> >>>
>> >>> at
>> org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
>> >>>
>> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Nat

Re: How to store JavaRDD as a sequence file using spark java API?

2014-06-20 Thread Kan Zhang
Yes, it can if you set the output format to SequenceFileOutputFormat. The
difference is saveAsSequenceFile does the conversion to Writable for you if
needed and then calls saveAsHadoopFile.


On Fri, Jun 20, 2014 at 12:43 AM, abhiguruvayya 
wrote:

> Does JavaPairRDD.saveAsHadoopFile store data as a sequenceFile? Then what
> is
> the significance of RDD.saveAsSequenceFile?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-JavaRDD-as-a-sequence-file-using-spark-java-API-tp7969p7983.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: spark on yarn is trying to use file:// instead of hdfs://

2014-06-20 Thread Marcelo Vanzin
Hi Koert,

Could you provide more details? Job arguments, log messages, errors, etc.

On Fri, Jun 20, 2014 at 9:40 AM, Koert Kuipers  wrote:
> i noticed that when i submit a job to yarn it mistakenly tries to upload
> files to local filesystem instead of hdfs. what could cause this?
>
> in spark-env.sh i have HADOOP_CONF_DIR set correctly (and spark-submit does
> find yarn), and my core-site.xml has a fs.defaultFS that is hdfs, not local
> filesystem.
>
> thanks! koert



-- 
Marcelo


spark on yarn is trying to use file:// instead of hdfs://

2014-06-20 Thread Koert Kuipers
i noticed that when i submit a job to yarn it mistakenly tries to upload
files to local filesystem instead of hdfs. what could cause this?

in spark-env.sh i have HADOOP_CONF_DIR set correctly (and spark-submit does
find yarn), and my core-site.xml has a fs.defaultFS that is hdfs, not local
filesystem.

thanks! koert


Re: How do you run your spark app?

2014-06-20 Thread Shivani Rao
Hello Michael,

I have a quick question for you. Can you clarify the statement " build fat
JAR's and build dist-style TAR.GZ packages with launch scripts, JAR's and
everything needed to run a Job".  Can you give an example.

I am using sbt assembly as well to create a fat jar, and supplying the
spark and hadoop locations in the class path. Inside the main() function
where spark context is created, I use SparkContext.jarOfClass(this).toList
add the fat jar to my spark context. However, I seem to be running into
issues with this approach. I was wondering if you had any inputs Michael.

Thanks,
Shivani


On Thu, Jun 19, 2014 at 10:57 PM, Sonal Goyal  wrote:

> We use maven for building our code and then invoke spark-submit through
> the exec plugin, passing in our parameters. Works well for us.
>
> Best Regards,
> Sonal
> Nube Technologies 
>
> 
>
>
>
>
> On Fri, Jun 20, 2014 at 3:26 AM, Michael Cutler  wrote:
>
>> P.S. Last but not least we use sbt-assembly to build fat JAR's and build
>> dist-style TAR.GZ packages with launch scripts, JAR's and everything needed
>> to run a Job.  These are automatically built from source by our Jenkins and
>> stored in HDFS.  Our Chronos/Marathon jobs fetch the latest release TAR.GZ
>> direct from HDFS, unpack it and launch the appropriate script.
>>
>> Makes for a much cleaner development / testing / deployment to package
>> everything required in one go instead of relying on cluster specific
>> classpath additions or any add-jars functionality.
>>
>>
>> On 19 June 2014 22:53, Michael Cutler  wrote:
>>
>>> When you start seriously using Spark in production there are basically
>>> two things everyone eventually needs:
>>>
>>>1. Scheduled Jobs - recurring hourly/daily/weekly jobs.
>>>2. Always-On Jobs - that require monitoring, restarting etc.
>>>
>>> There are lots of ways to implement these requirements, everything from
>>> crontab through to workflow managers like Oozie.
>>>
>>> We opted for the following stack:
>>>
>>>- Apache Mesos  (mesosphere.io distribution)
>>>
>>>
>>>- Marathon  - init/control
>>>system for starting, stopping, and maintaining always-on applications.
>>>
>>>
>>>- Chronos  - general-purpose
>>>scheduler for Mesos, supports job dependency graphs.
>>>
>>>
>>>- ** Spark Job Server  -
>>>primarily for it's ability to reuse shared contexts with multiple jobs
>>>
>>> The majority of our jobs are periodic (batch) jobs run through
>>> spark-sumit, and we have several always-on Spark Streaming jobs (also run
>>> through spark-submit).
>>>
>>> We always use "client mode" with spark-submit because the Mesos cluster
>>> has direct connectivity to the Spark cluster and it means all the Spark
>>> stdout/stderr is externalised into Mesos logs which helps diagnosing
>>> problems.
>>>
>>> I thoroughly recommend you explore using Mesos/Marathon/Chronos to run
>>> Spark and manage your Jobs, the Mesosphere tutorials are awesome and you
>>> can be up and running in literally minutes.  The Web UI's to both make it
>>> easy to get started without talking to REST API's etc.
>>>
>>> Best,
>>>
>>> Michael
>>>
>>>
>>>
>>>
>>> On 19 June 2014 19:44, Evan R. Sparks  wrote:
>>>
 I use SBT, create an assembly, and then add the assembly jars when I
 create my spark context. The main executor I run with something like "java
 -cp ... MyDriver".

 That said - as of spark 1.0 the preferred way to run spark applications
 is via spark-submit -
 http://spark.apache.org/docs/latest/submitting-applications.html


 On Thu, Jun 19, 2014 at 11:36 AM, ldmtwo  wrote:

> I want to ask this, not because I can't read endless documentation and
> several tutorials, but because there seems to be many ways of doing
> things
> and I keep having issues. How do you run /your /spark app?
>
> I had it working when I was only using yarn+hadoop1 (Cloudera), then I
> had
> to get Spark and Shark working and ended upgrading everything and
> dropped
> CDH support. Anyways, this is what I used with master=yarn-client and
> app_jar being Scala code compiled with Maven.
>
> java -cp $CLASSPATH -Dspark.jars=$APP_JAR -Dspark.master=$MASTER
> $CLASSNAME
> $ARGS
>
> Do you use this? or something else? I could never figure out this
> method.
> SPARK_HOME/bin/spark jar APP_JAR ARGS
>
> For example:
> bin/spark-class jar
>
> /usr/lib/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar
> pi 10 10
>
> Do you use SBT or Maven to compile? or something else?
>
>
> ** It seams that I can't get subscribed to the mailing list and I
> tried both
> my work email and personal.
>
>
>
> --
> View t

broadcast not working in yarn-cluster mode

2014-06-20 Thread Christophe Préaud
Hi,

Since I migrated to spark 1.0.0, a couple of applications that used to work in 
0.9.1 now fail when broadcasting a variable.
Those applications are run on a YARN cluster in yarn-cluster mode (and used to 
run in yarn-standalone mode in 0.9.1)

Here is an extract of the error log:

Exception in thread "Thread-3" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:186)
Caused by: java.lang.NoSuchMethodError: 
org.apache.spark.SparkContext.broadcast(Ljava/lang/Object;)Lorg/apache/spark/broadcast/Broadcast;
at 
kelkoo.MerchantOffersPerformance$.main(MerchantOffersPerformance.scala:289)
at 
kelkoo.MerchantOffersPerformance.main(MerchantOffersPerformance.scala)

Has anyone any idea how to solve this problem?

Thanks,
Christophe.

Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


Re: Performance problems on SQL JOIN

2014-06-20 Thread Evan R. Sparks
Also - you could consider caching your data after the first split (before
the first filter), this will prevent you from retrieving the data from s3
twice.


On Fri, Jun 20, 2014 at 8:32 AM, Xiangrui Meng  wrote:

> Your data source is S3 and data is used twice. m1.large does not have very
> good network performance. Please try file.count() and see how fast it goes.
> -Xiangrui
>
> > On Jun 20, 2014, at 8:16 AM, mathias 
> wrote:
> >
> > Hi there,
> >
> > We're trying out Spark and are experiencing some performance issues using
> > Spark SQL.
> > Anyone who can tell us if our results are normal?
> >
> > We are using the Amazon EC2 scripts to create a cluster with 3
> > workers/executors (m1.large).
> > Tried both spark 1.0.0 as well as the git master; the Scala as well as
> the
> > Python shells.
> >
> > Running the following code takes about 5 minutes, which seems a long time
> > for this query.
> >
> > val file = sc.textFile("s3n:// ...  .csv");
> > val data = file.map(x => x.split('|')); // 300k rows
> >
> > case class BookingInfo(num_rooms: String, hotelId: String, toDate:
> String,
> > ...);
> > val rooms2 = data.filter(x => x(0) == "2").map(x => BookingInfo(x(0),
> x(1),
> > ... , x(9))); // 50k rows
> > val rooms3 = data.filter(x => x(0) == "3").map(x => BookingInfo(x(0),
> x(1),
> > ... , x(9))); // 30k rows
> >
> > rooms2.registerAsTable("rooms2");
> > cacheTable("rooms2");
> > rooms3.registerAsTable("rooms3");
> > cacheTable("rooms3");
> >
> > sql("SELECT * FROM rooms2 LEFT JOIN rooms3 ON rooms2.hotelId =
> > rooms3.hotelId AND rooms2.toDate = rooms3.toDate").count();
> >
> >
> > Are we doing something wrong here?
> > Thanks!
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Performance-problems-on-SQL-JOIN-tp8001.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Performance problems on SQL JOIN

2014-06-20 Thread Xiangrui Meng
Your data source is S3 and data is used twice. m1.large does not have very good 
network performance. Please try file.count() and see how fast it goes. -Xiangrui

> On Jun 20, 2014, at 8:16 AM, mathias  wrote:
> 
> Hi there,
> 
> We're trying out Spark and are experiencing some performance issues using
> Spark SQL.
> Anyone who can tell us if our results are normal?
> 
> We are using the Amazon EC2 scripts to create a cluster with 3
> workers/executors (m1.large).
> Tried both spark 1.0.0 as well as the git master; the Scala as well as the
> Python shells.
> 
> Running the following code takes about 5 minutes, which seems a long time
> for this query.
> 
> val file = sc.textFile("s3n:// ...  .csv");
> val data = file.map(x => x.split('|')); // 300k rows
> 
> case class BookingInfo(num_rooms: String, hotelId: String, toDate: String,
> ...);
> val rooms2 = data.filter(x => x(0) == "2").map(x => BookingInfo(x(0), x(1),
> ... , x(9))); // 50k rows
> val rooms3 = data.filter(x => x(0) == "3").map(x => BookingInfo(x(0), x(1),
> ... , x(9))); // 30k rows
> 
> rooms2.registerAsTable("rooms2");
> cacheTable("rooms2");
> rooms3.registerAsTable("rooms3");
> cacheTable("rooms3");
> 
> sql("SELECT * FROM rooms2 LEFT JOIN rooms3 ON rooms2.hotelId =
> rooms3.hotelId AND rooms2.toDate = rooms3.toDate").count();
> 
> 
> Are we doing something wrong here?
> Thanks!
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Performance-problems-on-SQL-JOIN-tp8001.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Better way to use a large data set?

2014-06-20 Thread Muttineni, Vinay
Hi All,
I have a 8 mill row, 500 column data set, which is derived by reading a text 
file and doing a filter, flatMap operation to weed out some anomalies.
Now, I have a process which has to run through all 500 columns, do couple of 
map, reduce, forEach operations on the data set and return some statistics as 
output. I have thought of the following approaches.
Approach 1:

i)Read the DataSet from textfile, do some operations, get a 
RDD. Use toArray or collect on this RDD and broadcast it.

ii)   Do a flatMap on a range of numbers, this range being 
equivalent to the number of columns.

iii) In each flatMap operation, perform the required operations 
on the broadcast variable to derive the stats, return the array of stats

Questions about this approach:

1)  Is there a preference amongst toArray and collect?

2)  Can I not directly broadcast a RDD instead of first collecting it and 
broadcasting it? I tried this, but I got a serialization exception.

3)  When I use sc.parallelize on the broadcast dataset, would it be a 
problem if there isn't enough space to store it in-memory?


Approach 2:

Instead of reading the textfile, doing some operations and then broadcasting 
it, I was planning to do the read part within each of the 500 steps of the 
flatMap (assuming I have 500 columns)

Is this better than Approach 1? In Approach 1, I'd have to read once and 
broadcast whilst here, I'd have to read 500 times.


Approach 3:
Do a transpose of the dataset and then flatMap on the transposed matrix.

Could someone please point out the best approach from above, or if there's a 
better way to solve this?
Thank you for the help!
Vinay






Re: trying to understand yarn-client mode

2014-06-20 Thread Koert Kuipers
thanks! i will try that.
i guess what i am most confused about is why the executors are trying to
retrieve the jars directly using the info i provided to add jars to my
spark context. i mean, thats bound to fail no? i could be on a different
machine (so my file://) isnt going to work for them, or i could have the
jars in a directory that is only readable by me.

how come the jars are not just shipped to yarn as part of the job submittal?

i am worried i am supposed to put the jars in a "central" location and yarn
is going to fetch them from there, leading to jars in yet another place
such as on hdfs which i find pretty messy.


On Thu, Jun 19, 2014 at 2:54 PM, Marcelo Vanzin  wrote:

> Coincidentally, I just ran into the same exception. What's probably
> happening is that you're specifying some jar file in your job as an
> absolute local path (e.g. just
> "/home/koert/test-assembly-0.1-SNAPSHOT.jar"), but your Hadoop config
> has the default FS set to HDFS.
>
> So your driver does not know that it should tell executors to download
> that file from the driver.
>
> If you specify the jar with the "file:" scheme that should solve the
> problem.
>
> On Thu, Jun 19, 2014 at 10:22 AM, Koert Kuipers  wrote:
> > i am trying to understand how yarn-client mode works. i am not using
> > Application application_1403117970283_0014 failed 2 times due to AM
> > Container for appattempt_1403117970283_0014_02 exited with exitCode:
> > -1000 due to: File file:/home/koert/test-assembly-0.1-SNAPSHOT.jar does
> not
> > exist
> > .Failing this attempt.. Failing the application.
>
>
> --
> Marcelo
>


Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space

2014-06-20 Thread Shivani Rao
Hello Abhi, I did try that and it did not work

And Eugene, Yes I am assembling the argonaut libraries in the fat jar. So
how did you overcome this problem?

Shivani


On Fri, Jun 20, 2014 at 1:59 AM, Eugen Cepoi  wrote:

>
> Le 20 juin 2014 01:46, "Shivani Rao"  a écrit :
>
> >
> > Hello Andrew,
> >
> > i wish I could share the code, but for proprietary reasons I can't. But
> I can give some idea though of what i am trying to do. The job reads a file
> and for each line of that file and processors these lines. I am not doing
> anything intense in the "processLogs" function
> >
> > import argonaut._
> > import argonaut.Argonaut._
> >
> >
> > /* all of these case classes are created from json strings extracted
> from the line in the processLogs() function
> > *
> > */
> > case class struct1…
> > case class struct2…
> > case class value1(struct1, struct2)
> >
> > def processLogs(line:String): Option[(key1, value1)] {…
> > }
> >
> > def run(sparkMaster, appName, executorMemory, jarsPath) {
> >   val sparkConf = new SparkConf()
> >sparkConf.setMaster(sparkMaster)
> >sparkConf.setAppName(appName)
> >sparkConf.set("spark.executor.memory", executorMemory)
> > sparkConf.setJars(jarsPath) // This includes all the jars relevant
> jars..
> >val sc = new SparkContext(sparkConf)
> >   val rawLogs = sc.textFile("hdfs:// >
> rawLogs.saveAsTextFile("hdfs:// >
> rawLogs.flatMap(processLogs).saveAsTextFile("hdfs:// > }
> >
> > If I switch to "local" mode, the code runs just fine, it fails with the
> error I pasted above. In the cluster mode, even writing back the file we
> just read fails
> (rawLogs.saveAsTextFile("hdfs:// >
> > I still believe this is a classNotFound error in disguise
> >
>
> Indeed you are right, this can be the reason. I had similar errors when
> defining case classes in the shell and trying to use them in the RDDs. Are
> you shading argonaut in the fat jar ?
>
> > Thanks
> > Shivani
> >
> >
> >
> > On Wed, Jun 18, 2014 at 2:49 PM, Andrew Ash 
> wrote:
> >>
> >> Wait, so the file only has four lines and the job running out of heap
> space?  Can you share the code you're running that does the processing?
>  I'd guess that you're doing some intense processing on every line but just
> writing parsed case classes back to disk sounds very lightweight.
> >>
> >> I
> >>
> >>
> >> On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao 
> wrote:
> >>>
> >>> I am trying to process a file that contains 4 log lines (not very
> long) and then write my parsed out case classes to a destination folder,
> and I get the following error:
> >>>
> >>>
> >>> java.lang.OutOfMemoryError: Java heap space
> >>>
> >>> at
> org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)
> >>>
> >>> at
> org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244)
> >>>
> >>> at
> org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)
> >>>
> >>> at
> org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)
> >>>
> >>> at
> org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
> >>>
> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>
> >>> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> >>>
> >>> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> >>>
> >>> at java.lang.reflect.Method.invoke(Method.java:597)
> >>>
> >>> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
> >>>
> >>> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
> >>>
> >>> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
> >>>
> >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
> >>>
> >>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
> >>>
> >>> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
> >>>
> >>> at
> org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
> >>>
> >>> at
> org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
> >>>
> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>
> >>> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> >>>
> >>> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> >>>
> >>> at java.lang.reflect.Method.invoke(Method.java:597)
> >>>
> >>> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
> >>>
> >>> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
> >>>
> >>> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
> >>>
> >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
> >>>
> >>> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
> >>>
> >>> at
> java.io.ObjectInputStream.readSerialData(ObjectI

Performance problems on SQL JOIN

2014-06-20 Thread mathias
Hi there,

We're trying out Spark and are experiencing some performance issues using
Spark SQL.
Anyone who can tell us if our results are normal?

We are using the Amazon EC2 scripts to create a cluster with 3
workers/executors (m1.large).
Tried both spark 1.0.0 as well as the git master; the Scala as well as the
Python shells.

Running the following code takes about 5 minutes, which seems a long time
for this query.

val file = sc.textFile("s3n:// ...  .csv");
val data = file.map(x => x.split('|')); // 300k rows

case class BookingInfo(num_rooms: String, hotelId: String, toDate: String,
...);
val rooms2 = data.filter(x => x(0) == "2").map(x => BookingInfo(x(0), x(1),
... , x(9))); // 50k rows
val rooms3 = data.filter(x => x(0) == "3").map(x => BookingInfo(x(0), x(1),
... , x(9))); // 30k rows

rooms2.registerAsTable("rooms2");
cacheTable("rooms2");
rooms3.registerAsTable("rooms3");
cacheTable("rooms3");

sql("SELECT * FROM rooms2 LEFT JOIN rooms3 ON rooms2.hotelId =
rooms3.hotelId AND rooms2.toDate = rooms3.toDate").count();


Are we doing something wrong here?
Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-problems-on-SQL-JOIN-tp8001.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Anything like grid search available for mlbase?

2014-06-20 Thread Xiangrui Meng
This is a planned feature for v1.1. I'm going to work on it after v1.0.1 
release. -Xiangrui

> On Jun 20, 2014, at 6:46 AM, Charles Earl  wrote:
> 
> Looking for something like scikit's grid search module.
> C


java.net.SocketTimeoutException: Read timed out and java.io.IOException: Filesystem closed on Spark 1.0

2014-06-20 Thread Arun Ahuja
Hi all,

I'm running a job that seems to continually fail with the following
exception:

java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:152)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:687)
at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633)
at
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1323)
...
org.apache.spark.executor.Executor.org

$apache$spark$executor$Executor$$updateDependencies(Executor.scala:330)

This is running spark-assembly-1.0.0-hadoop2.3.0 through yarn.

The only additional error I see is
14/06/20 10:44:15 WARN NewHadoopRDD: Exception in RecordReader.close()
net.sf.samtools.util.RuntimeIOException: java.io.IOException: Filesystem
closed

I had thought this issue of the file system closed was resolved in
https://issues.apache.org/jira/browse/SPARK-1676.  I've also attempted to
run under a single core to avoid this issue (which seems to help sometimes
as this failure is intermittent)

I saw a previous mail thread:
http://apache-spark-user-list.1001560.n3.nabble.com/Filesystem-closed-while-running-spark-job-td4596.html
a suggestion to disable caching?

Anyone seen this before or know a resolution.  As I mentioned this is
intermittent as sometimes the job runs to completion, or sometimes fails in
this way.

Thanks,
Arun


parallel Reduce within a key

2014-06-20 Thread ansriniv
Hi,

I am on Spark 0.9.0

I have a 2 node cluster (2 worker nodes) with 16 cores on each node (so, 32
cores in the cluster).
I have an input rdd with 64 partitions.

I am running  "sc.mapPartitions(...).reduce(...)"

I can see that I get full parallelism on the mapper (all my 32 cores are
busy simultaneously). However, when it comes to reduce(), the outputs of the
mappers are all reduced SERIALLY. Further, all the reduce processing happens
only on 1 of the workers.

I was expecting that the outputs of the 16 mappers on node 1 would be
reduced in parallel in node 1 while the outputs of the 16 mappers on node 2
would be reduced in parallel on node 2 and there would be 1 final inter-node
reduce (of node 1 reduced result and node 2 reduced result).

Isn't parallel reduce supported WITHIN a key (in this case I have no key) ?
(I know that there is parallelism in reduce across keys)

Best Regards
Anand



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallel-Reduce-within-a-key-tp7998.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Anything like grid search available for mlbase?

2014-06-20 Thread Charles Earl
Looking for something like scikit's grid search module.
C


Re: problem about cluster mode of spark 1.0.0

2014-06-20 Thread Gino Bustelo
I've found that the jar will be copied to the worker from hdfs fine, but it is 
not added to the spark context for you. You have to know that the jar will end 
up in the driver's working dir, and so you just add a the file name if the jar 
to the context in your program. 

In your example below, just add to the context "test.jar". 

Btw, the context will not have the master URL either, so add that while you are 
at it. 

This is a big issue. I've posted about it a week ago and no replies. Hopefully 
it gets more attention as more people start hitting this. Basically, 
spark-submit on standalone cluster with cluster deploy is broken. 

Gino B.

> On Jun 20, 2014, at 2:46 AM, randylu  wrote:
> 
> in addition, jar file can be copied to driver node automatically.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-cluster-mode-of-spark-1-0-0-tp7982p7984.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: MLLib inside Storm : silly or not ?

2014-06-20 Thread Eustache DIEMERT
Yes, learning on a dedicated Spark cluster and predicting inside a Storm
bolt is quite OK :)

Thanks all for your answers.

I'll post back if/when we experience this solution.

E/



2014-06-19 20:45 GMT+02:00 Shuo Xiang :

> If I'm understanding correctly, you want to use MLlib for offline training
> and then deploy the learned model to Storm? In this case I don't think
> there is any problem. However if you are looking for online model
> update/training, this can be complicated and I guess quite a few algorithms
> in mllib at this time are designed for offline/batch learning.
>
>
> On Thu, Jun 19, 2014 at 12:26 AM, Eustache DIEMERT 
> wrote:
>
>> Hi Sparkers,
>>
>> We have a Storm cluster and looking for a decent execution engine for
>> machine learned models. What I've seen from MLLib is extremely positive,
>> but we can't just throw away our Storm based stack.
>>
>> So my question is: is it feasible/recommended to train models in
>> Spark/MLLib and execute them in another Java environment (Storm in this
>> case) ?
>>
>> Thanks for any insights :)
>>
>> Eustache
>>
>
>


Re: How could I set the number of executor?

2014-06-20 Thread Earthson
--num-executors seems to be only available with YARN-only.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-could-I-set-the-number-of-executor-tp7990p7992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space

2014-06-20 Thread Eugen Cepoi
Le 20 juin 2014 01:46, "Shivani Rao"  a écrit :
>
> Hello Andrew,
>
> i wish I could share the code, but for proprietary reasons I can't. But I
can give some idea though of what i am trying to do. The job reads a file
and for each line of that file and processors these lines. I am not doing
anything intense in the "processLogs" function
>
> import argonaut._
> import argonaut.Argonaut._
>
>
> /* all of these case classes are created from json strings extracted from
the line in the processLogs() function
> *
> */
> case class struct1…
> case class struct2…
> case class value1(struct1, struct2)
>
> def processLogs(line:String): Option[(key1, value1)] {…
> }
>
> def run(sparkMaster, appName, executorMemory, jarsPath) {
>   val sparkConf = new SparkConf()
>sparkConf.setMaster(sparkMaster)
>sparkConf.setAppName(appName)
>sparkConf.set("spark.executor.memory", executorMemory)
> sparkConf.setJars(jarsPath) // This includes all the jars relevant
jars..
>val sc = new SparkContext(sparkConf)
>   val rawLogs = sc.textFile("hdfs://
rawLogs.saveAsTextFile("hdfs://
rawLogs.flatMap(processLogs).saveAsTextFile("hdfs:// }
>
> If I switch to "local" mode, the code runs just fine, it fails with the
error I pasted above. In the cluster mode, even writing back the file we
just read fails
(rawLogs.saveAsTextFile("hdfs://
> I still believe this is a classNotFound error in disguise
>

Indeed you are right, this can be the reason. I had similar errors when
defining case classes in the shell and trying to use them in the RDDs. Are
you shading argonaut in the fat jar ?

> Thanks
> Shivani
>
>
>
> On Wed, Jun 18, 2014 at 2:49 PM, Andrew Ash  wrote:
>>
>> Wait, so the file only has four lines and the job running out of heap
space?  Can you share the code you're running that does the processing?
 I'd guess that you're doing some intense processing on every line but just
writing parsed case classes back to disk sounds very lightweight.
>>
>> I
>>
>>
>> On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao 
wrote:
>>>
>>> I am trying to process a file that contains 4 log lines (not very long)
and then write my parsed out case classes to a destination folder, and I
get the following error:
>>>
>>>
>>> java.lang.OutOfMemoryError: Java heap space
>>>
>>> at
org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)
>>>
>>> at
org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244)
>>>
>>> at
org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)
>>>
>>> at
org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)
>>>
>>> at
org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>>
>>> at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:597)
>>>
>>> at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
>>>
>>> at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>>>
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>>>
>>> at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>>>
>>> at
org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
>>>
>>> at
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>>
>>> at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:597)
>>>
>>> at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
>>>
>>> at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>>>
>>> at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
>>>
>>> at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>>>
>>> at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
>>>
>>> at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>>>
>>>
>>> Sadly, there are several folks that have faced this error while trying
to exe

How could I set the number of executor?

2014-06-20 Thread Earthson
"spark-submit" has an arguments "--num-executors" to set the number of
executor, but how could I set it from anywhere else?

We're using Shark, and want to change the number of executor. The number of
executor seems to be same as workers by default?

Shall we configure the executor number manually(Is there an automatically
way?)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-could-I-set-the-number-of-executor-tp7990.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: broadcast in spark streaming

2014-06-20 Thread Hahn Jiang
I get it.  thank you


On Fri, Jun 20, 2014 at 4:43 PM, Sourav Chandra <
sourav.chan...@livestream.com> wrote:

> From the StreamingContext object, you can get reference of SparkContext
> using which you can create broadcast variables
>
>
> On Fri, Jun 20, 2014 at 2:09 PM, Hahn Jiang 
> wrote:
>
>> I want to use broadcast in spark streaming, but I found there is no this
>> function.
>> How can I use global variable in spark streaming?
>>
>> thanks
>>
>
>
>
> --
>
> Sourav Chandra
>
> Senior Software Engineer
>
> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>
> sourav.chan...@livestream.com
>
> o: +91 80 4121 8723
>
> m: +91 988 699 3746
>
> skype: sourav.chandra
>
> Livestream
>
> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
> Block, Koramangala Industrial Area,
>
> Bangalore 560034
>
> www.livestream.com
>


Re: broadcast in spark streaming

2014-06-20 Thread Sourav Chandra
>From the StreamingContext object, you can get reference of SparkContext
using which you can create broadcast variables


On Fri, Jun 20, 2014 at 2:09 PM, Hahn Jiang 
wrote:

> I want to use broadcast in spark streaming, but I found there is no this
> function.
> How can I use global variable in spark streaming?
>
> thanks
>



-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


broadcast in spark streaming

2014-06-20 Thread Hahn Jiang
I want to use broadcast in spark streaming, but I found there is no this
function.
How can I use global variable in spark streaming?

thanks


Re: 1.0.1 release plan

2014-06-20 Thread Patrick Wendell
Hey There,

I'd like to start voting on this release shortly because there are a
few important fixes that have queued up. We're just waiting to fix an
akka issue. I'd guess we'll cut a vote in the next few days.

- Patrick

On Thu, Jun 19, 2014 at 10:47 AM, Mingyu Kim  wrote:
> Hi all,
>
> Is there any plan for 1.0.1 release?
>
> Mingyu


Re: problem about cluster mode of spark 1.0.0

2014-06-20 Thread randylu
in addition, jar file can be copied to driver node automatically.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-cluster-mode-of-spark-1-0-0-tp7982p7984.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to store JavaRDD as a sequence file using spark java API?

2014-06-20 Thread abhiguruvayya
Does JavaPairRDD.saveAsHadoopFile store data as a sequenceFile? Then what is
the significance of RDD.saveAsSequenceFile?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-JavaRDD-as-a-sequence-file-using-spark-java-API-tp7969p7983.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


problem about cluster mode of spark 1.0.0

2014-06-20 Thread randylu
my programer runs in standalone model, the commond line is like:
/opt/spark-1.0.0/bin/spark-submit \
--verbose \
--class $class_name --master spark://master:7077 \
--driver-memory 15G \
--driver-cores 2 \
--deploy-mode cluster \
hdfs://master:9000/user/root/jartest/test.jar

But test.jar can't be copied to worker node, so it prompts that
java.lang.ClassNotFoundException.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-cluster-mode-of-spark-1-0-0-tp7982.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.