Re: Kyro deserialisation error

2014-07-24 Thread Guillaume Pitel

Hi,

We've got the same problem here (randomly happens) :

Unable to 
find class: 6  4 ڗ4ڻ 8 44ں*Q|T4⛇` j4 Ǥ4ꙴg8 
4 ¾4Ú»   4   4Ú» pE4ʽ4ں*WsѴμˁ4ڻ4ʤ4ցbל4ڻ

4[͝4[ۦ44ڻ!~44ڻΡ4Ƈ4Pҍ4҇Ÿ%Q4ɋ4‚ifj4w4Y4ڻ*¸4☮”R4Ҳ؅”R4X4ڻ
4]5ᴁX^34l[?s4ƾ4ڻ8BH4Z4@4jჴ? 4ڻ 
7B4ٛƒ/v4ꃂE4뿁4J04릁4%44ؕ w\44 
Ӓ¯ٕ4ڻ/lv4ⴁ40喴Ƴ䂁4¸C4P4ڻ _o4lbʂԛ4각 
4^x4ڻ


Clearly a stream corruption problem.

We've been running fine (afaik) on 1.0.0 for two weeks, switch to 1.0.1 
this Monday, and since, this kind of problem randomly occur.



Guillaume Pitel

Not sure if this helps, but it does seem to be part of a name in a
Wikipedia article, and Wikipedia is the data set. So something is
reading this class name from the data.

http://en.wikipedia.org/wiki/Carl_Fridtjof_Rode

On Thu, Jul 17, 2014 at 9:40 AM, Tathagata Das
tathagata.das1...@gmail.com wrote:

Seems like there is some sort of stream corruption, causing Kryo read to
read a weird class name from the stream (the name arl Fridtjof Rode in the
exception cannot be a class!).
Not sure how to debug this.

@Patrick: Any idea?



--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



spark streaming actor receiver doesn't play well with kryoserializer

2014-07-24 Thread Alan Ngai
it looks like when you configure sparkconfig to use the kryoserializer in 
combination of using an ActorReceiver, bad things happen.  I modified the 
ActorWordCount example program from 

val sparkConf = new SparkConf().setAppName(ActorWordCount)

to

val sparkConf = new SparkConf()
  .setAppName(ActorWordCount)
  .set(spark.serializer, org.apache.spark.serializer.KryoSerializer”)

and I get the stack trace below.  I figured it might be that Kryo doesn’t know 
how to serialize/deserialize the actor so I added a registry.  I also added a 
default empty constructor to SampleActorReceiver just for kicks

class SerializationRegistry extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
kryo.register(classOf[SampleActorReceiver])
  }
}

…

case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
extends Actor with ActorHelper {
  def this() = this(“”)
  ...
}

...
val sparkConf = new SparkConf()
  .setAppName(ActorWordCount)
  .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
  .set(spark.kryo.registrator, 
org.apache.spark.examples.streaming.SerializationRegistry)


None of this worked, same stack trace.  Any idea what’s going on?  Is this a 
known issue and is there a workaround?  

14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while 
creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher 
[akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
 akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: configuration problem while creating 
[akka://spark/user/Supervisor0/SampleReceiver] with dispatcher 
[akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at 
akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at 
org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.init(ActorReceiver.scala:152)
at 
org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at 
org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public 
akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with 
arguments [class java.lang.Class, class 
org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more



Re: Help in merging a RDD agaisnt itself using the V of a (K,V).

2014-07-24 Thread Sean Owen
Yeah reduce() will leave you with one big collection of sets on the
driver. Maybe the set of all identifiers isn't so big -- a hundred
million Longs even isn't so much. I'm glad to hear cartesian works but
can that scale? you're making an RDD of N^2 elements initially which
is just vast.

On Thu, Jul 24, 2014 at 2:09 AM, Roch Denis rde...@exostatic.com wrote:
 Ah yes, you're quite right with partitions I could probably process a good
 chunk of the data but I didn't think a reduce would work? Sorry, I'm still
 new to Spark and map reduce in general but I thought that the reduce result
 wasn't an RDD and had to fit into memory. If the result of a reduce can be
 any size, then yes I can see how to make it work.

 Sorry for not being certain, the doc is not quite clear on that point, at
 least to me.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Help-in-merging-a-RDD-agaisnt-itself-using-the-V-of-a-K-V-tp10530p10556.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark Function setup and cleanup

2014-07-24 Thread Yosi Botzer
Hi,

I am using the Java api of Spark.

I wanted to know if there is a way to run some code in a manner that is
like the setup() and cleanup() methods of Hadoop Map/Reduce

The reason I need it is because I want to read something from the DB
according to each record I scan in my Function, and I would like to open
the DB connection only once (and close it only once).

Thanks


Re: streaming sequence files?

2014-07-24 Thread Sean Owen
Can you just call fileStream or textFileStream in the second app, to
consume files that appear in HDFS / Tachyon from the first job?

On Thu, Jul 24, 2014 at 2:43 AM, Barnaby bfa...@outlook.com wrote:
 If I save an RDD as a sequence file such as:

 val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _)
 wordCounts.foreachRDD( d = {
 d.saveAsSequenceFile(tachyon://localhost:19998/files/WordCounts- +
 (new SimpleDateFormat(MMdd-HHmmss) format
 Calendar.getInstance.getTime).toString)
 })

 How can I use these results in another Spark app since there is no
 StreamingContext.sequenceFileStream()?

 Or,

 What is the best way to save RDDs of objects to files in one streaming app
 so that another app can stream those files in? Basically, reuse partially
 reduced RDDs for further processing so that it doesn't have to be done more
 than once.




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/streaming-sequence-files-tp10557.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Starting with spark

2014-07-24 Thread Sameer Sayyed
Hello All,

I am new user of spark, I am using *cloudera-quickstart-vm-5.0.0-0-vmware*
for execute sample examples of Spark.
I am very sorry for silly and basic question.
I am not able to deploy and execute sample examples of spark.

please suggest me *how to start with spark*.

Please help me
Thanks in advance.

Regards,
Sam


save to HDFS

2014-07-24 Thread lmk
Hi,
I have a scala application which I have launched into a spark cluster. I
have the following statement trying to save to a folder in the master:
saveAsHadoopFile[TextOutputFormat[NullWritable,
Text]](hdfs://masteripaddress:9000/root/test-app/test1/)

The application is executed successfully and log says that save is complete
also. But I am not able to find the file I have saved anywhere. Is there a
way I can access this file?

Pls advice.

Regards,
lmk



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: save to HDFS

2014-07-24 Thread Akhil Das
Are you sure the RDD that you were saving isn't empty!?

Are you seeing a _SUCCESS file in this location? hdfs://
masteripaddress:9000/root/test-app/test1/
 (Do hadoop fs -ls hdfs://masteripaddress:9000/root/test-app/test1/)


Thanks
Best Regards


On Thu, Jul 24, 2014 at 4:24 PM, lmk lakshmi.muralikrish...@gmail.com
wrote:

 Hi,
 I have a scala application which I have launched into a spark cluster. I
 have the following statement trying to save to a folder in the master:
 saveAsHadoopFile[TextOutputFormat[NullWritable,
 Text]](hdfs://masteripaddress:9000/root/test-app/test1/)

 The application is executed successfully and log says that save is complete
 also. But I am not able to find the file I have saved anywhere. Is there a
 way I can access this file?

 Pls advice.

 Regards,
 lmk



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Starting with spark

2014-07-24 Thread Akhil Das
Here's the complete overview http://spark.apache.org/docs/latest/

And Here's the quick start guidelines
http://spark.apache.org/docs/latest/quick-start.html

I would suggest you downloading the Spark pre-compiled binaries
http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.1/spark-1.0.1-bin-hadoop1.tgz
and
start off yourself.

Thanks
Best Regards


On Thu, Jul 24, 2014 at 4:23 PM, Sameer Sayyed sam.sayyed...@gmail.com
wrote:

 Hello All,

 I am new user of spark, I am using *cloudera-quickstart-vm-5.0.0-0-vmware*
 for execute sample examples of Spark.
 I am very sorry for silly and basic question.
 I am not able to deploy and execute sample examples of spark.

 please suggest me *how to start with spark*.

 Please help me
 Thanks in advance.

 Regards,
 Sam



Re: save to HDFS

2014-07-24 Thread lmk
Hi Akhil,
I am sure that the RDD that I saved is not empty. I have tested it using
take.
But is there no way that I can see this saved physically like we do in the
normal context? Can't I view this folder as I am already logged into the
cluster?
And, should I run hadoop fs -ls
hdfs://masteripaddress:9000/root/test-app/test1/
after I login to the cluster?

Thanks,
lmk



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578p10581.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: save to HDFS

2014-07-24 Thread Akhil Das
This piece of code

saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](hdfs://
masteripaddress:9000/root/test-app/test1/)

Saves the RDD into HDFS, and yes you can physically see the files using the
hadoop command (hadoop fs -ls /root/test-app/test1 - yes you need to login
to the cluster). In case if you are not able to execute the command (like
hadoop command not found), you can do like $HADOOP_HOME/bin/hadoop fs -ls
/root/test-app/test1



Thanks
Best Regards


On Thu, Jul 24, 2014 at 4:34 PM, lmk lakshmi.muralikrish...@gmail.com
wrote:

 Hi Akhil,
 I am sure that the RDD that I saved is not empty. I have tested it using
 take.
 But is there no way that I can see this saved physically like we do in the
 normal context? Can't I view this folder as I am already logged into the
 cluster?
 And, should I run hadoop fs -ls
 hdfs://masteripaddress:9000/root/test-app/test1/
 after I login to the cluster?

 Thanks,
 lmk



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578p10581.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: save to HDFS

2014-07-24 Thread lmk
Thanks Akhil.
I was able to view the files. Actually I was trying to list the same using
regular ls and since it did not show anything I was concerned.
Thanks for showing me the right direction.

Regards,
lmk



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578p10583.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Function setup and cleanup

2014-07-24 Thread Yanbo Liang
If you want to connect to DB in program, you can use JdbcRDD (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
)


2014-07-24 18:32 GMT+08:00 Yosi Botzer yosi.bot...@gmail.com:

 Hi,

 I am using the Java api of Spark.

 I wanted to know if there is a way to run some code in a manner that is
 like the setup() and cleanup() methods of Hadoop Map/Reduce

 The reason I need it is because I want to read something from the DB
 according to each record I scan in my Function, and I would like to open
 the DB connection only once (and close it only once).

 Thanks



Re: new error for me

2014-07-24 Thread phoenix bai
I am currently facing the same problem. error snapshot as below:

14-07-24 19:15:30 WARN [pool-3-thread-1] SendingConnection: Error
finishing connection to r64b22034.tt.net/10.148.129.84:47525
java.net.ConnectException: Connection timed out
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599)
at 
org.apache.spark.network.SendingConnection.finishConnect(Connection.scala:318)
at 
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
14-07-24 19:15:30 INFO [pool-3-thread-1] ConnectionManager: Handling
connection error on connection to
ConnectionManagerId(r64b22034.tt.net,47525)
14-07-24 19:15:30 INFO [pool-3-thread-1] ConnectionManager: Removing
SendingConnection to ConnectionManagerId(r64b22034.tt.net,47525)
14-07-24 19:15:30 INFO [pool-3-thread-1] ConnectionManager: Notifying
org.apache.spark.network.ConnectionManager$MessageStatus@1704ebb


could anyone help shed a light on this?


thanks




On Tue, Jul 22, 2014 at 11:35 AM, Nathan Kronenfeld 
nkronenf...@oculusinfo.com wrote:

 Does anyone know what this error means:
 14/07/21 23:07:22 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
 14/07/21 23:07:22 INFO TaskSetManager: Starting task 3.0:0 as TID 1620 on
 executor 27: r104u05.oculus.local (PROCESS_LOCAL)
 14/07/21 23:07:22 INFO TaskSetManager: Serialized task 3.0:0 as 8620 bytes
 in 1 ms
 14/07/21 23:07:36 INFO BlockManagerInfo: Added taskresult_1620 in memory
 on r104u05.oculus.local:50795 (size: 64.9 MB, free: 18.3 GB)
 14/07/21 23:07:36 INFO SendingConnection: Initiating connection to
 [r104u05.oculus.local/192.168.0.105:50795]
 14/07/21 23:07:57 INFO ConnectionManager: key already cancelled ?
 sun.nio.ch.SelectionKeyImpl@1d86a150
 java.nio.channels.CancelledKeyException
 at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
 at sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77)
 at
 org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:265)
 at
 org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:115)
 14/07/21 23:07:57 WARN SendingConnection: Error finishing connection to
 r104u05.oculus.local/192.168.0.105:50795
 java.net.ConnectException: Connection timed out
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at
 sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
 at
 org.apache.spark.network.SendingConnection.finishConnect(Connection.scala:318)
 at
 org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:202)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)
 14/07/21 23:07:57 INFO ConnectionManager: Handling connection error on
 connection to ConnectionManagerId(r104u05.oculus.local,50795)
 14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(r104u05.oculus.local,50795)
 14/07/21 23:07:57 INFO ConnectionManager: Notifying
 org.apache.spark.network.ConnectionManager$MessageStatus@13ad274d
 14/07/21 23:07:57 INFO ConnectionManager: Handling connection error on
 connection to ConnectionManagerId(r104u05.oculus.local,50795)
 14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(r104u05.oculus.local,50795)
 14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(r104u05.oculus.local,50795)
 14/07/21 23:07:57 WARN TaskSetManager: Lost TID 1620 (task 3.0:0)
 14/07/21 23:07:57 WARN TaskSetManager: Lost result for TID 1620 on host
 r104u05.oculus.local

 I've never seen this one before, and now it's coming up consistently.

 Thanks,
  -Nathan




Re: Configuring Spark Memory

2014-07-24 Thread Martin Goodson
Thank you Nishkam,
I have read your code. So, for the sake of my understanding, it seems that
for each spark context there is one executor per node? Can anyone confirm
this?


-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]


On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi nr...@cloudera.com wrote:

 See if this helps:

 https://github.com/nishkamravi2/SparkAutoConfig/

 It's a very simple tool for auto-configuring default parameters in Spark.
 Takes as input high-level parameters (like number of nodes, cores per node,
 memory per node, etc) and spits out default configuration, user advice and
 command line. Compile (javac SparkConfigure.java) and run (java
 SparkConfigure).

 Also cc'ing dev in case others are interested in helping evolve this over
 time (by refining the heuristics and adding more parameters).


  On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 Thanks Andrew,

 So if there is only one SparkContext there is only one executor per
 machine? This seems to contradict Aaron's message from the link above:

 If each machine has 16 GB of RAM and 4 cores, for example, you might set
 spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.)

 Am I reading this incorrectly?

 Anyway our configuration is 21 machines (one master and 20 slaves) each
 with 60Gb. We would like to use 4 cores per machine. This is pyspark so we
 want to leave say 16Gb on each machine for python processes.

 Thanks again for the advice!



 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]


 On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com wrote:

 Hi Martin,

 In standalone mode, each SparkContext you initialize gets its own set of
 executors across the cluster.  So for example if you have two shells open,
 they'll each get two JVMs on each worker machine in the cluster.

 As far as the other docs, you can configure the total number of cores
 requested for the SparkContext, the amount of memory for the executor JVM
 on each machine, the amount of memory for the Master/Worker daemons (little
 needed since work is done in executors), and several other settings.

 Which of those are you interested in?  What spec hardware do you have
 and how do you want to configure it?

 Andrew


 On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 We are having difficulties configuring Spark, partly because we still
 don't understand some key concepts. For instance, how many executors are
 there per machine in standalone mode? This is after having closely
 read the documentation several times:

 *http://spark.apache.org/docs/latest/configuration.html
 http://spark.apache.org/docs/latest/configuration.html*
 *http://spark.apache.org/docs/latest/spark-standalone.html
 http://spark.apache.org/docs/latest/spark-standalone.html*
 *http://spark.apache.org/docs/latest/tuning.html
 http://spark.apache.org/docs/latest/tuning.html*
 *http://spark.apache.org/docs/latest/cluster-overview.html
 http://spark.apache.org/docs/latest/cluster-overview.html*

 The cluster overview has some information here about executors but is
 ambiguous about whether there are single executors or multiple executors on
 each machine.

  This message from Aaron Davidson implies that the executor memory
 should be set to total available memory on the machine divided by the
 number of cores:
 *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E
 http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E*

 But other messages imply that the executor memory should be set to the
 *total* available memory of each machine.

 We would very much appreciate some clarity on this and the myriad of
 other memory settings available (daemon memory, worker memory etc). Perhaps
 a worked example could be added to the docs? I would be happy to provide
 some text as soon as someone can enlighten me on the technicalities!

 Thank you

 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]







Re: Spark Function setup and cleanup

2014-07-24 Thread Yosi Botzer
In my case I want to reach HBase. For every record with userId I want to
get some extra information about the user and add it to result record for
further prcessing


On Thu, Jul 24, 2014 at 9:11 AM, Yanbo Liang yanboha...@gmail.com wrote:

 If you want to connect to DB in program, you can use JdbcRDD (
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
 )


 2014-07-24 18:32 GMT+08:00 Yosi Botzer yosi.bot...@gmail.com:

 Hi,

 I am using the Java api of Spark.

 I wanted to know if there is a way to run some code in a manner that is
 like the setup() and cleanup() methods of Hadoop Map/Reduce

 The reason I need it is because I want to read something from the DB
 according to each record I scan in my Function, and I would like to open
 the DB connection only once (and close it only once).

 Thanks





GraphX for pyspark?

2014-07-24 Thread Eric Friedman
I understand that GraphX is not yet available for pyspark.  I was wondering
if the Spark team has set a target release and timeframe for doing that
work?

Thank you,
Eric


Re: Starting with spark

2014-07-24 Thread Marco Shaw
First thing...  Go into the Cloudera Manager and make sure that the Spark
service (master?) is started.

Marco


On Thu, Jul 24, 2014 at 7:53 AM, Sameer Sayyed sam.sayyed...@gmail.com
wrote:

 Hello All,

 I am new user of spark, I am using *cloudera-quickstart-vm-5.0.0-0-vmware*
 for execute sample examples of Spark.
 I am very sorry for silly and basic question.
 I am not able to deploy and execute sample examples of spark.

 please suggest me *how to start with spark*.

 Please help me
 Thanks in advance.

 Regards,
 Sam



Spark got stuck with a loop

2014-07-24 Thread Denis RP
Hi,
I ran spark standalone mode on a cluster and it went well for approximately
one hour, then the driver's output stopped with the following:

14/07/24 08:07:36 INFO MapOutputTrackerMasterActor: Asked to send map output
locations for shuffle 36 to spark@worker5.local:47416
14/07/24 08:07:36 INFO MapOutputTrackerMaster: Size of output statuses for
shuffle 36 is 265 bytes
14/07/24 08:30:04 INFO MapOutputTrackerMasterActor: Asked to send map output
locations for shuffle 39 to spark@worker5.local:47416
14/07/24 08:30:04 INFO MapOutputTrackerMaster: Size of output statuses for
shuffle 39 is 265 bytes

Then I checked the spark UI, found only one active task, then I checked that
worker's stderr, it seemed the worker had fallen into a loop:

14/07/24 09:18:18 INFO BlockManager: Found block rdd_14_3 locally
14/07/24 09:18:18 INFO BlockManager: Found block rdd_14_3 locally
14/07/24 09:18:18 INFO BlockManager: Found block rdd_14_3 locally
14/07/24 09:18:18 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/07/24 09:18:18 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 0 non-empty blocks out of 28 blocks
14/07/24 09:18:18 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 0 remote fetches in 0 ms

These aberrant info was repeatedly outputted.

So, what should I do to fix it? I have run the program for multiple times
and sooner or later it ends up in the case. And I tried to extend the
memory, didn't work.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-got-stuck-with-a-loop-tp10590.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext

2014-07-24 Thread lihu
​Which code do you used, do you caused by your own code or something in
spark itself?


On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com hsy...@gmail.com wrote:

 I have the same problem


 On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote:

 Hi,
 Everyone.  I have a piece of following code. When I run it,
 it occurred the error just like below, it seem that the SparkContext is not
 serializable, but i do not try to use the SparkContext except the broadcast.
 [In fact, this code is in the MLLib, I just try to broadcast the
  centerArrays ]

 it can success in the redeceBykey operation, but failed at the
 collect operation, this confused me.


 INFO DAGScheduler: Failed to run collect at KMeans.scala:235
 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
 not serializable: java.io.NotSerializableException:
 org.apache.spark.SparkContext
 org.apache.spark.SparkException: Job aborted: Task not serializable:
 java.io.NotSerializableException: org.apache.spark.SparkContext
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
  at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)




 private def initKMeansParallel(data: RDD[Array[Double]]):
 Array[ClusterCenters] = {

 @transient val sc = data.sparkContext   // I try to add the 
 transient
 annotation here, but it doesn't work

 // Initialize each run's center to a random point
 val seed = new XORShiftRandom().nextInt()
 val sample = data.takeSample(true, runs, seed).toSeq
 val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r)))

 // On each step, sample 2 * k points on average for each run with
 probability proportional
 // to their squared distance from that run's current centers
 for (step - 0 until initializationSteps) {
   val centerArrays = sc.broadcast(centers.map(_.toArray))
   val sumCosts = data.flatMap { point =
 for (r - 0 until runs) yield (r,
 KMeans.pointCost(centerArrays.value(r), point))
   }.reduceByKey(_ + _).collectAsMap()
   //can pass at this point
   val chosen = data.mapPartitionsWithIndex { (index, points) =
 val rand = new XORShiftRandom(seed ^ (step  16) ^ index)
 for {
   p - points
   r - 0 until runs
   if rand.nextDouble()  KMeans.pointCost(centerArrays.value(r),
 p) * 2 * k / sumCosts(r)
 } yield (r, p)
   }.collect()
   // failed at this
 point.
   for ((r, p) - chosen) {
 centers(r) += p
   }
 }








-- 
*Best Wishes!*

 *Li Hu(李浒) | Graduate Student*

*Institute for Interdisciplinary Information Sciences(IIIS
http://iiis.tsinghua.edu.cn/) *
*Tsinghua University, China*

*Email: lihu...@gmail.com lihu...@gmail.com*
*Tel  : +86 15120081920*
*Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
http://iiis.tsinghua.edu.cn/zh/lihu/*


Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-24 Thread Tathagata Das
You will have to define your own stream-to-iterator function and use the
socketStream. The function should return custom delimited object as bytes
are continuously coming in. When data is insufficient, the function should
block.

TD
On Jul 23, 2014 6:52 PM, kytay kaiyang@gmail.com wrote:

 Hi TD

 You are right, I did not include \n to delimit the string flushed. That's
 the reason.

 Is there a way for me to define the delimiter? Like SOH or ETX instead of
 \n

 Regards
 kytay



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p10558.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Configuring Spark Memory

2014-07-24 Thread Martin Goodson
Great - thanks for the clarification Aaron. The offer stands for me to
write some documentation and an example that covers this without leaving
*any* room for ambiguity.




-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]


On Thu, Jul 24, 2014 at 6:09 PM, Aaron Davidson ilike...@gmail.com wrote:

 Whoops, I was mistaken in my original post last year. By default, there is
 one executor per node per Spark Context, as you said.
 spark.executor.memory is the amount of memory that the application
 requests for each of its executors. SPARK_WORKER_MEMORY is the amount of
 memory a Spark Worker is willing to allocate in executors.

 So if you were to set SPARK_WORKER_MEMORY to 8g everywhere on your
 cluster, and spark.executor.memory to 4g, you would be able to run 2
 simultaneous Spark Contexts who get 4g per node. Similarly, if
 spark.executor.memory were 8g, you could only run 1 Spark Context at a time
 on the cluster, but it would get all the cluster's memory.


 On Thu, Jul 24, 2014 at 7:25 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 Thank you Nishkam,
 I have read your code. So, for the sake of my understanding, it seems
 that for each spark context there is one executor per node? Can anyone
 confirm this?


 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]


 On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi nr...@cloudera.com wrote:

 See if this helps:

 https://github.com/nishkamravi2/SparkAutoConfig/

 It's a very simple tool for auto-configuring default parameters in
 Spark. Takes as input high-level parameters (like number of nodes, cores
 per node, memory per node, etc) and spits out default configuration, user
 advice and command line. Compile (javac SparkConfigure.java) and run (java
 SparkConfigure).

 Also cc'ing dev in case others are interested in helping evolve this
 over time (by refining the heuristics and adding more parameters).


  On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 Thanks Andrew,

 So if there is only one SparkContext there is only one executor per
 machine? This seems to contradict Aaron's message from the link above:

 If each machine has 16 GB of RAM and 4 cores, for example, you might
 set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by
 Spark.)

 Am I reading this incorrectly?

 Anyway our configuration is 21 machines (one master and 20 slaves) each
 with 60Gb. We would like to use 4 cores per machine. This is pyspark so we
 want to leave say 16Gb on each machine for python processes.

 Thanks again for the advice!



 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]


 On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com
 wrote:

 Hi Martin,

 In standalone mode, each SparkContext you initialize gets its own set
 of executors across the cluster.  So for example if you have two shells
 open, they'll each get two JVMs on each worker machine in the cluster.

 As far as the other docs, you can configure the total number of cores
 requested for the SparkContext, the amount of memory for the executor JVM
 on each machine, the amount of memory for the Master/Worker daemons 
 (little
 needed since work is done in executors), and several other settings.

 Which of those are you interested in?  What spec hardware do you have
 and how do you want to configure it?

 Andrew


 On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 We are having difficulties configuring Spark, partly because we still
 don't understand some key concepts. For instance, how many executors are
 there per machine in standalone mode? This is after having closely
 read the documentation several times:

 *http://spark.apache.org/docs/latest/configuration.html
 http://spark.apache.org/docs/latest/configuration.html*
 *http://spark.apache.org/docs/latest/spark-standalone.html
 http://spark.apache.org/docs/latest/spark-standalone.html*
 *http://spark.apache.org/docs/latest/tuning.html
 http://spark.apache.org/docs/latest/tuning.html*
 *http://spark.apache.org/docs/latest/cluster-overview.html
 http://spark.apache.org/docs/latest/cluster-overview.html*

 The cluster overview has some information here about executors but is
 ambiguous about whether there are single executors or multiple executors 
 on
 each machine.

  This message from Aaron Davidson implies that the executor memory
 should be set to total available memory on the machine divided by the
 number of cores:
 *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E
 http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E*

 But other messages imply that the executor memory should be set to
 the *total* available memory of each machine.

 We would very much appreciate some clarity on this and the myriad of
 

Re: Configuring Spark Memory

2014-07-24 Thread Aaron Davidson
Whoops, I was mistaken in my original post last year. By default, there is
one executor per node per Spark Context, as you said.
spark.executor.memory is the amount of memory that the application
requests for each of its executors. SPARK_WORKER_MEMORY is the amount of
memory a Spark Worker is willing to allocate in executors.

So if you were to set SPARK_WORKER_MEMORY to 8g everywhere on your cluster,
and spark.executor.memory to 4g, you would be able to run 2 simultaneous
Spark Contexts who get 4g per node. Similarly, if spark.executor.memory
were 8g, you could only run 1 Spark Context at a time on the cluster, but
it would get all the cluster's memory.


On Thu, Jul 24, 2014 at 7:25 AM, Martin Goodson mar...@skimlinks.com
wrote:

 Thank you Nishkam,
 I have read your code. So, for the sake of my understanding, it seems that
 for each spark context there is one executor per node? Can anyone confirm
 this?


 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]


 On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi nr...@cloudera.com wrote:

 See if this helps:

 https://github.com/nishkamravi2/SparkAutoConfig/

 It's a very simple tool for auto-configuring default parameters in Spark.
 Takes as input high-level parameters (like number of nodes, cores per node,
 memory per node, etc) and spits out default configuration, user advice and
 command line. Compile (javac SparkConfigure.java) and run (java
 SparkConfigure).

 Also cc'ing dev in case others are interested in helping evolve this over
 time (by refining the heuristics and adding more parameters).


  On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 Thanks Andrew,

 So if there is only one SparkContext there is only one executor per
 machine? This seems to contradict Aaron's message from the link above:

 If each machine has 16 GB of RAM and 4 cores, for example, you might
 set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by
 Spark.)

 Am I reading this incorrectly?

 Anyway our configuration is 21 machines (one master and 20 slaves) each
 with 60Gb. We would like to use 4 cores per machine. This is pyspark so we
 want to leave say 16Gb on each machine for python processes.

 Thanks again for the advice!



 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]


 On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com
 wrote:

 Hi Martin,

 In standalone mode, each SparkContext you initialize gets its own set
 of executors across the cluster.  So for example if you have two shells
 open, they'll each get two JVMs on each worker machine in the cluster.

 As far as the other docs, you can configure the total number of cores
 requested for the SparkContext, the amount of memory for the executor JVM
 on each machine, the amount of memory for the Master/Worker daemons (little
 needed since work is done in executors), and several other settings.

 Which of those are you interested in?  What spec hardware do you have
 and how do you want to configure it?

 Andrew


 On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 We are having difficulties configuring Spark, partly because we still
 don't understand some key concepts. For instance, how many executors are
 there per machine in standalone mode? This is after having closely
 read the documentation several times:

 *http://spark.apache.org/docs/latest/configuration.html
 http://spark.apache.org/docs/latest/configuration.html*
 *http://spark.apache.org/docs/latest/spark-standalone.html
 http://spark.apache.org/docs/latest/spark-standalone.html*
 *http://spark.apache.org/docs/latest/tuning.html
 http://spark.apache.org/docs/latest/tuning.html*
 *http://spark.apache.org/docs/latest/cluster-overview.html
 http://spark.apache.org/docs/latest/cluster-overview.html*

 The cluster overview has some information here about executors but is
 ambiguous about whether there are single executors or multiple executors 
 on
 each machine.

  This message from Aaron Davidson implies that the executor memory
 should be set to total available memory on the machine divided by the
 number of cores:
 *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E
 http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E*

 But other messages imply that the executor memory should be set to the
 *total* available memory of each machine.

 We would very much appreciate some clarity on this and the myriad of
 other memory settings available (daemon memory, worker memory etc). 
 Perhaps
 a worked example could be added to the docs? I would be happy to provide
 some text as soon as someone can enlighten me on the technicalities!

 Thank you

 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]








Re: Starting with spark

2014-07-24 Thread Jerry
Hi Sameer,

I think it is much easier to start using Spark in standalone mode on a single 
machine. Last time I tried cloudera manager to deploy spark, it wasn't very 
straight forward and I hit couple of obstacles along the way. However, 
standalone mode is very easy to start exploring spark.

Best Regards,

Jerry

Sent from my iPad

 On Jul 24, 2014, at 6:53 AM, Sameer Sayyed sam.sayyed...@gmail.com wrote:
 
 Hello All,
 
 I am new user of spark, I am using cloudera-quickstart-vm-5.0.0-0-vmware for 
 execute sample examples of Spark. 
 I am very sorry for silly and basic question.
 I am not able to deploy and execute sample examples of spark.
 
 please suggest me how to start with spark.
 
 Please help me
 Thanks in advance.
 
 Regards,  
 Sam


Re: akka 2.3.x?

2014-07-24 Thread yardena
We are also eagerly waiting for akka 2.3.4 support as we use Akka (and Spray)
directly in addition to Spark.

  Yardena



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/akka-2-3-x-tp10513p10597.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Starting with spark

2014-07-24 Thread Kostiantyn Kudriavtsev
Hi Sam,

I tried Spark on Cloudera a couple month age, any there were a lot of issues… 
Fortunately, I was able to switch to Hortonworks and exerting works perfect. In 
general, you can try two mode: standalone and via YARN. Personally, I found 
using Spark via YARN more comfortable special for administrating. You can era 
about my experience w/ standalone mode: 
http://simpletoad.blogspot.com/2014/04/spark-on-hdp2.html

On Jul 24, 2014, at 8:12 PM, Jerry chiling...@gmail.com wrote:

 Hi Sameer,
 
 I think it is much easier to start using Spark in standalone mode on a single 
 machine. Last time I tried cloudera manager to deploy spark, it wasn't very 
 straight forward and I hit couple of obstacles along the way. However, 
 standalone mode is very easy to start exploring spark.
 
 Best Regards,
 
 Jerry
 
 Sent from my iPad
 
 On Jul 24, 2014, at 6:53 AM, Sameer Sayyed sam.sayyed...@gmail.com wrote:
 
 Hello All,
 
 I am new user of spark, I am using cloudera-quickstart-vm-5.0.0-0-vmware for 
 execute sample examples of Spark. 
 I am very sorry for silly and basic question.
 I am not able to deploy and execute sample examples of spark.
 
 please suggest me how to start with spark.
 
 Please help me
 Thanks in advance.
 
 Regards,  
 Sam



rdd.saveAsTextFile blows up

2014-07-24 Thread Eric Friedman
I'm trying to run a simple pipeline using PySpark, version 1.0.1

I've created an RDD over a parquetFile and am mapping the contents with a
transformer function and now wish to write the data out to HDFS.

All of the executors fail with the same stack trace (below)

I do get a directory on HDFS, but it's empty except for a file named
_temporary.

Any ideas?

java.io.IOException (java.io.IOException: Filesystem closed}
org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:629)
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:735)
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:793)
java.io.DataInputStream.readFully(DataInputStream.java:195)
java.io.DataInputStream.readFully(DataInputStream.java:169)
parquet.hadoop.ParquetFileReader$Chunk.init(ParquetFileReader.java:369)
parquet.hadoop.ParquetFileReader$Chunk.init(ParquetFileReader.java:362)
parquet.hadoop.ParquetFileReader.readColumnChunkPages(ParquetFileReader.java:411)
parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:349)
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:966)
scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:293)
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)


Re: akka 2.3.x?

2014-07-24 Thread Matei Zaharia
This is being tracked here: https://issues.apache.org/jira/browse/SPARK-1812, 
since it will also be needed for cross-building with Scala 2.11. Maybe we can 
do it before that. Probably too late for 1.1, but you should open an issue for 
1.2.

In that JIRA I linked, there's a pull request from a month ago that adds Akka 
2.3, so that's worth looking at.

Matei

On Jul 24, 2014, at 10:19 AM, yardena ymeym...@gmail.com wrote:

 We are also eagerly waiting for akka 2.3.4 support as we use Akka (and Spray)
 directly in addition to Spark.
 
  Yardena
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/akka-2-3-x-tp10513p10597.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



GraphX canonical conflation issues

2014-07-24 Thread e5c
Hi there,

This issue has been mentioned in:

http://apache-spark-user-list.1001560.n3.nabble.com/Java-IO-Stream-Corrupted-Invalid-Type-AC-td6925.html
 

However I'm starting a new thread since the issue is distinct from the above
topic's designated subject.

I'm test-running canonical conflation on a ~100 MB graph (with hopes to
scale to 10 GB or more). I'm deploying on 5 r3.xlarge machines on AWS EMR
and using default configurations, with the exception of setting
spark.serializer as org.apache.spark.serializer.KryoSerializer.

The full stack-trace from canonical conflation is pasted below; it evidently
fails at: Failed to run reduce at VertexRDD.scala:100. (The same app ran
just fine on very small input locally.) Has there been any progress in
identifying the underlying issues? Thanks!

14/07/24 16:29:37 INFO mapred.FileInputFormat: Total input paths to process
: 1
* About to run connected components *
14/07/24 16:29:37 INFO spark.SparkContext: Starting job: reduce at
VertexRDD.scala:100
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 5
(mapPartitions at VertexRDD.scala:423)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 18
(mapPartitions at VertexRDD.scala:318)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 22
(mapPartitions at VertexRDD.scala:318)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 26
(mapPartitions at GraphImpl.scala:184)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Got job 0 (reduce at
VertexRDD.scala:100) with 1 output partitions (allowLocal=false)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Final stage: Stage 0(reduce
at VertexRDD.scala:100)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Parents of final stage:
List(Stage 1, Stage 2)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Missing parents: List(Stage
1, Stage 2)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Submitting Stage 1
(VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[5]
at mapPartitions at VertexRDD.scala:423), which has no missing parents
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from Stage 1 (VertexRDD.createRoutingTables - vid2pid (aggregation)
MapPartitionsRDD[5] at mapPartitions at VertexRDD.scala:423)
14/07/24 16:29:37 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with
1 tasks
14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkExecutor@ip-10-5-147-209.ec2.internal:60530/user/Executor#-2098248966]
with ID 0
14/07/24 16:29:39 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID
0 on executor 0: ip-10-5-147-209.ec2.internal (PROCESS_LOCAL)
14/07/24 16:29:39 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as
2300 bytes in 3 ms
14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkExecutor@ip-10-167-166-70.ec2.internal:53470/user/Executor#-1954387250]
with ID 3
14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkExecutor@ip-10-169-50-78.ec2.internal:37584/user/Executor#-247338355]
with ID 2
14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkExecutor@ip-10-95-161-133.ec2.internal:55718/user/Executor#-2120787048]
with ID 1
14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager
ip-10-167-166-70.ec2.internal:52351 with 294.9 MB RAM
14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager
ip-10-5-147-209.ec2.internal:34712 with 294.9 MB RAM
14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager
ip-10-169-50-78.ec2.internal:35244 with 294.9 MB RAM
14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager
ip-10-95-161-133.ec2.internal:44976 with 294.9 MB RAM
14/07/24 16:30:09 INFO cluster.SparkDeploySchedulerBackend: Executor 0
disconnected, so removing it
14/07/24 16:30:09 INFO client.AppClient$ClientActor: Executor updated:
app-20140724162937-0004/0 is now EXITED (Command exited with code 52)
14/07/24 16:30:09 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20140724162937-0004/0 removed: Command exited with code 52
14/07/24 16:30:09 ERROR scheduler.TaskSchedulerImpl: Lost executor 0 on
ip-10-5-147-209.ec2.internal: remote Akka client disassociated
14/07/24 16:30:09 INFO scheduler.TaskSetManager: Re-queueing tasks for 0
from TaskSet 1.0
14/07/24 16:30:10 WARN scheduler.TaskSetManager: Lost TID 0 (task 1.0:0)
14/07/24 16:30:10 INFO client.AppClient$ClientActor: Executor added:
app-20140724162937-0004/4 on
worker-20140724151003-ip-10-5-147-209.ec2.internal-55958
(ip-10-5-147-209.ec2.internal:55958) with 4 cores
14/07/24 16:30:10 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID
1 on executor 1: ip-10-95-161-133.ec2.internal (PROCESS_LOCAL)
14/07/24 16:30:10 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20140724162937-0004/4 on hostPort 

continuing processing when errors occur

2014-07-24 Thread Art Peel
Our system works with RDDs generated from Hadoop files. It processes each
record in a Hadoop file and for a subset of those records generates output
that is written to an external system via RDD.foreach. There are no
dependencies between the records that are processed.

If writing to the external system fails (due to a detail of what is being
written) and throws an exception, I see the following behavior:

1. Spark retries the entire partition (thus wasting time and effort),
reaches the problem record and fails again.
2. It repeats step 1 up to the default 4 tries and then gives up. As a
result, the rest of records from that Hadoop file are not processed.
3. The executor where the 4th failure occurred is marked as failed and told
to shut down and thus I lose a core for processing the remaining Hadoop
files, thus slowing down the entire process.

For this particular problem, I know how to prevent the underlying
exception, but I'd still like to get a handle on error handling for future
situations that I haven't yet encountered.

My goal is this:
Retry the problem record only (rather than starting over at the beginning
of the partition) up to N times, then give up and move on to process the
rest of the partition.

As far as I can tell, I need to supply my own retry behavior and if I want
to process records after the problem record I have to swallow exceptions
inside the foreach block.

My 2 questions are:
1. Is there anything I can do to prevent the executor from being shut down
when a failure occurs?

2. Are there ways Spark can help me get closer to my goal of retrying only
the problem record without writing my own re-try code and swallowing
exceptions?

Regards,
Art


Getting the number of slaves

2014-07-24 Thread Nicolas Mai
Hi,

Is there a way to get the number of slaves/workers during runtime?

I searched online but didn't find anything :/ The application I'm working
will run on different clusters corresponding to different deployment stages
(beta - prod). It would be great to get the number of slaves currently in
use, in order set the level of parallelism and RDD partitions, based on that
number.

Thanks!
Nicolas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Simple record matching using Spark SQL

2014-07-24 Thread Yin Huai
Hi Sarath,

I will try to reproduce the problem.

Thanks,

Yin


On Wed, Jul 23, 2014 at 11:32 PM, Sarath Chandra 
sarathchandra.jos...@algofusiontech.com wrote:

 Hi Michael,

 Sorry for the delayed response.

 I'm using Spark 1.0.1 (pre-built version for hadoop 1). I'm running spark
 programs on a standalone spark cluster using 2 nodes. One node works as
 both master and worker while other node is just a worker.

 I quite didn't get when you asked for jstack of the driver and executor.
 So I'm attaching the log files generated in $SPARK_HOME/logs and stdout and
 stderr files for this job in $SPARK_HOME/work folder from both the nodes.

 Also attaching the program which I executed. If I uncomment the lines 36 
 37 the program works fine, otherwise it just keeps running forever.

 ~Sarath.


 On Thu, Jul 17, 2014 at 9:35 PM, Michael Armbrust mich...@databricks.com
 wrote:

 What version are you running?  Could you provide a jstack of the driver
 and executor when it is hanging?


 On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Added below 2 lines just before the sql query line -
 *...*
 *file1_schema.count;*
 *file2_schema.count;*
 *...*
 and it started working. But I couldn't get the reason.

 Can someone please explain me? What was happening earlier and what is
 happening with addition of these 2 lines?

 ~Sarath


 On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 No Sonal, I'm not doing any explicit call to stop context.

 If you see my previous post to Michael, the commented portion of the
 code is my requirement. When I run this over standalone spark cluster, the
 execution keeps running with no output or error. After waiting for several
 minutes I'm killing it by pressing Ctrl+C in the terminal.

 But the same code runs perfectly when executed from spark shell.

 ~Sarath


 On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal sonalgoy...@gmail.com
 wrote:

 Hi Sarath,

 Are you explicitly stopping the context?

 sc.stop()




 Best Regards,
 Sonal
 Nube Technologies http://www.nubetech.co

 http://in.linkedin.com/in/sonalgoyal




 On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Hi Michael, Soumya,

 Can you please check and let me know what is the issue? what am I
 missing?
 Let me know if you need any logs to analyze.

 ~Sarath


 On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Hi Michael,

 Tried it. It's correctly printing the line counts of both the files.
 Here's what I tried -

 *Code:*
 *package test*
 *object Test4 {*
 *  case class Test(fld1: String, *
 *   fld2: String, *
 *   fld3: String, *
 *   fld4: String, *
 *   fld5: String, *
 *   fld6: Double, *
 *   fld7: String);*
 *  def main(args: Array[String]) {*
 *val conf = new SparkConf()*
 *.setMaster(args(0))*
 * .setAppName(SQLTest)*
 * .setSparkHome(args(1))*
 * .set(spark.executor.memory, 2g);*
 *val sc = new SparkContext(conf);*
 *sc.addJar(test1-0.1.jar);*
 *val file1 = sc.textFile(args(2));*
 *println(file1.count());*
 *val file2 = sc.textFile(args(3));*
 *println(file2.count());*
 *//val sq = new SQLContext(sc);*
 *//import sq._*
 *//val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l =
 Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
 *//val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s =
 Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
 *//val file1_schema = sq.createSchemaRDD(file1_recs);*
 *//val file2_schema = sq.createSchemaRDD(file2_recs);*
 *//file1_schema.registerAsTable(file1_tab);*
 *//file2_schema.registerAsTable(file2_tab);*
 *//val matched = sq.sql(select * from file1_tab l join
 file2_tab s on  + *
 *// l.fld7=s.fld7 where l.fld2=s.fld2 and  + *
 *// l.fld3=s.fld3 and l.fld4=s.fld4 and  + *
 *// l.fld6=s.fld6);*
 *//matched.collect().foreach(println);*
 *  }*
 *}*

 *Execution:*
 *export
 CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar*
 *export CONFIG_OPTS=-Dspark.jars=test1-0.1.jar*
 *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077
 /usr/local/spark-1.0.1-bin-hadoop1
 hdfs://master:54310/user/hduser/file1.csv
 hdfs://master:54310/user/hduser/file2.csv*

 ~Sarath

 On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 What if you just run something like:
 *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()*


 On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Yes Soumya, I did it.

 First I tried with the example available in the documentation
 (example using people table and finding teenagers). After successfully
 running it, I moved on to this one which is starting point to a bigger
 requirement for which I'm evaluating Spark SQL.


 On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta 

Spark Training at Scala By the Bay with Databricks, Fast Tracl to Scala

2014-07-24 Thread Alexy Khrabrov
Scala By the Bay (www.scalabythebay.org) is happy to confirm that our
Spark training on August 11-12 will be run by Databricks and By the
Bay together.  It will be focused on Scala, and is the first Spark
Training at a major Scala venue.  Spark is written in Scala, with the
unified data pipeline and interactive REPL and streaming all taking
advantage of powerful Scala DSL capabilities.  Type safety ensures
that your program will not have run-time errors due to typos, and JVM
performance wins for machine learning and compute-intensive
applications.  Type inference keeps code clear and concise in Scala
while keeping all strong typing guarantees, and immutability enables
concurrency on multicores as well as across the cluster.  If you're
serious about performance and enterprise maintainability, you should
use Spark with Scala.

We're offering Typesafe-certified Fast Track to Scala (FTTS) course at
Scala By the Bay.  To celebrate our becoming a Typesafe Certified
Training Partner, we're happy to extend to Spark users a $300 discount
for the FTTS -- use code SPARKSCALABYTHEBAY.  FTTS is taught on August
6-7 by Brendan W. McAdams.  Brendan is an Akka contributor and the
author of MongoDB Scala drivers, including reactive ones, and one of
the best Scala teachers in the world.  His notes on the course are
here: http://www.scalabythebay.org/files/scala_training_deck.pdf.

This is the best intensive ramp-up Scala course you could possibly
take.  If your company is choosing Scala, there's no better way to get
up to speed on the whole Scala ecosystem by attending the training and
then the conference (and possibly Spark training afterwards).  As you
dive deeper into Spark, you will want to use it in Scala.

Our trainings are fairly intimate affairs, limited to 25 people each.
It means you will get individual attention for your skill level.
Please share this course information with your colleagues who are
planning to learn Scala and Spark.

Full training info: http://www.scalabythebay.org/training.html#training.
Group discounts are available on 5 or more registrations,
contactregistrat...@scalabythebay.org for more information about those
or if you have any other questions.

Cheers,
A+ (Alexy) and Scala By the Bay


Re: Getting the number of slaves

2014-07-24 Thread Evan R. Sparks
Try sc.getExecutorStorageStatus().length

SparkContext's getExecutorMemoryStatus or getExecutorStorageStatus will
give you back an object per executor - the StorageStatus objects are what
drives a lot of the Spark Web UI.

https://spark.apache.org/docs/1.0.1/api/scala/index.html#org.apache.spark.SparkContext


On Thu, Jul 24, 2014 at 11:16 AM, Nicolas Mai nicolas@gmail.com wrote:

 Hi,

 Is there a way to get the number of slaves/workers during runtime?

 I searched online but didn't find anything :/ The application I'm working
 will run on different clusters corresponding to different deployment stages
 (beta - prod). It would be great to get the number of slaves currently in
 use, in order set the level of parallelism and RDD partitions, based on
 that
 number.

 Thanks!
 Nicolas



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Emacs Setup Anyone?

2014-07-24 Thread Steve Nunez
Anyone out there have a good configuration for emacs? Scala-mode sort of
works, but I¹d love to see a fully-supported spark-mode with an inferior
shell. Searching didn¹t turn up much of anything.

Any emacs users out there? What setup are you using?

Cheers,
- SteveN





-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Kmeans: set initial centers explicitly

2014-07-24 Thread SK

Hi,

The mllib.clustering.kmeans implementation supports a random or parallel
initialization mode to pick the initial centers. is there a way to specify
the initial centers explictly? It would be useful to have a setCenters()
method where we can explicitly specify the initial centers. (For e.g. R
allows us to specify the initial centers.)  

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kmeans-set-initial-centers-explicitly-tp10609.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: continuing processing when errors occur

2014-07-24 Thread Imran Rashid
Hi Art,

I have some advice that isn't spark-specific at all, so it doesn't
*exactly* address your questions, but you might still find helpful.  I
think using an implicit to add your retyring behavior might be useful.  I
can think of two options:

1. enriching RDD itself, eg. to add a .retryForeach, which would have the
desired behavior.

2. enriching Function to create a variant with retry behavior.

I prefer option 2, because it could be useful outside of spark, and even
within spark, you might realize you want to do something similar for more
than just foreach.

Here's an example.  (probably there is a more functional way to do this, to
avoid the while loop, but my brain isn't working and that's not the point
of this anyway)

Lets say we have this function:

def tenDiv(x:Int) = println(10 / x)

and we try applying it to a normal old Range:

scala (-10 to 10).foreach{tenDiv}
-1
-1
-1
-1
-1
-2
-2
-3
-5
-10
java.lang.ArithmeticException: / by zero
at .tenDiv(console:7)


We can create enrich Function to add some retry behavior:

class RetryFunction[-A](nTries: Int,f: A = Unit) extends Function[A,Unit]
{
  def apply(a: A): Unit = {
var tries = 0
var success = false
while(!success  tries  nTries) {
  tries += 1
  try {
f(a)
  } catch {
case scala.util.control.NonFatal(ex) =
  println(sfailed on try $tries with $ex)
  }
}
  }
}

implicit class Retryable[A](f: A = Unit) {
  def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
}



We activate this behavior by calling .retryable(nTries) on our method.
Like so:

scala (-2 to 2).foreach{(tenDiv _).retryable(1)}
-5
-10
failed on try 1 with java.lang.ArithmeticException: / by zero
10
5

scala (-2 to 2).foreach{(tenDiv _).retryable(3)}
-5
-5
-5
-10
-10
-10
failed on try 1 with java.lang.ArithmeticException: / by zero
failed on try 2 with java.lang.ArithmeticException: / by zero
failed on try 3 with java.lang.ArithmeticException: / by zero
10
10
10
5
5
5


You could do the same thing on closures you pass to RDD.foreach.

I should add, that I'm often very hesitant to use implicits because in can
make it harder to follow what's going on in the code.  I think this version
is OK, though, b/c somebody coming along later and looking at the code at
least can see the call to retryable as a clue.  (I really dislike
implicit conversions that happen without any hints in the actual code.)
Hopefully that's enough of a hint for others to figure out what is going
on.  Eg., intellij will know where that method came from and jump to it,
and also if you make the name unique enough, you can probably find it with
plain text search / c-tags.  But, its definitely worth considering for
yourself.

hope this helps,
Imran



On Thu, Jul 24, 2014 at 1:12 PM, Art Peel found...@gmail.com wrote:

 Our system works with RDDs generated from Hadoop files. It processes each
 record in a Hadoop file and for a subset of those records generates output
 that is written to an external system via RDD.foreach. There are no
 dependencies between the records that are processed.

 If writing to the external system fails (due to a detail of what is being
 written) and throws an exception, I see the following behavior:

 1. Spark retries the entire partition (thus wasting time and effort),
 reaches the problem record and fails again.
 2. It repeats step 1 up to the default 4 tries and then gives up. As a
 result, the rest of records from that Hadoop file are not processed.
 3. The executor where the 4th failure occurred is marked as failed and
 told to shut down and thus I lose a core for processing the remaining
 Hadoop files, thus slowing down the entire process.


 For this particular problem, I know how to prevent the underlying
 exception, but I'd still like to get a handle on error handling for future
 situations that I haven't yet encountered.

 My goal is this:
 Retry the problem record only (rather than starting over at the beginning
 of the partition) up to N times, then give up and move on to process the
 rest of the partition.

 As far as I can tell, I need to supply my own retry behavior and if I want
 to process records after the problem record I have to swallow exceptions
 inside the foreach block.

 My 2 questions are:
 1. Is there anything I can do to prevent the executor from being shut down
 when a failure occurs?


 2. Are there ways Spark can help me get closer to my goal of retrying only
 the problem record without writing my own re-try code and swallowing
 exceptions?

 Regards,
 Art




Re: Configuring Spark Memory

2014-07-24 Thread John Omernik
SO this is good information for standalone, but how is memory distributed
within Mesos?  There's coarse grain mode where the execute stays active, or
theres fine grained mode where it appears each task is it's only process in
mesos, how to memory allocations work in these cases? Thanks!



On Thu, Jul 24, 2014 at 12:14 PM, Martin Goodson mar...@skimlinks.com
wrote:

 Great - thanks for the clarification Aaron. The offer stands for me to
 write some documentation and an example that covers this without leaving
 *any* room for ambiguity.




 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]


 On Thu, Jul 24, 2014 at 6:09 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 Whoops, I was mistaken in my original post last year. By default, there
 is one executor per node per Spark Context, as you said.
 spark.executor.memory is the amount of memory that the application
 requests for each of its executors. SPARK_WORKER_MEMORY is the amount of
 memory a Spark Worker is willing to allocate in executors.

 So if you were to set SPARK_WORKER_MEMORY to 8g everywhere on your
 cluster, and spark.executor.memory to 4g, you would be able to run 2
 simultaneous Spark Contexts who get 4g per node. Similarly, if
 spark.executor.memory were 8g, you could only run 1 Spark Context at a time
 on the cluster, but it would get all the cluster's memory.


 On Thu, Jul 24, 2014 at 7:25 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 Thank you Nishkam,
 I have read your code. So, for the sake of my understanding, it seems
 that for each spark context there is one executor per node? Can anyone
 confirm this?


 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]


 On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi nr...@cloudera.com
 wrote:

 See if this helps:

 https://github.com/nishkamravi2/SparkAutoConfig/

 It's a very simple tool for auto-configuring default parameters in
 Spark. Takes as input high-level parameters (like number of nodes, cores
 per node, memory per node, etc) and spits out default configuration, user
 advice and command line. Compile (javac SparkConfigure.java) and run (java
 SparkConfigure).

 Also cc'ing dev in case others are interested in helping evolve this
 over time (by refining the heuristics and adding more parameters).


  On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 Thanks Andrew,

 So if there is only one SparkContext there is only one executor per
 machine? This seems to contradict Aaron's message from the link above:

 If each machine has 16 GB of RAM and 4 cores, for example, you might
 set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by
 Spark.)

 Am I reading this incorrectly?

 Anyway our configuration is 21 machines (one master and 20 slaves)
 each with 60Gb. We would like to use 4 cores per machine. This is pyspark
 so we want to leave say 16Gb on each machine for python processes.

 Thanks again for the advice!



 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]


 On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com
 wrote:

 Hi Martin,

 In standalone mode, each SparkContext you initialize gets its own set
 of executors across the cluster.  So for example if you have two shells
 open, they'll each get two JVMs on each worker machine in the cluster.

 As far as the other docs, you can configure the total number of cores
 requested for the SparkContext, the amount of memory for the executor JVM
 on each machine, the amount of memory for the Master/Worker daemons 
 (little
 needed since work is done in executors), and several other settings.

 Which of those are you interested in?  What spec hardware do you have
 and how do you want to configure it?

 Andrew


 On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com
  wrote:

 We are having difficulties configuring Spark, partly because we
 still don't understand some key concepts. For instance, how many 
 executors
 are there per machine in standalone mode? This is after having
 closely read the documentation several times:

 *http://spark.apache.org/docs/latest/configuration.html
 http://spark.apache.org/docs/latest/configuration.html*
 *http://spark.apache.org/docs/latest/spark-standalone.html
 http://spark.apache.org/docs/latest/spark-standalone.html*
 *http://spark.apache.org/docs/latest/tuning.html
 http://spark.apache.org/docs/latest/tuning.html*
 *http://spark.apache.org/docs/latest/cluster-overview.html
 http://spark.apache.org/docs/latest/cluster-overview.html*

 The cluster overview has some information here about executors but
 is ambiguous about whether there are single executors or multiple 
 executors
 on each machine.

  This message from Aaron Davidson implies that the executor memory
 should be set to total available memory on the machine divided by the
 number of cores:
 

Re: continuing processing when errors occur

2014-07-24 Thread Imran Rashid
whoops!  just realized I was retyring the function even on success.  didn't
pay enough attention to the output from my calls.  Slightly updated
definitions:

class RetryFunction[-A](nTries: Int,f: A = Unit) extends Function[A,Unit]
{
  def apply(a: A): Unit = {
var tries = 0
var success = false
while(!success  tries  nTries) {
  tries += 1
  try {
f(a)
success = true
  } catch {
case scala.util.control.NonFatal(ex) =
  println(sfailed on input $a, try $tries with $ex)
  }
}
  }
}

implicit class Retryable[A](f: A = Unit) {
  def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
}


def tenDiv(x:Int) = println(x +  ---  + (10 / x))


and example usage:

scala (-2 to 2).foreach{(tenDiv _).retryable(3)}
-2 --- -5
-1 --- -10
failed on input 0, try 1 with java.lang.ArithmeticException: / by zero
failed on input 0, try 2 with java.lang.ArithmeticException: / by zero
failed on input 0, try 3 with java.lang.ArithmeticException: / by zero
1 --- 10
2 --- 5





On Thu, Jul 24, 2014 at 2:58 PM, Imran Rashid im...@therashids.com wrote:

 Hi Art,

 I have some advice that isn't spark-specific at all, so it doesn't
 *exactly* address your questions, but you might still find helpful.  I
 think using an implicit to add your retyring behavior might be useful.  I
 can think of two options:

 1. enriching RDD itself, eg. to add a .retryForeach, which would have the
 desired behavior.

 2. enriching Function to create a variant with retry behavior.

 I prefer option 2, because it could be useful outside of spark, and even
 within spark, you might realize you want to do something similar for more
 than just foreach.

 Here's an example.  (probably there is a more functional way to do this,
 to avoid the while loop, but my brain isn't working and that's not the
 point of this anyway)

 Lets say we have this function:

 def tenDiv(x:Int) = println(10 / x)

 and we try applying it to a normal old Range:

 scala (-10 to 10).foreach{tenDiv}
 -1
 -1
 -1
 -1
 -1
 -2
 -2
 -3
 -5
 -10
 java.lang.ArithmeticException: / by zero
 at .tenDiv(console:7)


 We can create enrich Function to add some retry behavior:

 class RetryFunction[-A](nTries: Int,f: A = Unit) extends Function[A,Unit]
 {
   def apply(a: A): Unit = {
 var tries = 0
 var success = false
 while(!success  tries  nTries) {
   tries += 1
   try {
 f(a)
   } catch {
 case scala.util.control.NonFatal(ex) =
   println(sfailed on try $tries with $ex)
   }
 }
   }
 }

 implicit class Retryable[A](f: A = Unit) {
   def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
 }



 We activate this behavior by calling .retryable(nTries) on our method.
 Like so:

 scala (-2 to 2).foreach{(tenDiv _).retryable(1)}
 -5
 -10
 failed on try 1 with java.lang.ArithmeticException: / by zero
 10
 5

 scala (-2 to 2).foreach{(tenDiv _).retryable(3)}
 -5
 -5
 -5
 -10
 -10
 -10
 failed on try 1 with java.lang.ArithmeticException: / by zero
 failed on try 2 with java.lang.ArithmeticException: / by zero
 failed on try 3 with java.lang.ArithmeticException: / by zero
 10
 10
 10
 5
 5
 5


 You could do the same thing on closures you pass to RDD.foreach.

 I should add, that I'm often very hesitant to use implicits because in can
 make it harder to follow what's going on in the code.  I think this version
 is OK, though, b/c somebody coming along later and looking at the code at
 least can see the call to retryable as a clue.  (I really dislike
 implicit conversions that happen without any hints in the actual code.)
 Hopefully that's enough of a hint for others to figure out what is going
 on.  Eg., intellij will know where that method came from and jump to it,
 and also if you make the name unique enough, you can probably find it with
 plain text search / c-tags.  But, its definitely worth considering for
 yourself.

 hope this helps,
 Imran



 On Thu, Jul 24, 2014 at 1:12 PM, Art Peel found...@gmail.com wrote:

 Our system works with RDDs generated from Hadoop files. It processes each
 record in a Hadoop file and for a subset of those records generates output
 that is written to an external system via RDD.foreach. There are no
 dependencies between the records that are processed.

 If writing to the external system fails (due to a detail of what is being
 written) and throws an exception, I see the following behavior:

 1. Spark retries the entire partition (thus wasting time and effort),
 reaches the problem record and fails again.
 2. It repeats step 1 up to the default 4 tries and then gives up. As a
 result, the rest of records from that Hadoop file are not processed.
 3. The executor where the 4th failure occurred is marked as failed and
 told to shut down and thus I lose a core for processing the remaining
 Hadoop files, thus slowing down the entire process.


 For this particular problem, I know how to prevent the underlying
 

KMeans: expensiveness of large vectors

2014-07-24 Thread durin
As a source, I have a textfile with n rows that each contain m
comma-separated integers. 
Each row is then converted into a feature vector with m features each.

I've noticed, that given the same total filesize and number of features, a
larger number of columns is much more expensive for training a KMeans model
than a large number of rows.

To give an example:
10k rows X 1k columns took 21 seconds on my cluster, whereas 1k rows X 10k
colums took 1min47s. Both files had a size of 238M. 

Can someone explain what in the implementation of KMeans causes large
vectors to be so much more expensive than having many of these vectors?
A pointer to the exact part of the source would be fantastic, but even a
general explanation would help me.


Best regards,
Simon 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


cache changes precision

2014-07-24 Thread Ron Gonzalez
Hi,
  I'm doing the following:

  def main(args: Array[String]) = {
    val sparkConf = new SparkConf().setAppName(AvroTest).setMaster(local[2])
    val sc = new SparkContext(sparkConf)
    val conf = new Configuration()
    val job = new Job(conf)
    val path = new Path(/tmp/a.avro);
    val schema = AvroUtils.getSchema(conf, path);

    AvroJob.setInputKeySchema(job, schema);
    
    val rdd = sc.newAPIHadoopFile(
       path.toString(),
       classOf[AvroKeyInputFormat[GenericRecord]],
       classOf[AvroKey[GenericRecord]],
       classOf[NullWritable], conf).map(x = x._1.datum())
    val sum = rdd.map(p = p.get(SEPAL_WIDTH).asInstanceOf[Float]).reduce(_ + 
_)
    val avg = sum/rdd.count()
    println(sSum = $sum)
    println(sAvg = $avg)
  }

If I run this, it works as expected, when I add .cache() to 

val rdd = sc.newAPIHadoopFile(
       path.toString(),
       classOf[AvroKeyInputFormat[GenericRecord]],
       classOf[AvroKey[GenericRecord]],
       classOf[NullWritable], conf).map(x = x._1.datum()).cache()

then the command rounds up the average.

Any idea why this works this way? Any tips on how to fix this?

Thanks,
Ron


mapToPair vs flatMapToPair vs flatMap function usage.

2014-07-24 Thread abhiguruvayya
Can any one help me understand the key difference between mapToPair vs
flatMapToPair vs flatMap functions and also when to apply these functions in
particular.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mapToPair-vs-flatMapToPair-vs-flatMap-function-usage-tp10617.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext

2014-07-24 Thread Tathagata Das
You can set the Java option -Dsun.io.serialization.extendedDebugInfo=true to
have more information about the object be printed. It will help you trace
down the how the SparkContext is getting included in some kind of closure.

TD


On Thu, Jul 24, 2014 at 9:48 AM, lihu lihu...@gmail.com wrote:

 ​Which code do you used, do you caused by your own code or something in
 spark itself?


 On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com hsy...@gmail.com
 wrote:

 I have the same problem


 On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote:

 Hi,
 Everyone.  I have a piece of following code. When I run it,
 it occurred the error just like below, it seem that the SparkContext is not
 serializable, but i do not try to use the SparkContext except the broadcast.
 [In fact, this code is in the MLLib, I just try to broadcast the
  centerArrays ]

 it can success in the redeceBykey operation, but failed at the
 collect operation, this confused me.


 INFO DAGScheduler: Failed to run collect at KMeans.scala:235
 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
 not serializable: java.io.NotSerializableException:
 org.apache.spark.SparkContext
 org.apache.spark.SparkException: Job aborted: Task not serializable:
 java.io.NotSerializableException: org.apache.spark.SparkContext
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
  at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)




 private def initKMeansParallel(data: RDD[Array[Double]]):
 Array[ClusterCenters] = {

 @transient val sc = data.sparkContext   // I try to add the 
 transient
 annotation here, but it doesn't work

 // Initialize each run's center to a random point
 val seed = new XORShiftRandom().nextInt()
 val sample = data.takeSample(true, runs, seed).toSeq
 val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r)))

 // On each step, sample 2 * k points on average for each run with
 probability proportional
 // to their squared distance from that run's current centers
 for (step - 0 until initializationSteps) {
   val centerArrays = sc.broadcast(centers.map(_.toArray))
   val sumCosts = data.flatMap { point =
 for (r - 0 until runs) yield (r,
 KMeans.pointCost(centerArrays.value(r), point))
   }.reduceByKey(_ + _).collectAsMap()
 //can pass at this point
   val chosen = data.mapPartitionsWithIndex { (index, points) =
 val rand = new XORShiftRandom(seed ^ (step  16) ^ index)
 for {
   p - points
   r - 0 until runs
   if rand.nextDouble()  KMeans.pointCost(centerArrays.value(r),
 p) * 2 * k / sumCosts(r)
 } yield (r, p)
   }.collect()
 // failed at this
 point.
   for ((r, p) - chosen) {
 centers(r) += p
   }
 }








 --
 *Best Wishes!*

  *Li Hu(李浒) | Graduate Student*

 *Institute for Interdisciplinary Information Sciences(IIIS
 http://iiis.tsinghua.edu.cn/) *
 *Tsinghua University, China*

 *Email: lihu...@gmail.com lihu...@gmail.com*
 *Tel  : +86 15120081920 %2B86%2015120081920*
 *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
 http://iiis.tsinghua.edu.cn/zh/lihu/*





Re: Getting the number of slaves

2014-07-24 Thread Nicolas Mai
Thanks, this is what I needed :) I should have searched more...

Something I noticed though: after the SparkContext is initialized, I had to
wait for a few seconds until sc.getExecutorStorageStatus.length returns the
correct number of workers in my cluster (otherwise it returns 1, for the
driver)...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604p10619.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: streaming sequence files?

2014-07-24 Thread Barnaby
I have the streaming program writing sequence files. I can find one of the
files and load it in the shell using:

scala val rdd = sc.sequenceFile[String,
Int](tachyon://localhost:19998/files/WordCounts/20140724-213930)
14/07/24 21:47:50 INFO storage.MemoryStore: ensureFreeSpace(32856) called
with curMem=0, maxMem=309225062
14/07/24 21:47:50 INFO storage.MemoryStore: Block broadcast_0 stored as
values to memory (estimated size 32.1 KB, free 294.9 MB)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1] at sequenceFile
at console:12

So I got some type information, seems good.

It took a while to research but I got the following streaming code to
compile and run:

val wordCounts = ssc.fileStream[String, Int, SequenceFileInputFormat[String,
Int]](args(0))

It works now and I offer this for reference to anybody else who may be
curious about saving sequence files and then streaming them back in.

Question:
When running both streaming programs at the same time using spark-submit I
noticed that only one app would really run. To get the one app to continue I
had to stop the other app. Is there a way to get these running
simultaneously?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-sequence-files-tp10557p10620.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: NotSerializableException in Spark Streaming

2014-07-24 Thread Nicholas Chammas
Yep, here goes!

Here are my environment vitals:

   - Spark 1.0.0
   - EC2 cluster with 1 slave spun up using spark-ec2
   - twitter4j 3.0.3
   - spark-shell called with --jars argument to load
   spark-streaming-twitter_2.10-1.0.0.jar as well as all the twitter4j
   jars.

Now, while I’m in the Spark shell, I enter the following:

import twitter4j.auth.{Authorization, OAuthAuthorization}
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
def getAuth(): Option[Authorization] = {

  System.setProperty(twitter4j.oauth.consumerKey, consumerKey)
  System.setProperty(twitter4j.oauth.consumerSecret, consumerSecret)
  System.setProperty(twitter4j.oauth.accessToken, accessToken)
  System.setProperty(twitter4j.oauth.accessTokenSecret, accessTokenSecret)

  Some(new OAuthAuthorization(new ConfigurationBuilder().build()))

}
def noop(a: Any): Any = {
  a
}
val ssc = new StreamingContext(sc, Seconds(5))
val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
val liveTweets = liveTweetObjects.map(_.getText)

liveTweets.map(t = noop(t)).print()

ssc.start()

So basically, I’m just printing Tweets as-is, but first I’m mapping them to
themselves via noop(). The Tweets will start to flow just fine for a minute
or so, and then, this:

14/07/24 23:13:30 ERROR JobScheduler: Error running job streaming job
140624361 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure:
Task not serializable: java.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext
at 
[org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at 
[org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
at 
[org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The time-to-first-error is variable.

This is the simplest repro I can show at this time. Doing more complex
things with liveTweets that involve a KMeansModel, for example, will be
interrupted quicker by this java.io.NotSerializableException. I don’t know
if the root cause is the same, but the error certainly is.

By the way, trying to reproduce this on 1.0.1 doesn’t raise the same error,
but I can’t dig deeper to make sure this is really resolved (e.g. by trying
more complex things that need data) due to SPARK-2471
https://issues.apache.org/jira/browse/SPARK-2471. I see that that issue
has been resolved, so I’ll try this whole process again using the latest
from master and see how it goes.

Nick


On Tue, Jul 15, 2014 at 5:58 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

I am very curious though. Can you post a concise code example which we can
 run to reproduce this problem?

 TD


 On Tue, Jul 15, 2014 at 2:54 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 I am not entire sure off the top of my head. But a possible (usually
 works) workaround is to define the function as a val instead of a def. For
 example

 def func(i: Int): Boolean = { true }

 can be written as

 val func = (i: Int) = { true }

 Hope this helps for now.

 TD


 On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Hey Diana,

 Did you ever figure this out?

 I’m running into the same exception, except in my case the function I’m
 

Re: Simple record matching using Spark SQL

2014-07-24 Thread Yin Huai
Hi Sarath,

Have you tried the current branch 1.0? If not, can you give it a try and
see if the problem can be resolved?

Thanks,

Yin


On Thu, Jul 24, 2014 at 11:17 AM, Yin Huai yh...@databricks.com wrote:

 Hi Sarath,

 I will try to reproduce the problem.

 Thanks,

 Yin


 On Wed, Jul 23, 2014 at 11:32 PM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Hi Michael,

 Sorry for the delayed response.

 I'm using Spark 1.0.1 (pre-built version for hadoop 1). I'm running spark
 programs on a standalone spark cluster using 2 nodes. One node works as
 both master and worker while other node is just a worker.

 I quite didn't get when you asked for jstack of the driver and
 executor. So I'm attaching the log files generated in $SPARK_HOME/logs and
 stdout and stderr files for this job in $SPARK_HOME/work folder from both
 the nodes.

 Also attaching the program which I executed. If I uncomment the lines 36
  37 the program works fine, otherwise it just keeps running forever.

 ~Sarath.


 On Thu, Jul 17, 2014 at 9:35 PM, Michael Armbrust mich...@databricks.com
  wrote:

 What version are you running?  Could you provide a jstack of the driver
 and executor when it is hanging?


 On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Added below 2 lines just before the sql query line -
 *...*
 *file1_schema.count;*
 *file2_schema.count;*
 *...*
 and it started working. But I couldn't get the reason.

 Can someone please explain me? What was happening earlier and what is
 happening with addition of these 2 lines?

 ~Sarath


 On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 No Sonal, I'm not doing any explicit call to stop context.

 If you see my previous post to Michael, the commented portion of the
 code is my requirement. When I run this over standalone spark cluster, the
 execution keeps running with no output or error. After waiting for several
 minutes I'm killing it by pressing Ctrl+C in the terminal.

 But the same code runs perfectly when executed from spark shell.

 ~Sarath


 On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal sonalgoy...@gmail.com
 wrote:

 Hi Sarath,

 Are you explicitly stopping the context?

 sc.stop()




 Best Regards,
 Sonal
 Nube Technologies http://www.nubetech.co

 http://in.linkedin.com/in/sonalgoyal




 On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Hi Michael, Soumya,

 Can you please check and let me know what is the issue? what am I
 missing?
 Let me know if you need any logs to analyze.

 ~Sarath


 On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Hi Michael,

 Tried it. It's correctly printing the line counts of both the
 files. Here's what I tried -

 *Code:*
 *package test*
 *object Test4 {*
 *  case class Test(fld1: String, *
 *   fld2: String, *
 *   fld3: String, *
 *   fld4: String, *
 *   fld5: String, *
 *   fld6: Double, *
 *   fld7: String);*
 *  def main(args: Array[String]) {*
 *val conf = new SparkConf()*
 *.setMaster(args(0))*
 * .setAppName(SQLTest)*
 * .setSparkHome(args(1))*
 * .set(spark.executor.memory, 2g);*
 *val sc = new SparkContext(conf);*
 *sc.addJar(test1-0.1.jar);*
 *val file1 = sc.textFile(args(2));*
 *println(file1.count());*
 *val file2 = sc.textFile(args(3));*
 *println(file2.count());*
 *//val sq = new SQLContext(sc);*
 *//import sq._*
 *//val file1_recs: RDD[Test] = file1.map(_.split(,)).map(l =
 Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
 *//val file2_recs: RDD[Test] = file2.map(_.split(,)).map(s =
 Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
 *//val file1_schema = sq.createSchemaRDD(file1_recs);*
 *//val file2_schema = sq.createSchemaRDD(file2_recs);*
 *//file1_schema.registerAsTable(file1_tab);*
 *//file2_schema.registerAsTable(file2_tab);*
 *//val matched = sq.sql(select * from file1_tab l join
 file2_tab s on  + *
 *// l.fld7=s.fld7 where l.fld2=s.fld2 and  + *
 *// l.fld3=s.fld3 and l.fld4=s.fld4 and  + *
 *// l.fld6=s.fld6);*
 *//matched.collect().foreach(println);*
 *  }*
 *}*

 *Execution:*
 *export
 CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar*
 *export CONFIG_OPTS=-Dspark.jars=test1-0.1.jar*
 *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077
 /usr/local/spark-1.0.1-bin-hadoop1
 hdfs://master:54310/user/hduser/file1.csv
 hdfs://master:54310/user/hduser/file2.csv*

 ~Sarath

 On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 What if you just run something like:
 *sc.textFile(hdfs://localhost:54310/user/hduser/file1.csv).count()*


 On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra 
 sarathchandra.jos...@algofusiontech.com wrote:

 Yes Soumya, I did it.

 First I tried with the example available in the documentation
 (example using people 

Re: mapToPair vs flatMapToPair vs flatMap function usage.

2014-07-24 Thread Matei Zaharia
The Pair ones return a JavaPairRDD, which has additional operations on 
key-value pairs. Take a look at 
http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs
 for details.

Matei

On Jul 24, 2014, at 3:41 PM, abhiguruvayya sharath.abhis...@gmail.com wrote:

 Can any one help me understand the key difference between mapToPair vs
 flatMapToPair vs flatMap functions and also when to apply these functions in
 particular.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/mapToPair-vs-flatMapToPair-vs-flatMap-function-usage-tp10617.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark Function setup and cleanup

2014-07-24 Thread Yanbo Liang
You can refer this topic
http://www.mapr.com/developercentral/code/loading-hbase-tables-spark


2014-07-24 22:32 GMT+08:00 Yosi Botzer yosi.bot...@gmail.com:

 In my case I want to reach HBase. For every record with userId I want to
 get some extra information about the user and add it to result record for
 further prcessing


 On Thu, Jul 24, 2014 at 9:11 AM, Yanbo Liang yanboha...@gmail.com wrote:

 If you want to connect to DB in program, you can use JdbcRDD (
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
 )


 2014-07-24 18:32 GMT+08:00 Yosi Botzer yosi.bot...@gmail.com:

 Hi,

 I am using the Java api of Spark.

 I wanted to know if there is a way to run some code in a manner that is
 like the setup() and cleanup() methods of Hadoop Map/Reduce

 The reason I need it is because I want to read something from the DB
 according to each record I scan in my Function, and I would like to open
 the DB connection only once (and close it only once).

 Thanks






Need help, got java.lang.ExceptionInInitializerError in Yarn-Client/Cluster mode

2014-07-24 Thread Jianshi Huang
I can successfully run my code in local mode using spark-submit (--master
local[4]), but I got ExceptionInInitializerError errors in Yarn-client mode.

Any hints what is the problem? Is it a closure serialization problem? How
can I debug it? Your answers would be very helpful.

14/07/25 11:48:14 WARN scheduler.TaskSetManager: Loss was due to
java.lang.ExceptionInInitializerError
java.lang.ExceptionInInitializerError
at
com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scal
a:40)
at
com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scal
a:36)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1016)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: GraphX Pragel implementation

2014-07-24 Thread Ankur Dave
On Thu, Jul 24, 2014 at 9:52 AM, Arun Kumar toga...@gmail.com wrote:

 While using pregel  API for Iterations how to figure out which super step
 the iteration currently in.


The Pregel API doesn't currently expose this, but it's very straightforward
to modify Pregel.scala
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala#L112
to do so. Let me know if you'd like help doing this.

Ankur http://www.ankurdave.com/